Skip to content

Commit 1416ac4

Browse files
authored
PubSubPullSensor: Remove project and return_immediately (#23231)
* `PubSubPullSensor`: Remove `project` and `return_immediately`
1 parent 06dfc25 commit 1416ac4

File tree

2 files changed

+11
-37
lines changed

2 files changed

+11
-37
lines changed

β€Žairflow/providers/google/CHANGELOG.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ Breaking changes
4242

4343
* ``GCSObjectsWtihPrefixExistenceSensor`` removed. Please use ``GCSObjectsWithPrefixExistenceSensor``.
4444

45+
* ``PubSubPullSensor``: Remove ``project``. Please use ``project_id``
46+
47+
* ``PubSubPullSensor``: Remove ``return_immediately``
48+
4549
6.8.0
4650
.....
4751

β€Žairflow/providers/google/cloud/sensors/pubsub.py

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
# specific language governing permissions and limitations
1717
# under the License.
1818
"""This module contains a Google PubSub sensor."""
19-
import warnings
2019
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Sequence, Union
2120

2221
from google.cloud.pubsub_v1.types import ReceivedMessage
@@ -49,23 +48,18 @@ class PubSubPullSensor(BaseSensorOperator):
4948
acknowledged before being returned, otherwise, downstream tasks will be
5049
responsible for acknowledging them.
5150
52-
``project`` and ``subscription`` are templated so you can use
51+
If you want a non-blocking task that does not to wait for messages, please use
52+
:class:`~airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator`
53+
instead.
54+
55+
``project_id`` and ``subscription`` are templated so you can use
5356
variables in them.
5457
55-
:param project: the Google Cloud project ID for the subscription (templated)
58+
:param project_id: the Google Cloud project ID for the subscription (templated)
5659
:param subscription: the Pub/Sub subscription name. Do not include the
5760
full subscription path.
5861
:param max_messages: The maximum number of messages to retrieve per
5962
PubSub pull request
60-
:param return_immediately:
61-
(Deprecated) This is an underlying PubSub API implementation detail.
62-
It has no real effect on Sensor behaviour other than some internal wait time before retrying
63-
on empty queue.
64-
The Sensor task will (by definition) always wait for a message, regardless of this argument value.
65-
66-
If you want a non-blocking task that does not to wait for messages, please use
67-
:class:`~airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator`
68-
instead.
6963
:param ack_messages: If True, each message will be acknowledged
7064
immediately rather than by any downstream tasks
7165
:param gcp_conn_id: The connection ID to use connecting to
@@ -101,44 +95,20 @@ def __init__(
10195
project_id: str,
10296
subscription: str,
10397
max_messages: int = 5,
104-
return_immediately: bool = True,
10598
ack_messages: bool = False,
10699
gcp_conn_id: str = 'google_cloud_default',
107100
messages_callback: Optional[Callable[[List[ReceivedMessage], "Context"], Any]] = None,
108101
delegate_to: Optional[str] = None,
109-
project: Optional[str] = None,
110102
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
111103
**kwargs,
112104
) -> None:
113-
# To preserve backward compatibility
114-
# TODO: remove one day
115-
if project:
116-
warnings.warn(
117-
"The project parameter has been deprecated. You should pass the project_id parameter.",
118-
DeprecationWarning,
119-
stacklevel=2,
120-
)
121-
project_id = project
122-
123-
if not return_immediately:
124-
warnings.warn(
125-
"The return_immediately parameter is deprecated.\n"
126-
" It exposes what is really just an implementation detail of underlying PubSub API.\n"
127-
" It has no effect on PubSubPullSensor behaviour.\n"
128-
" It should be left as default value of True.\n"
129-
" If is here only because of backwards compatibility.\n"
130-
" If may be removed in the future.\n",
131-
DeprecationWarning,
132-
stacklevel=2,
133-
)
134105

135106
super().__init__(**kwargs)
136107
self.gcp_conn_id = gcp_conn_id
137108
self.delegate_to = delegate_to
138109
self.project_id = project_id
139110
self.subscription = subscription
140111
self.max_messages = max_messages
141-
self.return_immediately = return_immediately
142112
self.ack_messages = ack_messages
143113
self.messages_callback = messages_callback
144114
self.impersonation_chain = impersonation_chain
@@ -161,7 +131,7 @@ def poke(self, context: "Context") -> bool:
161131
project_id=self.project_id,
162132
subscription=self.subscription,
163133
max_messages=self.max_messages,
164-
return_immediately=self.return_immediately,
134+
return_immediately=True,
165135
)
166136

167137
handle_messages = self.messages_callback or self._default_message_callback

0 commit comments

Comments
 (0)