При необходимости можно устанавить параметры. Доступные параметры для продюсера помечены в колонке "C/P" символами "P" или "*" (см. https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
По умолчанию для продюсера можно не устанавливать никаких параметров.
УстановитьПараметр(ИмяПараметра, ЗначениеПараметра);
ИмяПараметра - Строка. Имя параметра, согласно колонке Property в данной таблице
ЗначениеПараметра - Строка. Доступные значения приведены в колонках Range и Default в данной таблице
ИнициализироватьПродюсера(Брокеры)
Брокеры - Строка. Список брокеров кластера, разделенные запятой.
Если в кластере несколько брокеров, то достаточно указать одного основного брокера. Брокер содержит все необходимую информацию об других связанных с ним брокерах в кластере.
Библиотека, на которой основана компонента, по своей природе имеет асинхронную архитектуру взаимодействия с брокерами.
Для асинхронной отправки сообщения необходимо использовать метод ОтправитьСообщение. Метод возвращает булево значение "Истина" практически во всех случаях, кроме следующих кейсов:
- Размер сообщения превышает установленный максимальный размер: message.max.bytes
- Запрошенный раздел (партиция) неизвестен в кластере Kafka
- Указанная тема не найдена в кластере Kafka (только если отключено автосоздание топиков на брокере)
- Произошла исключительная ситуация. Текст ошибки можно получить функцией ПолучитьСообщениеОбОшибке.
ОтправитьСообщение(Сообщение, Топик, Партиция, Ключ, Заголовки)
Сообщение - Строка, ДвоичныеДанные Тело сообщения, например json строка.
Топик - Строка.
Партиция - Число. Если не указано значение, то по умолчанию запись производится в 0 раздел.
Ключ - Строка. Необязательный параметр. Ключ сообщения, который может использоваться в темах с уплотнением сообщений по ключу.
Заголовки - Строка. Перечень заголовков, состоящих из ключа и значения в следующем формате "ключ1,значение1;ключ2,значение2"
В случае асинхронной отправки - результаты доставки можно получить с помощью следующих архитектурных подходов:
- При создании экземпляра продюсера, создаем так же экземпляр консьюмера, подписываемся на тему, куда производится отправка. После отправки каждого сообщения - получаем с помощью экземпляра консьюмера сообщение, вычисляем хэш отправленного и полученного сообщения (или же используем ключи сообщений). Сравниваем хэши или ключи - в случае совпадения - считаем сообщение доставленным.
- Настраиваем считывание из файла лога. Если при отправке был указан key сообщения, в лог будет выведен key и результат доставки. Если key не был указан - в лог будет выведен хэш (md5) сообщения.
Порядок полей в логе: Дата время: Хэш или ключ сообщения, Статус доставки:Причина, Размер сообщения в байтах, Топик, Смещение (-1001 если все плохо), Партиция (-1 если все плохо), Брокер (-1 если все плохо).
Примеры строк в логе доставки
2023-09-24 09:59:18.54614692 Key:205f03b5-9c64-4752-9fb4-0b8695789648, Status:Persisted, Details:Success, Size:28, Topic:testTopic, Offset:20553, Partition:0, BrokerID:0
2023-09-24 10:01:57.958599455 Hash:ed59c8ed6c3209e9e6e434c2d9fbe52a, Status:Persisted, Details:Success, Size:28, Topic:testTopic, Offset:20554, Partition:0, BrokerID:0
2023-09-24 10:18:32.495522885 Error: Configuration property "socket.blocking.max.ms" value 0 is outside allowed range 1..60000
2023-09-24 12:56:10.311876162 Hash:dbee3cf28689a286066db820f6a8583e, Status:NotPersisted, Details:Local Message timed out, Size:9546, Topic:testTopic, Offset:-1001, Partition:-1, BrokerID:-1
Для синхронной отправки необходимо использовать метод ОтправитьСообщениеСОжиданиемРезультата, состав параметров аналогичен методу асинхронной отправки.
Метод возвращает значение типа булево - "Истина", если при доставке все брокеры вернули подтверждение о приемке сообщения, "Ложь", если хотя бы один брокер не подтвердил получение или не было получено подтверждение от брокеров в течении 20 секунд. Текст ошибки можно получить функцией ПолучитьСообщениеОбОшибке.
Дополнительно, в рамках синхронной отправки рекомендуется устанавливать параметры queue.buffering.max.ms и socket.blocking.max.ms в самые минимальные значения, чтобы во внутреннем буфере сообщений не происходило накопление данных и сообщения могли отправляться сразу, минуя буфер.
Так же следует обратить внимание на параметр message.timeout.ms. Данным параметром мы указываем, сколько времени в мс. мы должны ждать подтверждения от брокеров. Брокеры могут и не отправить подтверждения, если, к примеру, они не доступны. В этом случае, рекомендуется устанавливать данный параметр в диапозоне от 1000 до 20000 мс. В противном случае - недоступности брокеров и включеном логировании в компоненте - в логе продюсера не будет никакой информации об проблеме.
УстановитьПараметр("queue.buffering.max.ms", "1");
УстановитьПараметр("socket.blocking.max.ms", "1");
УстановитьПараметр("message.timeout.ms", "5000");
ОтправитьСообщениеСОжиданиемРезультата(Сообщение, Топик, Партиция, Ключ, Заголовки)
Список параметров метода идентичен методу ОтправитьСообщение
После того как была произведена отправка всех необходимых сообщений и больше нет необходимости в использовании инстанса продюсера - необходимо обяхательно производить остановку продюсера.
ОстановитьПродюсера()
Если ТипЗнч(Данные) <> Тип("Массив") Тогда
Сообщения = ОбщегоНазначенияКлиентСервер.ЗначениеВМассиве(Данные);
Иначе
Сообщения = Данные;
КонецЕсли;
Если ТипЗнч(Параметры) = Тип("Соответствие") ИЛИ (ТипЗнч(Параметры) = Тип("Структура")) Тогда
Для каждого Параметр Из Параметры Цикл
Компонента.УстановитьПараметр(Строка(Параметр.Ключ), Строка(Параметр.Значение));
КонецЦикла;
КонецЕсли;
// инициализируем подключение к брокеру
// достаточно указать одного брокера из кластера, либо есть возможность перечислить брокеров через ,
РезультатИнициализации = Компонента.ИнициализироватьПродюсера(Брокеры);
Если РезультатИнициализации Тогда
Для каждого СообщениеВКафку Из Сообщения Цикл
// Parametr1 - Тело сообщения (строка)
// Parametr2 - Топик (строка)
// Parametr3 - Номер партиции, по умолчанию = -1 (число)
// Parametr4 - Произвольный ключ, идентифицирующий сообщение, например GUID (строка)
// Parametr5 - Заголовки (строка), "ключ1,значение1;ключ2,значение2"
РезультатОтправки = Компонента.ОтправитьСообщение(СообщениеВКафку, Топик,, Ключ, Заголовки);
Если Не РезультатОтправки Тогда
Сообщить(Компонента.ПолучитьСообщениеОбОшибке())
КонецЕсли;
КонецЦикла;
Компонента.ОстановитьПродюсера();
Компонента = Неопределено;
КонецЕсли;