diff --git a/pubsublite/internal/test/util.go b/pubsublite/internal/test/util.go index 979c8cc012e4..6a35095afdf1 100644 --- a/pubsublite/internal/test/util.go +++ b/pubsublite/internal/test/util.go @@ -18,13 +18,14 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "golang.org/x/xerrors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // ErrorEqual compares two errors for equivalence. func ErrorEqual(got, want error) bool { - if got == want { + if xerrors.Is(got, want) { return true } return cmp.Equal(got, want, cmpopts.EquateErrors()) diff --git a/pubsublite/internal/wire/assigner.go b/pubsublite/internal/wire/assigner.go index 14e71f6085e7..e5dcc4151584 100644 --- a/pubsublite/internal/wire/assigner.go +++ b/pubsublite/internal/wire/assigner.go @@ -63,6 +63,7 @@ type partitionAssignmentReceiver func(partitionSet) error type assigner struct { // Immutable after creation. assignmentClient *vkit.PartitionAssignmentClient + subscription string initialReq *pb.PartitionAssignmentRequest receiveAssignment partitionAssignmentReceiver metadata pubsubMetadata @@ -81,6 +82,7 @@ func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignment a := &assigner{ assignmentClient: assignmentClient, + subscription: subscriptionPath, initialReq: &pb.PartitionAssignmentRequest{ Request: &pb.PartitionAssignmentRequest_Initial{ Initial: &pb.InitialPartitionAssignmentRequest{ @@ -164,7 +166,7 @@ func (a *assigner) handleAssignment(assignment *pb.PartitionAssignment) error { } func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { - if !a.unsafeUpdateStatus(targetStatus, err) { + if !a.unsafeUpdateStatus(targetStatus, wrapError("assigner", a.subscription, err)) { return } // No data to send. Immediately terminate the stream. diff --git a/pubsublite/internal/wire/committer.go b/pubsublite/internal/wire/committer.go index c830f8730b9b..d9d24c01f01a 100644 --- a/pubsublite/internal/wire/committer.go +++ b/pubsublite/internal/wire/committer.go @@ -42,6 +42,7 @@ const commitCursorPeriod = 50 * time.Millisecond type committer struct { // Immutable after creation. cursorClient *vkit.CursorClient + subscription subscriptionPartition initialReq *pb.StreamingCommitCursorRequest metadata pubsubMetadata @@ -59,6 +60,7 @@ func newCommitter(ctx context.Context, cursor *vkit.CursorClient, settings Recei c := &committer{ cursorClient: cursor, + subscription: subscription, initialReq: &pb.StreamingCommitCursorRequest{ Request: &pb.StreamingCommitCursorRequest_Initial{ Initial: &pb.InitialCommitCursorRequest{ @@ -201,7 +203,7 @@ func (c *committer) unsafeCommitOffsetToStream() { } func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { - if !c.unsafeUpdateStatus(targetStatus, err) { + if !c.unsafeUpdateStatus(targetStatus, wrapError("committer", c.subscription.String(), err)) { return } diff --git a/pubsublite/internal/wire/errors.go b/pubsublite/internal/wire/errors.go index 2953dca70115..55299e077837 100644 --- a/pubsublite/internal/wire/errors.go +++ b/pubsublite/internal/wire/errors.go @@ -16,6 +16,8 @@ package wire import ( "errors" "fmt" + + "golang.org/x/xerrors" ) // Errors exported from this package. @@ -41,3 +43,10 @@ var ( // stopping. ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping") ) + +func wrapError(context, resource string, err error) error { + if err != nil { + return xerrors.Errorf("%s(%s): %w", context, resource, err) + } + return err +} diff --git a/pubsublite/internal/wire/publisher.go b/pubsublite/internal/wire/publisher.go index 19a5a0255c79..f48e114ae17d 100644 --- a/pubsublite/internal/wire/publisher.go +++ b/pubsublite/internal/wire/publisher.go @@ -236,7 +236,7 @@ func (pp *singlePartitionPublisher) onResponse(response interface{}) { // // Expected to be called with singlePartitionPublisher.mu held. func (pp *singlePartitionPublisher) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { - if !pp.unsafeUpdateStatus(targetStatus, err) { + if !pp.unsafeUpdateStatus(targetStatus, wrapError("publisher", pp.topic.String(), err)) { return } diff --git a/pubsublite/internal/wire/resources.go b/pubsublite/internal/wire/resources.go index 5df9e817cfea..5fa01a708313 100644 --- a/pubsublite/internal/wire/resources.go +++ b/pubsublite/internal/wire/resources.go @@ -149,7 +149,15 @@ type topicPartition struct { Partition int } +func (tp topicPartition) String() string { + return fmt.Sprintf("%s/partitions/%d", tp.Path, tp.Partition) +} + type subscriptionPartition struct { Path string Partition int } + +func (sp subscriptionPartition) String() string { + return fmt.Sprintf("%s/partitions/%d", sp.Path, sp.Partition) +} diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index 054747a362c7..18fc516ee115 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -325,7 +325,7 @@ func (s *subscribeStream) unsafeSendFlowControl(req *pb.FlowControlRequest) { } func (s *subscribeStream) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { - if !s.unsafeUpdateStatus(targetStatus, err) { + if !s.unsafeUpdateStatus(targetStatus, wrapError("subscriber", s.subscription.String(), err)) { return }