Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
* upstream/master: (124 commits)
  [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs apache#3552
  fix helix job wait completion bug when job goes to STOPPING state (apache#3556)
  [GOBBLIN-1695] Fix: Failure to add spec executors doesn't block deployment (apache#3551)
  [GOBBLIN-1701] Replace jcenter with either maven central or gradle plugin portal (apache#3554)
  [GOBBLIN-1700] Remove unused coveralls-gradle-plugin dependency
  add MysqlUserQuotaManager (apache#3545)
  [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode (apache#3544)
  Add GMCE topic explicitly to hive commit event (apache#3547)
  [GOBBLIN-1678] Refactor git flowgraph component to be extensible (apache#3536)
  [GOBBLIN-1690] Added logging to ORC writer
  Allow all iceberg exceptions to be fault tolerant (apache#3541)
  Guard against exists fs call as well (apache#3538)
  Add error handling for timeaware finder to handle scenarios where fil… (apache#3537)
  [GOBBLIN-1675] Add pagination for GaaS on server side (apache#3533)
  [GOBBLIN-1672] Refactor metrics from DagManager into its own class, add metrics per … (apache#3532)
  [GOBBLIN-1677] Fix timezone property to read from key correctly (apache#3535)
  [Gobblin-931] Fix typo in gobblin CLI usage (apache#3530)
  [GOBBLIN-1671] : Fix gobblin.sh script to add external jars as colon separated to HADOOP_CLASSPATH (apache#3531)
  [GOBBLIN-1656] Return a http status 503 on GaaS when quota is exceeded for user or flowgroup (apache#3516)
  [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time… (apache#3528)
  [GOBBLIN-1670] Remove rat tasks and unneeded checkstyles blocking build pipeline (apache#3529)
  [GOBBLIN-1668] Add audit counts for iceberg registration (apache#3527)
  [GOBBLIN-1667] Create new predicate - ExistingPartitionSkipPredicate (apache#3526)
  Calculate requested container count based on adding allocated count and outstanding ContainerRequests in Yarn (apache#3524)
  make the requestedContainerCountMap correctly update the container count (apache#3523)
  Fix running counts for retried flows (apache#3520)
  Allow table to flush after write failure (apache#3522)
  [GOBBLIN-1652]Add more log in the KafkaJobStatusMonitor in case it fails to process one GobblinTrackingEvent (apache#3513)
  Make Yarn container and helix instance allocation group by tag (apache#3519)
  [GOBBLIN-1657] Update completion watermark on change_property in IcebergMetadataWriter (apache#3517)
  [GOBBLIN-1654] Add capacity floor to avoid aggressively requesting resource and small files. (apache#3515)
  [GOBBLIN-1653] Shorten job name length if it exceeds 255 characters (apache#3514)
  [GOBBLIN-1650] Implement flowGroup quotas for the DagManager (apache#3511)
  [GOBBLIN-1648] Complete use of JDBC `DataSource` 'read-only' validation query by incorporating where previously omitted (apache#3509)
  Add config to set close timeout in HiveRegister (apache#3512)
  add an API in AbstractBaseKafkaConsumerClient to list selected topics (apache#3501)
  [GOBBLIN-1649] Revert gobblin-1633 (apache#3510)
  [GOBBLIN-1639] Prevent metrics reporting if configured, clean up workunit count metric (apache#3500)
  [GOBBLIN-1647] Add hive commit GTE to HiveMetadataWriter (apache#3508)
  [GOBBLIN-1633] Fix compaction actions on job failure not retried if compaction succeeds (apache#3494)
  [GOBBLIN-1646] Revert yarn container / helix tag group changes (apache#3507)
  [GOBBLIN-1641] Add meter for sla exceeded flows (apache#3502)
  GOBBLIN-1644 (apache#3506)
  [GOBBLIN-1645]Change the prefix of dagManager heartbeat to make it consistent with other metrics (apache#3505)
  Fix bug when shrinking the container in Yarn service (apache#3504)
  [GOBBLIN-1637] Add writer, operation, and partition info to failed metadata writer events (apache#3498)
  [GOBBLIN-1638] Fix unbalanced running count metrics due to Azkaban failures (apache#3499)
  [GOBBLIN-1634] Add retries on flow sla kills (apache#3495)
  [GOBBLIN-1620]Make yarn container allocation group by helix tag (apache#3487)
  [GOBBLIN-1636] Close DatasetCleaner after clean task (apache#3497)
  [GOBBLIN-1635] Avoid loading env configuration when using config store to improve the performance (apache#3496)
  use user supplied props to create FileSystem in DatasetCleanerTask (apache#3483)
  [GOBBLIN-1619] WriterUtils.mkdirsWithRecursivePermission contains race condition and puts unnecessary load on filesystem (apache#3477)
  use data node aliases to figure out data node names before using DMAS (apache#3493)
  [GOBBLIN-1630] Remove flow level metrics for adhoc flows (apache#3491)
  [GOBBLIN-1631]Emit heartbeat for dagManagerThread (apache#3492)
  [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running … (apache#3481)
  [GOBBLIN-1613] Add metadata writers field to GMCE schema (apache#3490)
  Update README.md
  [GOBBLIN-1629] Make GobblinMCEWriter be able to catch error when calculating hive specs (apache#3489)
  Add/fix some fields of MetadataWriterFailureEvent (apache#3485)
  [GOBBLIN-1627] provide option to convert datanodes names (apache#3484)
  Add coverage for edge cases when table paths do not exist, check parents (apache#3482)
  [GOBBLIN-1616] Add close connection logic in salseforceSource (apache#3486)
  [GOBBLIN-1621] Make HelixRetriggeringJobCallable emit job skip event when job is dropped due to previous job is running (apache#3478)
  [GOBBLIN-1623] Fix NPE when try to close RestApiConnector (apache#3480)
  Clear bad mysql packages from cache in CI/CD machines (apache#3479)
  [GOBBLIN-1617] pass configurations to some HadoopUtils APIs (apache#3475)
  [GOBBLIN-1616] Make RestApiConnector be able to close the connection finally (apache#3474)
  add config to set log level for any class (apache#3473)
  Fix bug where partitioned tables would always return the wrong equality in paths (apache#3472)
  [GOBBLIN-1602] Change hive table location and partition check to validate using FS r… (apache#3459)
  Don't flush on change_property operation (apache#3467)
  Fix case where error GTE is incorrectly sent from MCE writer (apache#3466)
  partial rollback of PR 3464 (apache#3465)
  [GOBBLIN-1604] Throw exception if there are no allocated requests due to lack of res… (apache#3461)
  [GOBBLIN-1603] Throws error if configured when encountering an IO exception while co… (apache#3460)
  [GOBBLIN-1606] change DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE value (apache#3464)
  Upgraded dropwizard metrics library version from 3.2.3 -> 4.1.2 and added a new wrapper class on dropwizard Timer.Context class to handle the code compatibility as the newer version of this class implements AutoClosable instead of Closable. (apache#3463)
  [GOBBLIN-1605] Fix mysql ubuntu download 404 not found for Github Actions CI/CD (apache#3462)
  [GOBBLIN-1601] implement ChangePermissionCommitStep (apache#3457)
  [GOBBLIN-1598]Fix metrics already exist issue in dag manager (apache#3454)
  [GOBBLIN-1597] Add error handling in dagmanager to continue if dag fails to process,… (apache#3452)
  GOBBLIN-1579 Fail job on hive existing target table location mismatch (apache#3433)
  [GOBBLIN-1596] Ignore already exists exception if the table has already been created… (apache#3451)
  [GOBBLIn-1595]Fix the dead lock during hive registration (apache#3450)
  Add guard in DagManager for improperly formed SLA (apache#3449)
  [GOBBLIN-1588] Send failure events for write failures when watermark is advanced in MCE writer (apache#3441)
  [GOBBLIN-1593] Fix bugs in dag manager about metric reporting and job status monitor (apache#3448)
  Fix bug in `JobSpecSerializer` of inadequately preventing access errors (within `MysqlJobCatalog`) (apache#3447)
  [GOBBLIN-1583] Add System level job start SLA (apache#3437)
  [GOBBLIN-1592] Make hive copy be able to apply filter on directory (apache#3446)
  [GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max attempt number (apache#3439)
  [GOBBLIN-1590] Add low/high watermark information in event emitted by Gobblin cluster (apache#3443)
  [HotFix]Try to fix the mysql dependency issue in Github action (apache#3445)
  Lazily initialize FileContext and do not store a handle of it so it can be GC'ed when required (apache#3444)
  [GOBBLIN-1584] Add replace record logic for Mysql writer (apache#3438)
  Bump up code cov version (apache#3440)
  [GOBBLIN-1581] Iterate over Sql ResultSet in Only the Forward Direction (apache#3435)
  [GOBBLIN-1575] use reference count in helix manager, so that connect/disconnect are called once and at the right time (apache#3427)
  ...
  • Loading branch information
phet committed Sep 12, 2022
2 parents 384dc09 + cfb47d1 commit c28544c
Show file tree
Hide file tree
Showing 291 changed files with 11,112 additions and 2,917 deletions.
15 changes: 12 additions & 3 deletions .github/workflows/build_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ jobs:
restore-keys: ${{ runner.os }}-gradle
- name: Build repository
run: |
./gradlew --no-daemon clean build -x test -x javadoc -x findbugsMain -x findbugsTest -x checkstyleMain -x checkstyleJmh -x checkstyleTest -Dorg.gradle.parallel=true
./gradlew --no-daemon clean build -x test -x rat -x javadoc -x findbugsMain -x findbugsTest -x checkstyleMain \
-x checkstyleJmh -x checkstyleTest -x checkstyleMainGeneratedDataTemplate -x checkstyleMainGeneratedRest -Dorg.gradle.parallel=true
- name: Verify Dependencies
run: |
# Since dependencies are cached, check after building if they are valid or not
Expand Down Expand Up @@ -86,7 +87,7 @@ jobs:
restore-keys: ${{ runner.os }}-gradle
- name: Run CheckStyle and FindBugs
run: |
./gradlew --no-daemon javadoc findbugsMain checkstyleMain checkstyleTest checkstyleJmh
./gradlew --no-daemon javadoc findbugsMain checkstyleMain checkstyleTest checkstyleJmh
run_tests:
timeout-minutes: 120
Expand Down Expand Up @@ -124,6 +125,14 @@ jobs:
echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- name: Verify mysql connection
run: |
sudo apt-get clean
sudo apt-get --fix-missing update
sudo apt-get -f install -o Dpkg::Options::="--force-overwrite"
sudo apt-get purge mysql\*
sudo rm -rf /var/lib/mysql
sudo rm -rf /etc/mysql
sudo dpkg -l | grep -i mysql
sudo apt-get clean
sudo apt-get install -y mysql-client
mysql --host 127.0.0.1 --port 3306 -uroot -ppassword -e "SHOW DATABASES"
- name: Cache Gradle Dependencies
Expand Down Expand Up @@ -160,7 +169,7 @@ jobs:
JACOCO_REPORTS=$(find . -name "jacoco*.xml" -exec printf ',{}' \; | cut -c2- )
echo "jacoco_reports=$JACOCO_REPORTS" >> $GITHUB_ENV
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v2
with:
files: ${{ env.jacoco_reports }}
fail_ci_if_error: true
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ The distribution will be created in build/gobblin-distribution/distributions dir
* [Running Gobblin on Docker from your laptop](https://github.com/apache/gobblin/blob/master/gobblin-docs/user-guide/Docker-Integration.md)
* [Getting started guide](https://gobblin.apache.org/docs/Getting-Started/)
* [Gobblin architecture](https://gobblin.apache.org/docs/Gobblin-Architecture/)
* Community Slack: [Get your invite]( https://join.slack.com/t/apache-gobblin/shared_invite/zt-vqgdztup-UUq8S6gGJqE6L5~9~JelNg)
* Community Slack: [Get your invite](https://join.slack.com/t/apache-gobblin/shared_invite/zt-1723tsdhd-ITcAEsaQNpQvuQUFOgfHbQ)
* [List of companies known to use Gobblin](https://gobblin.apache.org/docs/Powered-By/)
* [Sample project](https://github.com/apache/gobblin/tree/master/gobblin-example)
* [How to build Gobblin from source code](https://gobblin.apache.org/docs/user-guide/Building-Gobblin/)
Expand Down
9 changes: 5 additions & 4 deletions bin/gobblin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ function print_gobblin_cli_usage() {
job-store-schema-manager Database job history store schema manager
gobblin-classpath shows the constructed gobblin classpath"
echo ""
echo " --conf-dir <gobblin-conf-dir-path> Gobblon config path. default is '\$GOBBLIN_HOME/conf/cli'."
echo " --conf-dir <gobblin-conf-dir-path> Gobblin config path. default is '\$GOBBLIN_HOME/conf/cli'."
echo " --log4j-conf <path-of-log4j-file> default is '<gobblin-conf-dir-path>/cli/log4j.properties'."
echo " --jvmopts <jvm or gc options> JVM or GC parameters for the java process to append to the default params: \"$JVM_OPTS\"."
echo " --jars <csv list of extra jars> Column-separated list of extra jars to put on the CLASSPATH."
Expand All @@ -133,7 +133,7 @@ function print_gobblin_service_usage() {
echo " --jt <resource manager URL> Only for mapreduce mode: Job submission URL, if not set, taken from \${HADOOP_HOME}/conf."
echo " --fs <file system URL> Only for mapreduce mode: Target file system, if not set, taken from \${HADOOP_HOME}/conf."
echo " --job-conf-file <job-conf-file-path> Only for mapreduce mode: configuration file for the job to run"
echo " --log-to-stdout Outputs to stdout rather than to a log file"
echo " --log-to-stdout Outputs to stdout rather than to a log file"
echo " --help Display this help."
echo " --verbose Display full command used to start the process."
}
Expand Down Expand Up @@ -301,7 +301,7 @@ fi

function build_classpath(){
GOBBLIN_CLASSPATH=''
# Build classpth
# Build classpath
GOBBLIN_JARS=''
GOBBLIN_HADOOP_JARS=''
GOBBLIN_CLASSPATH=''
Expand Down Expand Up @@ -332,7 +332,8 @@ function build_classpath(){
GOBBLIN_CLASSPATH=${GOBBLIN_JARS}:${GOBBLIN_HADOOP_CLASSPATH}

if [[ -n "$EXTRA_JARS" ]]; then
GOBBLIN_CLASSPATH=${GOBBLIN_CLASSPATH}:"$EXTRA_JARS"
# EXTRA_JARS has comma separated jars. Replace commas with colon for the classpath.
GOBBLIN_CLASSPATH=${GOBBLIN_CLASSPATH}:${EXTRA_JARS//,/:}
fi

GOBBLIN_CLASSPATH=${GOBBLIN_CONF}:${GOBBLIN_CLASSPATH}
Expand Down
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ buildscript {
dependencies {
classpath 'org.apache.ant:ant:1.9.4'
classpath 'gradle.plugin.org.inferred:gradle-processors:1.1.2'
classpath 'org.kt3k.gradle.plugin:coveralls-gradle-plugin:1.0.2'
classpath 'io.spring.gradle:dependency-management-plugin:1.0.11.RELEASE'
classpath 'me.champeau.gradle:jmh-gradle-plugin:0.4.8'
classpath "gradle.plugin.nl.javadude.gradle.plugins:license-gradle-plugin:0.14.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public class ConfigurationKeys {
public static final String JOB_GROUP_KEY = "job.group";
public static final String JOB_TAG_KEY = "job.tag";
public static final String JOB_DESCRIPTION_KEY = "job.description";
public static final String JOB_CURRENT_ATTEMPTS = "job.currentAttempts";
public static final String JOB_CURRENT_GENERATION = "job.currentGeneration";
// Job launcher type
public static final String JOB_LAUNCHER_TYPE_KEY = "launcher.type";
public static final String JOB_SCHEDULE_KEY = "job.schedule";
Expand Down Expand Up @@ -216,6 +218,11 @@ public class ConfigurationKeys {
public static final String QUEUED_TASK_TIME_MAX_AGE = "taskexecutor.queued_task_time.history.max_age";
public static final long DEFAULT_QUEUED_TASK_TIME_MAX_AGE = TimeUnit.HOURS.toMillis(1);

/**
* Optional property to specify whether existing data in databases can be overwritten during ingestion jobs
*/
public static final String ALLOW_JDBC_RECORD_OVERWRITE = "allow.jdbc.record.overwrite";

/**
* Optional property to specify a default Authenticator class for a job
*/
Expand Down Expand Up @@ -318,6 +325,12 @@ public class ConfigurationKeys {
public static final boolean DEFAULT_EXTRACT_LIMIT_ENABLED = false;
public static final String EXTRACT_ID_TIME_ZONE = "extract.extractIdTimeZone";
public static final String DEFAULT_EXTRACT_ID_TIME_ZONE = "UTC";
public static final String EXTRACT_SALESFORCE_BULK_API_MIN_WAIT_TIME_IN_MILLIS_KEY =
"extract.salesforce.bulkApi.minWaitTimeInMillis";
public static final long DEFAULT_EXTRACT_SALESFORCE_BULK_API_MIN_WAIT_TIME_IN_MILLIS = 60 * 1000L; // 1 min
public static final String EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS_KEY =
"extract.salesforce.bulkApi.maxWaitTimeInMillis";
public static final long DEFAULT_EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS = 10 * 60 * 1000L; // 10 min

/**
* Converter configuration properties.
Expand Down Expand Up @@ -882,8 +895,6 @@ public class ConfigurationKeys {
public static final String KAFKA_SOURCE_AVG_FETCH_TIME_CAP = "kakfa.source.avgFetchTimeCap";
public static final int DEFAULT_KAFKA_SOURCE_AVG_FETCH_TIME_CAP = 100;
public static final String SHARED_KAFKA_CONFIG_PREFIX = "gobblin.kafka.sharedConfig";
public static final String KAFKA_JOB_STATUS_MONITOR_RETRY_TIME_OUT_MINUTES =
"gobblin.kafka.jobStatusMonitor.retry.timeOut.minutes";

/**
* Kafka schema registry HTTP client configuration
Expand Down Expand Up @@ -996,13 +1007,21 @@ public class ConfigurationKeys {
public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
public static final String GOBBLIN_JOB_START_SLA_TIME = "gobblin.job.start.sla.time";
public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT = "gobblin.job.start.sla.timeunit";
public static final long DEFAULT_GOBBLIN_JOB_START_SLA_TIME = 10L;
public static final String DEFAULT_GOBBLIN_JOB_START_SLA_TIME_UNIT = "MINUTES";
public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT = "MINUTES";
public static final String DATASET_SUBPATHS_KEY = "gobblin.flow.dataset.subPaths";
public static final String DATASET_BASE_INPUT_PATH_KEY = "gobblin.flow.dataset.baseInputPath";
public static final String DATASET_BASE_OUTPUT_PATH_KEY = "gobblin.flow.dataset.baseOutputPath";
public static final String DATASET_COMBINE_KEY = "gobblin.flow.dataset.combine";
public static final String WHITELISTED_EDGE_IDS = "gobblin.flow.whitelistedEdgeIds";
public static final String GOBBLIN_OUTPUT_JOB_LEVEL_METRICS = "gobblin.job.outputJobLevelMetrics";

/**
* Configuration properties related to flowGraphs
*/

public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";

/***
* Configuration properties related to TopologySpec Store
Expand Down Expand Up @@ -1120,4 +1139,8 @@ public class ConfigurationKeys {
* */
public static final String TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE = "gobblin.troubleshooter.inMemoryIssueRepository.maxSize";
public static final int DEFAULT_TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE = 100;

public static final String JOB_METRICS_REPORTER_CLASS_KEY = "gobblin.job.metrics.reporter.class";
public static final String DEFAULT_JOB_METRICS_REPORTER_CLASS = "org.apache.gobblin.runtime.metrics.DefaultGobblinJobMetricReporter";

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class ServiceConfigKeys {

public static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
public static final String GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS = "org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler";
public static final String GOBBLIN_ORCHESTRATOR_LISTENER_CLASS = "org.apache.gobblin.service.modules.orchestration.Orchestrator";

// Gobblin Service Manager Keys
public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
Expand All @@ -36,7 +37,9 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "dagManager.enabled";
public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = false;
public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
// If true, will mark up/down d2 servers on leadership so that all requests will be routed to the leader node
public static final String GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER = GOBBLIN_SERVICE_PREFIX + "d2.onlyAnnounceLeader";

Expand All @@ -46,6 +49,7 @@ public class ServiceConfigKeys {
public static final String HELIX_INSTANCE_NAME_OPTION_NAME = "helix_instance_name";
public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helixInstanceName";
public static final String GOBBLIN_SERVICE_FLOWSPEC = GOBBLIN_SERVICE_PREFIX + "flowSpec";
public static final String GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowGraph.class";

// Helix message sub types for FlowSpec
public static final String HELIX_FLOWSPEC_ADD = "FLOWSPEC_ADD";
Expand All @@ -55,6 +59,7 @@ public class ServiceConfigKeys {
// Flow Compiler Keys
public static final String GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowCompiler.class";
public static final String COMPILATION_SUCCESSFUL = "compilation.successful";
public static final String COMPILATION_RESPONSE = "compilation.response";

// Flow Catalog Keys
public static final String GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = GOBBLIN_SERVICE_PREFIX + "flowCatalog.localCommit";
Expand Down Expand Up @@ -137,6 +142,10 @@ public class ServiceConfigKeys {

public static final String FORCE_LEADER = GOBBLIN_SERVICE_PREFIX + "forceLeader";
public static final boolean DEFAULT_FORCE_LEADER = false;

public static final String QUOTA_MANAGER_CLASS = GOBBLIN_SERVICE_PREFIX + "quotaManager.class";
public static final String DEFAULT_QUOTA_MANAGER = "org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager";

// Group Membership authentication service
public static final String GROUP_OWNERSHIP_SERVICE_CLASS = GOBBLIN_SERVICE_PREFIX + "groupOwnershipService.class";
public static final String DEFAULT_GROUP_OWNERSHIP_SERVICE = "org.apache.gobblin.service.NoopGroupOwnershipService";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.source;

import com.google.common.eventbus.EventBus;
import org.apache.gobblin.annotation.Alpha;


/**
* An interface for infinite source, where source should be able to detect the work unit change
* and post the change through eventBus
*
* @author Zihan Li
*
* @param <S> output schema type
* @param <D> output record type
*/
@Alpha
public interface InfiniteSource<S, D> extends Source<S, D>{

/**
* Return the eventBus where it will post {@link org.apache.gobblin.stream.WorkUnitChangeEvent} when workUnit change
*/
EventBus getEventBus();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.stream;

import java.util.List;
import lombok.Getter;
import org.apache.gobblin.source.workunit.WorkUnit;

/**
* The event for {@link org.apache.gobblin.source.InfiniteSource} to indicate there is a change in work units
* Job launcher should then be able to handle this event
*/
public class WorkUnitChangeEvent {
@Getter
private final List<String> oldTaskIds;
@Getter
private final List<WorkUnit> newWorkUnits;
public WorkUnitChangeEvent(List<String> oldTaskIds, List<WorkUnit> newWorkUnits) {
this.oldTaskIds = oldTaskIds;
this.newWorkUnits = newWorkUnits;
}
}
4 changes: 3 additions & 1 deletion gobblin-aws/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ dependencies {
compile externalDependency.hadoopYarnClient
compile externalDependency.avroMapredH2
compile externalDependency.findBugsAnnotations
compile externalDependency.helix
compile (externalDependency.helix) {
exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
}

testCompile project(path: ':gobblin-cluster', configuration: 'tests')
testCompile project(":gobblin-example")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -83,10 +83,10 @@ public GobblinAWSClusterManager(String clusterName, String applicationId, Config
}

/**
* A custom {@link MessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
* A custom {@link MultiTypeMessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
* handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
*/
private static class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
private static class ControllerUserDefinedMessageHandlerFactory implements MultiTypeMessageHandlerFactory {

@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
Expand Down Expand Up @@ -122,7 +122,7 @@ public ControllerUserDefinedMessageHandler(Message message, NotificationContext
}

@Override
public HelixTaskResult handleMessage() throws InterruptedException {
public HelixTaskResult handleMessage() {
LOGGER.warn(String
.format("No handling setup for %s message of subtype: %s", Message.MessageType.USER_DEFINE_MSG.toString(),
this._message.getMsgSubType()));
Expand Down
4 changes: 3 additions & 1 deletion gobblin-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ dependencies {
compile externalDependency.hadoopCommon
compile externalDependency.avroMapredH2
compile externalDependency.findBugsAnnotations
compile externalDependency.helix
compile (externalDependency.helix) {
exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
}

runtimeOnly project(":gobblin-modules:gobblin-service-kafka")

Expand Down
Loading

0 comments on commit c28544c

Please sign in to comment.