diff --git a/build.gradle b/build.gradle index b60c27d4..7066ea1f 100644 --- a/build.gradle +++ b/build.gradle @@ -41,10 +41,11 @@ allprojects { compile 'net.savantly:graphite-client:1.1.0-RELEASE' compile 'com.timgroup:java-statsd-client:3.0.1' compile 'com.signalfx.public:signalfx-codahale:0.0.47' - compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.0' - compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1' + compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.8.2' + compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.8.2' compile 'org.apache.commons:commons-lang3:3.12.0' compile 'com.linkedin.avroutil1:helper-all:0.2.81' + compile 'org.apache.zookeeper:zookeeper:3.8.0' testCompile 'org.mockito:mockito-core:2.24.0' testCompile 'org.testng:testng:6.8.8' } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java index 18c31803..f8a4ece1 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java @@ -103,30 +103,37 @@ public class OffsetCommitService implements Service { int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); - LogContext logContext = new LogContext("[Consumer clientId=" + clientId + "] "); + List bootstrapServers = config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); List addresses = ClientUtils.parseAndValidateAddresses(bootstrapServers, ClientDnsLookup.DEFAULT); - ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, _time); + + LogContext logContext = new LogContext("[Consumer clientId=" + clientId + "] "); + + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, _time, logContext); LOGGER.info("Bootstrap servers config: {} | broker addresses: {}", bootstrapServers, addresses); Metadata metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), logContext, new ClusterResourceListeners()); - metadata.bootstrap(addresses, _time.milliseconds()); + metadata.bootstrap(addresses); Selector selector = new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), new Metrics(), _time, METRIC_GRP_PREFIX, channelBuilder, logContext); - KafkaClient kafkaClient = new NetworkClient(selector, metadata, clientId, MAX_INFLIGHT_REQUESTS_PER_CONNECTION, + KafkaClient kafkaClient = new NetworkClient( + selector, metadata, clientId, MAX_INFLIGHT_REQUESTS_PER_CONNECTION, config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), - config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), ClientDnsLookup.DEFAULT, _time, true, + config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), + config.getInt(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG), config.getInt(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG), + ClientDnsLookup.DEFAULT, _time, true, new ApiVersions(), logContext); + LOGGER.debug("The network client active: {}", kafkaClient.active()); LOGGER.debug("The network client has in flight requests: {}", kafkaClient.hasInFlightRequests()); LOGGER.debug("The network client in flight request count: {}", kafkaClient.inFlightRequestCount());