Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1736] Add metrics for change stream monitor and mysql quota manager #3593

Merged
merged 10 commits into from
Nov 14, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,16 @@ public class RuntimeMetrics {
public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.failed.added.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.deleted.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.unexpected.errors";
public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStoreMonitor.message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.unexpected.errors";

public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.unexpected.errors";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_EXCEEDS_REQUESTS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaExceeds.requests";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_CHECK_QUOTA_TIME = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.check.quota.time";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Names are a bit hard to understand without reading the code GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED so it implies number of requests exceeding quota. gobblin_mysql_quota_manager_time_to_check_quota for the second, is it measuring amount of time it takes to do the whole check?


// Metadata keys
public static final String TOPIC = "topic";
public static final String GROUP_ID = "groupId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.util.ConfigUtils;


Expand All @@ -42,10 +44,13 @@ abstract public class AbstractUserQuotaManager implements UserQuotaManager {
public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private final Map<String, Integer> perUserQuota;
private final Map<String, Integer> perFlowGroupQuota;
protected MetricContext metricContext;

private final int defaultQuota;

public AbstractUserQuotaManager(Config config) {
this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
this.getClass());
this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
ImmutableMap.Builder<String, Integer> userMapBuilder = ImmutableMap.builder();
ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = ImmutableMap.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin.service.modules.orchestration;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -38,6 +40,7 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.metastore.MysqlStateStore;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
Expand All @@ -52,13 +55,17 @@
public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
public final MysqlQuotaStore quotaStore;
public final RunningDagIdsStore runningDagIds;
private Meter quotaExceedsRequests;
private Meter failedQuotaCheck;


@Inject
public MysqlUserQuotaManager(Config config) throws IOException {
super(config);
this.quotaStore = createQuotaStore(config);
this.runningDagIds = createRunningDagStore(config);
this.failedQuotaCheck = this.metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS);
this.quotaExceedsRequests = this.metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_EXCEEDS_REQUESTS);
}

void addDagId(Connection connection, String dagId) throws IOException {
Expand All @@ -80,18 +87,21 @@ public void init(Collection<Dag<JobExecutionPlan>> dags) {

@Override
public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes) throws IOException {
try (Connection connection = this.quotaStore.dataSource.getConnection()) {
try (Connection connection = this.quotaStore.dataSource.getConnection();
Timer.Context context = metricContext.timer(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_CHECK_QUOTA_TIME).time()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do we call stop for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in try clause, when this try section finish, the context will close automatically

connection.setAutoCommit(false);

for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
QuotaCheck quotaCheck = increaseAndCheckQuota(connection, dagNode);
if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) {
connection.rollback();
quotaExceedsRequests.mark();
throw new QuotaExceededException(quotaCheck.requesterMessage);
}
}
connection.commit();
} catch (SQLException e) {
this.failedQuotaCheck.mark();
throw new IOException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.service.monitoring;

import com.codahale.metrics.Meter;
import java.io.IOException;
import java.sql.SQLException;
import java.util.UUID;
Expand Down Expand Up @@ -50,9 +51,10 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX = "dagActionChangeStore";

// Metrics
ContextAwareMeter killsInvoked;
ContextAwareMeter resumesInvoked;
ContextAwareMeter unexpectedErrors;
private ContextAwareMeter killsInvoked;
private ContextAwareMeter resumesInvoked;
private ContextAwareMeter unexpectedErrors;
private Meter messageProcessedMeter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you use normal Meter instead of ContextAwareMeter? Also we should end all the names in "Meter" or none of them ie: messagesProcessed or killsInvokedMeter


protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() {
@Override
Expand Down Expand Up @@ -92,7 +94,8 @@ protected void assignTopicPartitions() {
partitioned and processed by only one thread (and corresponding queue).
*/
protected void processMessage(DecodeableKafkaRecord message) {
// TODO: Add metric that service is healthy and we're continuously processing messages.
// This will also include the heathCheck message so that we can rely on this to monitor the health of this Monitor
messageProcessedMeter.mark();
String key = (String) message.getKey();
DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();

Expand Down Expand Up @@ -171,6 +174,7 @@ protected void createMetrics() {
this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer {

// Metrics
private Meter successfullyAddedSpecs;
private Meter messageProcessedMeter;
private Meter failedAddedSpecs;
private Meter deletedSpecs;
private Meter unexpectedErrors;
Expand Down Expand Up @@ -97,7 +98,8 @@ protected void assignTopicPartitions() {
associated with it), a given message itself will be partitioned and assigned to only one queue.
*/
protected void processMessage(DecodeableKafkaRecord message) {
// TODO: Add metric that service is healthy and we're continuously processing messages.
// This will also include the heathCheck message so that we can rely on this to monitor the health of this Monitor
messageProcessedMeter.mark();
String key = (String) message.getKey();
GenericStoreChangeEvent value = (GenericStoreChangeEvent) message.getValue();

Expand Down Expand Up @@ -168,5 +170,6 @@ protected void createMetrics() {
this.failedAddedSpecs = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS);
this.deletedSpecs = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
}
}