diff --git a/python/utils/worker.py b/python/utils/worker.py index 426ee1fe8..3d11b3cae 100644 --- a/python/utils/worker.py +++ b/python/utils/worker.py @@ -201,8 +201,24 @@ def producer( extra={ "processor_name": processor_name, "stream_address": indexer_grpc_data_service_address, + "error": str(e), + "next_version_to_fetch": next_version_to_fetch, + "ending_version": ending_version, }, ) + # Datastream error can happen when we fail to deserialize deeply nested types. + # Skip the batch, log the error, and continue processing. + is_success = True + next_version_to_fetch += 1 + response_stream = get_grpc_stream( + indexer_grpc_data_service_address, + indexer_grpc_data_stream_api_key, + indexer_grpc_http2_ping_interval, + indexer_grpc_http2_ping_timeout, + next_version_to_fetch, + ending_version, + processor_name, + ) # Check if we're at the end of the stream reached_ending_version = ( @@ -369,7 +385,8 @@ async def consumer_impl( "service_type": PROCESSOR_SERVICE_TYPE, }, ) - os._exit(1) + # Gaps are possible because we skipped versions + # os._exit(1) last_fetched_version = transactions[-1].version transaction_batches.append(transactions) @@ -415,7 +432,8 @@ async def consumer_impl( "service_type": PROCESSOR_SERVICE_TYPE, }, ) - os._exit(1) + # Gaps are possible because we skip versions + # os._exit(1) prev_start = result.start_version prev_end = result.end_version