Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-3044] Dataflow operators accept templated job_name param #3887

Merged
merged 1 commit into from
Sep 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 18 additions & 24 deletions airflow/contrib/hooks/gcp_dataflow_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,9 @@ def get_conn(self):
return build(
'dataflow', 'v1b3', http=http_authorized, cache_discovery=False)

def _start_dataflow(self, task_id, variables, name,
command_prefix, label_formatter):
def _start_dataflow(self, variables, name, command_prefix, label_formatter):
variables = self._set_variables(variables)
cmd = command_prefix + self._build_cmd(task_id, variables,
label_formatter)
cmd = command_prefix + self._build_cmd(variables, label_formatter)
job_id = _Dataflow(cmd).wait_for_done()
_DataflowJob(self.get_conn(), variables['project'], name,
variables['region'],
Expand All @@ -208,58 +206,54 @@ def _set_variables(variables):
variables['region'] = DEFAULT_DATAFLOW_LOCATION
return variables

def start_java_dataflow(self, task_id, variables, dataflow, job_class=None,
def start_java_dataflow(self, job_name, variables, dataflow, job_class=None,
append_job_name=True):
name = self._build_dataflow_job_name(task_id, append_job_name)
name = self._build_dataflow_job_name(job_name, append_job_name)
variables['jobName'] = name

def label_formatter(labels_dict):
return ['--labels={}'.format(
json.dumps(labels_dict).replace(' ', ''))]
command_prefix = (["java", "-cp", dataflow, job_class] if job_class
else ["java", "-jar", dataflow])
self._start_dataflow(task_id, variables, name,
command_prefix, label_formatter)
self._start_dataflow(variables, name, command_prefix, label_formatter)

def start_template_dataflow(self, task_id, variables, parameters, dataflow_template,
def start_template_dataflow(self, job_name, variables, parameters, dataflow_template,
append_job_name=True):
name = self._build_dataflow_job_name(task_id, append_job_name)
name = self._build_dataflow_job_name(job_name, append_job_name)
self._start_template_dataflow(
name, variables, parameters, dataflow_template)

def start_python_dataflow(self, task_id, variables, dataflow, py_options,
def start_python_dataflow(self, job_name, variables, dataflow, py_options,
append_job_name=True):
name = self._build_dataflow_job_name(task_id, append_job_name)
name = self._build_dataflow_job_name(job_name, append_job_name)
variables['job_name'] = name

def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]
# TODO: Change python2 to python when Beam supports both python 2 and 3
# Remember to change the test case too
self._start_dataflow(task_id, variables, name,
["python2"] + py_options + [dataflow],
self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
label_formatter)

@staticmethod
def _build_dataflow_job_name(task_id, append_job_name=True):
task_id = str(task_id).replace('_', '-')
def _build_dataflow_job_name(job_name, append_job_name=True):
base_job_name = str(job_name).replace('_', '-')

if not re.match(r"^[a-z]([-a-z0-9]*[a-z0-9])?$", task_id):
if not re.match(r"^[a-z]([-a-z0-9]*[a-z0-9])?$", base_job_name):
raise ValueError(
'Invalid job_name ({}); the name must consist of'
'only the characters [-a-z0-9], starting with a '
'letter and ending with a letter or number '.format(task_id))
'letter and ending with a letter or number '.format(base_job_name))

if append_job_name:
job_name = task_id + "-" + str(uuid.uuid4())[:8]
safe_job_name = base_job_name + "-" + str(uuid.uuid4())[:8]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need appending if a user specifies job_name 🤔 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Provided job name could still be non-unique, but yeah, could be handled by the caller in that case. Would still be useful during testing, IMO.

else:
job_name = task_id
safe_job_name = base_job_name

return job_name
return safe_job_name

@staticmethod
def _build_cmd(task_id, variables, label_formatter):
def _build_cmd(variables, label_formatter):
command = ["--runner=DataflowRunner"]
if variables is not None:
for attr, value in variables.items():
Expand Down
36 changes: 26 additions & 10 deletions airflow/contrib/operators/dataflow_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ class DataFlowJavaOperator(BaseOperator):
For more detail on job submission have a look at the reference:
https://cloud.google.com/dataflow/pipelines/specifying-exec-params

:param jar: The reference to a self executing DataFlow jar.
:param jar: The reference to a self executing DataFlow jar (templated).
:type jar: str
:param job_name: The 'jobName' to use when executing the DataFlow job
(templated). This ends up being set in the pipeline options, so any entry
with key ``'jobName'`` in ``options`` will be overwritten.
:type job_name: str
:param dataflow_default_options: Map of default job options.
:type dataflow_default_options: dict
:param options: Map of job specific options.
Expand All @@ -58,7 +62,7 @@ class DataFlowJavaOperator(BaseOperator):
is often not the main class configured in the dataflow jar file.
:type job_class: str

Both ``jar`` and ``options`` are templated so you can use variables in them.
``jar``, ``options``, and ``job_name`` are templated so you can use variables in them.

Note that both
``dataflow_default_options`` and ``options`` will be merged to specify pipeline
Expand Down Expand Up @@ -100,13 +104,14 @@ class DataFlowJavaOperator(BaseOperator):
dag=my-dag)

"""
template_fields = ['options', 'jar']
template_fields = ['options', 'jar', 'job_name']
ui_color = '#0273d4'

@apply_defaults
def __init__(
self,
jar,
job_name='{{task.task_id}}',
dataflow_default_options=None,
options=None,
gcp_conn_id='google_cloud_default',
Expand All @@ -124,6 +129,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.jar = jar
self.job_name = job_name
self.dataflow_default_options = dataflow_default_options
self.options = options
self.poll_sleep = poll_sleep
Expand All @@ -140,7 +146,7 @@ def execute(self, context):
dataflow_options = copy.copy(self.dataflow_default_options)
dataflow_options.update(self.options)

hook.start_java_dataflow(self.task_id, dataflow_options,
hook.start_java_dataflow(self.job_name, dataflow_options,
self.jar, self.job_class)


Expand All @@ -151,6 +157,8 @@ class DataflowTemplateOperator(BaseOperator):

:param template: The reference to the DataFlow template.
:type template: str
:param job_name: The 'jobName' to use when executing the DataFlow template
(templated).
:param dataflow_default_options: Map of default job environment options.
:type dataflow_default_options: dict
:param parameters: Map of job specific parameters for the template.
Expand Down Expand Up @@ -201,8 +209,8 @@ class DataflowTemplateOperator(BaseOperator):
gcp_conn_id='gcp-airflow-service-account',
dag=my-dag)

``template``, ``dataflow_default_options`` and ``parameters`` are templated so you can
use variables in them.
``template``, ``dataflow_default_options``, ``parameters``, and ``job_name`` are
templated so you can use variables in them.

Note that ``dataflow_default_options`` is expected to save high-level options
for project information, which apply to all dataflow operators in the DAG.
Expand All @@ -214,13 +222,14 @@ class DataflowTemplateOperator(BaseOperator):
For more detail on job template execution have a look at the reference:
https://cloud.google.com/dataflow/docs/templates/executing-templates
"""
template_fields = ['parameters', 'dataflow_default_options', 'template']
template_fields = ['parameters', 'dataflow_default_options', 'template', 'job_name']
ui_color = '#0273d4'

@apply_defaults
def __init__(
self,
template,
job_name='{{task.task_id}}',
dataflow_default_options=None,
parameters=None,
gcp_conn_id='google_cloud_default',
Expand All @@ -238,14 +247,15 @@ def __init__(
self.dataflow_default_options = dataflow_default_options
self.poll_sleep = poll_sleep
self.template = template
self.job_name = job_name
self.parameters = parameters

def execute(self, context):
hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)

hook.start_template_dataflow(self.task_id, self.dataflow_default_options,
hook.start_template_dataflow(self.job_name, self.dataflow_default_options,
self.parameters, self.template)


Expand All @@ -264,6 +274,10 @@ class DataFlowPythonOperator(BaseOperator):
:param py_file: Reference to the python dataflow pipleline file.py, e.g.,
/some/local/file/path/to/your/python/pipeline/file.
:type py_file: str
:param job_name: The 'job_name' to use when executing the DataFlow job
(templated). This ends up being set in the pipeline options, so any entry
with key ``'jobName'`` or ``'job_name'`` in ``options`` will be overwritten.
:type job_name: str
:param py_options: Additional python options.
:type pyt_options: list of strings, e.g., ["-m", "-v"].
:param dataflow_default_options: Map of default job options.
Expand All @@ -282,12 +296,13 @@ class DataFlowPythonOperator(BaseOperator):
JOB_STATE_RUNNING state.
:type poll_sleep: int
"""
template_fields = ['options', 'dataflow_default_options']
template_fields = ['options', 'dataflow_default_options', 'job_name']

@apply_defaults
def __init__(
self,
py_file,
job_name='{{task.task_id}}',
py_options=None,
dataflow_default_options=None,
options=None,
Expand All @@ -300,6 +315,7 @@ def __init__(
super(DataFlowPythonOperator, self).__init__(*args, **kwargs)

self.py_file = py_file
self.job_name = job_name
self.py_options = py_options or []
self.dataflow_default_options = dataflow_default_options or {}
self.options = options or {}
Expand All @@ -325,7 +341,7 @@ def execute(self, context):
formatted_options = {camel_to_snake(key): dataflow_options[key]
for key in dataflow_options}
hook.start_python_dataflow(
self.task_id, formatted_options,
self.job_name, formatted_options,
self.py_file, self.py_options)


Expand Down
44 changes: 22 additions & 22 deletions tests/contrib/hooks/test_gcp_dataflow_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@


TASK_ID = 'test-dataflow-operator'
JOB_NAME = 'test-dataflow-pipeline'
TEMPLATE = 'gs://dataflow-templates/wordcount/template_file'
PARAMETERS = {
'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
Expand Down Expand Up @@ -92,14 +93,14 @@ def test_start_python_dataflow(self, mock_conn,
dataflowjob_instance = mock_dataflowjob.return_value
dataflowjob_instance.wait_for_done.return_value = None
self.dataflow_hook.start_python_dataflow(
task_id=TASK_ID, variables=DATAFLOW_OPTIONS_PY,
job_name=JOB_NAME, variables=DATAFLOW_OPTIONS_PY,
dataflow=PY_FILE, py_options=PY_OPTIONS)
EXPECTED_CMD = ['python2', '-m', PY_FILE,
'--region=us-central1',
'--runner=DataflowRunner', '--project=test',
'--labels=foo=bar',
'--staging_location=gs://test/staging',
'--job_name={}-{}'.format(TASK_ID, MOCK_UUID)]
'--job_name={}-{}'.format(JOB_NAME, MOCK_UUID)]
self.assertListEqual(sorted(mock_dataflow.call_args[0][0]),
sorted(EXPECTED_CMD))

Expand All @@ -116,14 +117,14 @@ def test_start_java_dataflow(self, mock_conn,
dataflowjob_instance = mock_dataflowjob.return_value
dataflowjob_instance.wait_for_done.return_value = None
self.dataflow_hook.start_java_dataflow(
task_id=TASK_ID, variables=DATAFLOW_OPTIONS_JAVA,
job_name=JOB_NAME, variables=DATAFLOW_OPTIONS_JAVA,
dataflow=JAR_FILE)
EXPECTED_CMD = ['java', '-jar', JAR_FILE,
'--region=us-central1',
'--runner=DataflowRunner', '--project=test',
'--stagingLocation=gs://test/staging',
'--labels={"foo":"bar"}',
'--jobName={}-{}'.format(TASK_ID, MOCK_UUID)]
'--jobName={}-{}'.format(JOB_NAME, MOCK_UUID)]
self.assertListEqual(sorted(mock_dataflow.call_args[0][0]),
sorted(EXPECTED_CMD))

Expand All @@ -140,18 +141,17 @@ def test_start_java_dataflow_with_job_class(
dataflowjob_instance = mock_dataflowjob.return_value
dataflowjob_instance.wait_for_done.return_value = None
self.dataflow_hook.start_java_dataflow(
task_id=TASK_ID, variables=DATAFLOW_OPTIONS_JAVA,
job_name=JOB_NAME, variables=DATAFLOW_OPTIONS_JAVA,
dataflow=JAR_FILE, job_class=JOB_CLASS)
EXPECTED_CMD = ['java', '-cp', JAR_FILE, JOB_CLASS,
'--region=us-central1',
'--runner=DataflowRunner', '--project=test',
'--stagingLocation=gs://test/staging',
'--labels={"foo":"bar"}',
'--jobName={}-{}'.format(TASK_ID, MOCK_UUID)]
'--jobName={}-{}'.format(JOB_NAME, MOCK_UUID)]
self.assertListEqual(sorted(mock_dataflow.call_args[0][0]),
sorted(EXPECTED_CMD))


@mock.patch('airflow.contrib.hooks.gcp_dataflow_hook._Dataflow.log')
@mock.patch('subprocess.Popen')
@mock.patch('select.select')
Expand All @@ -178,18 +178,18 @@ def poll_resp_error():

def test_valid_dataflow_job_name(self):
job_name = self.dataflow_hook._build_dataflow_job_name(
task_id=TASK_ID, append_job_name=False
job_name=JOB_NAME, append_job_name=False
)

self.assertEquals(job_name, TASK_ID)
self.assertEquals(job_name, JOB_NAME)

def test_fix_underscore_in_task_id(self):
task_id_with_underscore = 'test_example'
fixed_job_name = task_id_with_underscore.replace(
def test_fix_underscore_in_job_name(self):
job_name_with_underscore = 'test_example'
fixed_job_name = job_name_with_underscore.replace(
'_', '-'
)
job_name = self.dataflow_hook._build_dataflow_job_name(
task_id=task_id_with_underscore, append_job_name=False
job_name=job_name_with_underscore, append_job_name=False
)

self.assertEquals(job_name, fixed_job_name)
Expand All @@ -201,7 +201,7 @@ def test_invalid_dataflow_job_name(self):

with self.assertRaises(ValueError) as e:
self.dataflow_hook._build_dataflow_job_name(
task_id=invalid_job_name, append_job_name=False
job_name=invalid_job_name, append_job_name=False
)
# Test whether the job_name is present in the Error msg
self.assertIn('Invalid job_name ({})'.format(fixed_name),
Expand All @@ -210,37 +210,37 @@ def test_invalid_dataflow_job_name(self):
def test_dataflow_job_regex_check(self):

self.assertEquals(self.dataflow_hook._build_dataflow_job_name(
task_id='df-job-1', append_job_name=False
job_name='df-job-1', append_job_name=False
), 'df-job-1')

self.assertEquals(self.dataflow_hook._build_dataflow_job_name(
task_id='df-job', append_job_name=False
job_name='df-job', append_job_name=False
), 'df-job')

self.assertEquals(self.dataflow_hook._build_dataflow_job_name(
task_id='dfjob', append_job_name=False
job_name='dfjob', append_job_name=False
), 'dfjob')

self.assertEquals(self.dataflow_hook._build_dataflow_job_name(
task_id='dfjob1', append_job_name=False
job_name='dfjob1', append_job_name=False
), 'dfjob1')

self.assertRaises(
ValueError,
self.dataflow_hook._build_dataflow_job_name,
task_id='1dfjob', append_job_name=False
job_name='1dfjob', append_job_name=False
)

self.assertRaises(
ValueError,
self.dataflow_hook._build_dataflow_job_name,
task_id='dfjob@', append_job_name=False
job_name='dfjob@', append_job_name=False
)

self.assertRaises(
ValueError,
self.dataflow_hook._build_dataflow_job_name,
task_id='df^jo', append_job_name=False
job_name='df^jo', append_job_name=False
)


Expand All @@ -254,7 +254,7 @@ def setUp(self):
@mock.patch(DATAFLOW_STRING.format('DataFlowHook._start_template_dataflow'))
def test_start_template_dataflow(self, internal_dataflow_mock):
self.dataflow_hook.start_template_dataflow(
task_id=TASK_ID, variables=DATAFLOW_OPTIONS_TEMPLATE, parameters=PARAMETERS,
job_name=JOB_NAME, variables=DATAFLOW_OPTIONS_TEMPLATE, parameters=PARAMETERS,
dataflow_template=TEMPLATE)
internal_dataflow_mock.assert_called_once_with(
mock.ANY, DATAFLOW_OPTIONS_TEMPLATE, PARAMETERS, TEMPLATE)
Expand Down
Loading