Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT APPROVE; testing purposes only] added test for KMS key #25902

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,095 changes: 1,095 additions & 0 deletions examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ message Elements {
// (Optional) Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// concatenated together.
bytes data = 3;
bytes data = 3 [ctype = CORD];

// (Optional) Set this bit to indicate the this is the last data block
// for the given instruction and transform, ending the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4790,10 +4790,6 @@ GBKTransform:
- from_runner_api_parameter
- to_runner_api_parameter
GcpTestIOError: {}
GcsDownloader:
methods:
- get_range
- size
GCSFileSystem:
methods:
- checksum
Expand Down Expand Up @@ -4837,10 +4833,6 @@ GcsIOError: {}
GcsIOOverrides:
methods:
- retry_func
GcsUploader:
methods:
- finish
- put
GeneralPurposeConsumerSet:
methods:
- flush
Expand Down
27 changes: 11 additions & 16 deletions sdks/go/pkg/beam/core/runtime/exec/sdf.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,10 @@ func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, d
// Input Diagram:
//
// *FullValue {
// Elm: *FullValue {
// Elm: *FullValue (original input)
// Elm: *FullValue { -- mainElm
// Elm: *FullValue (original input) -- inp
// Elm2: *FullValue {
// Elm: Restriction
// Elm: Restriction -- rest
// Elm2: Watermark estimator state
// }
// }
Expand All @@ -325,24 +325,19 @@ func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, d
// }
func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
mainElm := elm.Elm.(*FullValue)
inp := mainElm.Elm
// For the main element, the way we fill it out depends on whether the input element
// is a KV or single-element. Single-elements might have been lifted out of
// their FullValue if they were decoded, so we need to have a case for that.
// TODO(https://github.com/apache/beam/issues/20196): Optimize this so it's decided in exec/translate.go
// instead of checking per-element.
if e, ok := mainElm.Elm.(*FullValue); ok {
mainElm = e
inp = e
}
rest := elm.Elm.(*FullValue).Elm2.(*FullValue).Elm

// If receiving directly from a datasource,
// the element may not be wrapped in a *FullValue
inp := convertIfNeeded(mainElm.Elm, &FullValue{})

rest := mainElm.Elm2.(*FullValue).Elm

rt, err := n.ctInv.Invoke(ctx, rest)
if err != nil {
return err
}

newRest, err := n.truncateInv.Invoke(ctx, rt, mainElm)
newRest, err := n.truncateInv.Invoke(ctx, rt, inp)
if err != nil {
return err
}
Expand All @@ -351,7 +346,7 @@ func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm *Full
return nil
}

size, err := n.sizeInv.Invoke(ctx, mainElm, newRest)
size, err := n.sizeInv.Invoke(ctx, inp, newRest)
if err != nil {
return err
}
Expand Down
24 changes: 13 additions & 11 deletions sdks/go/test/integration/primitives/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

func init() {
register.DoFn3x1[*sdf.LockRTracker, []byte, func(int64), sdf.ProcessContinuation](&TruncateFn{})
register.DoFn4x1[context.Context, *sdf.LockRTracker, []byte, func(int64), sdf.ProcessContinuation](&TruncateFn{})

register.Emitter1[int64]()
}
Expand Down Expand Up @@ -83,39 +83,41 @@ func (fn *TruncateFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) [
}

