diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 4f98b8d63fe0..c60e88e7c070 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -27,6 +27,8 @@ import json import logging import os +import random + import pkg_resources import re import sys @@ -496,6 +498,11 @@ def __init__(self, options, proto_pipeline): self.proto.labels.additionalProperties.append( dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value)) + # Client Request ID + self.proto.clientRequestId = '{}-{}'.format( + datetime.utcnow().strftime('%Y%m%d%H%M%S%f'), + random.randrange(9000) + 1000) + self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$') self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$') @@ -795,6 +802,20 @@ def submit_job_description(self, job): self.google_cloud_options.dataflow_endpoint) _LOGGER.fatal('details of server error: %s', e) raise + + if response.clientRequestId and \ + response.clientRequestId != job.proto.clientRequestId: + if self.google_cloud_options.update: + raise DataflowJobAlreadyExistsError( + "The job named %s with id: %s has already been updated into job " + "id: %s and cannot be updated again." % + (response.name, job.proto.replaceJobId, response.id)) + else: + raise DataflowJobAlreadyExistsError( + 'There is already active job named %s with id: %s. If you want to ' + 'submit a second job, try again by setting a different name using ' + '--job_name.' % (response.name, response.id)) + _LOGGER.info('Create job: %s', response) # The response is a Job proto with the id for the new job. _LOGGER.info('Created job with id: [%s]', response.id) @@ -1029,6 +1050,13 @@ def get_sdk_package_name(): return shared_names.BEAM_PACKAGE_NAME +class DataflowJobAlreadyExistsError(retry.PermanentException): + """A non-retryable exception that a job with the given name already exists.""" + # Inherits retry.PermanentException to avoid retry in + # DataflowApplicationClient.submit_job_description + pass + + def to_split_int(n): res = dataflow.SplitInt64() res.lowBits = n & 0xffffffff diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index a4f81a31f06a..41e79f3e509e 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1039,6 +1039,86 @@ def test_graph_is_uploaded(self): mock.ANY, "dataflow_graph.json", mock.ANY) client.create_job_description.assert_called_once() + def test_create_job_returns_existing_job(self): + pipeline_options = PipelineOptions([ + '--project', + 'test_project', + '--job_name', + 'test_job_name', + '--temp_location', + 'gs://test-location/temp', + ]) + job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL) + self.assertTrue(job.proto.clientRequestId) # asserts non-empty string + pipeline_options.view_as(GoogleCloudOptions).no_auth = True + client = apiclient.DataflowApplicationClient(pipeline_options) + + response = dataflow.Job() + # different clientRequestId from `job` + response.clientRequestId = "20210821081910123456-1234" + response.name = 'test_job_name' + response.id = '2021-08-19_21_18_43-9756917246311111021' + + with mock.patch.object(client._client.projects_locations_jobs, + 'Create', + side_effect=[response]): + with mock.patch.object(client, 'create_job_description', + side_effect=None): + with self.assertRaises( + apiclient.DataflowJobAlreadyExistsError) as context: + client.create_job(job) + + self.assertEqual( + str(context.exception), + 'There is already active job named %s with id: %s. If you want to ' + 'submit a second job, try again by setting a different name using ' + '--job_name.' % ('test_job_name', response.id)) + + def test_update_job_returns_existing_job(self): + pipeline_options = PipelineOptions([ + '--project', + 'test_project', + '--job_name', + 'test_job_name', + '--temp_location', + 'gs://test-location/temp', + '--region', + 'us-central1', + '--update', + ]) + replace_job_id = '2021-08-21_00_00_01-6081497447916622336' + with mock.patch('apache_beam.runners.dataflow.internal.apiclient' + '.DataflowApplicationClient.job_id_for_name', + return_value=replace_job_id) as job_id_for_name_mock: + job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL) + job_id_for_name_mock.assert_called_once() + + self.assertTrue(job.proto.clientRequestId) # asserts non-empty string + + pipeline_options.view_as(GoogleCloudOptions).no_auth = True + client = apiclient.DataflowApplicationClient(pipeline_options) + + response = dataflow.Job() + # different clientRequestId from `job` + response.clientRequestId = "20210821083254123456-1234" + response.name = 'test_job_name' + response.id = '2021-08-19_21_29_07-5725551945600207770' + + with mock.patch.object(client, 'create_job_description', side_effect=None): + with mock.patch.object(client._client.projects_locations_jobs, + 'Create', + side_effect=[response]): + + with self.assertRaises( + apiclient.DataflowJobAlreadyExistsError) as context: + client.create_job(job) + + self.assertEqual( + str(context.exception), + 'The job named %s with id: %s has already been updated into job ' + 'id: %s and cannot be updated again.' % + ('test_job_name', replace_job_id, response.id)) + def test_template_file_generation_with_upload_graph(self): pipeline_options = PipelineOptions([ '--project',