Skip to content

Commit

Permalink
refactor(pubsublite): add context to error messages for leaf services (
Browse files Browse the repository at this point in the history
…#3649)

To disambiguate error messages from leaf services when they are propagated to the top-level publisher and subscriber clients.
  • Loading branch information
tmdiep authored Feb 4, 2021
1 parent 38b944a commit b7438c2
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 5 deletions.
3 changes: 2 additions & 1 deletion pubsublite/internal/test/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/xerrors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// ErrorEqual compares two errors for equivalence.
func ErrorEqual(got, want error) bool {
if got == want {
if xerrors.Is(got, want) {
return true
}
return cmp.Equal(got, want, cmpopts.EquateErrors())
Expand Down
4 changes: 3 additions & 1 deletion pubsublite/internal/wire/assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type partitionAssignmentReceiver func(partitionSet) error
type assigner struct {
// Immutable after creation.
assignmentClient *vkit.PartitionAssignmentClient
subscription string
initialReq *pb.PartitionAssignmentRequest
receiveAssignment partitionAssignmentReceiver
metadata pubsubMetadata
Expand All @@ -81,6 +82,7 @@ func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignment

a := &assigner{
assignmentClient: assignmentClient,
subscription: subscriptionPath,
initialReq: &pb.PartitionAssignmentRequest{
Request: &pb.PartitionAssignmentRequest_Initial{
Initial: &pb.InitialPartitionAssignmentRequest{
Expand Down Expand Up @@ -164,7 +166,7 @@ func (a *assigner) handleAssignment(assignment *pb.PartitionAssignment) error {
}

func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !a.unsafeUpdateStatus(targetStatus, err) {
if !a.unsafeUpdateStatus(targetStatus, wrapError("assigner", a.subscription, err)) {
return
}
// No data to send. Immediately terminate the stream.
Expand Down
4 changes: 3 additions & 1 deletion pubsublite/internal/wire/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const commitCursorPeriod = 50 * time.Millisecond
type committer struct {
// Immutable after creation.
cursorClient *vkit.CursorClient
subscription subscriptionPartition
initialReq *pb.StreamingCommitCursorRequest
metadata pubsubMetadata

Expand All @@ -59,6 +60,7 @@ func newCommitter(ctx context.Context, cursor *vkit.CursorClient, settings Recei

c := &committer{
cursorClient: cursor,
subscription: subscription,
initialReq: &pb.StreamingCommitCursorRequest{
Request: &pb.StreamingCommitCursorRequest_Initial{
Initial: &pb.InitialCommitCursorRequest{
Expand Down Expand Up @@ -201,7 +203,7 @@ func (c *committer) unsafeCommitOffsetToStream() {
}

func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !c.unsafeUpdateStatus(targetStatus, err) {
if !c.unsafeUpdateStatus(targetStatus, wrapError("committer", c.subscription.String(), err)) {
return
}

Expand Down
9 changes: 9 additions & 0 deletions pubsublite/internal/wire/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package wire
import (
"errors"
"fmt"

"golang.org/x/xerrors"
)

// Errors exported from this package.
Expand All @@ -41,3 +43,10 @@ var (
// stopping.
ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping")
)

func wrapError(context, resource string, err error) error {
if err != nil {
return xerrors.Errorf("%s(%s): %w", context, resource, err)
}
return err
}
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (pp *singlePartitionPublisher) onResponse(response interface{}) {
//
// Expected to be called with singlePartitionPublisher.mu held.
func (pp *singlePartitionPublisher) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !pp.unsafeUpdateStatus(targetStatus, err) {
if !pp.unsafeUpdateStatus(targetStatus, wrapError("publisher", pp.topic.String(), err)) {
return
}

Expand Down
8 changes: 8 additions & 0 deletions pubsublite/internal/wire/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,15 @@ type topicPartition struct {
Partition int
}

func (tp topicPartition) String() string {
return fmt.Sprintf("%s/partitions/%d", tp.Path, tp.Partition)
}

type subscriptionPartition struct {
Path string
Partition int
}

func (sp subscriptionPartition) String() string {
return fmt.Sprintf("%s/partitions/%d", sp.Path, sp.Partition)
}
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (s *subscribeStream) unsafeSendFlowControl(req *pb.FlowControlRequest) {
}

func (s *subscribeStream) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !s.unsafeUpdateStatus(targetStatus, err) {
if !s.unsafeUpdateStatus(targetStatus, wrapError("subscriber", s.subscription.String(), err)) {
return
}

Expand Down

0 comments on commit b7438c2

Please sign in to comment.