Skip to content

Commit

Permalink
show/explain meta info better. ref KxSystems#105
Browse files Browse the repository at this point in the history
running test_consumer.q, test_producer.q now presents meta info better
test_consumer.q also now prevented from producing erroneous UNKNOWN_TOPIC_OR_PART msg
  • Loading branch information
sshanks-kx committed Jan 24, 2024
1 parent 0322cfc commit 50d0ca9
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 12 deletions.
9 changes: 8 additions & 1 deletion docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ _Information about configuration of brokers and topics_
.kfk.Metadata id
```

Where `id` is a consumer or producer ID, returns a dictionary with information about the brokers and topics.
Where `id` is a consumer or producer ID, returns a dictionary populated with the following info:

- `orig_broker_id` (int): Broker originating this metadata
- `orig_broker_name` (symbol): Name of originating broker
- `brokers` (list of dictionaries): Info on current brokers
- `topics` (list of dictionaries): Info on current topics

e.g.

```q
q)show producer_meta:.kfk.Metadata producer
Expand Down
24 changes: 18 additions & 6 deletions examples/test_consumer.q
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,36 @@ kfk_cfg:(!) . flip(
client:.kfk.Consumer[kfk_cfg];

// Topics to subscribe to
topic1:`test1; topic2:`test2;
topic1:`test1;
topic2:`test2;

// Define datasets and topic callbacks for individual
// topic subscriptions `topic1 and `topic2
// Define datasets and topic callbacks for topic1
data1:();
topcb1:{[msg]
msg[`data]:"c"$msg[`data];
msg[`rcvtime]:.z.p;
data1,::enlist msg;}

// Define datasets and topic callbacks for topic2
data2:();
topcb2:{[msg]
msg[`data]:"c"$msg[`data];
msg[`rcvtime]:.z.t;
data2,::enlist msg;}

printmeta:{
-1 "==== MetaData provided by the following broker =======";
-1 "name:",string x`orig_broker_name;
-1 "id:",string x`orig_broker_id;
-1 "==== Brokers =========================================";
show each x`brokers;
-1 "==== Topics ==========================================";
$[count x`topics;show each x`topics;-1 "[None]"];
-1 "";}

printmeta .kfk.Metadata[client];

// Subscribe to topic1 and topic2 with different callbacks from a single client
-1 "Subscribing to topics called ",(string topic1)," and ",string topic2;
-1 "Consumed data will be placed in tables data1 and data2 when available (type 'data1' or 'data2' to view)";
.kfk.Subscribe[client;(topic1;topic2);enlist .kfk.PARTITION_UA;(topcb1;topcb2)]

client_meta:.kfk.Metadata[client];
show client_meta`topics;
18 changes: 13 additions & 5 deletions examples/test_producer.q
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
\l ../kfk.q

kfk_cfg:(!) . flip(
(`metadata.broker.list;`localhost:9092);
(`statistics.interval.ms;`10000);
Expand All @@ -12,12 +13,19 @@ topic2:.kfk.Topic[producer;`test2;()!()]
.z.ts:{n+:1;topic:$[n mod 2;topic1;topic2];
.kfk.Pub[topic;.kfk.PARTITION_UA;string x;""]}

printmeta:{
-1 "==== MetaData provided by the following broker =======";
-1 "name:",string x`orig_broker_name;
-1 "id:",string x`orig_broker_id;
-1 "==== Brokers =========================================";
show each x`brokers;
-1 "==== Topics ==========================================";
$[count x`topics;show each x`topics;-1 "[None]"];
-1 "";}

printmeta .kfk.Metadata[producer];

-1 "Publishing on topics:",string[.kfk.TopicName topic1],", ",string[.kfk.TopicName topic2];
-1 "Publishing single msg on topics: ",string[.kfk.TopicName topic1],", ",string[.kfk.TopicName topic2];
.kfk.Pub[;.kfk.PARTITION_UA;string .z.p;""]each(topic1;topic2);
-1 "Published one message for each topic";
producer_meta:.kfk.Metadata[producer];
show producer_meta`topics;
-1 "Set timer with \\t 500 to publish a message each second to each topic.";
-1 "Set timer with \\t 500 to publish a message every 1/2 second to each topic in turn";

0 comments on commit 50d0ca9

Please sign in to comment.