Skip to content

Commit

Permalink
Skip versions that can't be deserialized (#352)
Browse files Browse the repository at this point in the history
  • Loading branch information
rtso authored Apr 19, 2024
1 parent d475078 commit 328210f
Showing 1 changed file with 20 additions and 2 deletions.
22 changes: 20 additions & 2 deletions python/utils/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,24 @@ def producer(
extra={
"processor_name": processor_name,
"stream_address": indexer_grpc_data_service_address,
"error": str(e),
"next_version_to_fetch": next_version_to_fetch,
"ending_version": ending_version,
},
)
# Datastream error can happen when we fail to deserialize deeply nested types.
# Skip the batch, log the error, and continue processing.
is_success = True
next_version_to_fetch += 1
response_stream = get_grpc_stream(
indexer_grpc_data_service_address,
indexer_grpc_data_stream_api_key,
indexer_grpc_http2_ping_interval,
indexer_grpc_http2_ping_timeout,
next_version_to_fetch,
ending_version,
processor_name,
)

# Check if we're at the end of the stream
reached_ending_version = (
Expand Down Expand Up @@ -369,7 +385,8 @@ async def consumer_impl(
"service_type": PROCESSOR_SERVICE_TYPE,
},
)
os._exit(1)
# Gaps are possible because we skipped versions
# os._exit(1)
last_fetched_version = transactions[-1].version
transaction_batches.append(transactions)

Expand Down Expand Up @@ -415,7 +432,8 @@ async def consumer_impl(
"service_type": PROCESSOR_SERVICE_TYPE,
},
)
os._exit(1)
# Gaps are possible because we skip versions
# os._exit(1)
prev_start = result.start_version
prev_end = result.end_version

Expand Down

0 comments on commit 328210f

Please sign in to comment.