Skip to content

Commit

Permalink
Service Factory + Individual Services' Factory Implementations (#257)
Browse files Browse the repository at this point in the history
Service Factory + Individual Services' Factory Implementations

Because of the early design, architectural decisions of Xinfra Monitor, the KafkaMonitor class enforces individual Xinfra Monitor Services to carry the same set of constructor parameters.

This is a large caveat as not all Service classes don't need the identical constructor arguments.

This is not enforced the codebase. It is partially and only silently enforced by convention.

Unmaintainable code + non-robust code.
Non-evolvable code: each Service's constructor implementations aren't allowed to be evolved independently. 3. Change(s) in one Service's constructor absolutely forces the other Services' constructors to also change its parameters, unfortunately.

Signed off by : Andrew Choi
  • Loading branch information
Andrew Choi authored Jun 2, 2020
1 parent 2fc22fd commit 10045c5
Show file tree
Hide file tree
Showing 14 changed files with 472 additions and 49 deletions.
48 changes: 14 additions & 34 deletions src/main/java/com/linkedin/kmf/XinfraMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,25 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.linkedin.kmf;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.kmf.apps.App;
import com.linkedin.kmf.services.ConsumerFactory;
import com.linkedin.kmf.services.ConsumerFactoryImpl;
import com.linkedin.kmf.services.Service;
import com.linkedin.kmf.services.ServiceFactory;
import java.io.BufferedReader;
import java.io.FileReader;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
Expand All @@ -42,9 +40,6 @@
*/
public class XinfraMonitor {
private static final Logger LOG = LoggerFactory.getLogger(XinfraMonitor.class);
public static final String CLASS_NAME_CONFIG = "class.name";
private static final String METRIC_GROUP_NAME = "kafka-monitor";
private static final String JMX_PREFIX = "kmf";

/** This is concurrent because healthCheck() can modify this map, but awaitShutdown() can be called at any time by
* a different thread.
Expand All @@ -63,54 +58,39 @@ public class XinfraMonitor {
* @param allClusterProps the properties of ALL kafka clusters for which apps and services need to be appended.
* @throws Exception when exception occurs while assigning Apps and Services
*/
@SuppressWarnings({"rawtypes", "unchecked"})

@SuppressWarnings({"rawtypes"})
public XinfraMonitor(Map<String, Map> allClusterProps) throws Exception {
_apps = new ConcurrentHashMap<>();
_services = new ConcurrentHashMap<>();

for (Map.Entry<String, Map> clusterProperty : allClusterProps.entrySet()) {
String name = clusterProperty.getKey();
Map props = clusterProperty.getValue();
if (!props.containsKey(CLASS_NAME_CONFIG))
throw new IllegalArgumentException(name + " is not configured with " + CLASS_NAME_CONFIG);
String className = (String) props.get(CLASS_NAME_CONFIG);
if (!props.containsKey(XinfraMonitorConstants.CLASS_NAME_CONFIG))
throw new IllegalArgumentException(name + " is not configured with " + XinfraMonitorConstants.CLASS_NAME_CONFIG);
String className = (String) props.get(XinfraMonitorConstants.CLASS_NAME_CONFIG);

Class<?> aClass = Class.forName(className);
if (App.class.isAssignableFrom(aClass)) {
App clusterApp = (App) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name);
_apps.put(name, clusterApp);
} else if (Service.class.isAssignableFrom(aClass)) {
Constructor<?>[] constructors = Class.forName(className).getConstructors();
if (this.constructorContainsClass(constructors, CompletableFuture.class)) {
// for ConsumeService public constructor
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
completableFuture.complete(null);
ConsumerFactoryImpl consumerFactory = new ConsumerFactoryImpl(props);
Service service = (Service) Class.forName(className)
.getConstructor(String.class, CompletableFuture.class, ConsumerFactory.class)
.newInstance(name, completableFuture, consumerFactory);
_services.put(name, service);
} else if (this.constructorContainsClass(constructors, AdminClient.class)) {
// for KafkaMetricsReporterService constructor
AdminClient adminClient = AdminClient.create(props);
Service service = (Service) Class.forName(className)
.getConstructor(Map.class, String.class, AdminClient.class)
.newInstance(props, name, adminClient);
_services.put(name, service);
} else {
Service service = (Service) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name);
_services.put(name, service);
}
ServiceFactory serviceFactory = (ServiceFactory) Class.forName(className + XinfraMonitorConstants.FACTORY)
.getConstructor(Map.class, String.class)
.newInstance(props, name);
Service service = serviceFactory.createService();
_services.put(name, service);
} else {
throw new IllegalArgumentException(className + " should implement either " + App.class.getSimpleName() + " or " + Service.class.getSimpleName());
}
}
_executor = Executors.newSingleThreadScheduledExecutor();
_offlineRunnables = new ConcurrentHashMap<>();
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(JMX_PREFIX));
reporters.add(new JmxReporter(XinfraMonitorConstants.JMX_PREFIX));
Metrics metrics = new Metrics(new MetricConfig(), reporters, new SystemTime());
metrics.addMetric(metrics.metricName("offline-runnable-count", METRIC_GROUP_NAME, "The number of Service/App that are not fully running"),
metrics.addMetric(metrics.metricName("offline-runnable-count", XinfraMonitorConstants.METRIC_GROUP_NAME, "The number of Service/App that are not fully running"),
(config, now) -> _offlineRunnables.size());
}

