Skip to content

Commit

Permalink
Revert "Migrate Beam Go to use the new Opaque Protocol Buffers (apach…
Browse files Browse the repository at this point in the history
…e#33434)" (apache#33628)

This reverts commit 4797d75.
  • Loading branch information
lostluck authored Jan 17, 2025
1 parent 4057463 commit 71df963
Show file tree
Hide file tree
Showing 124 changed files with 12,021 additions and 52,793 deletions.
2 changes: 1 addition & 1 deletion sdks/go/cmd/beamctl/cmd/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func listFn(cmd *cobra.Command, args []string) error {
}

for _, a := range md.GetManifest().GetArtifact() {
cmd.Println(a.GetName())
cmd.Println(a.Name)
}
return nil
}
2 changes: 1 addition & 1 deletion sdks/go/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func main() {

enableGoogleCloudProfiler := strings.Contains(options, enableGoogleCloudProfilerOption)
if enableGoogleCloudProfiler {
err := configureGoogleCloudProfilerEnvVars(ctx, logger, info.GetMetadata())
err := configureGoogleCloudProfilerEnvVars(ctx, logger, info.Metadata)
if err != nil {
logger.Printf(ctx, "could not configure Google Cloud Profiler variables, got %v", err)
}
Expand Down
38 changes: 19 additions & 19 deletions sdks/go/container/boot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (
)

func TestEnsureEndpointsSet_AllSet(t *testing.T) {
provisionInfo := fnpb.ProvisionInfo_builder{
LoggingEndpoint: pipepb.ApiServiceDescriptor_builder{Url: "testLoggingEndpointUrl"}.Build(),
ArtifactEndpoint: pipepb.ApiServiceDescriptor_builder{Url: "testArtifactEndpointUrl"}.Build(),
ControlEndpoint: pipepb.ApiServiceDescriptor_builder{Url: "testControlEndpointUrl"}.Build(),
}.Build()
provisionInfo := &fnpb.ProvisionInfo{
LoggingEndpoint: &pipepb.ApiServiceDescriptor{Url: "testLoggingEndpointUrl"},
ArtifactEndpoint: &pipepb.ApiServiceDescriptor{Url: "testArtifactEndpointUrl"},
ControlEndpoint: &pipepb.ApiServiceDescriptor{Url: "testControlEndpointUrl"},
}
*loggingEndpoint = ""
*artifactEndpoint = ""
*controlEndpoint = ""
Expand All @@ -53,11 +53,11 @@ func TestEnsureEndpointsSet_AllSet(t *testing.T) {
}

func TestEnsureEndpointsSet_OneMissing(t *testing.T) {
provisionInfo := fnpb.ProvisionInfo_builder{
LoggingEndpoint: pipepb.ApiServiceDescriptor_builder{Url: "testLoggingEndpointUrl"}.Build(),
ArtifactEndpoint: pipepb.ApiServiceDescriptor_builder{Url: "testArtifactEndpointUrl"}.Build(),
ControlEndpoint: pipepb.ApiServiceDescriptor_builder{Url: ""}.Build(),
}.Build()
provisionInfo := &fnpb.ProvisionInfo{
LoggingEndpoint: &pipepb.ApiServiceDescriptor{Url: "testLoggingEndpointUrl"},
ArtifactEndpoint: &pipepb.ApiServiceDescriptor{Url: "testArtifactEndpointUrl"},
ControlEndpoint: &pipepb.ApiServiceDescriptor{Url: ""},
}
*loggingEndpoint = ""
*artifactEndpoint = ""
*controlEndpoint = ""
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestGetGoWorkerArtifactName_NoArtifacts(t *testing.T) {

func TestGetGoWorkerArtifactName_OneArtifact(t *testing.T) {
artifact := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path", "sha")
artifacts := []*pipepb.ArtifactInformation{artifact}
artifacts := []*pipepb.ArtifactInformation{&artifact}

val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
Expand All @@ -99,7 +99,7 @@ func TestGetGoWorkerArtifactName_OneArtifact(t *testing.T) {
func TestGetGoWorkerArtifactName_MultipleArtifactsFirstIsWorker(t *testing.T) {
artifact1 := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path", "sha")
artifact2 := constructArtifactInformation(t, "other role", "test/path2", "sha")
artifacts := []*pipepb.ArtifactInformation{artifact1, artifact2}
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
Expand All @@ -113,7 +113,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsFirstIsWorker(t *testing.T) {
func TestGetGoWorkerArtifactName_MultipleArtifactsSecondIsWorker(t *testing.T) {
artifact1 := constructArtifactInformation(t, "other role", "test/path", "sha")
artifact2 := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path2", "sha")
artifacts := []*pipepb.ArtifactInformation{artifact1, artifact2}
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
Expand All @@ -127,7 +127,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsSecondIsWorker(t *testing.T) {
func TestGetGoWorkerArtifactName_MultipleArtifactsLegacyWay(t *testing.T) {
artifact1 := constructArtifactInformation(t, "other role", "test/path", "sha")
artifact2 := constructArtifactInformation(t, "other role", "worker", "sha")
artifacts := []*pipepb.ArtifactInformation{artifact1, artifact2}
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
Expand All @@ -141,7 +141,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsLegacyWay(t *testing.T) {
func TestGetGoWorkerArtifactName_MultipleArtifactsNoneMatch(t *testing.T) {
artifact1 := constructArtifactInformation(t, "other role", "test/path", "sha")
artifact2 := constructArtifactInformation(t, "other role", "test/path2", "sha")
artifacts := []*pipepb.ArtifactInformation{artifact1, artifact2}
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

_, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err == nil {
Expand Down Expand Up @@ -193,16 +193,16 @@ func TestCopyExe(t *testing.T) {
}
}

func constructArtifactInformation(t *testing.T, roleUrn string, path string, sha string) *pipepb.ArtifactInformation {
func constructArtifactInformation(t *testing.T, roleUrn string, path string, sha string) pipepb.ArtifactInformation {
t.Helper()

typePayload, _ := proto.Marshal(pipepb.ArtifactFilePayload_builder{Path: path, Sha256: sha}.Build())
typePayload, _ := proto.Marshal(&pipepb.ArtifactFilePayload{Path: path, Sha256: sha})

return pipepb.ArtifactInformation_builder{
return pipepb.ArtifactInformation{
RoleUrn: roleUrn,
TypeUrn: artifact.URNFileArtifact,
TypePayload: typePayload,
}.Build()
}
}

func TestConfigureGoogleCloudProfilerEnvVars(t *testing.T) {
Expand Down
22 changes: 11 additions & 11 deletions sdks/go/container/pool/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,24 @@ func (s *Process) StartWorker(_ context.Context, req *fnpb.StartWorkerRequest) (
s.mu.Lock()
defer s.mu.Unlock()
if s.workers == nil {
return fnpb.StartWorkerResponse_builder{
return &fnpb.StartWorkerResponse{
Error: "worker pool shutting down",
}.Build(), nil
}, nil
}

if _, ok := s.workers[req.GetWorkerId()]; ok {
return fnpb.StartWorkerResponse_builder{
return &fnpb.StartWorkerResponse{
Error: fmt.Sprintf("worker with ID %q already exists", req.GetWorkerId()),
}.Build(), nil
}, nil
}
if req.GetLoggingEndpoint() == nil {
return fnpb.StartWorkerResponse_builder{Error: fmt.Sprintf("Missing logging endpoint for worker %v", req.GetWorkerId())}.Build(), nil
return &fnpb.StartWorkerResponse{Error: fmt.Sprintf("Missing logging endpoint for worker %v", req.GetWorkerId())}, nil
}
if req.GetControlEndpoint() == nil {
return fnpb.StartWorkerResponse_builder{Error: fmt.Sprintf("Missing control endpoint for worker %v", req.GetWorkerId())}.Build(), nil
return &fnpb.StartWorkerResponse{Error: fmt.Sprintf("Missing control endpoint for worker %v", req.GetWorkerId())}, nil
}
if req.GetLoggingEndpoint().HasAuthentication() || req.GetControlEndpoint().HasAuthentication() {
return fnpb.StartWorkerResponse_builder{Error: "[BEAM-10610] Secure endpoints not supported."}.Build(), nil
if req.GetLoggingEndpoint().Authentication != nil || req.GetControlEndpoint().Authentication != nil {
return &fnpb.StartWorkerResponse{Error: "[BEAM-10610] Secure endpoints not supported."}, nil
}

ctx := grpcx.WriteWorkerID(s.root, req.GetWorkerId())
Expand All @@ -118,7 +118,7 @@ func (s *Process) StartWorker(_ context.Context, req *fnpb.StartWorkerRequest) (
cmd.Env = nil // Use the current environment.

if err := cmd.Start(); err != nil {
return fnpb.StartWorkerResponse_builder{Error: fmt.Sprintf("Unable to start boot for worker %v: %v", req.GetWorkerId(), err)}.Build(), nil
return &fnpb.StartWorkerResponse{Error: fmt.Sprintf("Unable to start boot for worker %v: %v", req.GetWorkerId(), err)}, nil
}
return &fnpb.StartWorkerResponse{}, nil
}
Expand All @@ -137,9 +137,9 @@ func (s *Process) StopWorker(_ context.Context, req *fnpb.StopWorkerRequest) (*f
delete(s.workers, req.GetWorkerId())
return &fnpb.StopWorkerResponse{}, nil
}
return fnpb.StopWorkerResponse_builder{
return &fnpb.StopWorkerResponse{
Error: fmt.Sprintf("no worker with id %q running", req.GetWorkerId()),
}.Build(), nil
}, nil

}

Expand Down
60 changes: 30 additions & 30 deletions sdks/go/container/pool/workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ func TestProcess(t *testing.T) {
t.Skip("Binary `true` doesn't exist, skipping tests.")
}

endpoint := pipepb.ApiServiceDescriptor_builder{
endpoint := &pipepb.ApiServiceDescriptor{
Url: "localhost:0",
}.Build()
secureEndpoint := pipepb.ApiServiceDescriptor_builder{
}
secureEndpoint := &pipepb.ApiServiceDescriptor{
Url: "localhost:0",
Authentication: pipepb.AuthenticationSpec_builder{
Authentication: &pipepb.AuthenticationSpec{
Urn: "beam:authentication:oauth2_client_credentials_grant:v1",
}.Build(),
}.Build()
},
}

ctx, cancelFn := context.WithCancel(context.Background())
t.Cleanup(cancelFn)
Expand All @@ -54,60 +54,60 @@ func TestProcess(t *testing.T) {
errExpected bool
}{
{
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "Worker1",
ControlEndpoint: endpoint,
LoggingEndpoint: endpoint,
}.Build(),
},
}, {
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "Worker2",
ControlEndpoint: endpoint,
LoggingEndpoint: endpoint,
}.Build(),
},
}, {
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "Worker1",
ControlEndpoint: endpoint,
LoggingEndpoint: endpoint,
}.Build(),
},
errExpected: true, // Repeated start
}, {
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "missingControl",
LoggingEndpoint: endpoint,
}.Build(),
},
errExpected: true,
}, {
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "missingLogging",
ControlEndpoint: endpoint,
}.Build(),
},
errExpected: true,
}, {
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "secureLogging",
LoggingEndpoint: secureEndpoint,
ControlEndpoint: endpoint,
}.Build(),
},
errExpected: true,
}, {
req: fnpb.StartWorkerRequest_builder{
req: &fnpb.StartWorkerRequest{
WorkerId: "secureControl",
LoggingEndpoint: endpoint,
ControlEndpoint: secureEndpoint,
}.Build(),
},
errExpected: true,
},
}
for _, test := range startTests {
resp, err := server.StartWorker(ctx, test.req)
if test.errExpected {
if err != nil || resp.GetError() == "" {
if err != nil || resp.Error == "" {
t.Errorf("Expected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp)
}
} else {
if err != nil || resp.GetError() != "" {
if err != nil || resp.Error != "" {
t.Errorf("Unexpected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp)
}
}
Expand All @@ -117,29 +117,29 @@ func TestProcess(t *testing.T) {
errExpected bool
}{
{
req: fnpb.StopWorkerRequest_builder{
req: &fnpb.StopWorkerRequest{
WorkerId: "Worker1",
}.Build(),
},
}, {
req: fnpb.StopWorkerRequest_builder{
req: &fnpb.StopWorkerRequest{
WorkerId: "Worker1",
}.Build(),
},
errExpected: true,
}, {
req: fnpb.StopWorkerRequest_builder{
req: &fnpb.StopWorkerRequest{
WorkerId: "NonExistent",
}.Build(),
},
errExpected: true,
},
}
for _, test := range stopTests {
resp, err := server.StopWorker(ctx, test.req)
if test.errExpected {
if err != nil || resp.GetError() == "" {
if err != nil || resp.Error == "" {
t.Errorf("Expected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp)
}
} else {
if err != nil || resp.GetError() != "" {
if err != nil || resp.Error != "" {
t.Errorf("Unexpected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp)
}
}
Expand Down
24 changes: 12 additions & 12 deletions sdks/go/container/tools/buffered_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func TestBufferedLogger(t *testing.T) {

received := catcher.msgs[0].GetLogEntries()[0]

if got, want := received.GetMessage(), "test message"; got != want {
if got, want := received.Message, "test message"; got != want {
t.Errorf("got message %q, want %q", got, want)
}

if got, want := received.GetSeverity(), fnpb.LogEntry_Severity_DEBUG; got != want {
if got, want := received.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
})
Expand Down Expand Up @@ -96,11 +96,11 @@ func TestBufferedLogger(t *testing.T) {
received := catcher.msgs[0].GetLogEntries()

for i, message := range received {
if got, want := message.GetMessage(), messages[i]; got != want {
if got, want := message.Message, messages[i]; got != want {
t.Errorf("got message %q, want %q", got, want)
}

if got, want := message.GetSeverity(), fnpb.LogEntry_Severity_DEBUG; got != want {
if got, want := message.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
}
Expand All @@ -125,11 +125,11 @@ func TestBufferedLogger(t *testing.T) {

received := catcher.msgs[0].GetLogEntries()[0]

if got, want := received.GetMessage(), "test error"; got != want {
if got, want := received.Message, "test error"; got != want {
t.Errorf("got message %q, want %q", got, want)
}

if got, want := received.GetSeverity(), fnpb.LogEntry_Severity_ERROR; got != want {
if got, want := received.Severity, fnpb.LogEntry_Severity_ERROR; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
})
Expand Down Expand Up @@ -158,11 +158,11 @@ func TestBufferedLogger(t *testing.T) {
received := catcher.msgs[0].GetLogEntries()

for i, message := range received {
if got, want := message.GetMessage(), messages[i]; got != want {
if got, want := message.Message, messages[i]; got != want {
t.Errorf("got message %q, want %q", got, want)
}

if got, want := message.GetSeverity(), fnpb.LogEntry_Severity_ERROR; got != want {
if got, want := message.Severity, fnpb.LogEntry_Severity_ERROR; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
}
Expand All @@ -177,11 +177,11 @@ func TestBufferedLogger(t *testing.T) {

received := catcher.msgs[0].GetLogEntries()[0]

if got, want := received.GetMessage(), "foo bar"; got != want {
if got, want := received.Message, "foo bar"; got != want {
t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got message %q, want %q", got, want)
}

if got, want := received.GetSeverity(), fnpb.LogEntry_Severity_DEBUG; got != want {
if got, want := received.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got severity %v, want %v", got, want)
}
})
Expand Down Expand Up @@ -229,11 +229,11 @@ func TestBufferedLogger(t *testing.T) {
messages = append(messages, lastMessage)

for i, message := range received {
if got, want := message.GetMessage(), messages[i]; got != want {
if got, want := message.Message, messages[i]; got != want {
t.Errorf("got message %q, want %q", got, want)
}

if got, want := message.GetSeverity(), fnpb.LogEntry_Severity_DEBUG; got != want {
if got, want := message.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
}
Expand Down
Loading

0 comments on commit 71df963

Please sign in to comment.