Skip to content

Commit

Permalink
Merge pull request #13 from apache/master
Browse files Browse the repository at this point in the history
merge master
  • Loading branch information
fengjian428 authored Jul 10, 2022
2 parents 424c1da + 63f95ab commit 4468648
Show file tree
Hide file tree
Showing 87 changed files with 2,782 additions and 855 deletions.
2 changes: 1 addition & 1 deletion conf/hudi-defaults.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

# Example:
# hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
# hoodie.datasource.hive_sync.use_jdbc true
# hoodie.datasource.hive_sync.mode jdbc
# hoodie.datasource.hive_sync.support_timestamp false
# hoodie.index.type BLOOM
# hoodie.metadata.enable false
3 changes: 2 additions & 1 deletion docker/demo/config/hoodie-incr.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ hoodie.deltastreamer.source.hoodieincr.path=/docker_hoodie_sync_valid_test
hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
# hive sync
hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2
hoodie.datasource.hive_sync.mode=jdbc
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000
hoodie.datasource.hive_sync.partition_fields=partition
hoodie.datasource.hive_sync.partition_fields=partition
2 changes: 2 additions & 0 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), "jdbc").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
Expand Down Expand Up @@ -79,6 +80,7 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), "jdbc").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSavepointException;
Expand Down Expand Up @@ -538,7 +538,7 @@ private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, Stri
private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) {
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(rollbackUsingMarkers)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
HoodieFailedWritesCleaningPolicy.EAGER).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.client.HoodieTimelineArchiver;
Expand Down Expand Up @@ -72,7 +73,8 @@ public void init() throws Exception {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.forTable("test-trip-table").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.client.HoodieTimelineArchiver;
Expand Down Expand Up @@ -212,7 +213,8 @@ public void testShowArchivedCommits(boolean enableMetadataTable) throws Exceptio
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
Expand Down Expand Up @@ -266,7 +268,8 @@ public void testShowArchivedCommitsWithMultiCommitsFile(boolean enableMetadataTa
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
Expand Down Expand Up @@ -166,7 +167,8 @@ private void generateArchive() throws IOException {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.forTable("test-trip-table").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand Down Expand Up @@ -276,15 +277,21 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
TableSchemaResolver schemaUtil = new TableSchemaResolver(table.getMetaClient());
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty()) {
InternalSchema internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) {
InternalSchema internalSchema;
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema()));
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema, internalSchema);
if (historySchemaStr.isEmpty()) {
internalSchema = AvroInternalSchemaConverter.convert(avroSchema);
internalSchema.setSchemaId(Long.parseLong(instantTime));
} else {
internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
}
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema);
if (evolvedSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
//TODO save history schema by metaTable
schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr.isEmpty() ? SerDeHelper.inheritSchemas(evolvedSchema, "") : historySchemaStr);
} else {
evolvedSchema.setSchemaId(Long.parseLong(instantTime));
String newSchemaStr = SerDeHelper.toJson(evolvedSchema);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.config;

import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;

import javax.annotation.concurrent.Immutable;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;

/**
* Archival related config.
*/
@Immutable
@ConfigClassProperty(name = "Archival Configs",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Configurations that control archival.")
public class HoodieArchivalConfig extends HoodieConfig {

public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
.key("hoodie.archive.automatic")
.defaultValue("true")
.withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+ " to archive commits if we cross a maximum value of commits."
+ " It's recommended to enable this, to ensure number of active commits is bounded.");

public static final ConfigProperty<String> ASYNC_ARCHIVE = ConfigProperty
.key("hoodie.archive.async")
.defaultValue("false")
.sinceVersion("0.11.0")
.withDocumentation("Only applies when " + AUTO_ARCHIVE.key() + " is turned on. "
+ "When turned on runs archiver async with writing, which can speed up overall write performance.");

public static final ConfigProperty<String> MAX_COMMITS_TO_KEEP = ConfigProperty
.key("hoodie.keep.max.commits")
.defaultValue("30")
.withDocumentation("Archiving service moves older entries from timeline into an archived log after each write, to "
+ " keep the metadata overhead constant, even as the table size grows."
+ "This config controls the maximum number of instants to retain in the active timeline. ");

public static final ConfigProperty<Integer> DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE = ConfigProperty
.key("hoodie.archive.delete.parallelism")
.defaultValue(100)
.withDocumentation("Parallelism for deleting archived hoodie commits.");

public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP = ConfigProperty
.key("hoodie.keep.min.commits")
.defaultValue("20")
.withDocumentation("Similar to " + MAX_COMMITS_TO_KEEP.key() + ", but controls the minimum number of"
+ "instants to retain in the active timeline.");

public static final ConfigProperty<String> COMMITS_ARCHIVAL_BATCH_SIZE = ConfigProperty
.key("hoodie.commits.archival.batch")
.defaultValue(String.valueOf(10))
.withDocumentation("Archiving of instants is batched in best-effort manner, to pack more instants into a single"
+ " archive log. This config controls such archival batch size.");

public static final ConfigProperty<Integer> ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty
.key("hoodie.archive.merge.files.batch.size")
.defaultValue(10)
.withDocumentation("The number of small archive files to be merged at once.");

public static final ConfigProperty<Long> ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
.key("hoodie.archive.merge.small.file.limit.bytes")
.defaultValue(20L * 1024 * 1024)
.withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file.");

public static final ConfigProperty<Boolean> ARCHIVE_MERGE_ENABLE = ConfigProperty
.key("hoodie.archive.merge.enable")
.defaultValue(false)
.withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's"
+ " useful when storage scheme doesn't support append operation.");

/**
* @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
*/
@Deprecated
public static final String MAX_COMMITS_TO_KEEP_PROP = MAX_COMMITS_TO_KEEP.key();
/**
* @deprecated Use {@link #MIN_COMMITS_TO_KEEP} and its methods instead
*/
@Deprecated
public static final String MIN_COMMITS_TO_KEEP_PROP = MIN_COMMITS_TO_KEEP.key();
/**
* @deprecated Use {@link #COMMITS_ARCHIVAL_BATCH_SIZE} and its methods instead
*/
@Deprecated
public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = COMMITS_ARCHIVAL_BATCH_SIZE.key();
/** @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */
@Deprecated
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = MAX_COMMITS_TO_KEEP.defaultValue();
/**
* @deprecated Use {@link #MIN_COMMITS_TO_KEEP} and its methods instead
*/
@Deprecated
private static final String DEFAULT_MIN_COMMITS_TO_KEEP = MIN_COMMITS_TO_KEEP.defaultValue();
/**
* @deprecated Use {@link #COMMITS_ARCHIVAL_BATCH_SIZE} and its methods instead
*/
@Deprecated
private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = COMMITS_ARCHIVAL_BATCH_SIZE.defaultValue();

private HoodieArchivalConfig() {
super();
}

public static HoodieArchivalConfig.Builder newBuilder() {
return new HoodieArchivalConfig.Builder();
}

public static class Builder {

private final HoodieArchivalConfig archivalConfig = new HoodieArchivalConfig();

public HoodieArchivalConfig.Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
this.archivalConfig.getProps().load(reader);
return this;
}
}