Expand Down
30 changes: 30 additions & 0 deletions src/main/java/com/linkedin/kmf/XinfraMonitorConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.linkedin.kmf;

/**
* Constant variables in Xinfra Monitor repo.
*/
public class XinfraMonitorConstants {

public XinfraMonitorConstants() {
}

static final String FACTORY = "Factory";

static final String CLASS_NAME_CONFIG = "class.name";

static final String METRIC_GROUP_NAME = "kafka-monitor";

static final String JMX_PREFIX = "kmf";


}
39 changes: 39 additions & 0 deletions src/main/java/com/linkedin/kmf/services/ConsumeServiceFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.linkedin.kmf.services;

import java.util.Map;
import java.util.concurrent.CompletableFuture;


/**
* Factory that constructs the ConsumeService.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class ConsumeServiceFactory implements ServiceFactory {
private final Map _props;
private final String _name;

public ConsumeServiceFactory(Map props, String name) {
_props = props;
_name = name;
}

@Override
public Service createService() throws Exception {

CompletableFuture<Void> topicPartitionResult = new CompletableFuture<>();
topicPartitionResult.complete(null);
ConsumerFactoryImpl consumerFactory = new ConsumerFactoryImpl(_props);

return new ConsumeService(_name, topicPartitionResult, consumerFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.linkedin.kmf.services;

import java.util.Map;


/**
* Factory class which instantiates a DefaultMetricsReporterService.
*/
@SuppressWarnings("rawtypes")
public class DefaultMetricsReporterServiceFactory implements ServiceFactory {
private final Map _properties;
private final String _serviceName;

public DefaultMetricsReporterServiceFactory(Map properties, String serviceName) {

_properties = properties;
_serviceName = serviceName;
}

@SuppressWarnings("unchecked")
@Override
public Service createService() throws Exception {
return new DefaultMetricsReporterService(_properties, _serviceName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.linkedin.kmf.services;

import java.util.Map;


/**
* Factory class which instantiates a GraphiteMetricsReporterServiceFactory service.
*/
@SuppressWarnings("rawtypes")
public class GraphiteMetricsReporterServiceFactory implements ServiceFactory {

private final Map _properties;
private final String _serviceName;

public GraphiteMetricsReporterServiceFactory(Map properties, String serviceName) {

_properties = properties;
_serviceName = serviceName;
}

@SuppressWarnings("unchecked")
@Override
public Service createService() throws Exception {
return new GraphiteMetricsReporterService(_properties, _serviceName);
}
}
36 changes: 36 additions & 0 deletions src/main/java/com/linkedin/kmf/services/JolokiaServiceFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.linkedin.kmf.services;

import java.util.Map;


/**
* Factory class which instantiates a JolokiaService service.
*/
@SuppressWarnings("rawtypes")
public class JolokiaServiceFactory implements ServiceFactory {

private final Map _properties;
private final String _serviceName;

public JolokiaServiceFactory(Map properties, String serviceName) {

_properties = properties;
_serviceName = serviceName;
}

@SuppressWarnings("unchecked")
@Override
public Service createService() throws Exception {
return new JolokiaService(_properties, _serviceName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.linkedin.kmf.services;

import java.util.Map;
import org.apache.kafka.clients.admin.AdminClient;


/**
* Factory class which instantiates a KafkaMetricsReporterService service object.
*/
@SuppressWarnings("rawtypes")
public class KafkaMetricsReporterServiceFactory implements ServiceFactory {

private final Map _properties;
private final String _serviceName;

public KafkaMetricsReporterServiceFactory(Map properties, String serviceName) {

_properties = properties;
_serviceName = serviceName;
}

@SuppressWarnings("unchecked")
@Override
public Service createService() throws Exception {

AdminClient adminClient = AdminClient.create(_properties);

return new KafkaMetricsReporterService(_properties, _serviceName, adminClient);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.linkedin.kmf.services;

import java.util.Map;


/**
* Factory which instantiates a MultiClusterTopicManagementService service object.
*/
@SuppressWarnings("rawtypes")
public class MultiClusterTopicManagementServiceFactory implements ServiceFactory {

private final Map _properties;
private final String _serviceName;

public MultiClusterTopicManagementServiceFactory(Map properties, String serviceName) {

_properties = properties;
_serviceName = serviceName;
}

@SuppressWarnings("unchecked")
@Override
public Service createService() throws Exception {
return new MultiClusterTopicManagementService(_properties, _serviceName);
}
}
Loading

0 comments on commit 10045c5

Please sign in to comment.