Skip to content

Commit

Permalink
[BEAM-12751] Set clientRequestId for Dataflow python job creation
Browse files Browse the repository at this point in the history
  • Loading branch information
baeminbo committed Aug 22, 2021
1 parent af59192 commit 81fe195
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 0 deletions.
28 changes: 28 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import json
import logging
import os
import random

import pkg_resources
import re
import sys
Expand Down Expand Up @@ -496,6 +498,11 @@ def __init__(self, options, proto_pipeline):
self.proto.labels.additionalProperties.append(
dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value))

# Client Request ID
self.proto.clientRequestId = '{}-{}'.format(
datetime.utcnow().strftime('%Y%m%d%H%M%S%f'),
random.randrange(9000) + 1000)

self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$')
self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')

Expand Down Expand Up @@ -794,6 +801,20 @@ def submit_job_description(self, job):
self.google_cloud_options.dataflow_endpoint)
_LOGGER.fatal('details of server error: %s', e)
raise

if response.clientRequestId and \
response.clientRequestId != job.proto.clientRequestId:
if self.google_cloud_options.update:
raise DataflowJobAlreadyExistsError(
"The job named %s with id: %s has already been updated into job "
"id: %s and cannot be updated again." %
(response.name, job.proto.replaceJobId, response.id))
else:
raise DataflowJobAlreadyExistsError(
'There is already active job named %s with id: %s. If you want to '
'submit a second job, try again by setting a different name using '
'--job_name.' % (response.name, response.id))

_LOGGER.info('Create job: %s', response)
# The response is a Job proto with the id for the new job.
_LOGGER.info('Created job with id: [%s]', response.id)
Expand Down Expand Up @@ -1028,6 +1049,13 @@ def get_sdk_package_name():
return shared_names.BEAM_PACKAGE_NAME


class DataflowJobAlreadyExistsError(retry.PermanentException):
"""A non-retryable exception that a job with the given name already exists."""
# Inherits retry.PermanentException to avoid retry in
# DataflowApplicationClient.submit_job_description
pass


def to_split_int(n):
res = dataflow.SplitInt64()
res.lowBits = n & 0xffffffff
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,86 @@ def test_graph_is_uploaded(self):
mock.ANY, "dataflow_graph.json", mock.ANY)
client.create_job_description.assert_called_once()

def test_create_job_returns_existing_job(self):
pipeline_options = PipelineOptions([
'--project',
'test_project',
'--job_name',
'test_job_name',
'--temp_location',
'gs://test-location/temp',
])
job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
self.assertTrue(job.proto.clientRequestId) # asserts non-empty string
pipeline_options.view_as(GoogleCloudOptions).no_auth = True
client = apiclient.DataflowApplicationClient(pipeline_options)

response = dataflow.Job()
# different clientRequestId from `job`
response.clientRequestId = "20210821081910123456-1234"
response.name = 'test_job_name'
response.id = '2021-08-19_21_18_43-9756917246311111021'

with mock.patch.object(client._client.projects_locations_jobs,
'Create',
side_effect=[response]):
with mock.patch.object(client, 'create_job_description',
side_effect=None):
with self.assertRaises(
apiclient.DataflowJobAlreadyExistsError) as context:
client.create_job(job)

self.assertEqual(
str(context.exception),
'There is already active job named %s with id: %s. If you want to '
'submit a second job, try again by setting a different name using '
'--job_name.' % ('test_job_name', response.id))

def test_update_job_returns_existing_job(self):
pipeline_options = PipelineOptions([
'--project',
'test_project',
'--job_name',
'test_job_name',
'--temp_location',
'gs://test-location/temp',
'--region',
'us-central1',
'--update',
])
replace_job_id = '2021-08-21_00_00_01-6081497447916622336'
with mock.patch('apache_beam.runners.dataflow.internal.apiclient'
'.DataflowApplicationClient.job_id_for_name',
return_value=replace_job_id) as job_id_for_name_mock:
job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
job_id_for_name_mock.assert_called_once()

self.assertTrue(job.proto.clientRequestId) # asserts non-empty string

pipeline_options.view_as(GoogleCloudOptions).no_auth = True
client = apiclient.DataflowApplicationClient(pipeline_options)

response = dataflow.Job()
# different clientRequestId from `job`
response.clientRequestId = "20210821083254123456-1234"
response.name = 'test_job_name'
response.id = '2021-08-19_21_29_07-5725551945600207770'

with mock.patch.object(client, 'create_job_description', side_effect=None):
with mock.patch.object(client._client.projects_locations_jobs,
'Create',
side_effect=[response]):

with self.assertRaises(
apiclient.DataflowJobAlreadyExistsError) as context:
client.create_job(job)

self.assertEqual(
str(context.exception),
'The job named %s with id: %s has already been updated into job '
'id: %s and cannot be updated again.' %
('test_job_name', replace_job_id, response.id))

def test_template_file_generation_with_upload_graph(self):
pipeline_options = PipelineOptions([
'--project',
Expand Down

0 comments on commit 81fe195

Please sign in to comment.