Skip to content

Commit

Permalink
[prism] worker PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Feb 19, 2023
1 parent 0c7676b commit aae628b
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
9 changes: 3 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
"bytes"
"context"
"fmt"
"io"
"net"
"sync"
"sync/atomic"

"io"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
Expand Down Expand Up @@ -58,8 +57,6 @@ type W struct {
// These are the ID sources
inst, bund uint64

// descs map[string]*fnpb.ProcessBundleDescriptor

InstReqs chan *fnpb.InstructionRequest
DataReqs chan *fnpb.Elements

Expand Down Expand Up @@ -168,7 +165,7 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
func toSlogSev(sev fnpb.LogEntry_Severity_Enum) slog.Level {
switch sev {
case fnpb.LogEntry_Severity_TRACE:
return slog.Level(-8) //
return slog.Level(-8)
case fnpb.LogEntry_Severity_DEBUG:
return slog.LevelDebug // -4
case fnpb.LogEntry_Severity_INFO:
Expand Down Expand Up @@ -418,7 +415,7 @@ func (d *DataService) Commit(tent engine.TentativeData) {
}
}

// Hack for Side Inputs until watermarks are sorted out.
// GetAllData is a hack for Side Inputs until watermarks are sorted out.
func (d *DataService) GetAllData(colID string) [][]byte {
return d.raw[colID]
}
12 changes: 6 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)

Expand All @@ -49,15 +50,15 @@ func TestWorker_NextInst(t *testing.T) {
}
}

func TestWorker_NextBund(t *testing.T) {
func TestWorker_NextStage(t *testing.T) {
w := New("test")

stageIDs := map[string]struct{}{}
for i := 0; i < 100; i++ {
stageIDs[w.NextStage()] = struct{}{}
}
if got, want := len(stageIDs), 100; got != want {
t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)
t.Errorf("calling w.NextStage() got %v unique ids, want %v", got, want)
}
}

Expand All @@ -83,7 +84,7 @@ func TestWorker_GetProcessBundleDescriptor(t *testing.T) {
ProcessBundleDescriptorId: "unknown",
})
if err == nil {
t.Errorf(" GetProcessBundleDescriptor(%q) = %v, want error", "unknown", pbd)
t.Errorf("got GetProcessBundleDescriptor(%q) = %v, want error", "unknown", pbd)
}
}

Expand All @@ -100,7 +101,7 @@ func serveTestWorker(t *testing.T) (context.Context, *W, *grpc.ClientConn) {

clientConn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
return lis.DialContext(ctx)
}), grpc.WithInsecure(), grpc.WithBlock())
}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
t.Fatal("couldn't create bufconn grpc connection:", err)
}
Expand All @@ -124,7 +125,7 @@ func TestWorker_Logging(t *testing.T) {
})

// TODO: Connect to the job management service.
// At this point job messages are just logged to whereever the prism runner executes
// At this point job messages are just logged to wherever the prism runner executes
// But this should pivot to anyone connecting to the Job Management service for the
// job.
// In the meantime, sleep to validate execution via coverage.
Expand Down Expand Up @@ -273,7 +274,6 @@ func TestWorker_State_Iterable(t *testing.T) {
if got, want := resp.GetGet().GetData(), []byte{42}; !bytes.Equal(got, want) {
t.Fatalf("didn't receive expected state response data: got %v, want %v", got, want)
}
resp.GetId()

if err := stateStream.CloseSend(); err != nil {
t.Errorf("stateStream.CloseSend() = %v", err)
Expand Down

0 comments on commit aae628b

Please sign in to comment.