Skip to content

Commit

Permalink
Add flag to control automatic exception sampling in Java (disabled) (#…
Browse files Browse the repository at this point in the history
…28403)

* Add flag to control automatic exception sampling in Java (disabled)

* Fix NPE in FnHarness with DataSampler

* trigger tests

* trigger tests

* trigger tests

* trigger tests

---------

Co-authored-by: Sam Rohde <[email protected]>
  • Loading branch information
rohdesamuel and Sam Rohde authored Sep 14, 2023
1 parent 8f60924 commit 5df59a9
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public class FnHarness {
private static final String PIPELINE_OPTIONS_FILE = "PIPELINE_OPTIONS_FILE";
private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
private static final String RUNNER_CAPABILITIES = "RUNNER_CAPABILITIES";
private static final String ENABLE_DATA_SAMPLING_EXPERIMENT = "enable_data_sampling";
private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);

private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String descriptor)
Expand Down Expand Up @@ -248,7 +247,8 @@ public static void main(
options.as(ExecutorOptions.class).getScheduledExecutorService();
ExecutionStateSampler executionStateSampler =
new ExecutionStateSampler(options, System::currentTimeMillis);
final DataSampler dataSampler = new DataSampler();

final @Nullable DataSampler dataSampler = DataSampler.create(options);

// The logging client variable is not used per se, but during its lifetime (until close()) it
// intercepts logging and sends it to the logging service.
Expand Down Expand Up @@ -276,10 +276,6 @@ public static void main(

FinalizeBundleHandler finalizeBundleHandler = new FinalizeBundleHandler(executorService);

// Create the sampler, if the experiment is enabled.
boolean shouldSample =
ExperimentalOptions.hasExperiment(options, ENABLE_DATA_SAMPLING_EXPERIMENT);

// Retrieves the ProcessBundleDescriptor from cache. Requests the PBD from the Runner if it
// doesn't exist. Additionally, runs any graph modifications.
Function<String, BeamFnApi.ProcessBundleDescriptor> getProcessBundleDescriptor =
Expand Down Expand Up @@ -314,7 +310,7 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
metricsShortIds,
executionStateSampler,
processWideCache,
shouldSample ? dataSampler : null);
dataSampler);
logging.setProcessBundleHandler(processBundleHandler);

BeamFnStatusClient beamFnStatusClient = null;
Expand Down Expand Up @@ -363,7 +359,12 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
InstructionRequest.RequestCase.HARNESS_MONITORING_INFOS,
processWideHandler::harnessMonitoringInfos);
handlers.put(
InstructionRequest.RequestCase.SAMPLE_DATA, dataSampler::handleDataSampleRequest);
InstructionRequest.RequestCase.SAMPLE_DATA,
request ->
dataSampler == null
? BeamFnApi.InstructionResponse.newBuilder()
.setSampleData(BeamFnApi.SampleDataResponse.newBuilder())
: dataSampler.handleDataSampleRequest(request));

