You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When using the Beam State API in the Go SDK, it's possible that data may become corrupted when performing a Write.
A change to the Go compiler for Go 1.23+ made the issue more frequent/severe/reproducible.
It is not recommended to use Go 1.23+ for Beam Go pipelines using the State API until this issue is resolved. If your job doesn't use the State API, then it is unaffected.
It's not known if the behavior can cause a similar issue elsewhere in the SDK at this time. Other calls such as writing data to data sinks are first written to a buffer, or properly copied to a buffer, avoiding the same problem, or at least rendering it less likely.
In particular, the data written to the state API, might become overwritten or zeroed out before actually being sent over the GRPC channel, corrupting the write. Consider a length prefix byte that is zeroed, or changed prior to being sent to the runner.
The root cause of the issue is that Beam Go is incorrectly implementing go's https://pkg.go.dev/io#Writer interface for writing to State calls. In particular, it's incorrectly retaining a reference to the input byte buffer, passed into the write call.
While this is ultimately a blocking call, the other half is due to Beam Go lightly subverting Go's escape analysis in the name of performance, using ioutilx.WriteUnsafe.
This was deemed required for performance for small writes in order to avoid usually unnecessary
allocations to the heap. Typically these allocations would be stack allocated, and then not re-used.
However in Go 1.23 a change to the compiler "can now overlap stack frame slots of local variables". It's my suspicion that this allowed the stack frame referred to to by the byte buffer to be zeroed out or re-assigned, corrupting the data.
The fix is to copy the bytes properly when sending them over the state API. This avoids the bytes being changed prior to being sent.
A more robust fix would be to re-write coder handling entirely, to avoid using the io.Reader and io.Writer interfaces, which opened up the issue in the first place.
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components
Component: Python SDK
Component: Java SDK
Component: Go SDK
Component: Typescript SDK
Component: IO connector
Component: Beam YAML
Component: Beam examples
Component: Beam playground
Component: Beam katas
Component: Website
Component: Infrastructure
Component: Spark Runner
Component: Flink Runner
Component: Samza Runner
Component: Twister2 Runner
Component: Hazelcast Jet Runner
Component: Google Cloud Dataflow Runner
The text was updated successfully, but these errors were encountered:
* [#32245] Copy bytes sent from the State API.
* Mention #32245 in changes.md
* remove unnecessary chagned line
* weird copypasta
---------
Co-authored-by: lostluck <[email protected]>
…ache#32246)
* [apache#32245] Copy bytes sent from the State API.
* Mention apache#32245 in changes.md
* remove unnecessary chagned line
* weird copypasta
---------
Co-authored-by: lostluck <[email protected]>
What happened?
When using the Beam State API in the Go SDK, it's possible that data may become corrupted when performing a Write.
A change to the Go compiler for Go 1.23+ made the issue more frequent/severe/reproducible.
It is not recommended to use Go 1.23+ for Beam Go pipelines using the State API until this issue is resolved. If your job doesn't use the State API, then it is unaffected.
It's not known if the behavior can cause a similar issue elsewhere in the SDK at this time. Other calls such as writing data to data sinks are first written to a buffer, or properly copied to a buffer, avoiding the same problem, or at least rendering it less likely.
In particular, the data written to the state API, might become overwritten or zeroed out before actually being sent over the GRPC channel, corrupting the write. Consider a length prefix byte that is zeroed, or changed prior to being sent to the runner.
The root cause of the issue is that Beam Go is incorrectly implementing go's https://pkg.go.dev/io#Writer interface for writing to State calls. In particular, it's incorrectly retaining a reference to the input byte buffer, passed into the write call.
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/statemgr.go#L481
The buffer is simply passed to the FnAPI StateAPI write request, then being sent to another channel for actually sending it on the State channel.
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/statemgr.go#L498
This send happens asynchronously, sending it to a different goroutine for serialization.
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/statemgr.go#L704
While this is ultimately a blocking call, the other half is due to Beam Go lightly subverting Go's escape analysis in the name of performance, using ioutilx.WriteUnsafe.
eg used here. https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/coder/varint.go#L44
This was deemed required for performance for small writes in order to avoid usually unnecessary
allocations to the heap. Typically these allocations would be stack allocated, and then not re-used.
However in Go 1.23 a change to the compiler "can now overlap stack frame slots of local variables". It's my suspicion that this allowed the stack frame referred to to by the byte buffer to be zeroed out or re-assigned, corrupting the data.
The fix is to copy the bytes properly when sending them over the state API. This avoids the bytes being changed prior to being sent.
A more robust fix would be to re-write coder handling entirely, to avoid using the io.Reader and io.Writer interfaces, which opened up the issue in the first place.
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components
The text was updated successfully, but these errors were encountered: