diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc index c2298f4608c..08d93826eb8 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc @@ -9,9 +9,14 @@ "doc" : "Primary key for the store", "compliance" : "NONE" }, { - "name" : "timestamp", + "name" : "txId", + "type" : "string", + "doc" : "ID to uniquely identify the transaction. Used for identifying duplicate messages with different timestamps for the same transaction.", + "compliance" : "NONE" + }, { + "name" : "produceTimestampMillis", "type" : "long", - "doc" : "Time the change occurred", + "doc" : "Time the change was produced to topic (separate than the time of the update to the store)", "compliance" : "NONE" }, { "name": "operationType", diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java index 8a28bf27019..bb0b96f903b 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java @@ -22,6 +22,7 @@ import java.util.Properties; import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeSuite; import org.testng.annotations.Test; @@ -155,6 +156,26 @@ public void testConsumerManualOffsetCommit() throws Exception { consumer.shutDown(); } + @Test + public void testCalculateProduceToConsumeLag() { + Properties consumerProps = new Properties(); + consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers); + consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest"); + //Generate a brand new consumer group id to ensure there are no previously committed offsets for this group id + String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", System.currentTimeMillis()); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + HighLevelConsumer.GROUP_ID_KEY, consumerGroupId); + consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, "true"); + MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps), + NUM_PARTITIONS) { + @Override public Long calcMillisSince(Long timestamp) { + return 1234L - timestamp; + } + }; + Long produceTimestamp = 1000L; + Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L)); + } + private List createByteArrayMessages() { List records = Lists.newArrayList(); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java index 7541503ee96..0e68ea9aaeb 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java @@ -333,4 +333,8 @@ public void run() { } } } + + public Long calcMillisSince(Long timestamp) { + return System.currentTimeMillis() - timestamp; + } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java index c2889ea95dd..1b7bb3ae573 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java @@ -36,15 +36,19 @@ public class RuntimeMetrics { public static final String GOBBLIN_JOB_MONITOR_SLAEVENT_REJECTEDEVENTS = "gobblin.jobMonitor.slaevent.rejectedevents"; public static final String GOBBLIN_JOB_MONITOR_KAFKA_MESSAGE_PARSE_FAILURES = "gobblin.jobMonitor.kafka.messageParseFailures"; - public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.successful.added.specs"; - public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.failed.added.specs"; - public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.deleted.specs"; - public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.unexpected.errors"; - public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.message.processed"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.kills.invoked"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStoreMonitor.message.processed"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.resumes.invoked"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.unexpected.errors"; + public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.successful.added.specs"; + public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.failed.added.specs"; + public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.deleted.specs"; + public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.unexpected.errors"; + public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.message.processed"; + public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = + ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specstoreMonitor.produce.to.consume.delay"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.kills.invoked"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.message.processed"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.resumes.invoked"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.unexpected.errors"; + public static final String + GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.produce.to.consume.delay"; public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.unexpected.errors"; public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded"; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 8aecd000a60..a42a01e2314 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareMeter; import org.apache.gobblin.runtime.api.DagActionStore; import org.apache.gobblin.runtime.api.SpecNotFoundException; @@ -53,6 +54,9 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer { private ContextAwareMeter resumesInvoked; private ContextAwareMeter unexpectedErrors; private ContextAwareMeter messageProcessedMeter; + private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge + + private volatile Long produceToConsumeDelayValue = -1L; protected CacheLoader cacheLoader = new CacheLoader() { @Override @@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) { String key = (String) message.getKey(); DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue(); - Long timestamp = value.getChangeEventIdentifier().getTimestamp(); + String tid = value.getChangeEventIdentifier().getTxId(); + Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestampMillis(); String operation = value.getChangeEventIdentifier().getOperationType().name(); String flowGroup = value.getFlowGroup(); String flowName = value.getFlowName(); String flowExecutionId = value.getFlowExecutionId(); - log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}", - flowGroup, flowName, flowExecutionId, timestamp, operation); + produceToConsumeDelayValue = calcMillisSince(produceTimestamp); + log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} tid: {} operation: {} lag: {}", + flowGroup, flowName, flowExecutionId, tid, operation, produceToConsumeDelayValue); - String changeIdentifier = timestamp + key; + String changeIdentifier = tid + key; if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, dagActionsSeenCache, operation, - timestamp.toString())) { + produceTimestamp.toString())) { return; } @@ -119,15 +125,17 @@ protected void processMessage(DecodeableKafkaRecord message) { try { dagAction = dagActionStore.getDagAction(flowGroup, flowName, flowExecutionId).getDagActionValue(); } catch (IOException e) { - log.warn("Encountered IOException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, e); + log.error("Encountered IOException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, e); this.unexpectedErrors.mark(); + return; } catch (SpecNotFoundException e) { - log.warn("DagAction not found for flow group: {} name: {} executionId: {} Exception: {}", flowGroup, flowName, + log.error("DagAction not found for flow group: {} name: {} executionId: {} Exception: {}", flowGroup, flowName, flowExecutionId, e); this.unexpectedErrors.mark(); + return; } catch (SQLException throwables) { - log.warn("Encountered SQLException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, throwables); - throwables.printStackTrace(); + log.error("Encountered SQLException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, throwables); + return; } } @@ -176,6 +184,7 @@ protected void createMetrics() { this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED); this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS); this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED); + this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java index 1e834c76d80..4f7ac6e866c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java @@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareMeter; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.Spec; @@ -58,6 +59,9 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer { private ContextAwareMeter failedAddedSpecs; private ContextAwareMeter deletedSpecs; private ContextAwareMeter unexpectedErrors; + private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge + + private volatile Long produceToConsumeDelayValue = -1L; protected CacheLoader cacheLoader = new CacheLoader() { @Override @@ -102,14 +106,17 @@ protected void processMessage(DecodeableKafkaRecord message) { String key = (String) message.getKey(); GenericStoreChangeEvent value = (GenericStoreChangeEvent) message.getValue(); - Long timestamp = value.getTimestamp(); + String tid = value.getTxId(); + Long produceTimestamp = value.getProduceTimestampMillis(); String operation = value.getOperationType().name(); - log.debug("Processing message where specUri is {} timestamp: {} operation: {}", key, timestamp, operation); + produceToConsumeDelayValue = calcMillisSince(produceTimestamp); + log.debug("Processing message where specUri is {} tid: {} operation: {} delay: {}", key, tid, operation, + produceToConsumeDelayValue); - String changeIdentifier = timestamp + key; + String changeIdentifier = tid + key; if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, specChangesSeenCache, operation, - timestamp.toString())) { + produceTimestamp.toString())) { return; } @@ -158,6 +165,7 @@ protected void processMessage(DecodeableKafkaRecord message) { } catch (Exception e) { log.warn("Ran into unexpected error processing SpecStore changes. Reexamine scheduler. Error: {}", e); this.unexpectedErrors.mark(); + return; } specChangesSeenCache.put(changeIdentifier, changeIdentifier); @@ -171,5 +179,6 @@ protected void createMetrics() { this.deletedSpecs = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS); this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS); this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED); + this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue); } } \ No newline at end of file