Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Silent failure with --runner=DataflowRunner #264

Open
feelingsonice opened this issue Mar 25, 2022 · 2 comments
Open

Silent failure with --runner=DataflowRunner #264

feelingsonice opened this issue Mar 25, 2022 · 2 comments

Comments

@feelingsonice
Copy link

feelingsonice commented Mar 25, 2022

System information

  • Environment: Google Colab, Vertex AI (KubeflowV2DagRunner)
  • TensorFlow version: 2.8.0
  • TFX Version: 1.7.0
  • Python version: 3.7.12

Here's my preprocessing_fn (redacted for clarity):

_FEATURES = [# list of str
]
_SPECIAL_IMPUTE = {
    'special_foo': 1,
}
HOURS = [1, 2, 3, 4]
TABLE_KEYS = {
    'XXX': ['XXX_1', 'XXX_2', 'XXX_3'],
    'YYY': ['YYY_1', 'YYY_2', 'YYY_3'],
}

@tf.function
def _divide(a, b):
  return tf.math.divide_no_nan(tf.cast(a, tf.float32), tf.cast(b, tf.float32))

def preprocessing_fn(inputs):
  x = {}

  for name, tensor in sorted(inputs.items()):
    if tensor.dtype == tf.bool:
      tensor = tf.cast(tensor, tf.int64)

    if isinstance(tensor, tf.sparse.SparseTensor):
      default_value = '' if tensor.dtype == tf.string else 0
      tensor = tft.sparse_tensor_to_dense_with_shape(tensor, [None, 1], default_value)
    
    x[name] = tensor

  x['foo'] = _divide((x['foo1'] - x['foo2']), x['foo_denom'])
  x['bar'] = tf.cast(x['bar'] > 0, tf.int64)

  for hour in HOURS:
      total = tf.constant(0, dtype=tf.int64)
      for device_type in DEVICE_TYPES.keys():
          total = total + x[f'some_device_{device_type}_{hour}h']

  # one hot encode categorical values
  for name, keys in TABLE_KEYS.items():
    with tf.init_scope():
      initializer = tf.lookup.KeyValueTensorInitializer(
          tf.constant(keys), 
          tf.constant([i for i in range(len(keys))]))
      table = tf.lookup.StaticHashTable(initializer, default_value=-1)
      
    indices = table.lookup(tf.squeeze(x[name], axis=1))
    one_hot = tf.one_hot(indices, len(keys), dtype=tf.int64)

    for i, _tensor in enumerate(tf.split(one_hot, num_or_size_splits=len(keys), axis=1)):
      x[f'{name}_{keys[i]}'] = _tensor

  return {name: tft.scale_to_0_1(x[name]) for name in _FEATURES}

Here's the beam_pipeline_args:

BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   '--runner=DataflowRunner',
   '--region=us-central1',
  '--experiments=upload_graph', # must be enabled, otherwise fails with 413
   '--dataflow_service_options=enable_prime',
   '--autoscaling_algorithm=THROUGHPUT_BASED',
   ]

Not sure if related but with the above preprocessing_fn, my transform first failed with the error:

RuntimeError: The order of analyzers in your `preprocessing_fn` appears to be non-deterministic. This can be fixed either by changing your `preprocessing_fn` such that tf.Transform analyzers are encountered in a deterministic order or by passing a unique name to each analyzer API call.

I then added names to the tft.scale_to_0_1 analyzers:

return {name: tft.scale_to_0_1(x[name], name=f'{name}_scale_to_0_1') for name in _FEATURES}

After which my transform just silently failed without logs (see first screenshot). I check the worker logs but there's nothing substantial, only warnings (see second screenshot).

It's worth noting that I have the enable_prime flag.

Screen Shot 2022-03-25 at 2 01 59 PM

Screen Shot 2022-03-25 at 2 01 40 PM

@pindinagesh pindinagesh self-assigned this Mar 29, 2022
@pindinagesh
Copy link

Hi @bli00

Could you please take a look at this issue and see if it helps in resolving your issue? Also you can refer to this doc which discusses about the error you mentioned. Thanks!

@feelingsonice
Copy link
Author

@pindinagesh After I removed the --dataflow_service_options=enable_prime flag it worked fine. Are you guys planning to support dataflow prime?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants