Skip to content

Commit

Permalink
Update to librdkafka 2.2.0 (#1033)
Browse files Browse the repository at this point in the history
  • Loading branch information
GaryWilber authored Jul 21, 2023
1 parent a8c288d commit e7e4c6d
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 34 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ I am looking for *your* help to make this project even better! If you're interes

The `node-rdkafka` library is a high-performance NodeJS client for [Apache Kafka](http://kafka.apache.org/) that wraps the native [librdkafka](https://github.com/edenhill/librdkafka) library. All the complexity of balancing writes across partitions and managing (possibly ever-changing) brokers should be encapsulated in the library.

__This library currently uses `librdkafka` version `2.1.1`.__
__This library currently uses `librdkafka` version `2.2.0`.__

## Reference Docs

Expand Down Expand Up @@ -60,7 +60,7 @@ Using Alpine Linux? Check out the [docs](https://github.com/Blizzard/node-rdkafk

### Windows

Windows build **is not** compiled from `librdkafka` source but it is rather linked against the appropriate version of [NuGet librdkafka.redist](https://www.nuget.org/packages/librdkafka.redist/) static binary that gets downloaded from `https://globalcdn.nuget.org/packages/librdkafka.redist.2.1.1.nupkg` during installation. This download link can be changed using the environment variable `NODE_RDKAFKA_NUGET_BASE_URL` that defaults to `https://globalcdn.nuget.org/packages/` when it's no set.
Windows build **is not** compiled from `librdkafka` source but it is rather linked against the appropriate version of [NuGet librdkafka.redist](https://www.nuget.org/packages/librdkafka.redist/) static binary that gets downloaded from `https://globalcdn.nuget.org/packages/librdkafka.redist.2.2.0.nupkg` during installation. This download link can be changed using the environment variable `NODE_RDKAFKA_NUGET_BASE_URL` that defaults to `https://globalcdn.nuget.org/packages/` when it's no set.

Requirements:
* [node-gyp for Windows](https://github.com/nodejs/node-gyp#on-windows)
Expand Down Expand Up @@ -97,7 +97,7 @@ const Kafka = require('node-rdkafka');

## Configuration

You can pass many configuration options to `librdkafka`. A full list can be found in `librdkafka`'s [Configuration.md](https://github.com/edenhill/librdkafka/blob/v2.1.1/CONFIGURATION.md)
You can pass many configuration options to `librdkafka`. A full list can be found in `librdkafka`'s [Configuration.md](https://github.com/edenhill/librdkafka/blob/v2.2.0/CONFIGURATION.md)

Configuration keys that have the suffix `_cb` are designated as callbacks. Some
of these keys are informational and you can choose to opt-in (for example, `dr_cb`). Others are callbacks designed to
Expand Down Expand Up @@ -132,7 +132,7 @@ You can also get the version of `librdkafka`
const Kafka = require('node-rdkafka');
console.log(Kafka.librdkafkaVersion);

// #=> 2.1.1
// #=> 2.2.0
```

## Sending Messages
Expand All @@ -145,7 +145,7 @@ const producer = new Kafka.Producer({
});
```

A `Producer` requires only `metadata.broker.list` (the Kafka brokers) to be created. The values in this list are separated by commas. For other configuration options, see the [Configuration.md](https://github.com/edenhill/librdkafka/blob/v2.1.1/CONFIGURATION.md) file described previously.
A `Producer` requires only `metadata.broker.list` (the Kafka brokers) to be created. The values in this list are separated by commas. For other configuration options, see the [Configuration.md](https://github.com/edenhill/librdkafka/blob/v2.2.0/CONFIGURATION.md) file described previously.

The following example illustrates a list with several `librdkafka` options set.

Expand Down
16 changes: 15 additions & 1 deletion config.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ====== Generated from librdkafka 2.1.1 file CONFIGURATION.md ======
// ====== Generated from librdkafka 2.2.0 file CONFIGURATION.md ======
// Code that generated this is a derivative work of the code from Nam Nguyen
// https://gist.github.com/ntgn81/066c2c8ec5b4238f85d1e9168a04e3fb

Expand Down Expand Up @@ -619,6 +619,13 @@ export interface GlobalConfig {
*/
"client.rack"?: string;

/**
* Controls how the client uses DNS lookups. By default, when the lookup returns multiple IP addresses for a hostname, they will all be attempted for connection before the connection is considered failed. This applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only`, each entry will be resolved and expanded into a list of canonical names. NOTE: Default here is different from the Java client's default behavior, which connects only to the first IP address returned for a hostname.
*
* @default use_all_dns_ips
*/
"client.dns.lookup"?: 'use_all_dns_ips' | 'resolve_canonical_bootstrap_servers_only';

/**
* Enables or disables `event.*` emitting.
*
Expand Down Expand Up @@ -858,6 +865,13 @@ export interface ConsumerGlobalConfig extends GlobalConfig {
*/
"fetch.wait.max.ms"?: number;

/**
* How long to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (queued.min.messages or queued.max.messages.kbytes) have been exceded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s) delays between messages. Low values may increase CPU utilization.
*
* @default 1000
*/
"fetch.queue.backoff.ms"?: number;

/**
* Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
*
Expand Down
2 changes: 1 addition & 1 deletion deps/librdkafka
2 changes: 1 addition & 1 deletion errors.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ====== Generated from librdkafka 2.1.1 file src-cpp/rdkafkacpp.h ======
// ====== Generated from librdkafka 2.2.0 file src-cpp/rdkafkacpp.h ======
export const CODES: { ERRORS: {
/* Internal errors to rdkafka: */
/** Begin internal error codes (**-200**) */
Expand Down
2 changes: 1 addition & 1 deletion lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ LibrdKafkaError.wrap = errorWrap;
* @enum {number}
* @constant
*/
// ====== Generated from librdkafka 2.1.1 file src-cpp/rdkafkacpp.h ======
// ====== Generated from librdkafka 2.2.0 file src-cpp/rdkafkacpp.h ======
LibrdKafkaError.codes = {

/* Internal errors to rdkafka: */
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"name": "node-rdkafka",
"version": "v2.16.1",
"version": "v2.17.0",
"description": "Node.js bindings for librdkafka",
"librdkafka": "2.1.1",
"librdkafka": "2.2.0",
"main": "lib/index.js",
"scripts": {
"configure": "node-gyp configure",
Expand Down Expand Up @@ -45,4 +45,4 @@
"engines": {
"node": ">=6.0.0"
}
}
}
20 changes: 0 additions & 20 deletions src/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,8 @@ using NodeKafka::KafkaConsumer;
using NodeKafka::AdminClient;
using NodeKafka::Topic;

using node::AtExit;
using RdKafka::ErrorCode;

static void RdKafkaCleanup(void*) { // NOLINT
/*
* Wait for RdKafka to decommission.
* This is not strictly needed but
* allows RdKafka to clean up all its resources before the application
* exits so that memory profilers such as valgrind wont complain about
* memory leaks.
*/

RdKafka::wait_destroyed(5000);
}

NAN_METHOD(NodeRdKafkaErr2Str) {
int points = Nan::To<int>(info[0]).FromJust();
// Cast to error code
Expand Down Expand Up @@ -74,13 +61,6 @@ void ConstantsInit(v8::Local<v8::Object> exports) {
}

void Init(v8::Local<v8::Object> exports, v8::Local<v8::Value> m_, void* v_) {
#if NODE_MAJOR_VERSION <= 9 || (NODE_MAJOR_VERSION == 10 && NODE_MINOR_VERSION <= 15)
AtExit(RdKafkaCleanup);
#else
v8::Local<v8::Context> context = Nan::GetCurrentContext();
node::Environment* env = node::GetCurrentEnvironment(context);
AtExit(env, RdKafkaCleanup, NULL);
#endif
KafkaConsumer::Init(exports);
Producer::Init(exports);
AdminClient::Init(exports);
Expand Down

0 comments on commit e7e4c6d

Please sign in to comment.