JvmInitializers.runBeforeProcessing(options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,19 +40,66 @@
*/
public class DataSampler {
private static final Logger LOG = LoggerFactory.getLogger(DataSampler.class);
private static final String ENABLE_DATA_SAMPLING_EXPERIMENT = "enable_data_sampling";
private static final String ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING_EXPERIMENT =
"enable_always_on_exception_sampling";
private static final String DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING_EXPERIMENT =
"disable_always_on_exception_sampling";

/**
* Optionally returns a DataSampler if the experiment "enable_data_sampling" is present or
* "enable_always_on_exception_sampling" is present. Returns null is data sampling is not enabled
* or "disable_always_on_exception_sampling" experiment is given.
*
* @param options the pipeline options given to this SDK Harness.
* @return the DataSampler if enabled or null, otherwise.
*/
public static @Nullable DataSampler create(PipelineOptions options) {
boolean disableAlwaysOnExceptionSampling =
ExperimentalOptions.hasExperiment(options, DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING_EXPERIMENT);
boolean enableAlwaysOnExceptionSampling =
ExperimentalOptions.hasExperiment(options, ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING_EXPERIMENT);
boolean enableDataSampling =
ExperimentalOptions.hasExperiment(options, ENABLE_DATA_SAMPLING_EXPERIMENT);
// Enable exception sampling, unless the user specifies for it to be disabled.
enableAlwaysOnExceptionSampling =
enableAlwaysOnExceptionSampling && !disableAlwaysOnExceptionSampling;

// If no sampling is enabled, don't create the DataSampler.
if (enableDataSampling || enableAlwaysOnExceptionSampling) {
// For performance reasons, sampling all elements should only be done when the user requests
// it.
// But, exception sampling doesn't need to worry about performance implications, since the SDK
// is already in a bad state. Thus, enable only exception sampling when the user does not
// request for the sampling of all elements.
boolean onlySampleExceptions = enableAlwaysOnExceptionSampling && !enableDataSampling;
return new DataSampler(onlySampleExceptions);
} else {
return null;
}
}

/**
* Creates a DataSampler to sample every 1000 elements while keeping a maximum of 10 in memory.
*/
public DataSampler() {
this(10, 1000);
this(10, 1000, false);
}

/**
* Creates a DataSampler to sample every 1000 elements while keeping a maximum of 10 in memory.
*
* @param onlySampleExceptions If true, only samples elements from exceptions.
*/
public DataSampler(Boolean onlySampleExceptions) {
this(10, 1000, onlySampleExceptions);
}

/**
* @param maxSamples Sets the maximum number of samples held in memory at once.
* @param sampleEveryN Sets how often to sample.
*/
public DataSampler(int maxSamples, int sampleEveryN) {
public DataSampler(int maxSamples, int sampleEveryN, Boolean onlySampleExceptions) {
checkArgument(
maxSamples > 0,
"Expected positive number of samples, did you mean to disable data sampling?");
Expand All @@ -58,6 +108,7 @@ public DataSampler(int maxSamples, int sampleEveryN) {
"Expected positive number for sampling period, did you mean to disable data sampling?");
this.maxSamples = maxSamples;
this.sampleEveryN = sampleEveryN;
this.onlySampleExceptions = onlySampleExceptions;
}

// Maximum number of elements in buffer.
Expand All @@ -66,6 +117,9 @@ public DataSampler(int maxSamples, int sampleEveryN) {
// Sampling rate.
private final int sampleEveryN;

// If true, only takes samples when exceptions in UDFs occur.
private final Boolean onlySampleExceptions;

// The fully-qualified type is: Map[PCollectionId, OutputSampler]. In order to sample
// on a PCollection-basis and not per-bundle, this keeps track of shared samples between states.
private final Map<String, OutputSampler<?>> outputSamplers = new ConcurrentHashMap<>();
Expand All @@ -86,7 +140,10 @@ public DataSampler(int maxSamples, int sampleEveryN) {
public <T> OutputSampler<T> sampleOutput(String pcollectionId, Coder<T> coder) {
return (OutputSampler<T>)
outputSamplers.computeIfAbsent(
pcollectionId, k -> new OutputSampler<>(coder, this.maxSamples, this.sampleEveryN));
pcollectionId,
k ->
new OutputSampler<>(
coder, this.maxSamples, this.sampleEveryN, this.onlySampleExceptions));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,19 @@ public class OutputSampler<T> {
// Index into the buffer of where to overwrite samples.
private int resampleIndex = 0;

// If true, only takes samples when exceptions in UDFs occur.
private final Boolean onlySampleExceptions;

@Nullable private final Coder<T> valueCoder;

@Nullable private final Coder<WindowedValue<T>> windowedValueCoder;

public OutputSampler(Coder<?> coder, int maxElements, int sampleEveryN) {
public OutputSampler(
Coder<?> coder, int maxElements, int sampleEveryN, boolean onlySampleExceptions) {
this.maxElements = maxElements;
this.sampleEveryN = sampleEveryN;
this.buffer = new ArrayList<>(this.maxElements);
this.onlySampleExceptions = onlySampleExceptions;

// The samples taken and encoded should match exactly to the specification from the
// ProcessBundleDescriptor. The coder given can either be a WindowedValueCoder, in which the
Expand Down Expand Up @@ -103,7 +108,7 @@ public ElementSample<T> sample(WindowedValue<T> element) {

ElementSample<T> elementSample =
new ElementSample<>(ThreadLocalRandom.current().nextInt(), element);
if (samples > 10 && samples % sampleEveryN != 0) {
if (onlySampleExceptions || (samples > 10 && samples % sampleEveryN != 0)) {
return elementSample;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.ByteArrayOutputStream;
Expand All @@ -32,10 +33,13 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -117,6 +121,21 @@ void assertHasSamples(
assertTrue(elementList.getElementsList().containsAll(expectedSamples));
}

void assertHasSamples(
BeamFnApi.InstructionResponse response,
String pcollection,
List<BeamFnApi.SampledElement> elements) {
Map<String, BeamFnApi.SampleDataResponse.ElementList> elementSamplesMap =
response.getSampleData().getElementSamplesMap();

assertFalse(elementSamplesMap.isEmpty());

BeamFnApi.SampleDataResponse.ElementList elementList = elementSamplesMap.get(pcollection);
assertNotNull(elementList);

assertTrue(elementList.getElementsList().containsAll(elements));
}

/**
* Smoke test that a samples show in the output map.
*
Expand Down Expand Up @@ -203,7 +222,7 @@ void generateStringSamples(DataSampler sampler) {
*/
@Test
public void testFiltersSinglePCollectionId() throws Exception {
DataSampler sampler = new DataSampler(10, 10);
DataSampler sampler = new DataSampler(10, 10, false);
generateStringSamples(sampler);

BeamFnApi.InstructionResponse samples = getSamplesForPCollection(sampler, "a");
Expand All @@ -219,7 +238,7 @@ public void testFiltersSinglePCollectionId() throws Exception {
public void testFiltersMultiplePCollectionIds() throws Exception {
List<String> pcollectionIds = ImmutableList.of("a", "c");

DataSampler sampler = new DataSampler(10, 10);
DataSampler sampler = new DataSampler(10, 10, false);
generateStringSamples(sampler);

BeamFnApi.InstructionResponse samples = getSamplesForPCollections(sampler, pcollectionIds);
Expand Down Expand Up @@ -275,4 +294,87 @@ public void testConcurrentNewSampler() throws Exception {
sampleThread.join();
}
}

/**
* Tests that including the "enable_always_on_exception_sampling" can sample.
*
* @throws Exception
*/
@Test
public void testEnableAlwaysOnExceptionSampling() throws Exception {
ExperimentalOptions experimentalOptions = PipelineOptionsFactory.as(ExperimentalOptions.class);
experimentalOptions.setExperiments(
Collections.singletonList("enable_always_on_exception_sampling"));
DataSampler sampler = DataSampler.create(experimentalOptions);
assertNotNull(sampler);

VarIntCoder coder = VarIntCoder.of();
OutputSampler<Integer> outputSampler = sampler.sampleOutput("pcollection-id", coder);
ElementSample<Integer> elementSample = outputSampler.sample(globalWindowedValue(1));
outputSampler.exception(elementSample, new RuntimeException(), "", "");

outputSampler.sample(globalWindowedValue(2));

BeamFnApi.InstructionResponse samples = getAllSamples(sampler);
List<SampledElement> expectedSamples =
ImmutableList.of(
SampledElement.newBuilder()
.setElement(ByteString.copyFrom(encodeInt(1)))
.setException(
SampledElement.Exception.newBuilder()
.setError(new RuntimeException().toString()))
.build());
assertHasSamples(samples, "pcollection-id", expectedSamples);
}

/**
* Tests that "disable_always_on_exception_sampling" overrides the always on experiment.
*
* @throws Exception
*/
@Test
public void testDisableAlwaysOnExceptionSampling() throws Exception {
ExperimentalOptions experimentalOptions = PipelineOptionsFactory.as(ExperimentalOptions.class);
experimentalOptions.setExperiments(
ImmutableList.of(
"enable_always_on_exception_sampling", "disable_always_on_exception_sampling"));
DataSampler sampler = DataSampler.create(experimentalOptions);
assertNull(sampler);
}

/**
* Tests that the "enable_data_sampling" experiment overrides
* "disable_always_on_exception_sampling".
*
* @throws Exception
*/
@Test
public void testDisableAlwaysOnExceptionSamplingWithEnableDataSampling() throws Exception {
ExperimentalOptions experimentalOptions = PipelineOptionsFactory.as(ExperimentalOptions.class);
experimentalOptions.setExperiments(
ImmutableList.of(
"enable_data_sampling",
"enable_always_on_exception_sampling",
"disable_always_on_exception_sampling"));
DataSampler sampler = DataSampler.create(experimentalOptions);
assertNotNull(sampler);

VarIntCoder coder = VarIntCoder.of();
OutputSampler<Integer> outputSampler = sampler.sampleOutput("pcollection-id", coder);
ElementSample<Integer> elementSample = outputSampler.sample(globalWindowedValue(1));
outputSampler.exception(elementSample, new RuntimeException(), "", "");

outputSampler.sample(globalWindowedValue(2));

BeamFnApi.InstructionResponse samples = getAllSamples(sampler);
List<SampledElement> expectedSamples =
ImmutableList.of(
SampledElement.newBuilder()
.setElement(ByteString.copyFrom(encodeInt(1)))
.setException(
SampledElement.Exception.newBuilder()
.setError(new RuntimeException().toString()))
.build());
assertHasSamples(samples, "pcollection-id", expectedSamples);
}
}
Loading

0 comments on commit 5df59a9

Please sign in to comment.