// TruncateRestriction truncates the restriction during drain.
func (fn *TruncateFn) TruncateRestriction(rt *sdf.LockRTracker, _ []byte) offsetrange.Restriction {
start := rt.GetRestriction().(offsetrange.Restriction).Start
func (fn *TruncateFn) TruncateRestriction(ctx context.Context, rt *sdf.LockRTracker, _ []byte) offsetrange.Restriction {
rest := rt.GetRestriction().(offsetrange.Restriction)
start := rest.Start
newEnd := start + 20

done, remaining := rt.GetProgress()
log.Infof(ctx, "Draining at: done %v, remaining %v, start %v, end %v, newEnd %v", done, remaining, start, rest.End, newEnd)

return offsetrange.Restriction{
Start: start,
End: newEnd,
}
}

// ProcessElement continually gets the start position of the restriction and emits the element as it is.
func (fn *TruncateFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, emit func(int64)) sdf.ProcessContinuation {
func (fn *TruncateFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, _ []byte, emit func(int64)) sdf.ProcessContinuation {
position := rt.GetRestriction().(offsetrange.Restriction).Start
counter := 0
for {
if rt.TryClaim(position) {
log.Infof(ctx, "Claimed position: %v", position)
// Successful claim, emit the value and move on.
emit(position)
position++
counter++
} else if rt.GetError() != nil || rt.IsDone() {
// Stop processing on error or completion
if err := rt.GetError(); err != nil {
log.Errorf(context.Background(), "error in restriction tracker, got %v", err)
log.Errorf(ctx, "error in restriction tracker, got %v", err)
}
log.Infof(ctx, "Restriction done at position %v.", position)
return sdf.StopProcessing()
} else {
log.Infof(ctx, "Checkpointed at position %v, resuming later.", position)
// Resume later.
return sdf.ResumeProcessingIn(5 * time.Second)
}

if counter >= 10 {
return sdf.ResumeProcessingIn(1 * time.Second)
}
time.Sleep(1 * time.Second)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public long getElementByteSize() {

@Override
public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K, InputT>> input) {

Duration allowedLateness = input.getWindowingStrategy().getAllowedLateness();
checkArgument(
input.getCoder() instanceof KvCoder,
"coder specified in the input PCollection is not a KvCoder");
Expand All @@ -344,6 +344,7 @@ public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K, InputT>> in
params.getBatchSizeBytes(),
weigher,
params.getMaxBufferingDuration(),
allowedLateness,
valueCoder)));
}

Expand All @@ -357,12 +358,20 @@ private static class GroupIntoBatchesDoFn<K, InputT>
@Nullable private final SerializableFunction<InputT, Long> weigher;
private final Duration maxBufferingDuration;

private final Duration allowedLateness;

// The following timer is no longer set. We maintain the spec for update compatibility.
private static final String END_OF_WINDOW_ID = "endOFWindow";

@TimerId(END_OF_WINDOW_ID)
private final TimerSpec windowTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

// This timer manages the watermark hold if there is no buffering timer.
private static final String TIMER_HOLD_ID = "watermarkHold";

@TimerId(TIMER_HOLD_ID)
private final TimerSpec holdTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

// This timer expires when it's time to batch and output the buffered data.
private static final String END_OF_BUFFERING_ID = "endOfBuffering";

Expand Down Expand Up @@ -410,11 +419,13 @@ private static class GroupIntoBatchesDoFn<K, InputT>
long batchSizeBytes,
@Nullable SerializableFunction<InputT, Long> weigher,
Duration maxBufferingDuration,
Duration allowedLateness,
Coder<InputT> inputValueCoder) {
this.batchSize = batchSize;
this.batchSizeBytes = batchSizeBytes;
this.weigher = weigher;
this.maxBufferingDuration = maxBufferingDuration;
this.allowedLateness = allowedLateness;
this.batchSpec = StateSpecs.bag(inputValueCoder);

Combine.BinaryCombineLongFn sumCombineFn =
Expand Down Expand Up @@ -452,9 +463,18 @@ public long apply(long left, long right) {
this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : (batchSize / 5);
}

@Override
public Duration getAllowedTimestampSkew() {
// This is required since flush is sometimes called from processElement. This is safe because
// a watermark hold
// will always be set using timer.withOutputTimestamp.
return Duration.millis(Long.MAX_VALUE);
}

@ProcessElement
public void processElement(
@TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer,
@TimerId(TIMER_HOLD_ID) Timer holdTimer,
@StateId(BATCH_ID) BagState<InputT> batch,
@StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> storedBatchSize,
@StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> storedBatchSizeBytes,
Expand All @@ -473,9 +493,10 @@ public void processElement(
storedBatchSizeBytes.readLater();
}
storedBatchSize.readLater();
if (shouldCareAboutMaxBufferingDuration) {
minBufferedTs.readLater();
}
minBufferedTs.readLater();

// Make sure we always include the current timestamp in the minBufferedTs.
minBufferedTs.add(elementTs.getMillis());

LOG.debug("*** BATCH *** Add element for window {} ", window);
if (shouldCareAboutWeight) {
Expand Down Expand Up @@ -505,23 +526,26 @@ public void processElement(
timerTs,
minBufferedTs);
bufferingTimer.clear();
holdTimer.clear();
}
storedBatchSizeBytes.add(elementWeight);
}
batch.add(element.getValue());
// Blind add is supported with combiningState
storedBatchSize.add(1L);
// Add the timestamp back into minBufferedTs as it might be cleared by flushBatch above.
minBufferedTs.add(elementTs.getMillis());

final long num = storedBatchSize.read();
if (shouldCareAboutMaxBufferingDuration) {
long oldOutputTs =
MoreObjects.firstNonNull(
minBufferedTs.read(), BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
minBufferedTs.add(elementTs.getMillis());
// If this is the first element in the batch or if the timer's output timestamp needs
// modifying, then set a
// timer.
if (num == 1 || minBufferedTs.read() != oldOutputTs) {

// If this is the first element in the batch or if the timer's output timestamp needs
// modifying, then set a timer.
long oldOutputTs =
MoreObjects.firstNonNull(
minBufferedTs.read(), BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
boolean needsNewTimer = num == 1 || minBufferedTs.read() != oldOutputTs;
if (needsNewTimer) {
if (shouldCareAboutMaxBufferingDuration) {
long targetTs =
MoreObjects.firstNonNull(
timerTs.read(),
Expand All @@ -530,6 +554,12 @@ public void processElement(
bufferingTimer
.withOutputTimestamp(Instant.ofEpochMilli(minBufferedTs.read()))
.set(Instant.ofEpochMilli(targetTs));
} else {
// The only way to hold the watermark is to set a timer. Since there is no buffering
// timer, we set a dummy
// timer at the end of the window to manage the hold.
Instant windowEnd = window.maxTimestamp().plus(allowedLateness);
holdTimer.withOutputTimestamp(Instant.ofEpochMilli(minBufferedTs.read())).set(windowEnd);
}
}

Expand Down Expand Up @@ -585,6 +615,11 @@ public void onWindowExpiration(
receiver, key, batch, storedBatchSize, storedBatchSizeBytes, timerTs, minBufferedTs);
}

@OnTimer(TIMER_HOLD_ID)
public void onHoldTimer() {
// Do nothing. The associated watermark hold will be automatically removed.
}

// We no longer set this timer, since OnWindowExpiration takes care of his. However we leave the
// callback in place
// for existing jobs that have already set these timers.
Expand Down Expand Up @@ -618,7 +653,8 @@ private void flushBatch(
Iterable<InputT> values = batch.read();
// When the timer fires, batch state might be empty
if (!Iterables.isEmpty(values)) {
receiver.output(KV.of(key, values));
receiver.outputWithTimestamp(
KV.of(key, values), Instant.ofEpochMilli(minBufferedTs.read()));
}
clearState(batch, storedBatchSize, storedBatchSizeBytes, timerTs, minBufferedTs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,8 @@ private WriteResult writeResult(Pipeline p, PCollection<TableDestination> succes
new TupleTag<>("successfulInserts"),
successfulWrites,
null,
null,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,7 @@ public static <T> Write<T> write() {
.setAutoSchemaUpdate(false)
.setDeterministicRecordIdFn(null)
.setMaxRetryJobs(1000)
.setPropagateSuccessfulStorageApiWrites(false)
.build();
}

Expand Down Expand Up @@ -2211,6 +2212,8 @@ public enum Method {

abstract int getNumStorageWriteApiStreams();

abstract boolean getPropagateSuccessfulStorageApiWrites();

abstract int getMaxFilesPerPartition();

abstract long getMaxBytesPerPartition();
Expand Down Expand Up @@ -2306,6 +2309,9 @@ abstract Builder<T> setAvroSchemaFactory(

abstract Builder<T> setNumStorageWriteApiStreams(int numStorageApiStreams);

abstract Builder<T> setPropagateSuccessfulStorageApiWrites(
boolean propagateSuccessfulStorageApiWrites);

abstract Builder<T> setMaxFilesPerPartition(int maxFilesPerPartition);

abstract Builder<T> setMaxBytesPerPartition(long maxBytesPerPartition);
Expand Down Expand Up @@ -2763,6 +2769,17 @@ public Write<T> withNumStorageWriteApiStreams(int numStorageWriteApiStreams) {
return toBuilder().setNumStorageWriteApiStreams(numStorageWriteApiStreams).build();
}

/**
* If set to true, then all successful writes will be propagated to {@link WriteResult} and
* accessible via the {@link WriteResult#getSuccessfulStorageApiInserts} method.
*/
public Write<T> withPropagateSuccessfulStorageApiWrites(
boolean propagateSuccessfulStorageApiWrites) {
return toBuilder()
.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites)
.build();
}

/**
* Provides a custom location on GCS for storing temporary files to be loaded via BigQuery batch
* load jobs. See "Usage with templates" in {@link BigQueryIO} documentation for discussion.
Expand Down Expand Up @@ -3270,6 +3287,9 @@ private <DestinationT> WriteResult continueExpandTyped(
checkArgument(
getSchemaUpdateOptions() == null || getSchemaUpdateOptions().isEmpty(),
"SchemaUpdateOptions are not supported when method == STREAMING_INSERTS");
checkArgument(
!getPropagateSuccessfulStorageApiWrites(),
"withPropagateSuccessfulStorageApiWrites only supported when using storage api writes.");

RowWriterFactory.TableRowWriterFactory<T, DestinationT> tableRowWriterFactory =
(RowWriterFactory.TableRowWriterFactory<T, DestinationT>) rowWriterFactory;
Expand Down Expand Up @@ -3301,6 +3321,9 @@ private <DestinationT> WriteResult continueExpandTyped(
rowWriterFactory.getOutputType() == OutputType.AvroGenericRecord,
"useAvroLogicalTypes can only be set with Avro output.");
}
checkArgument(
!getPropagateSuccessfulStorageApiWrites(),
"withPropagateSuccessfulStorageApiWrites only supported when using storage api writes.");

// Batch load jobs currently support JSON data insertion only with CSV files
if (getJsonSchema() != null && getJsonSchema().isAccessible()) {
Expand Down Expand Up @@ -3406,7 +3429,8 @@ private <DestinationT> WriteResult continueExpandTyped(
method == Method.STORAGE_API_AT_LEAST_ONCE,
getAutoSharding(),
getAutoSchemaUpdate(),
getIgnoreUnknownValues());
getIgnoreUnknownValues(),
getPropagateSuccessfulStorageApiWrites());
return input.apply("StorageApiLoads", storageApiLoads);
} else {
throw new RuntimeException("Unexpected write method " + method);
Expand Down
Loading