public HoodieArchivalConfig.Builder fromProperties(Properties props) {
this.archivalConfig.getProps().putAll(props);
return this;
}

public HoodieArchivalConfig.Builder withAutoArchive(Boolean autoArchive) {
archivalConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive));
return this;
}

public HoodieArchivalConfig.Builder withAsyncArchive(Boolean asyncArchive) {
archivalConfig.setValue(ASYNC_ARCHIVE, String.valueOf(asyncArchive));
return this;
}

public HoodieArchivalConfig.Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
archivalConfig.setValue(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
archivalConfig.setValue(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
return this;
}

public HoodieArchivalConfig.Builder withArchiveMergeFilesBatchSize(int number) {
archivalConfig.setValue(ARCHIVE_MERGE_FILES_BATCH_SIZE, String.valueOf(number));
return this;
}

public HoodieArchivalConfig.Builder withArchiveMergeSmallFileLimit(long size) {
archivalConfig.setValue(ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES, String.valueOf(size));
return this;
}

public HoodieArchivalConfig.Builder withArchiveMergeEnable(boolean enable) {
archivalConfig.setValue(ARCHIVE_MERGE_ENABLE, String.valueOf(enable));
return this;
}

public HoodieArchivalConfig.Builder withArchiveDeleteParallelism(int archiveDeleteParallelism) {
archivalConfig.setValue(DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE, String.valueOf(archiveDeleteParallelism));
return this;
}

public HoodieArchivalConfig.Builder withCommitsArchivalBatchSize(int batchSize) {
archivalConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
return this;
}

public HoodieArchivalConfig build() {
archivalConfig.setDefaults(HoodieArchivalConfig.class.getName());
return archivalConfig;
}
}
}
Loading

0 comments on commit 4468648

Please sign in to comment.