Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subject: KafkaConsumer API Performance Challenges and Optimization Suggestions ( Kafka clients 3.6.1) #1971

Closed
gmungi opened this issue Nov 21, 2024 · 5 comments
Assignees
Labels
invalid status-triage_done Initial triage done, will be further handled by the driver team

Comments

@gmungi
Copy link

gmungi commented Nov 21, 2024

Hi All,

We have been observing that the KafkaConsumer API is significantly slower compared to the previous low-level Kafka API we were using (e.g., FetchRequest, FetchResponse, ByteBufferMessageSet). Below is a detailed overview of the issue and the current implementation, along with an explanation of the bottlenecks and potential optimization suggestions.

Performance Issues
Use Case:

The application requires fetching 1,000 messages starting from a specific user-provided offset and returning the next offset (1001) in the response. This offset will then be used as input for subsequent requests.
Despite using MAX_POLL_RECORDS_CONFIG=1000, the consumer API fetches only ~300 records per poll in ~2 seconds. Fetching 1,000 records typically takes ~4 polls, resulting in a total time of ~8–10 second

I have tried different consumer settings like

MAX_PARTITION_FETCH_BYTES_CONFIG,FETCH_MIN_BYTES_CONFIG,MAX_POLL_RECORDS_CONFIG etc I have tried to increase max poll records..In 2 seconds it is not able to fetch 1000 records and returning 0 records.

Observed Delays:

Consumer Assignment and Seeking:
The time taken for consumer.assign() and consumer.seek() operations adds to the overall latency.
Polling:
The consumer.poll() call often returns fewer records than expected, resulting in multiple iterations to achieve the required batch size.

Comparison with Low-Level API:
The low-level Kafka API (e.g., FetchRequest and FetchResponse) performs better, with reduced latency for fetching records. It appears to bypass some of the high-level abstractions (e.g., consumer group coordination and offset management) that introduce overhead.

Consumer Creation Method
j

public static KafkaConsumer<String, String> createConsumer(String clientName, int fetchSize) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerStr);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientName);
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
// Security and additional properties...
return new KafkaConsumer<>(props);
}

public static KafkaConsumer<String,String> createConsumer(String clientName,int fetchSize) {
Properties props = new Properties();
String kafkaBrokerStr = Config.getConsumerPropValue("kafkabrokerslist");
String groupId = Config.getConsumerPropValue("group.id");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerStr);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,clientName);
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
		String uname = System.getenv(NextGenKafkaConstants.KAFKA_USER);
		String pwrd = System.getenv(NextGenKafkaConstants.KAFKA_PASS);
		log.info("KafkaUser:{} kafkaBrokerStr:{} GroupID:{}",uname,kafkaBrokerStr,groupId);
	    String jaasCfg = String.format(jaasTemplate, uname, pwrd);
		props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
		props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
		props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasCfg);
		//props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, fetchSize);
		//props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
		props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
		//props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
		return new KafkaConsumer<String,String>(props);

}

