Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: linkedin/kafka-monitor
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 2.5.13
Choose a base ref
...
head repository: linkedin/kafka-monitor
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 2.5.14
Choose a head ref
  • 2 commits
  • 2 files changed
  • 1 contributor

Commits on Oct 21, 2022

  1. bump dependencies: kafka and zookeeper (#378)

    - 'kafka_2.12', version: '2.4.0' -> 'kafka_2.12', version: '2.8.2'
    - 'kafka-clients', version: '2.3.1' -> 'kafka-clients', version: '2.8.2'
    - zookeeper 3.5.6 -> 3.8.0
    
    ## Details
    
    new `NetworkClient` arguments:
    
    1. `long connectionSetupTimeoutMs`,
    2. `long connectionSetupTimeoutMaxMs`
    
    are described in https://issues.apache.org/jira/browse/KAFKA-9893 and corresponding PRs:
    
    1. apache/kafka#8544
    2. apache/kafka#8683
    
    ## Testing Done
    
    1. ./gradlew build
    mhratson authored Oct 21, 2022
    Copy the full SHA
    b6faf38 View commit details
  2. bump avro-util: 0.2.81 -> 0.2.118 (#380)

    ## Details
    
    [CVE-2022-42889](GHSA-599f-7c49-w659) in avro-util:
    
    > Arbitrary code execution in Apache Commons Text
    
    closes #379
    
    ## Testing Done
    
    1. ./gradlew build
    mhratson authored Oct 21, 2022
    Copy the full SHA
    25bd987 View commit details
Showing with 16 additions and 8 deletions.
  1. +4 −3 build.gradle
  2. +12 −5 src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java
7 changes: 4 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -41,10 +41,11 @@ allprojects {
compile 'net.savantly:graphite-client:1.1.0-RELEASE'
compile 'com.timgroup:java-statsd-client:3.0.1'
compile 'com.signalfx.public:signalfx-codahale:0.0.47'
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.0'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1'
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.8.2'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.8.2'
compile 'org.apache.commons:commons-lang3:3.12.0'
compile 'com.linkedin.avroutil1:helper-all:0.2.81'
compile 'com.linkedin.avroutil1:helper-all:0.2.118'
compile 'org.apache.zookeeper:zookeeper:3.8.0'
testCompile 'org.mockito:mockito-core:2.24.0'
testCompile 'org.testng:testng:6.8.8'
}
Original file line number Diff line number Diff line change
@@ -103,30 +103,37 @@ public class OffsetCommitService implements Service {
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);

String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
LogContext logContext = new LogContext("[Consumer clientId=" + clientId + "] ");

List<String> bootstrapServers = config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(bootstrapServers, ClientDnsLookup.DEFAULT);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, _time);

LogContext logContext = new LogContext("[Consumer clientId=" + clientId + "] ");

ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, _time, logContext);

LOGGER.info("Bootstrap servers config: {} | broker addresses: {}", bootstrapServers, addresses);

Metadata metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), logContext,
new ClusterResourceListeners());

metadata.bootstrap(addresses, _time.milliseconds());
metadata.bootstrap(addresses);

Selector selector =
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), new Metrics(), _time,
METRIC_GRP_PREFIX, channelBuilder, logContext);

KafkaClient kafkaClient = new NetworkClient(selector, metadata, clientId, MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
KafkaClient kafkaClient = new NetworkClient(
selector, metadata, clientId, MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), ClientDnsLookup.DEFAULT, _time, true,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG), config.getInt(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
ClientDnsLookup.DEFAULT, _time, true,
new ApiVersions(), logContext);


LOGGER.debug("The network client active: {}", kafkaClient.active());
LOGGER.debug("The network client has in flight requests: {}", kafkaClient.hasInFlightRequests());
LOGGER.debug("The network client in flight request count: {}", kafkaClient.inFlightRequestCount());