From c447a573b1bda59f1e89a834baabb3efeb1ade67 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Tue, 7 Feb 2023 18:18:35 +0100 Subject: [PATCH 1/2] Kafka documentation refactor: Concepts -> Connection -> Messaging -> Details --- docs/integration/KafkaIntegration.md | 184 ++++++++++++++++----------- 1 file changed, 107 insertions(+), 77 deletions(-) diff --git a/docs/integration/KafkaIntegration.md b/docs/integration/KafkaIntegration.md index 6dbd2dae74d..ac8af2d60bf 100644 --- a/docs/integration/KafkaIntegration.md +++ b/docs/integration/KafkaIntegration.md @@ -8,7 +8,7 @@ sidebar_position: 1 To better understand how Nussknacker works with Kafka, it's recommended to read the following first: * [Kafka introduction](https://kafka.apache.org/intro) -* [Role of Schema Registry](/about/TypicalImplementation) +* [Role of Schema Registry](/about/TypicalImplementationStreaming) * [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html) If you want to use Flink engine, this is also recommended: @@ -16,40 +16,92 @@ If you want to use Flink engine, this is also recommended: ## Concepts +### Sources and sinks + Kafka topics are native streaming data input to Nussknacker and the native output where results of Nussknacker scenarios processing are placed. -In Nussknacker terminology input topics are called *sources*, output topics are called *sinks*. This section provides important details of Nussknacker's integration with Kafka and Schema Registry. +In Nussknacker terminology input topics are handled by *source* components, output topics are handled by *sink* components. +This section provides important details of Nussknacker's integration with Kafka and Schema Registry. -## Schema Registry integration +### Schemas -Nussknacker integrates with Schema Registries. It is the source of knowledge about: -- Which topics are available in Kafka sources and sinks -- What schema versions are available for topic -- How code completions and validations on types described by schemas should work -- How messages will be serialized and deserialized +Nussknacker uses schemas for code completions and validations of messages. +Schema of message can be described in [Avro Schema format](https://avro.apache.org/docs/#schemas) or [JSON Schema format](https://json-schema.org) (Confluent Schema Registry only) -Currently, Nussknacker supports two implementations of schema registries: based on *Confluent Schema Registry* and based on *Azure Schema Registry*. -During runtime, it serializes and deserializes messages in the way that is compatible with standard Kafka serializers and deserializers -delivered by those schema registries providers. Thanks to that you should be able to send messages to Nussknacker and read messages -produced by Nussknacker using standard tooling available around those Schema Registries. +Schemas are retrieved from Schema Registry - *Confluent Schema Registry* and *Azure Schema Registry* are supported. -Given implementation is picked based on `schema.registry.url` property. By default, Confluent-based implementation is used. -For urls ended with `.servicebus.windows.net` Azure-based implementation is used. +To preview schemas or add a new version, you can use tools available on your cloud platform or tools like [AKHQ](https://akhq.io) + +#### Association between schema with topic + +To properly present information about topics and version and to recognize which schema is assigned to version, Nussknacker follow conventions: +- For Confluent-based implementation it uses [TopicNameStrategy for subject names](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy). + It means that it looks for schemas available at `-(key or value)` *subject*. For example for topic `transactions`, it looks for schemas at `transactions-key` *subject* for key and `transactions-value` *subject* for value +- In the Azure Schema Registry, *subject* concept doesn't exist - schemas are grouped by the same *schema name*. Because of that, Nussknacker introduces + own convention for association between schema and topic: *schema name* should be in format: `CamelCasedTopicNameKey` for keys and `CamelCasedTopicNameValue` for values. + For example for `input-events` topic, schema name should be named `InputEventsKey` for key or `InputEventsValue` for value. Be aware that it may require change of schema name + not only in Azure portal but also inside schema content - those names should be the same to make serialization works correctly + +## Connection and Authentication Configuration -### Connection configuration +Under the hood Nussknacker uses `kafkaProperties` to configure standard kafka client. It means that all standard Kafka client properties will be respected. +For detailed instruction where it should be placed inside Nussknacker's configuration, take a look at [Configuration details section](#configuration-details) -Configuration necessary to connect with Schema registry should be placed inside `kafkaProperties`. For detailed instruction -how configuration is handled take a look at [Configuration paragraph](#configuration) +### Kafka - Connection -#### Confluent-based implementation +To configure connection to kafka, you need to configure at least `bootstrap.servers` property. It should contain comma separated list of urls to Kafka brokers. + +#### Kafka - Authentication + +Kafka cluster has multiple options to configure Authentication. Take a look at [Kafka security documentation](https://kafka.apache.org/090/documentation.html#security) +to see detailed examples how those options should be translated into properties. For example for the typical `SASL_SSL` configuration with +credential in `JAAS` format, you should provide configuration similar to this one: + +``` +kafkaProperties { + "schema.registry.url": "http://schemaregistry:8081" + "bootstrap.servers": "broker1:9092,broker2:9092" + "security.protocol": "SASL_SSL" + "sasl.mechanism": "PLAIN" + "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"some_user\" password=\"some_user\";" +} +``` + +If you use Azure Events Hubs (which uses this mode), username will be `$ConnectionString` and password will be the connection string starting with `Endpoint=sb://`. + +In case if you use your own CA and client+server certificates authentication, you should additionally provide: +`ssl.keystore.location`, `ssl.keystore.password`, `ssl.key.password`, `ssl.truststore.location`, `ssl.truststore.password`. + +To make sure if your configuration is correct, you can test it with standard kafka-cli commands +like `kafka-console-consumer`, `kafka-console-producer` or [kcat](https://github.com/edenhill/kcat). + +Some tutorials how to do that: +- [Kafka quickstart](https://kafka.apache.org/documentation/#quickstart_send) +- [Azure Event Hubs for Kafka quickstart](https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/kafka-cli) + +After you'll get properly working set of properties, you just need to copy it to Nussknacker's configuration. + +### Schema Registry - Connection + +To configure connection Schema Registry, you need to configure at least `schema.registry.url`. It should contain comma separated list of urls to Schema Registry. +For the single node installation, it will be just an url. Be aware that contrary to Kafka brokers, Schema Registry urls should start with `https://` or `http://`. + +Currently, Nussknacker supports two implementations of Schema Registries: based on *Confluent Schema Registry* and based on *Azure Schema Registry*. +Given implementation is picked based on `schema.registry.url` property. By default, Confluent-based implementation is used. +For urls ended with `.servicebus.windows.net` Azure-based implementation is used. + +#### Confluent-based Schema Registry - Connection and Authentication For Confluent-based implementation you should provide at least `schema.registry.url`. If your schema registry is secured -by user and password, you should additionally provide `"basic.auth.credentials.source": USER_INFO` and `basic.auth.user.info` properties. +by user and password, you should additionally provide `"basic.auth.credentials.source": USER_INFO` and `"basic.auth.user.info": "some_user:some_password"` entries. To read more see [Schema registry documentation](https://docs.confluent.io/platform/current/schema-registry/security/index.html#governance) -#### Azure-based implementation +To make sure if your configuration is correct, you can test it with `kafka-avro-console-consumer`, `kafka-avro-console-producer` available +in Confluent Schema Registry distribution. After you'll get properly working set of properties, you just need to copy it to Nussknacker's configuration. -For Azure-based implementation, firstly you should provide `schema.registry.url` and `schema.group` properties. First one should be -`https://.servicebus.windows.net`, the second one should be the name of schema groups where will be +#### Azure-based Schema Registry - Connection and Authentication + +For Azure-based implementation, firstly you should provide `schema.registry.url` and `schema.group` properties. First one should be the +`https://.servicebus.windows.net` url, the second one should be the name of schema groups where will be located all schemas used by Nussknacker. Regarding authentication, a couple of options can be used - you can provide credential via: @@ -57,55 +109,63 @@ Regarding authentication, a couple of options can be used - you can provide cred by Azure's [DefaultAzureCredential](https://learn.microsoft.com/en-us/java/api/overview/azure/identity-readme?view=azure-java-stable#defaultazurecredential). For example via Azure CLI or Azure PowerShell. -### Association between schema with topic +## Messaging -To properly present information about topics and version and to recognize which schema is assigned to version, Nussknacker follow conventions: -- For Confluent-based implementation it uses [TopicNameStrategy for subject names](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy). - It means that it looks for schemas available at `\-(key or value)`. For example for topic `transactions`, it looks for schemas at `transactions-key` for key and `transactions-value` subject for value -- In Azure Schema Registry, subjects concept doesn't exist - schemas are grouped by the same schema name. Because of that, Nussknacker introduces - own convention for association between schema and topic: schema name should be in format: CamelCasedTopicNameKey for keys and CamelCasedTopicNameValue for values. - For example for `input-events` topic, schema name should be named `InputEventsKey` for key or `InputEventsValue` for value. Be aware that it may require change of schema name - not only in Azure portal but also inside schema content - those names should be the same to make serialization works correctly - -### Schema and payload types +To communicate with running Nussknacker's scenarios via Kafka sources and sinks you can use for example standard kafka-cli commands +like `kafka-console-consumer`, `kafka-console-producer`, [kcat](https://github.com/edenhill/kcat), Confluent's +`kafka-avro-console-consumer`, `kafka-avro-console-producer` commands for Confluent-based Avro encoded messages or graphical tools like [AKHQ](https://akhq.io). + +Be aware that Azure-based Avro encoded messages have a little different format than Confluent once - Schema ID is passed in headers instead of payload. +It can be less supported by some available tools. See [Schema Registry comparison section](#schema-registry-comparison) for details. + +### Message Payload By default, Nussknacker supports two combinations of schema type and payload type: -* Avro schema + Avro payload -* JSON schema + JSON payload +* Avro schema + Avro payload (binary format) +* JSON schema + JSON payload (human readable, text format) + +Avro payloads are more concise, because messages contain only values and schema id - without information about message structure like field names. -If you prefer using JSON payload with Avro schema, you can use `avroAsJsonSerialization` configuration setting to change that behaviour ([see Configuration for details](/docs/installation_configuration_guide/ModelConfiguration#common-kafka-configuration)). +Avro payload is compatible with standard Kafka serializers and deserializers delivered by Schema Registry providers. Thanks to that you should be able to send messages to Nussknacker and read messages +produced by Nussknacker using standard tooling available around those Schema Registries. To see how those formats are different, take a look at [Schema Registry comparison section](#schema-registry-comparison) + +For some situations it might be helpful to use JSON payload with Avro schema. Especially when your Schema Registry doesn't support JSON schemas. +You can do that by enabling `avroAsJsonSerialization` configuration setting. ### Schema ID -Nussknacker supports schema evolution. +Each topic can contain messages written using different schema versions. Schema versions are identified by *Schema ID*. +Nussknacker need to know what was the schema used during writing to make message validation and schema evolution possible. +Because of that it need to extract *Schema ID* from the message. -For sources and sinks, you can choose which schema version should be used for syntax [suggestions and validation](/docs/integration/DataTypingAndSchemasHandling.md). +Additionally, in sources and sinks, you can choose which schema version should be used during reading/writing. Thanks to schema evolution mechanism, message in the original format will be evolved to desired format. +This desired schema will be used in [code completion and validation](/docs/integration/DataTypingAndSchemasHandling.md). At runtime Nussknacker determines the schema version of a message value and key in the following way: 1. It checks in `key.schemaId`, `value.schemaId` and Azure-specific `content-type` headers; 2. If no such headers provided, it looks for the magic byte (0x00) and a schema id in the message, [in a format used by Confluent](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format); 3. If the magic byte is not found, it assumes the schema version chosen by the user in the scenario. -### Comparison +### Schema Registry comparison Below you can find a quick comparison of how given schema registry types are handled: -| Schema registry type | What is used for association between schema with topic | Convention | The way how schema id is passed in message | -|----------------------|--------------------------------------------------------|-----------------------------------------------------|---------------------------------------------------------------------------------------------------------------| -| Confluent | Subjects | Subject = \-(key or value) | For Avro: in payload in format: 0x00, 4 bytes schema id, Avro payload | -| | | | For JSON: in `key.schemaId` or `value.schemaId` headers | -| Azure | Schema names | Schema name = \(Key or Value) | For Avro: In header: content-type: avro/binary+schemaId | -| | | | For JSON: in `key.schemaId` or `value.schemaId` headers (only when `avroAsJsonSerialization` option enabled) | +| Schema registry type | What is used for association
between schema with topic | Convention | The way how schema id is passed in message | Payload content | +|----------------------|------------------------------------------------------------|-----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------|---------------------------------------| +| Confluent | Subjects | Subject = `-(key or value)` | For Avro: in payload in format: 0x00, 4 bytes schema id, Avro payload | 0x00, 4 bytes schema id, Avro payload | +| | | | For JSON: in `key.schemaId` or `value.schemaId` headers | JSON payload | +| Azure | Schema names | Schema name = `(Key or Value)` | For Avro: In header: content-type: avro/binary+schemaId | Avro payload | +| | | | For JSON: in `key.schemaId` or `value.schemaId` headers
(only when `avroAsJsonSerialization` option enabled) | JSON payload | -## Configuration +## Configuration details ### Common part The Kafka configuration is part of the Model configuration. All the settings below should be placed relative to `scenarioTypes.ScenarioTypeName.modelConfig` key. You can find the high level structure of the configuration file [here](/docs/installation_configuration_guide/#configuration-areas) Both streaming Engines (Lite and Flink) share some common Kafka settings this section describes them, see respective sections below for details on configuring Kafka for particular Engine (e.g. the keys where the common settings should be placed at). -### Kafka connection configuration +### Available configuration options Important thing to remember is that Kafka server addresses/Schema Registry addresses have to be resolvable from: - Nussknacker Designer host (to enable schema discovery and scenario testing) @@ -132,36 +192,7 @@ Important thing to remember is that Kafka server addresses/Schema Registry addre | schemaRegistryCacheConfig.parsedSchemaAccessExpirationTime | Low | duration | 2 hours | How long parsed schema will be cached after first access to it | | schemaRegistryCacheConfig.maximumSize | Low | number | 10000 | Maximum entries size for each caches: available schemas cache and parsed schema cache | | lowLevelComponentsEnabled | Medium | boolean | false | Add low level (deprecated) Kafka components: 'kafka-json', 'kafka-avro', 'kafka-registry-typed-json' | -| avroAsJsonSerialization | Low | boolean | false | Send and receive json messages serialized using Avro schema | - -### Authentication - -Under the hood Nussknacker uses `kafkaProperties` to configure standard kafka client. It means that all standard client properties -will be respected. Take a look at [Kafka security documentation](https://kafka.apache.org/090/documentation.html#security) -to see detailed examples how connection should be configured. For example for the typical `SASL_SSL` configuration with -credential in `JAAS` format should provide configuration similar to this one: - -``` -kafkaProperties { - "schema.registry.url": "http://schemaregistry:8081" - "bootstrap.servers": "broker1:9092,broker2:9092" - "security.protocol": "SASL_SSL" - "sasl.mechanism": "PLAIN" - "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"some_user\" password=\"some_user\";" -} -``` - -In case if you use your own CA and client+server certificates authentication, you should additionally provide: -`ssl.keystore.location`, `ssl.keystore.password`, `ssl.key.password`, `ssl.truststore.location`, `ssl.truststore.password`. - -To make sure if your configuration is correct, you can test it with standard kafka-cli commands -like `kafka-console-consumer`, `kafka-console-producer`, `kafka-avro-console-consumer`, `kafka-avro-console-producer` or [kcat](https://github.com/edenhill/kcat). - -Some examples: -- [Kafka quickstart](https://kafka.apache.org/documentation/#quickstart_send) -- [Azure Event Hubs for Kafka quickstart](https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/kafka-cli) - -After you'll get properly working set of properties, you just need to move it to Nussknacker's configuration. +| avroAsJsonSerialization | Low | boolean | false | Send and receive json messages desribed using Avro schema | ### Exception handling @@ -182,7 +213,6 @@ Errors can be sent to specified Kafka topic in following json format (see below } ``` - Following properties can be configured (please look at correct engine page : [Lite](/docs/installation_configuration_guide/model/Lite#exception-handling) or [Flink](/docs/installation_configuration_guide/model/Flink#configuring-exception-handling), to see where they should be set): From 9c41a51ea3ced29c5ff43314f60decdfad2a5b63 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Fri, 10 Feb 2023 12:38:59 +0100 Subject: [PATCH 2/2] review --- docs/integration/KafkaIntegration.md | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/docs/integration/KafkaIntegration.md b/docs/integration/KafkaIntegration.md index ac8af2d60bf..bc6e02041fa 100644 --- a/docs/integration/KafkaIntegration.md +++ b/docs/integration/KafkaIntegration.md @@ -24,10 +24,11 @@ This section provides important details of Nussknacker's integration with Kafka ### Schemas -Nussknacker uses schemas for code completions and validations of messages. +Schema defines the format of data. Nussknacker expects that messages in topics are described by the schema. +Nussknacker uses information contained in schemas for code completion and validation of messages. Schema of message can be described in [Avro Schema format](https://avro.apache.org/docs/#schemas) or [JSON Schema format](https://json-schema.org) (Confluent Schema Registry only) -Schemas are retrieved from Schema Registry - *Confluent Schema Registry* and *Azure Schema Registry* are supported. +Schemas are managed by Schema Registry - *Confluent Schema Registry* and *Azure Schema Registry* are supported. To preview schemas or add a new version, you can use tools available on your cloud platform or tools like [AKHQ](https://akhq.io) @@ -82,12 +83,13 @@ After you'll get properly working set of properties, you just need to copy it to ### Schema Registry - Connection +Currently, Nussknacker supports two implementations of Schema Registries: based on *Confluent Schema Registry* and based on *Azure Schema Registry*. + To configure connection Schema Registry, you need to configure at least `schema.registry.url`. It should contain comma separated list of urls to Schema Registry. For the single node installation, it will be just an url. Be aware that contrary to Kafka brokers, Schema Registry urls should start with `https://` or `http://`. -Currently, Nussknacker supports two implementations of Schema Registries: based on *Confluent Schema Registry* and based on *Azure Schema Registry*. -Given implementation is picked based on `schema.registry.url` property. By default, Confluent-based implementation is used. -For urls ended with `.servicebus.windows.net` Azure-based implementation is used. +Nussknacker determines which registry implementation (Confluent or Azure) is used from the `schema.registry.url` property. +If the URL ends with `.servicebus.windows.net`, Nussknacker assumes that Azure schema registry is used; if not Confluent schema registry is assumed. #### Confluent-based Schema Registry - Connection and Authentication @@ -111,11 +113,11 @@ For example via Azure CLI or Azure PowerShell. ## Messaging -To communicate with running Nussknacker's scenarios via Kafka sources and sinks you can use for example standard kafka-cli commands -like `kafka-console-consumer`, `kafka-console-producer`, [kcat](https://github.com/edenhill/kcat), Confluent's -`kafka-avro-console-consumer`, `kafka-avro-console-producer` commands for Confluent-based Avro encoded messages or graphical tools like [AKHQ](https://akhq.io). +You can use standard kafka-cli commands like `kafka-console-consumer`, `kafka-console-producer`, [kcat](https://github.com/edenhill/kcat), Confluent's +`kafka-avro-console-consumer`, `kafka-avro-console-producer` commands for Confluent-based Avro encoded messages or graphical tools like [AKHQ](https://akhq.io) +to interact with kafka source and sink topics used in Nu scenarios. -Be aware that Azure-based Avro encoded messages have a little different format than Confluent once - Schema ID is passed in headers instead of payload. +Be aware that Azure-based Avro encoded messages have a little different format than Confluent - Schema ID is passed in headers instead of payload. It can be less supported by some available tools. See [Schema Registry comparison section](#schema-registry-comparison) for details. ### Message Payload @@ -136,8 +138,8 @@ You can do that by enabling `avroAsJsonSerialization` configuration setting. ### Schema ID Each topic can contain messages written using different schema versions. Schema versions are identified by *Schema ID*. -Nussknacker need to know what was the schema used during writing to make message validation and schema evolution possible. -Because of that it need to extract *Schema ID* from the message. +Nussknacker needs to know what was the schema used during writing to make message validation and schema evolution possible. +Because of that Nussknacker needs to extract *Schema ID* from the message. Additionally, in sources and sinks, you can choose which schema version should be used during reading/writing. Thanks to schema evolution mechanism, message in the original format will be evolved to desired format. This desired schema will be used in [code completion and validation](/docs/integration/DataTypingAndSchemasHandling.md). @@ -192,7 +194,7 @@ Important thing to remember is that Kafka server addresses/Schema Registry addre | schemaRegistryCacheConfig.parsedSchemaAccessExpirationTime | Low | duration | 2 hours | How long parsed schema will be cached after first access to it | | schemaRegistryCacheConfig.maximumSize | Low | number | 10000 | Maximum entries size for each caches: available schemas cache and parsed schema cache | | lowLevelComponentsEnabled | Medium | boolean | false | Add low level (deprecated) Kafka components: 'kafka-json', 'kafka-avro', 'kafka-registry-typed-json' | -| avroAsJsonSerialization | Low | boolean | false | Send and receive json messages desribed using Avro schema | +| avroAsJsonSerialization | Low | boolean | false | Send and receive json messages described using Avro schema | ### Exception handling