Skip to content

Commit

Permalink
Callbacks now work on bad config. (KxSystems#102)
Browse files Browse the repository at this point in the history
Also removed invalid fetch.wait.max.ms for producers
  • Loading branch information
sshanks-kx authored Sep 8, 2023
1 parent 5af24b7 commit e3d8a39
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 6 deletions.
3 changes: 1 addition & 2 deletions examples/test_offsetp.q
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
kfk_cfg:(!) . flip(
(`metadata.broker.list;`localhost:9092);
(`statistics.interval.ms;`10000);
(`queue.buffering.max.ms;`1);
(`fetch.wait.max.ms;`10)
(`queue.buffering.max.ms;`1)
);
producer:.kfk.Producer[kfk_cfg]

Expand Down
3 changes: 1 addition & 2 deletions examples/test_producer.q
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
kfk_cfg:(!) . flip(
(`metadata.broker.list;`localhost:9092);
(`statistics.interval.ms;`10000);
(`queue.buffering.max.ms;`1);
(`fetch.wait.max.ms;`10)
(`queue.buffering.max.ms;`1)
);
producer:.kfk.Producer[kfk_cfg]

Expand Down
4 changes: 2 additions & 2 deletions kfk.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,15 @@ EXP K2(kfkClient){
return krr((S) b);
if(!(rk= rd_kafka_new(type, conf, b, sizeof(b))))
return krr(b);
/* Redirect logs to main queue */
rd_kafka_set_log_queue(rk,NULL);
/* Redirect rd_kafka_poll() to consumer_poll() */
if(type == RD_KAFKA_CONSUMER){
rd_kafka_poll_set_consumer(rk);
rd_kafka_queue_io_event_enable(rd_kafka_queue_get_consumer(rk),spair[1],"X",1);
}
else
rd_kafka_queue_io_event_enable(rd_kafka_queue_get_main(rk),spair[1],"X",1);
/* Redirect logs to main queue */
rd_kafka_set_log_queue(rk,NULL);
js(&clients, (S) rk);
return ki(clients->n - 1);
}
Expand Down

0 comments on commit e3d8a39

Please sign in to comment.