Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCP PubSub Source: Recurring error logs #22304

Open
ibeauvais opened this issue Jan 27, 2025 · 0 comments
Open

GCP PubSub Source: Recurring error logs #22304

ibeauvais opened this issue Jan 27, 2025 · 0 comments
Labels
source: gcp_pubsub Anything `gcp_pubsub` source related type: bug A code related bug.

Comments

@ibeauvais
Copy link

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

With a Pub/sub source configured with Vector, we have regular errors in the logs.
The logs seem to arrive in the sink but we can wonder if we do not lose events when this happens.

The error message:

 ERROR vector::internal_events::gcp_pubsub: Failed to fetch events. error=status: Unavailable, message: "The service was unable to fulfill your request. Please try again. [code=8a75]", details: [], metadata: MetadataMap { headers: {} } error_code="failed_fetching_events" error_type="request_failed" stage="receiving" internal_log_rate_limit=true

The error occurs even if there are no messages in the subscription, I even wonder if this is not the source of the error (pull when no messages).

Configuration

sources:
  logs_from_gcp:
    type: gcp_pubsub
    project: "my-gcp-project"
    subscription: "my-subscription"
    decoding:
      codec: json

Version

Vector version v0.44.0

Debug Output

2025-01-27T10:00:38.981173Z TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=91}:hpack::decode: h2::hpack::decoder: rem=69 kind=LiteralWithIndexing
2025-01-27T10:00:38.981197Z DEBUG Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: received frame=Headers { stream_id: StreamId(1), flags: (0x5: END_HEADERS | END_STREAM) }
2025-01-27T10:00:38.981204Z TRACE Connection{peer=Client}:poll: h2::proto::connection: recv HEADERS frame=Headers { stream_id: StreamId(1), flags: (0x5: END_HEADERS | END_STREAM) }
2025-01-27T10:00:38.981216Z TRACE Connection{peer=Client}:poll: h2::proto::streams::streams: recv_headers; stream=StreamId(1); state=State { inner: Open { local: Streaming, remote: Streaming } }
2025-01-27T10:00:38.981649Z TRACE Connection{peer=Client}:poll: h2::proto::streams::state: recv_close: Open => HalfClosedRemote(Streaming)
2025-01-27T10:00:38.981715Z TRACE Connection{peer=Client}:poll: h2::proto::streams::counts: transition_after; stream=StreamId(1); state=State { inner: HalfClosedRemote(Streaming) }; is_closed=false; pending_send_empty=true; buffered_send_data=0; num_recv=0; num_send=1
2025-01-27T10:00:38.981747Z TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: poll
2025-01-27T10:00:38.981756Z TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: read.bytes=13
2025-01-27T10:00:38.981761Z TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=13}: h2::codec::framed_read: decoding frame from 13B
2025-01-27T10:00:38.981764Z TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=13}: h2::codec::framed_read: frame.kind=Reset
2025-01-27T10:00:38.981782Z DEBUG Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: received frame=Reset { stream_id: StreamId(1), error_code: NO_ERROR }
2025-01-27T10:00:38.982097Z TRACE Connection{peer=Client}:poll: h2::proto::connection: recv RST_STREAM frame=Reset { stream_id: StreamId(1), error_code: NO_ERROR }
2025-01-27T10:00:38.982401Z TRACE Connection{peer=Client}:poll: h2::proto::streams::state: recv_reset; frame=Reset { stream_id: StreamId(1), error_code: NO_ERROR }; state=HalfClosedRemote(Streaming); queued=false
2025-01-27T10:00:38.982558Z TRACE Connection{peer=Client}:poll: h2::proto::streams::counts: transition_after; stream=StreamId(1); state=State { inner: Closed(Error(Reset(StreamId(1), NO_ERROR, Remote))) }; is_closed=true; pending_send_empty=true; buffered_send_data=0; num_recv=0; num_send=1
2025-01-27T10:00:38.982611Z TRACE Connection{peer=Client}:poll: h2::proto::streams::counts: dec_num_streams; stream=StreamId(1)
2025-01-27T10:00:38.982627Z TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: poll
2025-01-27T10:00:38.982630Z TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: read.bytes=17
2025-01-27T10:00:38.982634Z TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=17}: h2::codec::framed_read: decoding frame from 17B
2025-01-27T10:00:38.982641Z TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=17}: h2::codec::framed_read: frame.kind=Ping
2025-01-27T10:00:38.982661Z TRACE h2::proto::streams::streams: drop_stream_ref; stream=Stream { id: StreamId(1), state: State { inner: Closed(Error(Reset(StreamId(1), NO_ERROR, Remote))) }, is_counted: false, ref_count: 3, next_pending_send: None, is_pending_send: false, send_flow: FlowControl { window_size: Window(1048467), available: Window(0) }, requested_send_capacity: 0, buffered_send_data: 0, send_task: None, pending_send: Deque { indices: None }, next_pending_send_capacity: None, is_pending_send_capacity: false, send_capacity_inc: true, next_open: None, is_pending_open: false, is_pending_push: false, next_pending_accept: None, is_pending_accept: false, recv_flow: FlowControl { window_size: Window(2097152), available: Window(2097152) }, in_flight_recv_data: 0, next_window_update: None, is_pending_window_update: false, reset_at: None, next_reset_expire: None, pending_recv: Deque { indices: Some(Indices { head: 1, tail: 1 }) }, is_recv: true, recv_task: None, pending_push_promises: Queue { indices: None, _p: PhantomData<h2::proto::streams::stream::NextAccept> }, content_length: Omitted }
2025-01-27T10:00:38.983524Z TRACE h2::proto::streams::counts: transition_after; stream=StreamId(1); state=State { inner: Closed(Error(Reset(StreamId(1), NO_ERROR, Remote))) }; is_closed=true; pending_send_empty=true; buffered_send_data=0; num_recv=0; num_send=0
2025-01-27T10:00:38.982645Z DEBUG Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: received frame=Ping { ack: false, payload: [0, 0, 0, 0, 0, 0, 2, 224] }
2025-01-27T10:00:38.984287Z TRACE Connection{peer=Client}:poll: h2::proto::connection: recv PING frame=Ping { ack: false, payload: [0, 0, 0, 0, 0, 0, 2, 224] }
2025-01-27T10:00:38.984379Z DEBUG Connection{peer=Client}:poll:poll_ready:FramedWrite::buffer{frame=Ping { ack: true, payload: [0, 0, 0, 0, 0, 0, 2, 224] }}: h2::codec::framed_write: send frame=Ping { ack: true, payload: [0, 0, 0, 0, 0, 0, 2, 224] }
2025-01-27T10:00:38.984411Z TRACE Connection{peer=Client}:poll:poll_ready:FramedWrite::buffer{frame=Ping { ack: true, payload: [0, 0, 0, 0, 0, 0, 2, 224] }}: h2::frame::ping: encoding PING; ack=true len=8
2025-01-27T10:00:38.985049Z TRACE Connection{peer=Client}:poll:poll_ready:FramedWrite::buffer{frame=Ping { ack: true, payload: [0, 0, 0, 0, 0, 0, 2, 224] }}: h2::codec::framed_write: encoded ping rem=17
2025-01-27T10:00:38.985079Z TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: poll
2025-01-27T10:00:38.985129Z TRACE Connection{peer=Client}:poll: h2::proto::streams::prioritize: poll_complete
2025-01-27T10:00:38.985139Z TRACE Connection{peer=Client}:poll: h2::proto::streams::prioritize: schedule_pending_open
2025-01-27T10:00:38.985165Z TRACE Connection{peer=Client}:poll:FramedWrite::flush: h2::codec::framed_write: queued_data_frame=false
2025-01-27T10:00:38.985418Z TRACE Connection{peer=Client}:poll:FramedWrite::flush: h2::codec::framed_write: flushing buffer
2025-01-27T10:00:38.985712Z TRACE h2::proto::streams::streams: drop_stream_ref; stream=Stream { id: StreamId(1), state: State { inner: Closed(Error(Reset(StreamId(1), NO_ERROR, Remote))) }, is_counted: false, ref_count: 2, next_pending_send: None, is_pending_send: false, send_flow: FlowControl { window_size: Window(1048467), available: Window(0) }, requested_send_capacity: 0, buffered_send_data: 0, send_task: None, pending_send: Deque { indices: None }, next_pending_send_capacity: None, is_pending_send_capacity: false, send_capacity_inc: true, next_open: None, is_pending_open: false, is_pending_push: false, next_pending_accept: None, is_pending_accept: false, recv_flow: FlowControl { window_size: Window(2097152), available: Window(2097152) }, in_flight_recv_data: 0, next_window_update: None, is_pending_window_update: false, reset_at: None, next_reset_expire: None, pending_recv: Deque { indices: Some(Indices { head: 1, tail: 1 }) }, is_recv: true, recv_task: None, pending_push_promises: Queue { indices: None, _p: PhantomData<h2::proto::streams::stream::NextAccept> }, content_length: Omitted }
2025-01-27T10:00:38.985739Z TRACE h2::proto::streams::counts: transition_after; stream=StreamId(1); state=State { inner: Closed(Error(Reset(StreamId(1), NO_ERROR, Remote))) }; is_closed=true; pending_send_empty=true; buffered_send_data=0; num_recv=0; num_send=0
2025-01-27T10:00:38.985847Z DEBUG hyper::proto::h2::client: client request body error: error writing a body to connection: send stream capacity unexpectedly closed
2025-01-27T10:00:38.986679Z ERROR vector::internal_events::gcp_pubsub: Failed to fetch events. error=status: Unavailable, message: "The service was unable to fulfill your request. Please try again. [code=8a75]", details: [], metadata: MetadataMap { headers: {} } error_code="failed_fetching_events" error_type="request_failed" stage="receiving" internal_log_rate_limit=true
2025-01-27T10:00:38.987031Z TRACE h2::proto::streams::streams: drop_stream_ref; stream=Stream { id: StreamId(1), state: State { inner: Closed(Error(Reset(StreamId(1), NO_ERROR, Remote))) }, is_counted: false, ref_count: 1, next_pending_send: None, is_pending_send: false, send_flow: FlowControl { window_size: Window(1048467), available: Window(0) }, requested_send_capacity: 0, buffered_send_data: 0, send_task: None, pending_send: Deque { indices: None }, next_pending_send_capacity: None, is_pending_send_capacity: false, send_capacity_inc: true, next_open: None, is_pending_open: false, is_pending_push: false, next_pending_accept: None, is_pending_accept: false, recv_flow: FlowControl { window_size: Window(2097152), available: Window(2097152) }, in_flight_recv_data: 0, next_window_update: None, is_pending_window_update: false, reset_at: None, next_reset_expire: None, pending_recv: Deque { indices: None }, is_recv: false, recv_task: None, pending_push_promises: Queue { indices: None, _p: PhantomData<h2::proto::streams::stream::NextAccept> }, content_length: Omitted }
2025-01-27T10:00:38.987055Z TRACE h2::proto::streams::counts: transition_after; stream=StreamId(1); state=State { inner: Closed(Error(Reset(StreamId(1), NO_ERROR, Remote))) }; is_closed=true; pending_send_empty=true; buffered_send_data=0; num_recv=0; num_send=0
2025-01-27T10:00:38.987161Z  INFO vector::sources::gcp_pubsub: Retrying after timeout. timeout_secs=1.0
2025-01-27T10:00:38.987229Z TRACE Connection{peer=Client}:poll: h2::proto::connection: connection.state=Open
2025-01-27T10:00:38.987266Z TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: poll
2025-01-27T10:00:38.987280Z TRACE Connection{peer=Client}:poll: h2::proto::streams::prioritize: poll_complete
2025-01-27T10:00:38.987298Z TRACE Connection{peer=Client}:poll: h2::proto::streams::prioritize: schedule_pending_open
2025-01-27T10:00:38.987303Z TRACE Connection{peer=Client}:poll:FramedWrite::flush: h2::codec::framed_write: flushing buffer
2025-01-27T10:00:38.987281Z TRACE tower::buffer::worker: worker polling for next message
2025-01-27T10:00:38.987357Z TRACE tower::buffer::worker: buffer already closed
2025-01-27T10:00:38.988277Z TRACE hyper::proto::h2::client: client::dispatch::Sender dropped
2025-01-27T10:00:38.988408Z TRACE Connection{peer=Client}:poll: h2::proto::connection: connection.state=Open
2025-01-27T10:00:38.988433Z TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: poll
2025-01-27T10:00:38.988443Z TRACE Connection{peer=Client}:poll: h2::proto::streams::prioritize: poll_complete
2025-01-27T10:00:38.988447Z TRACE Connection{peer=Client}:poll: h2::proto::streams::prioritize: schedule_pending_open
2025-01-27T10:00:38.988457Z TRACE Connection{peer=Client}:poll:FramedWrite::flush: h2::codec::framed_write: flushing buffer
2025-01-27T10:00:38.988521Z TRACE hyper::proto::h2::client: send_request dropped, starting conn shutdown
2025-01-27T10:00:38.988589Z TRACE Connection{peer=Client}:poll: h2::proto::connection: connection.state=Open
2025-01-27T10:00:38.988623Z DEBUG Connection{peer=Client}:poll:FramedWrite::buffer{frame=GoAway { error_code: NO_ERROR, last_stream_id: StreamId(0) }}: h2::codec::framed_write: send frame=GoAway { error_code: NO_ERROR, last_stream_id: StreamId(0) }
2025-01-27T10:00:38.988643Z TRACE Connection{peer=Client}:poll:FramedWrite::buffer{frame=GoAway { error_code: NO_ERROR, last_stream_id: StreamId(0) }}: h2::frame::go_away: encoding GO_AWAY; code=NO_ERROR
2025-01-27T10:00:38.988656Z TRACE Connection{peer=Client}:poll:FramedWrite::buffer{frame=GoAway { error_code: NO_ERROR, last_stream_id: StreamId(0) }}: h2::codec::framed_write: encoded go_away rem=17
2025-01-27T10:00:38.988671Z DEBUG Connection{peer=Client}:poll: h2::proto::connection: Connection::poll; connection error error=GoAway(b"", NO_ERROR, Library)
2025-01-27T10:00:38.989263Z TRACE Connection{peer=Client}:poll: h2::proto::connection:     -> already going away
2025-01-27T10:00:38.989286Z TRACE Connection{peer=Client}:poll: h2::proto::connection: connection.state=Closing(NO_ERROR, Library)
2025-01-27T10:00:38.989292Z TRACE Connection{peer=Client}:poll: h2::proto::connection: connection closing after flush
2025-01-27T10:00:38.989298Z TRACE Connection{peer=Client}:poll:FramedWrite::flush: h2::codec::framed_write: queued_data_frame=false
2025-01-27T10:00:38.989368Z TRACE Connection{peer=Client}:poll:FramedWrite::flush: h2::codec::framed_write: flushing buffer
2025-01-27T10:00:38.989427Z DEBUG Connection{peer=Client}:poll: rustls::common_state: Sending warning alert CloseNotify    
2025-01-27T10:00:38.990027Z TRACE Connection{peer=Client}:poll: h2::proto::connection: connection.state=Closed(NO_ERROR, Library)
2025-01-27T10:00:38.990099Z TRACE h2::proto::streams::streams: Streams::recv_eof
2025-01-27T10:00:38.990142Z TRACE mio::poll: deregistering event source from poller    
2025-01-27T10:00:39.221740Z TRACE vector: Beep.
2025-01-27T10:00:39.221737Z DEBUG sink{component_kind="sink" component_id=out component_type=file}: vector::utilization: utilization=0.000000057917434190489975
2025-01-27T10:00:39.991256Z TRACE tonic::transport::service::reconnect: poll_ready; idle
2025-01-27T10:00:39.991545Z TRACE tonic::transport::service::reconnect: poll_ready; connecting
2025-01-27T10:00:39.993391Z TRACE hyper::client::connect::http: Http::connect; scheme=Some("https"), host=Some("pubsub.googleapis.com"), port=None
2025-01-27T10:00:39.996864Z TRACE tonic::transport::service::reconnect: poll_ready; not ready
2025-01-27T10:00:39.997980Z DEBUG hyper::client::connect::dns: resolving host="pubsub.googleapis.com"
2025-01-27T10:00:40.032100Z TRACE tonic::transport::service::reconnect: poll_ready; connecting
2025-01-27T10:00:40.032164Z DEBUG hyper::client::connect::http: connecting to 142.250.179.74:443
2025-01-27T10:00:40.033732Z TRACE mio::poll: registering event source with poller: token=Token(281473266221440), interests=READABLE | WRITABLE    
2025-01-27T10:00:40.033794Z TRACE tonic::transport::service::reconnect: poll_ready; not ready
2025-01-27T10:00:40.042362Z TRACE tonic::transport::service::reconnect: poll_ready; connecting
2025-01-27T10:00:40.042504Z DEBUG hyper::client::connect::http: connected to 142.250.179.74:443
2025-01-27T10:00:40.042984Z DEBUG rustls::client::hs: Resuming session

Example Data

No response

Additional Context

The Pub/Sub subscription is configured with 60 seconds of Acknowledgement deadline. No other options configured.

References

No response

@ibeauvais ibeauvais added the type: bug A code related bug. label Jan 27, 2025
@pront pront added the source: gcp_pubsub Anything `gcp_pubsub` source related label Jan 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
source: gcp_pubsub Anything `gcp_pubsub` source related type: bug A code related bug.
Projects
None yet
Development

No branches or pull requests

2 participants