public List consume(long offset, String topicName,int partition,CEConsumeRequest inputReq) throws CustomException {
List msglist = new ArrayList();

	int waitTime = Integer.valueOf(Config
			.getConsumerPropValue("pull.wait.time"));
	int limit = Integer.valueOf(Config.getConsumerPropValue("pull.size.limit"));
	int emptyLoopLimit = Integer.valueOf(Config
			.getConsumerPropValue("pull.empty.loop.limit"));
	int fetchSize = Integer.valueOf(Config
			.getConsumerPropValue("pull.each.fetch.size"));
	
	
	TopicPartition topicPartition = new TopicPartition(topicName, partition);
	/*
	 * User input offset.
	 */
	
	long readOffset = offset;
	clientName = "crt_consumer_" + Thread.currentThread().getName();
    try (KafkaConsumer<String,String> consumer = KafkaConsumerFactory.createConsumer(clientName,fetchSize)){
    	consumer.assign(Collections.singletonList(topicPartition));
	
    
	if (readOffset == 0 && readOffset < kafkaEarliestOffset) {
		log.warn("Resetting the offset to earliest available offset in kafka.");
		readOffset = kafkaEarliestOffset;
	}

	boolean end = false;
	long startTime = Calendar.getInstance().getTimeInMillis();
	long endTime = Calendar.getInstance().getTimeInMillis();
	int emptyFetchCount = 0;
	consumer.seek(topicPartition, readOffset);
	do {
		JSONObject obj = null;
		ConsumerRecords < String, String > records = consumer.poll(Duration.ofMillis(2000));
		log.info("Poll Records Count :{}",records.count());
		  for (ConsumerRecord< String, String > consumerRecord: records) {
			  long currentOffset = consumerRecord.offset();
			  if (currentOffset < readOffset) {
                    log.warn("Found an old offset: {}, Expecting: {}", currentOffset, readOffset);
                    continue;
                }
			  String message = consumerRecord.value();
			  log.debug(
						"client name : {} , Offset is : {} , Message is : {} ",
						clientName, readOffset,
						message);
			  CONSUME_LOG.debug(topicName + "\t" + partition + "\t"
						+ String.valueOf(currentOffset));
			  obj = new JSONObject(message);
			   msglist.add(obj);
				readOffset = currentOffset + 1;
		  }
		endTime = Calendar.getInstance().getTimeInMillis();
		if (msglist.size() >= Math.round(limit
				/ inputReq.getApplicationArea().getReqInfo().size())
				|| (endTime - startTime) >= waitTime) {
			log.info(
					"Wait condition has been met... exiting the fetch loop. recordCount - {}, time exhausted - {} ms.",
					msglist.size(), (endTime - startTime));
			end = true;
			consumer.commitSync();
		} else if (records.isEmpty()) {
			emptyFetchCount++;
			try {
				if(emptyFetchCount == emptyLoopLimit) {
					log.info("No messages were found in 3 successive fetches. Stopping the consume process here.");
					end = true;
				} else {
					Thread.sleep(1000);
				}
			} catch (InterruptedException ie) {
				CONSUME_LOG.warn(ie.getMessage(), ie);
			}
		} else {
			emptyFetchCount = 0;
		}
	} while (!end);
	
	return msglist;
}

Please suggest on how to improve this..

@gmungi gmungi added the bug label Nov 21, 2024
@github-actions github-actions bot changed the title Subject: KafkaConsumer API Performance Challenges and Optimization Suggestions SNOW-1820229: Subject: KafkaConsumer API Performance Challenges and Optimization Suggestions Nov 21, 2024
@gmungi gmungi changed the title SNOW-1820229: Subject: KafkaConsumer API Performance Challenges and Optimization Suggestions Subject: KafkaConsumer API Performance Challenges and Optimization Suggestions ( Kafka clients 3.6.1) Nov 21, 2024
@sfc-gh-dszmolka
Copy link
Contributor

hi - i'm wondering this Issue was intended for the Snowflake Kafka connector library instead?

@gmungi
Copy link
Author

gmungi commented Nov 22, 2024

hi - i'm wondering this Issue was intended for the Snowflake Kafka connector library instead?

@sfc-gh-dszmolka
Iam not able to get the kafka consumer link..can u please hep on this

@sfc-gh-sghosh
Copy link
Contributor

Hello @gmungi ,

Could you please create the issue in https://github.com/snowflakedb/snowflake-kafka-connector

Regards,
Sujan

@sfc-gh-sghosh sfc-gh-sghosh self-assigned this Nov 25, 2024
@sfc-gh-sghosh sfc-gh-sghosh removed the bug label Nov 25, 2024
@sfc-gh-sghosh
Copy link
Contributor

Hello @gmungi ,

This issue falls outside the scope of Snowflake Kafka Connector support, as it pertains to tuning the Kafka Connect framework versus other Kafka APIs. For these types of questions, Confluent support is a good resource.

Regards,
Sujan

@sfc-gh-sghosh sfc-gh-sghosh added the status-triage_done Initial triage done, will be further handled by the driver team label Nov 25, 2024
@sfc-gh-dszmolka
Copy link
Contributor

closing this issue as it should not be here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
invalid status-triage_done Initial triage done, will be further handled by the driver team
Projects
None yet
Development

No branches or pull requests

3 participants