Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#25314) Have boot loaders log to logging service where possible. #26035

Merged
merged 7 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions sdks/go/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ import (
"strings"
"time"

"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"

// Import gcs filesystem so that it can be used to upload heap dumps
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/diagnostics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
Expand All @@ -58,7 +58,7 @@ const (
enableGoogleCloudProfilerOption = "enable_google_cloud_profiler"
)

func configureGoogleCloudProfilerEnvVars(metadata map[string]string) error {
func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger *tools.Logger, metadata map[string]string) error {
if metadata == nil {
return errors.New("enable_google_cloud_profiler is set to true, but no metadata is received from provision server, profiling will not be enabled")
}
Expand All @@ -72,7 +72,7 @@ func configureGoogleCloudProfilerEnvVars(metadata map[string]string) error {
}
os.Setenv(cloudProfilingJobName, jobName)
os.Setenv(cloudProfilingJobID, jobID)
log.Printf("Cloud Profiling Job Name: %v, Job IDL %v", jobName, jobID)
logger.Printf(ctx, "Cloud Profiling Job Name: %v, Job IDL %v", jobName, jobID)
return nil
}

Expand All @@ -87,7 +87,7 @@ func main() {

ctx := grpcx.WriteWorkerID(context.Background(), *id)

info, err := provision.Info(ctx, *provisionEndpoint)
info, err := tools.ProvisionInfo(ctx, *provisionEndpoint)
if err != nil {
log.Fatalf("Failed to obtain provisioning information: %v", err)
}
Expand All @@ -97,13 +97,14 @@ func main() {
if err != nil {
log.Fatalf("Endpoint not set: %v", err)
}
log.Printf("Initializing Go harness: %v", strings.Join(os.Args, " "))
logger := &tools.Logger{Endpoint: *loggingEndpoint}
logger.Printf(ctx, "Initializing Go harness: %v", strings.Join(os.Args, " "))

// (1) Obtain the pipeline options

options, err := provision.ProtoToJSON(info.GetPipelineOptions())
options, err := tools.ProtoToJSON(info.GetPipelineOptions())
if err != nil {
log.Fatalf("Failed to convert pipeline options: %v", err)
logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err)
}

// (2) Retrieve the staged files.
Expand All @@ -115,19 +116,19 @@ func main() {
dir := filepath.Join(*semiPersistDir, "staged")
artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
if err != nil {
log.Fatalf("Failed to retrieve staged files: %v", err)
logger.Fatalf(ctx, "Failed to retrieve staged files: %v", err)
}

name, err := getGoWorkerArtifactName(artifacts)
name, err := getGoWorkerArtifactName(ctx, logger, artifacts)
if err != nil {
log.Fatalf("Failed to get Go Worker Artifact Name: %v", err)
logger.Fatalf(ctx, "Failed to get Go Worker Artifact Name: %v", err)
}

// (3) The persist dir may be on a noexec volume, so we must
// copy the binary to a different location to execute.
const prog = "/bin/worker"
if err := copyExe(filepath.Join(dir, name), prog); err != nil {
log.Fatalf("Failed to copy worker binary: %v", err)
logger.Fatalf(ctx, "Failed to copy worker binary: %v", err)
}

args := []string{
Expand All @@ -148,9 +149,9 @@ func main() {

enableGoogleCloudProfiler := strings.Contains(options, enableGoogleCloudProfilerOption)
if enableGoogleCloudProfiler {
err := configureGoogleCloudProfilerEnvVars(info.Metadata)
err := configureGoogleCloudProfilerEnvVars(ctx, logger, info.Metadata)
if err != nil {
log.Printf("could not configure Google Cloud Profiler variables, got %v", err)
logger.Printf(ctx, "could not configure Google Cloud Profiler variables, got %v", err)
}
}

Expand All @@ -166,10 +167,10 @@ func main() {
}
}

log.Fatalf("User program exited: %v", err)
logger.Fatalf(ctx, "User program exited: %v", err)
}

func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string, error) {
func getGoWorkerArtifactName(ctx context.Context, logger *tools.Logger, artifacts []*pipepb.ArtifactInformation) (string, error) {
const worker = "worker"
name := worker

Expand All @@ -190,7 +191,7 @@ func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string, e
for _, a := range artifacts {
n, _ := artifact.MustExtractFilePayload(a)
if n == worker {
log.Printf("Go worker binary found with legacy name '%v'", worker)
logger.Printf(ctx, "Go worker binary found with legacy name '%v'", worker)
return n, nil
}
}
Expand Down
16 changes: 9 additions & 7 deletions sdks/go/container/boot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package main

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
Expand Down Expand Up @@ -75,7 +77,7 @@ func TestEnsureEndpointsSet_OneMissing(t *testing.T) {
}

func TestGetGoWorkerArtifactName_NoArtifacts(t *testing.T) {
_, err := getGoWorkerArtifactName([]*pipepb.ArtifactInformation{})
_, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, []*pipepb.ArtifactInformation{})
if err == nil {
t.Fatalf("getGoWorkerArtifactName() = nil, want non-nil error")
}
Expand All @@ -85,7 +87,7 @@ func TestGetGoWorkerArtifactName_OneArtifact(t *testing.T) {
artifact := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path", "sha")
artifacts := []*pipepb.ArtifactInformation{&artifact}

val, err := getGoWorkerArtifactName(artifacts)
val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
}
Expand All @@ -99,7 +101,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsFirstIsWorker(t *testing.T) {
artifact2 := constructArtifactInformation(t, "other role", "test/path2", "sha")
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

val, err := getGoWorkerArtifactName(artifacts)
val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
}
Expand All @@ -113,7 +115,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsSecondIsWorker(t *testing.T) {
artifact2 := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path2", "sha")
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

val, err := getGoWorkerArtifactName(artifacts)
val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
}
Expand All @@ -127,7 +129,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsLegacyWay(t *testing.T) {
artifact2 := constructArtifactInformation(t, "other role", "worker", "sha")
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

val, err := getGoWorkerArtifactName(artifacts)
val, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err != nil {
t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
}
Expand All @@ -141,7 +143,7 @@ func TestGetGoWorkerArtifactName_MultipleArtifactsNoneMatch(t *testing.T) {
artifact2 := constructArtifactInformation(t, "other role", "test/path2", "sha")
artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}

_, err := getGoWorkerArtifactName(artifacts)
_, err := getGoWorkerArtifactName(context.Background(), &tools.Logger{}, artifacts)
if err == nil {
t.Fatalf("getGoWorkerArtifactName() = nil, want non-nil error")
}
Expand Down Expand Up @@ -243,7 +245,7 @@ func TestConfigureGoogleCloudProfilerEnvVars(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Cleanup(os.Clearenv)
err := configureGoogleCloudProfilerEnvVars(test.inputMetadata)
err := configureGoogleCloudProfilerEnvVars(context.Background(), &tools.Logger{}, test.inputMetadata)
if err != nil {
if got, want := err.Error(), test.expectedError; got != want {
t.Errorf("got error %v, want error %v", got, want)
Expand Down
117 changes: 117 additions & 0 deletions sdks/go/container/tools/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tools

import (
"context"
"errors"
"fmt"
"log"
"os"
"sync"
"time"

fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"google.golang.org/protobuf/types/known/timestamppb"
)

// Logger is a wrapper around the FnAPI Logging Client, intended for
// container boot loader use. Not intended for Beam end users.
type Logger struct {
Endpoint string

client logSender
closeFn func()
mu sync.Mutex // To protect Send in the rare case multiple goroutines are calling this logger.
}

type logSender interface {
Send(*fnpb.LogEntry_List) error
CloseSend() error
}

func (l *Logger) Close() {
if l.closeFn != nil {
l.client.CloseSend()
l.closeFn()
l.closeFn = nil
l.client = nil
}
}

// Log a message with the given severity.
func (l *Logger) Log(ctx context.Context, sev fnpb.LogEntry_Severity_Enum, message string) {
l.mu.Lock()
defer l.mu.Unlock()

var exitErr error
defer func() {
if exitErr != nil {
log.Println("boot.go: error logging message over FnAPI. endpoint", l.Endpoint, "error:", exitErr, "message follows")
log.Println(sev.String(), message)
}
}()
if l.client == nil {
if l.Endpoint == "" {
exitErr = errors.New("no logging endpoint set")
return
}
cc, err := grpcx.Dial(ctx, l.Endpoint, 2*time.Minute)
if err != nil {
exitErr = err
return
}
l.closeFn = func() { cc.Close() }

l.client, err = fnpb.NewBeamFnLoggingClient(cc).Logging(ctx)
if err != nil {
exitErr = err
l.Close()
return
}
}

err := l.client.Send(&fnpb.LogEntry_List{
LogEntries: []*fnpb.LogEntry{
{
Severity: sev,
Timestamp: timestamppb.Now(),
Message: message,
},
},
})
if err != nil {
exitErr = err
return
}
}

// Printf logs the message with Debug severity.
func (l *Logger) Printf(ctx context.Context, format string, args ...any) {
l.Log(ctx, fnpb.LogEntry_Severity_DEBUG, fmt.Sprintf(format, args...))
}

// Warnf logs the message with Warning severity.
func (l *Logger) Warnf(ctx context.Context, format string, args ...any) {
l.Log(ctx, fnpb.LogEntry_Severity_WARN, fmt.Sprintf(format, args...))
}

// Fatalf logs the message with Critical severity, and then calls os.Exit(1).
func (l *Logger) Fatalf(ctx context.Context, format string, args ...any) {
l.Log(ctx, fnpb.LogEntry_Severity_CRITICAL, fmt.Sprintf(format, args...))
os.Exit(1)
}
Loading