Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into iceberg_hash
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas committed Jan 25, 2024
2 parents 713fcfe + 80c6d3c commit ae52478
Showing 1 changed file with 47 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ public Read<T> from(List<String> inputDirectories) {
return toBuilder().setInputDirectories(inputDirectories).build();
}

/** Specifies the avro {@link AvroDatumFactory} for reading. */
public Read<T> withDatumFactory(AvroDatumFactory<T> datumFactory) {
return toBuilder().setDatumFactory(datumFactory).build();
}

/** Specifies the input filename suffix. */
public Read<T> withSuffix(String filenameSuffix) {
return toBuilder().setFilenameSuffix(filenameSuffix).build();
Expand Down Expand Up @@ -345,6 +350,18 @@ abstract static class Builder<K1, K2, T extends IndexedRecord> {
abstract Write<K1, K2, T> build();
}

/** Writes to the given output directory. */
public Write<K1, K2, T> to(String outputDirectory) {
return toBuilder()
.setOutputDirectory(FileSystems.matchNewResource(outputDirectory, true))
.build();
}

/** Specifies the avro {@link AvroDatumFactory} for writing. */
public Write<K1, K2, T> withDatumFactory(AvroDatumFactory<T> datumFactory) {
return toBuilder().setDatumFactory(datumFactory).build();
}

/** Specifies the number of buckets for partitioning. */
public Write<K1, K2, T> withNumBuckets(int numBuckets) {
return toBuilder().setNumBuckets(numBuckets).build();
Expand All @@ -365,20 +382,38 @@ public Write<K1, K2, T> withMetadata(Map<String, Object> metadata) {
return toBuilder().setMetadata(metadata).build();
}

/** Writes to the given output directory. */
public Write<K1, K2, T> to(String outputDirectory) {
return toBuilder()
.setOutputDirectory(FileSystems.matchNewResource(outputDirectory, true))
.build();
}

/** Specifies the temporary directory for writing. Defaults to --tempLocation if not set. */
public Write<K1, K2, T> withTempDirectory(String tempDirectory) {
return toBuilder()
.setTempDirectory(FileSystems.matchNewResource(tempDirectory, true))
.build();
}

/** Specifies the output filename suffix. */
public Write<K1, K2, T> withSuffix(String filenameSuffix) {
return toBuilder().setFilenameSuffix(filenameSuffix).build();
}

/** Specifies the output filename prefix (i.e. "bucket" or "part"). */
public Write<K1, K2, T> withFilenamePrefix(String filenamePrefix) {
return toBuilder().setFilenamePrefix(filenamePrefix).build();
}

/** Specifies the sorter memory in MB. */
public Write<K1, K2, T> withSorterMemoryMb(int sorterMemoryMb) {
return toBuilder().setSorterMemoryMb(sorterMemoryMb).build();
}

/** Specifies the size of an optional key-to-hash cache in the ExtractKeys transform. */
public Write<K1, K2, T> withKeyCacheOfSize(int keyCacheSize) {
return toBuilder().setKeyCacheSize(keyCacheSize).build();
}

/** Specifies the output file {@link CodecFactory}. */
public Write<K1, K2, T> withCodec(CodecFactory codec) {
return toBuilder().setCodec(codec).build();
}

@SuppressWarnings("unchecked")
@Override
public FileOperations<T> getFileOperations() {
Expand All @@ -405,31 +440,6 @@ BucketMetadata<K1, K2, T> getBucketMetadata() {
throw new IllegalStateException(e);
}
}

/** Specifies the output filename suffix. */
public Write<K1, K2, T> withSuffix(String filenameSuffix) {
return toBuilder().setFilenameSuffix(filenameSuffix).build();
}

/** Specifies the output filename prefix (i.e. "bucket" or "part"). */
public Write<K1, K2, T> withFilenamePrefix(String filenamePrefix) {
return toBuilder().setFilenamePrefix(filenamePrefix).build();
}

/** Specifies the sorter memory in MB. */
public Write<K1, K2, T> withSorterMemoryMb(int sorterMemoryMb) {
return toBuilder().setSorterMemoryMb(sorterMemoryMb).build();
}

/** Specifies the size of an optional key-to-hash cache in the ExtractKeys transform. */
public Write<K1, K2, T> withKeyCacheOfSize(int keyCacheSize) {
return toBuilder().setKeyCacheSize(keyCacheSize).build();
}

/** Specifies the output file {@link CodecFactory}. */
public Write<K1, K2, T> withCodec(CodecFactory codec) {
return toBuilder().setCodec(codec).build();
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -491,6 +501,11 @@ public TransformOutput<K1, K2, T> to(String outputDirectory) {
.build();
}

/** Specifies the avro {@link AvroDatumFactory} for reading and writing. */
public TransformOutput<K1, K2, T> withDatumFactory(AvroDatumFactory<T> datumFactory) {
return toBuilder().setDatumFactory(datumFactory).build();
}

/** Specifies the temporary directory for writing. Defaults to --tempLocation if not set. */
public TransformOutput<K1, K2, T> withTempDirectory(String tempDirectory) {
return toBuilder()
Expand Down

0 comments on commit ae52478

Please sign in to comment.