Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1766] Define metric to measure lag from producing to consume… #3625

Merged
merged 5 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@
"doc" : "Primary key for the store",
"compliance" : "NONE"
}, {
"name" : "timestamp",
"name" : "txId",
"type" : "string",
"doc" : "ID to uniquely identify the transaction. Used for identifying duplicate messages with different timestamps for the same transaction.",
"compliance" : "NONE"
}, {
"name" : "produceTimestampMillis",
"type" : "long",
"doc" : "Time the change occurred",
"doc" : "Time the change was produced to topic (separate than the time of the update to the store)",
"compliance" : "NONE"
}, {
"name": "operationType",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Properties;

import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -155,6 +156,26 @@ public void testConsumerManualOffsetCommit() throws Exception {
consumer.shutDown();
}

@Test
public void testCalculateProduceToConsumeLag() {
Properties consumerProps = new Properties();
consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
//Generate a brand new consumer group id to ensure there are no previously committed offsets for this group id
String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", System.currentTimeMillis());
consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + HighLevelConsumer.GROUP_ID_KEY, consumerGroupId);
consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, "true");
MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps),
NUM_PARTITIONS) {
@Override public Long calcMillisSince(Long timestamp) {
return 1234L - timestamp;
}
};
Long produceTimestamp = 1000L;
Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L));
}

private List<byte[]> createByteArrayMessages() {
List<byte[]> records = Lists.newArrayList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,8 @@ public void run() {
}
}
}

public Long calcMillisSince(Long timestamp) {
return System.currentTimeMillis() - timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,19 @@ public class RuntimeMetrics {
public static final String GOBBLIN_JOB_MONITOR_SLAEVENT_REJECTEDEVENTS = "gobblin.jobMonitor.slaevent.rejectedevents";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_MESSAGE_PARSE_FAILURES =
"gobblin.jobMonitor.kafka.messageParseFailures";
public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.successful.added.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.failed.added.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.deleted.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.unexpected.errors";
public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStoreMonitor.message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.unexpected.errors";
public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.successful.added.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.failed.added.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.deleted.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.unexpected.errors";
public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.message.processed";
public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specstoreMonitor.produce.to.consume.delay";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.unexpected.errors";
public static final String
GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.produce.to.consume.delay";

public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.unexpected.errors";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
Expand All @@ -53,6 +54,9 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
private ContextAwareMeter resumesInvoked;
private ContextAwareMeter unexpectedErrors;
private ContextAwareMeter messageProcessedMeter;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge

private volatile Long produceToConsumeDelayValue = -1L;

protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() {
@Override
Expand Down Expand Up @@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
String key = (String) message.getKey();
DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();

Long timestamp = value.getChangeEventIdentifier().getTimestamp();
String tid = value.getChangeEventIdentifier().getTxId();
Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestampMillis();
String operation = value.getChangeEventIdentifier().getOperationType().name();
String flowGroup = value.getFlowGroup();
String flowName = value.getFlowName();
String flowExecutionId = value.getFlowExecutionId();

log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
flowGroup, flowName, flowExecutionId, timestamp, operation);
produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} tid: {} operation: {} lag: {}",
flowGroup, flowName, flowExecutionId, tid, operation, produceToConsumeDelayValue);

String changeIdentifier = timestamp + key;
String changeIdentifier = tid + key;
if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, dagActionsSeenCache, operation,
timestamp.toString())) {
produceTimestamp.toString())) {
return;
}

Expand All @@ -119,15 +125,17 @@ protected void processMessage(DecodeableKafkaRecord message) {
try {
dagAction = dagActionStore.getDagAction(flowGroup, flowName, flowExecutionId).getDagActionValue();
} catch (IOException e) {
log.warn("Encountered IOException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, e);
log.error("Encountered IOException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, e);
this.unexpectedErrors.mark();
return;
} catch (SpecNotFoundException e) {
log.warn("DagAction not found for flow group: {} name: {} executionId: {} Exception: {}", flowGroup, flowName,
log.error("DagAction not found for flow group: {} name: {} executionId: {} Exception: {}", flowGroup, flowName,
flowExecutionId, e);
this.unexpectedErrors.mark();
return;
} catch (SQLException throwables) {
log.warn("Encountered SQLException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, throwables);
throwables.printStackTrace();
log.error("Encountered SQLException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, throwables);
return;
}
}

Expand Down Expand Up @@ -176,6 +184,7 @@ protected void createMetrics() {
this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
Expand All @@ -58,6 +59,9 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer {
private ContextAwareMeter failedAddedSpecs;
private ContextAwareMeter deletedSpecs;
private ContextAwareMeter unexpectedErrors;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge

private volatile Long produceToConsumeDelayValue = -1L;

protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() {
@Override
Expand Down Expand Up @@ -102,14 +106,17 @@ protected void processMessage(DecodeableKafkaRecord message) {
String key = (String) message.getKey();
GenericStoreChangeEvent value = (GenericStoreChangeEvent) message.getValue();

Long timestamp = value.getTimestamp();
String tid = value.getTxId();
Long produceTimestamp = value.getProduceTimestampMillis();
String operation = value.getOperationType().name();

log.debug("Processing message where specUri is {} timestamp: {} operation: {}", key, timestamp, operation);
produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
log.debug("Processing message where specUri is {} tid: {} operation: {} delay: {}", key, tid, operation,
produceToConsumeDelayValue);

String changeIdentifier = timestamp + key;
String changeIdentifier = tid + key;
if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, specChangesSeenCache, operation,
timestamp.toString())) {
produceTimestamp.toString())) {
return;
}

Expand Down Expand Up @@ -158,6 +165,7 @@ protected void processMessage(DecodeableKafkaRecord message) {
} catch (Exception e) {
log.warn("Ran into unexpected error processing SpecStore changes. Reexamine scheduler. Error: {}", e);
this.unexpectedErrors.mark();
return;
}

specChangesSeenCache.put(changeIdentifier, changeIdentifier);
Expand All @@ -171,5 +179,6 @@ protected void createMetrics() {
this.deletedSpecs = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue);
}
}