Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
* upstream/master:
  [GOBBLIN-1710]  Codecov should be optional in CI and not fail Github Actions (apache#3562)
  Define basics for collecting Iceberg metadata for the current snapshot (apache#3559)
  [GOBBLIN-1698] Fast fail during work unit generation based on config. (apache#3542)
  • Loading branch information
phet committed Sep 27, 2022
2 parents 6930a69 + e385ea4 commit 8219430
Show file tree
Hide file tree
Showing 62 changed files with 3,356 additions and 118 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:
restore-keys: ${{ runner.os }}-gradle
- name: Run CheckStyle and FindBugs
run: |
./gradlew --no-daemon javadoc findbugsMain checkstyleMain checkstyleTest checkstyleJmh
./gradlew --no-daemon javadoc findbugsMain checkstyleMain checkstyleTest checkstyleJmh
run_tests:
timeout-minutes: 120
Expand Down Expand Up @@ -172,5 +172,5 @@ jobs:
uses: codecov/codecov-action@v2
with:
files: ${{ env.jacoco_reports }}
fail_ci_if_error: true
fail_ci_if_error: false
verbose: true
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ public class ConfigurationKeys {
public static final String WORK_UNIT_STATE_ACTUAL_HIGH_WATER_MARK_KEY = "workunit.state.actual.high.water.mark";
public static final String WORK_UNIT_DATE_PARTITION_KEY = "workunit.source.date.partition";
public static final String WORK_UNIT_DATE_PARTITION_NAME = "workunit.source.date.partitionName";
public static final String WORK_UNIT_GENERATOR_FAILURE_IS_FATAL = "workunit.generator.failure.is.fatal";
public static final boolean DEFAULT_WORK_UNIT_FAST_FAIL_ENABLED = true;

/**
* Task execution properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class DatasetConstants {
public static final String PLATFORM_HIVE = "hive";
public static final String PLATFORM_SALESFORCE = "salesforce";
public static final String PLATFORM_MYSQL = "mysql";
public static final String PLATFORM_ICEBERG = "iceberg";

/** Common metadata */
public static final String BRANCH = "branch";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public class ServiceConfigKeys {

public static final int MAX_FLOW_NAME_LENGTH = 128; // defined in FlowId.pdl
public static final int MAX_FLOW_GROUP_LENGTH = 128; // defined in FlowId.pdl
public static final int MAX_FLOW_EXECUTION_ID_LENGTH = 13; // length of flowExecutionId which is epoch timestamp
public static final int MAX_JOB_NAME_LENGTH = 374;
public static final int MAX_JOB_GROUP_LENGTH = 374;
public static final String STATE_STORE_TABLE_SUFFIX = "gst";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import java.util.function.Function;

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.listeners.ControllerChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
Expand Down Expand Up @@ -86,6 +88,9 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
@Getter
private HelixManager jobClusterHelixManager = null;

@Getter
private HelixAdmin jobClusterHelixAdmin = null;

/**
* Helix manager to handle planning job distribution.
* Corresponds to cluster with key name {@link GobblinClusterConfigurationKeys#HELIX_CLUSTER_NAME_KEY}.
Expand Down Expand Up @@ -170,6 +175,16 @@ protected static HelixManager buildHelixManager(Config config, String clusterNam
config.getString(clusterName), helixInstanceName, type, zkConnectionString);
}

/**
* Build the {@link org.apache.helix.HelixAdmin} for the AM
*/
protected static HelixAdmin buildHelixAdmin(Config cfg) {
String zkConnectionString = cfg.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
return new ZKHelixAdmin.Builder()
.setZkAddress(zkConnectionString)
.build();
}

public void initialize() {
if (this.dedicatedManagerCluster) {
Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY));
Expand All @@ -183,6 +198,7 @@ public void initialize() {
this.jobClusterHelixManager = buildHelixManager(this.config,
GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
InstanceType.ADMINISTRATOR);
this.jobClusterHelixAdmin = buildHelixAdmin(this.config);

// This will create a dedicated controller for job distribution
this.dedicatedJobClusterController = ConfigUtils.getBoolean(
Expand Down Expand Up @@ -225,6 +241,7 @@ public void initialize() {
GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
isHelixClusterManaged ? InstanceType.PARTICIPANT : InstanceType.CONTROLLER);
this.jobClusterHelixManager = this.managerClusterHelixManager;
this.jobClusterHelixAdmin = buildHelixAdmin(this.config);
}
}

Expand Down
4 changes: 4 additions & 0 deletions gobblin-data-management/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ dependencies {
compile externalDependency.testng
compile externalDependency.junit

testCompile(group: 'org.apache.iceberg', name: 'iceberg-hive-metastore', version: '0.11.1', classifier: 'tests') {
transitive = false
}
testCompile('org.apache.hadoop:hadoop-common:2.6.0')
testCompile project(":gobblin-test-utils")
testCompile project(path: ":gobblin-metastore", configuration: "testFixtures")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public List<WorkUnit> getWorkunits(final SourceState state) {
failJobIfAllRequestsRejected(allocator, prioritizedFileSets);

String filesetWuGeneratorAlias = state.getProp(ConfigurationKeys.COPY_SOURCE_FILESET_WU_GENERATOR_CLASS, FileSetWorkUnitGenerator.class.getName());
boolean shouldWuGeneratorFailureBeFatal = state.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_GENERATOR_FAILURE_IS_FATAL, ConfigurationKeys.DEFAULT_WORK_UNIT_FAST_FAIL_ENABLED);
Iterator<Callable<Void>> callableIterator =
Iterators.transform(prioritizedFileSets, new Function<FileSet<CopyEntity>, Callable<Void>>() {
@Nullable
Expand All @@ -239,6 +240,9 @@ public Callable<Void> apply(FileSet<CopyEntity> input) {
future.get();
} catch (ExecutionException exc) {
log.error("Failed to get work units for dataset.", exc.getCause());
if (shouldWuGeneratorFailureBeFatal) {
throw new RuntimeException("Failed to get work units for dataset.", exc.getCause());
}
}
}
} catch (InterruptedException ie) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.data.management.copy;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -208,8 +209,8 @@ protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter f
try {
return FileListUtils
.listFilesToCopyAtPath(fs, path, fileFilter, applyFilterToDirectories, includeEmptyDirectories);
} catch (IOException e) {
log.warn(String.format("Could not find any files on fs %s path %s due to the following exception. Returning an empty list of files.", fs.getUri(), path), e);
} catch (FileNotFoundException fnfe) {
log.warn(String.format("Could not find any files on fs %s path %s due to the following exception. Returning an empty list of files.", fs.getUri(), path), fnfe);
return Lists.newArrayList();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,40 @@ protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter f
return recursivelyGetFilesAtDatePath(fs, path, "", fileFilter, 1, startDate, endDate, formatter);
}

/**
* Checks if the datePath provided is in the range of the start and end dates.
* Rounds startDate and endDate to the same granularity as datePath prior to comparing.
* @param startDate
* @param endDate
* @param datePath
* @param datePathFormat (This is the user set desired format)
* @param level
* @return true if the datePath provided is in the range of start and end dates, inclusive.
*/
public static Boolean checkPathDateTimeValidity(LocalDateTime startDate, LocalDateTime endDate, String datePath,
String datePathFormat, int level) {
String [] datePathFormatArray = datePathFormat.split("/");
String datePathPattern = String.join(FileSystems.getDefault().getSeparator(), Arrays.asList(datePathFormatArray).subList(0, level - 1));
try {
DateTimeFormatter formatGranularity = DateTimeFormat.forPattern(datePathPattern);
LocalDateTime traversedDatePathRound = formatGranularity.parseLocalDateTime(datePath);
LocalDateTime startDateRound = formatGranularity.parseLocalDateTime(startDate.toString(datePathPattern));
LocalDateTime endDateRound = formatGranularity.parseLocalDateTime(endDate.toString(datePathPattern));
return !traversedDatePathRound.isBefore(startDateRound) && !traversedDatePathRound.isAfter(endDateRound);
} catch (IllegalArgumentException e) {
log.error(String.format("Cannot parse path provided %s, expected in format of %s", datePath, datePathFormat));
return false;
}
}

private List<FileStatus> recursivelyGetFilesAtDatePath(FileSystem fs, Path path, String traversedDatePath, PathFilter fileFilter,
int level, LocalDateTime startDate, LocalDateTime endDate, DateTimeFormatter formatter) throws IOException {
List<FileStatus> fileStatuses = Lists.newArrayList();
if (!traversedDatePath.isEmpty()) {
if (!checkPathDateTimeValidity(startDate, endDate, traversedDatePath, this.datePattern, level)) {
return fileStatuses;
}
}
Iterator<FileStatus> folderIterator;
try {
if (!fs.exists(path)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;


/**
* Any catalog from which to access {@link IcebergTable}s.
*/
public interface IcebergCatalog {
IcebergTable openTable(String dbName, String tableName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hive.HiveCatalog;

import com.google.common.collect.Maps;


/**
* Provides an {@link IcebergCatalog}.
*/
public class IcebergCatalogFactory {
public static IcebergCatalog create(Configuration configuration) {
HiveCatalog hcat = new HiveCatalog();
hcat.setConf(configuration);
hcat.initialize("hive", Maps.newHashMap());
return new IcebergHiveCatalog(hcat);
}
}
Loading

0 comments on commit 8219430

Please sign in to comment.