Skip to content

Commit

Permalink
Adds an SdkHarnessOption that controls whether logging is redirected …
Browse files Browse the repository at this point in the history
…through the FnApi (#33418)

* Add an SdkHarnessOption that controls whether logging is redirected through the FnApi logging service. Redirection through the logging service is enabled by default.

* Add an SdkHarnessOption that controls whether logging is redirected through the FnApi logging service. Redirection through the FnApi is enabled by default.

* include license in new files

* fix ManagedChannel dep

* fix invalid conversions

* fix missing imports

* fix type mismatch

* fix up tests

* continue to use anyOf when logViaFnApi is enabled

* More comments on the new SdkHarnessOption. DataflowRunner.run() forces
the option to 'enabled'.
  • Loading branch information
t2h6 authored Jan 7, 2025
1 parent e0b04bf commit e657d6c
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
Expand Down Expand Up @@ -1252,6 +1253,8 @@ public DataflowPipelineJob run(Pipeline pipeline) {
experiments.add("use_portable_job_submission");
}
options.setExperiments(ImmutableList.copyOf(experiments));
// Ensure that logging via the FnApi is enabled
options.as(SdkHarnessOptions.class).setEnableLogViaFnApi(true);
}

logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ enum LogLevel {

void setLogMdc(boolean value);

/** This option controls whether logging will be redirected through the FnApi. */
@Description(
"Controls whether logging will be redirected through the FnApi. In normal usage, setting "
+ "this to a non-default value will cause log messages to be dropped.")
@Default.Boolean(true)
@Hidden
boolean getEnableLogViaFnApi();

void setEnableLogViaFnApi(boolean enableLogViaFnApi);

/**
* Size (in MB) of each grouping table used to pre-combine elements. Larger values may reduce the
* amount of data shuffled. If unset, defaults to 100 MB.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.fn.harness.logging.LoggingClient;
import org.apache.beam.fn.harness.logging.LoggingClientFactory;
import org.apache.beam.fn.harness.logging.QuotaEvent;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
Expand Down Expand Up @@ -80,7 +82,7 @@ public void onCompleted() {
/** Setup a simple logging service and configure the {@link BeamFnLoggingClient}. */
@State(Scope.Benchmark)
public static class ManageLoggingClientAndService {
public final BeamFnLoggingClient loggingClient;
public final LoggingClient loggingClient;
public final CallCountLoggingService loggingService;
public final Server server;

Expand All @@ -98,7 +100,7 @@ public ManageLoggingClientAndService() {
.build();
server.start();
loggingClient =
BeamFnLoggingClient.createAndStart(
LoggingClientFactory.createAndStart(
PipelineOptionsFactory.create(),
apiServiceDescriptor,
managedChannelFactory::forDescriptor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
import org.apache.beam.fn.harness.debug.DataSampler;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.logging.LoggingClient;
import org.apache.beam.fn.harness.logging.LoggingClientFactory;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.fn.harness.status.BeamFnStatusClient;
import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories;
Expand All @@ -62,6 +63,7 @@
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat;
Expand Down Expand Up @@ -283,8 +285,8 @@ public static void main(

// The logging client variable is not used per se, but during its lifetime (until close()) it
// intercepts logging and sends it to the logging service.
try (BeamFnLoggingClient logging =
BeamFnLoggingClient.createAndStart(
try (LoggingClient logging =
LoggingClientFactory.createAndStart(
options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) {
LOG.info("Fn Harness started");
// Register standard file systems.
Expand Down Expand Up @@ -410,7 +412,11 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
outboundObserverFactory,
executorService,
handlers);
CompletableFuture.anyOf(control.terminationFuture(), logging.terminationFuture()).get();
if (options.as(SdkHarnessOptions.class).getEnableLogViaFnApi()) {
CompletableFuture.anyOf(control.terminationFuture(), logging.terminationFuture()).get();
} else {
control.terminationFuture().get();
}
if (beamFnStatusClient != null) {
beamFnStatusClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
/**
* Configures {@link java.util.logging} to send all {@link LogRecord}s via the Beam Fn Logging API.
*/
public class BeamFnLoggingClient implements AutoCloseable {
public class BeamFnLoggingClient implements LoggingClient {
private static final String ROOT_LOGGER_NAME = "";
private static final ImmutableMap<Level, BeamFnApi.LogEntry.Severity.Enum> LOG_LEVEL_MAP =
ImmutableMap.<Level, BeamFnApi.LogEntry.Severity.Enum>builder()
Expand Down Expand Up @@ -119,7 +119,7 @@ public class BeamFnLoggingClient implements AutoCloseable {
*/
private @Nullable Thread logEntryHandlerThread = null;

public static BeamFnLoggingClient createAndStart(
static BeamFnLoggingClient createAndStart(
PipelineOptions options,
Endpoints.ApiServiceDescriptor apiServiceDescriptor,
Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory) {
Expand Down Expand Up @@ -383,6 +383,7 @@ void flushFinalLogs(@UnderInitialization BeamFnLoggingClient this) {
}
}

@Override
public CompletableFuture<?> terminationFuture() {
checkNotNull(bufferedLogConsumer, "BeamFnLoggingClient not fully started");
return bufferedLogConsumer;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.fn.harness.logging;

import java.util.concurrent.CompletableFuture;

public interface LoggingClient extends AutoCloseable {

CompletableFuture<?> terminationFuture();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.fn.harness.logging;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;

/**
* A factory for {@link LoggingClient}s. Provides {@link BeamFnLoggingClient} if the logging service
* is enabled, otherwise provides a no-op client.
*/
public class LoggingClientFactory {

private LoggingClientFactory() {}

/**
* A factory for {@link LoggingClient}s. Provides {@link BeamFnLoggingClient} if the logging
* service is enabled, otherwise provides a no-op client.
*/
public static LoggingClient createAndStart(
PipelineOptions options,
Endpoints.ApiServiceDescriptor apiServiceDescriptor,
Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory) {
if (options.as(SdkHarnessOptions.class).getEnableLogViaFnApi()) {
return BeamFnLoggingClient.createAndStart(options, apiServiceDescriptor, channelFactory);
} else {
return new NoOpLoggingClient();
}
}

static final class NoOpLoggingClient implements LoggingClient {
@Override
public CompletableFuture<?> terminationFuture() {
return CompletableFuture.completedFuture(new Object());
}

@Override
public void close() throws Exception {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker;
import org.apache.beam.fn.harness.debug.DataSampler;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.fn.harness.logging.LoggingClient;
import org.apache.beam.fn.harness.logging.LoggingClientFactory;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
Expand Down Expand Up @@ -647,8 +648,8 @@ public StreamObserver<BeamFnApi.LogEntry.List> logging(
// Start the test within the logging context. This reroutes logging through to the boiler-plate
// that was set up
// earlier.
try (BeamFnLoggingClient ignored =
BeamFnLoggingClient.createAndStart(
try (LoggingClient ignored =
LoggingClientFactory.createAndStart(
PipelineOptionsFactory.create(),
apiServiceDescriptor,
(Endpoints.ApiServiceDescriptor descriptor) -> channel)) {
Expand Down

0 comments on commit e657d6c

Please sign in to comment.