Skip to content

Commit

Permalink
(#25314) Have boot loaders log to logging service where possible. (#2…
Browse files Browse the repository at this point in the history
…6035)

* (#25314) Have boot loaders log to logging service where possible.

* Fix Go container tests.

* Update logging in typescript boot.go

* Migrate provision to be based on container/tools

* Migrated missed call to provision package.

* Last local use of provision.

* Update Go and Staticheck versions.

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Mar 31, 2023
1 parent 5e368b4 commit 492e2c9
Show file tree
Hide file tree
Showing 12 changed files with 549 additions and 146 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
fetch-depth: 2
- uses: actions/setup-go@v3
with:
go-version: '1.19'
go-version: '1.20'
- name: Delete old coverage
run: "cd sdks/go/pkg && rm -rf .coverage || :"
- name: Run coverage
Expand All @@ -62,6 +62,6 @@ jobs:
go vet --copylocks=false --unsafeptr=false ./...
- name: Run Staticcheck
run: |
go install "honnef.co/go/tools/cmd/staticcheck@2022.1"
go install "honnef.co/go/tools/cmd/staticcheck@2023.1.3"
cd sdks/go/pkg/beam
$(go env GOPATH)/bin/staticcheck ./...
33 changes: 17 additions & 16 deletions sdks/go/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ import (
"strings"
"time"

"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"

// Import gcs filesystem so that it can be used to upload heap dumps
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/diagnostics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
Expand All @@ -58,7 +58,7 @@ const (
enableGoogleCloudProfilerOption = "enable_google_cloud_profiler"
)

func configureGoogleCloudProfilerEnvVars(metadata map[string]string) error {
func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger *tools.Logger, metadata map[string]string) error {
if metadata == nil {
return errors.New("enable_google_cloud_profiler is set to true, but no metadata is received from provision server, profiling will not be enabled")
}
Expand All @@ -72,7 +72,7 @@ func configureGoogleCloudProfilerEnvVars(metadata map[string]string) error {
}
os.Setenv(cloudProfilingJobName, jobName)
os.Setenv(cloudProfilingJobID, jobID)
log.Printf("Cloud Profiling Job Name: %v, Job IDL %v", jobName, jobID)
logger.Printf(ctx, "Cloud Profiling Job Name: %v, Job IDL %v", jobName, jobID)
return nil
}

Expand All @@ -87,7 +87,7 @@ func main() {

ctx := grpcx.WriteWorkerID(context.Background(), *id)

info, err := provision.Info(ctx, *provisionEndpoint)
info, err := tools.ProvisionInfo(ctx, *provisionEndpoint)
if err != nil {
log.Fatalf("Failed to obtain provisioning information: %v", err)
}
Expand All @@ -97,13 +97,14 @@ func main() {
if err != nil {
log.Fatalf("Endpoint not set: %v", err)
}
log.Printf("Initializing Go harness: %v", strings.Join(os.Args, " "))
logger := &tools.Logger{Endpoint: *loggingEndpoint}
logger.Printf(ctx, "Initializing Go harness: %v", strings.Join(os.Args, " "))

// (1) Obtain the pipeline options

options, err := provision.ProtoToJSON(info.GetPipelineOptions())
options, err := tools.ProtoToJSON(info.GetPipelineOptions())
if err != nil {
log.Fatalf("Failed to convert pipeline options: %v", err)
logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err)
}

// (2) Retrieve the staged files.
Expand All @@ -115,19 +116,19 @@ func main() {
dir := filepath.Join(*semiPersistDir, "staged")
artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
if err != nil {
log.Fatalf("Failed to retrieve staged files: %v", err)
logger.Fatalf(ctx, "Failed to retrieve staged files: %v", err)
}

name, err := getGoWorkerArtifactName(artifacts)
name, err := getGoWorkerArtifactName(ctx, logger, artifacts)
if err != nil {
log.Fatalf("Failed to get Go Worker Artifact Name: %v", err)
logger.Fatalf(ctx, "Failed to get Go Worker Artifact Name: %v", err)
}

// (3) The persist dir may be on a noexec volume, so we must
// copy the binary to a different location to execute.
const prog = "/bin/worker"
if err := copyExe(filepath.Join(dir, name), prog); err != nil {
log.Fatalf("Failed to copy worker binary: %v", err)
logger.Fatalf(ctx, "Failed to copy worker binary: %v", err)
}

args := []string{
Expand All @@ -148,9 +149,9 @@ func main() {

enableGoogleCloudProfiler := strings.Contains(options, enableGoogleCloudProfilerOption)
if enableGoogleCloudProfiler {
err := configureGoogleCloudProfilerEnvVars(info.Metadata)
err := configureGoogleCloudProfilerEnvVars(ctx, logger, info.Metadata)
if err != nil {
log.Printf("could not configure Google Cloud Profiler variables, got %v", err)
logger.Printf(ctx, "could not configure Google Cloud Profiler variables, got %v", err)
}
}

Expand All @@ -166,10 +167,10 @@ func main() {
}
}

log.Fatalf("User program exited: %v", err)
logger.Fatalf(ctx, "User program exited: %v", err)
}

func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string, error) {
func getGoWorkerArtifactName(ctx context.Context, logger *tools.Logger, artifacts []*pipepb.ArtifactInformation) (string, error) {
const worker = "worker"
name := worker

Expand All @@ -190,7 +191,7 @@ func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string, e
for _, a := range artifacts {
n, _ := artifact.MustExtractFilePayload(a)
if n == worker {
log.Printf("Go worker binary found with legacy name '%v'", worker)
logger.Printf(ctx, "Go worker binary found with legacy name '%v'", worker)
return n, nil
}
}
Expand Down
16 changes: 9 additions & 7 deletions sdks/go/container/boot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package main

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
Expand Down Expand Up @@ -75,7 +77,7 @@ func TestEnsureEndpointsSet_OneMissing(t *testing.T) {
}

func TestGetGoWorkerArtifactName_NoArtifacts(t *testing.T) {
_, err := getGoWorkerArtifactName([]*pipepb.ArtifactInformation{})
_, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, []*pipepb.ArtifactInformation{})
if err == nil {
t.Fatalf("getGoWorkerArtifactName() = nil, want non-nil error")
}
Expand All @@ -85,7 +87,7 @@ func TestGetGoWorkerArtifactName_OneArtifact(t *testing.T) {
artifact := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path", "sha")
artifacts := []*pipepb.ArtifactInformation{&artifact}

val, err := getGoWorkerArtifactName(artifacts)
val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
}
Expand All @@ -99,7 +101,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsFirstIsWorker(t *testing.T) {
artifact2 := constructArtifactInformation(t, "other role", "test/path2", "sha")
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

val, err := getGoWorkerArtifactName(artifacts)
val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
}
Expand All @@ -113,7 +115,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsSecondIsWorker(t *testing.T) {
artifact2 := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path2", "sha")
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

val, err := getGoWorkerArtifactName(artifacts)
val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
}
Expand All @@ -127,7 +129,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsLegacyWay(t *testing.T) {
artifact2 := constructArtifactInformation(t, "other role", "worker", "sha")
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

val, err := getGoWorkerArtifactName(artifacts)
val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
}
Expand All @@ -141,7 +143,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsNoneMatch(t *testing.T) {
artifact2 := constructArtifactInformation(t, "other role", "test/path2", "sha")
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

_, err := getGoWorkerArtifactName(artifacts)
_, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err == nil {
t.Fatalf("getGoWorkerArtifactName() = nil, want non-nil error")
}
Expand Down Expand Up @@ -243,7 +245,7 @@ func TestConfigureGoogleCloudProfilerEnvVars(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Cleanup(os.Clearenv)
err := configureGoogleCloudProfilerEnvVars(test.inputMetadata)
err := configureGoogleCloudProfilerEnvVars(context.Background(), &tools.Logger{}, test.inputMetadata)
if err != nil {
if got, want := err.Error(), test.expectedError; got != want {
t.Errorf("got error %v, want error %v", got, want)
Expand Down
117 changes: 117 additions & 0 deletions sdks/go/container/tools/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tools

import (
"context"
"errors"
"fmt"
"log"
"os"
"sync"
"time"

fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"google.golang.org/protobuf/types/known/timestamppb"
)

// Logger is a wrapper around the FnAPI Logging Client, intended for
// container boot loader use. Not intended for Beam end users.
type Logger struct {
Endpoint string

client logSender
closeFn func()
mu sync.Mutex // To protect Send in the rare case multiple goroutines are calling this logger.
}

type logSender interface {
Send(*fnpb.LogEntry_List) error
CloseSend() error
}

func (l *Logger) Close() {
if l.closeFn != nil {
l.client.CloseSend()
l.closeFn()
l.closeFn = nil
l.client = nil
}
}

// Log a message with the given severity.
func (l *Logger) Log(ctx context.Context, sev fnpb.LogEntry_Severity_Enum, message string) {
l.mu.Lock()
defer l.mu.Unlock()

var exitErr error
defer func() {
if exitErr != nil {
log.Println("boot.go: error logging message over FnAPI. endpoint", l.Endpoint, "error:", exitErr, "message follows")
log.Println(sev.String(), message)
}
}()
if l.client == nil {
if l.Endpoint == "" {
exitErr = errors.New("no logging endpoint set")
return
}
cc, err := grpcx.Dial(ctx, l.Endpoint, 2*time.Minute)
if err != nil {
exitErr = err
return
}
l.closeFn = func() { cc.Close() }

l.client, err = fnpb.NewBeamFnLoggingClient(cc).Logging(ctx)
if err != nil {
exitErr = err
l.Close()
return
}
}

err := l.client.Send(&fnpb.LogEntry_List{
LogEntries: []*fnpb.LogEntry{
{
Severity: sev,
Timestamp: timestamppb.Now(),
Message: message,
},
},
})
if err != nil {
exitErr = err
return
}
}

// Printf logs the message with Debug severity.
func (l *Logger) Printf(ctx context.Context, format string, args ...any) {
l.Log(ctx, fnpb.LogEntry_Severity_DEBUG, fmt.Sprintf(format, args...))
}

// Warnf logs the message with Warning severity.
func (l *Logger) Warnf(ctx context.Context, format string, args ...any) {
l.Log(ctx, fnpb.LogEntry_Severity_WARN, fmt.Sprintf(format, args...))
}

// Fatalf logs the message with Critical severity, and then calls os.Exit(1).
func (l *Logger) Fatalf(ctx context.Context, format string, args ...any) {
l.Log(ctx, fnpb.LogEntry_Severity_CRITICAL, fmt.Sprintf(format, args...))
os.Exit(1)
}
Loading

0 comments on commit 492e2c9

Please sign in to comment.