Skip to content

Commit 88c7d2e

Browse files
Dataflow operators don't not always create a virtualenv (#10373)
1 parent 30f4617 commit 88c7d2e

File tree

4 files changed

+80
-4
lines changed

4 files changed

+80
-4
lines changed

β€Žairflow/providers/google/cloud/hooks/dataflow.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import select
2525
import shlex
2626
import subprocess
27+
import textwrap
2728
import time
2829
import uuid
2930
import warnings
@@ -633,7 +634,7 @@ def start_python_dataflow( # pylint: disable=too-many-arguments
633634
:param py_system_site_packages: Whether to include system_site_packages in your virtualenv.
634635
See virtualenv documentation for more information.
635636
636-
This option is only relevant if the ``py_requirements`` parameter is passed.
637+
This option is only relevant if the ``py_requirements`` parameter is not None.
637638
:type py_interpreter: str
638639
:param append_job_name: True if unique suffix has to be appended to job name.
639640
:type append_job_name: bool
@@ -653,6 +654,19 @@ def label_formatter(labels_dict):
653654
for key, value in labels_dict.items()]
654655

655656
if py_requirements is not None:
657+
if not py_requirements and not py_system_site_packages:
658+
warning_invalid_environment = textwrap.dedent(
659+
"""\
660+
Invalid method invocation. You have disabled inclusion of system packages and empty list
661+
required for installation, so it is not possible to create a valid virtual environment.
662+
In the virtual environment, apache-beam package must be installed for your job to be \
663+
executed. To fix this problem:
664+
* install apache-beam on the system, then set parameter py_system_site_packages to True,
665+
* add apache-beam to the list of required packages in parameter py_requirements.
666+
"""
667+
)
668+
raise AirflowException(warning_invalid_environment)
669+
656670
with TemporaryDirectory(prefix='dataflow-venv') as tmp_dir:
657671
py_interpreter = prepare_virtualenv(
658672
venv_directory=tmp_dir,

β€Žairflow/providers/google/cloud/operators/dataflow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ class DataflowCreatePythonJobOperator(BaseOperator):
470470
:param py_system_site_packages: Whether to include system_site_packages in your virtualenv.
471471
See virtualenv documentation for more information.
472472
473-
This option is only relevant if the ``py_requirements`` parameter is passed.
473+
This option is only relevant if the ``py_requirements`` parameter is not None.
474474
:param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
475475
:type gcp_conn_id: str
476476
:param project_id: Optional, the GCP project ID in which to start a job.
@@ -517,7 +517,7 @@ def __init__( # pylint: disable=too-many-arguments
517517
self.options.setdefault('labels', {}).update(
518518
{'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')})
519519
self.py_interpreter = py_interpreter
520-
self.py_requirements = py_requirements or []
520+
self.py_requirements = py_requirements
521521
self.py_system_site_packages = py_system_site_packages
522522
self.project_id = project_id
523523
self.location = location

β€Žtests/providers/google/cloud/hooks/test_dataflow.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,68 @@ def test_start_python_dataflow_with_custom_interpreter(
314314
self.assertListEqual(sorted(mock_dataflow.call_args[1]["cmd"]),
315315
sorted(expected_cmd))
316316

317+
@parameterized.expand([
318+
(['foo-bar'], False),
319+
(['foo-bar'], True),
320+
([], True),
321+
])
322+
@mock.patch(DATAFLOW_STRING.format('prepare_virtualenv'))
323+
@mock.patch(DATAFLOW_STRING.format('uuid.uuid4'))
324+
@mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
325+
@mock.patch(DATAFLOW_STRING.format('_DataflowRunner'))
326+
@mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn'))
327+
def test_start_python_dataflow_with_non_empty_py_requirements_and_without_system_packages(
328+
self,
329+
current_py_requirements,
330+
current_py_system_site_packages,
331+
mock_conn,
332+
mock_dataflow,
333+
mock_dataflowjob,
334+
mock_uuid,
335+
mock_virtualenv,
336+
):
337+
mock_uuid.return_value = MOCK_UUID
338+
mock_conn.return_value = None
339+
dataflow_instance = mock_dataflow.return_value
340+
dataflow_instance.wait_for_done.return_value = None
341+
dataflowjob_instance = mock_dataflowjob.return_value
342+
dataflowjob_instance.wait_for_done.return_value = None
343+
mock_virtualenv.return_value = '/dummy_dir/bin/python'
344+
self.dataflow_hook.start_python_dataflow( # pylint: disable=no-value-for-parameter
345+
job_name=JOB_NAME, variables=DATAFLOW_VARIABLES_PY,
346+
dataflow=PY_FILE, py_options=PY_OPTIONS,
347+
py_requirements=current_py_requirements,
348+
py_system_site_packages=current_py_system_site_packages
349+
)
350+
expected_cmd = ['/dummy_dir/bin/python', '-m', PY_FILE,
351+
'--region=us-central1',
352+
'--runner=DataflowRunner', '--project=test',
353+
'--labels=foo=bar',
354+
'--staging_location=gs://test/staging',
355+
'--job_name={}-{}'.format(JOB_NAME, MOCK_UUID)]
356+
self.assertListEqual(sorted(mock_dataflow.call_args[1]["cmd"]),
357+
sorted(expected_cmd))
358+
359+
@mock.patch(DATAFLOW_STRING.format('uuid.uuid4'))
360+
@mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
361+
@mock.patch(DATAFLOW_STRING.format('_DataflowRunner'))
362+
@mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn'))
363+
def test_start_python_dataflow_with_empty_py_requirements_and_without_system_packages(
364+
self, mock_conn, mock_dataflow, mock_dataflowjob, mock_uuid
365+
):
366+
mock_uuid.return_value = MOCK_UUID
367+
mock_conn.return_value = None
368+
dataflow_instance = mock_dataflow.return_value
369+
dataflow_instance.wait_for_done.return_value = None
370+
dataflowjob_instance = mock_dataflowjob.return_value
371+
dataflowjob_instance.wait_for_done.return_value = None
372+
with self.assertRaisesRegex(AirflowException, "Invalid method invocation."):
373+
self.dataflow_hook.start_python_dataflow( # pylint: disable=no-value-for-parameter
374+
job_name=JOB_NAME, variables=DATAFLOW_VARIABLES_PY,
375+
dataflow=PY_FILE, py_options=PY_OPTIONS,
376+
py_requirements=[]
377+
)
378+
317379
@mock.patch(DATAFLOW_STRING.format('uuid.uuid4'))
318380
@mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
319381
@mock.patch(DATAFLOW_STRING.format('_DataflowRunner'))

β€Žtests/providers/google/cloud/operators/test_dataflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def test_exec(self, gcs_hook, dataflow_mock):
114114
dataflow=mock.ANY,
115115
py_options=PY_OPTIONS,
116116
py_interpreter=PY_INTERPRETER,
117-
py_requirements=[],
117+
py_requirements=None,
118118
py_system_site_packages=False,
119119
on_new_job_id_callback=mock.ANY,
120120
project_id=None,

0 commit comments

Comments
 (0)