Skip to content

Commit

Permalink
[Improve][Connector-V2] RocketMQ Sink add message tag config (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wengys authored and eyys committed Nov 13, 2024
1 parent 53c4046 commit 087898f
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 8 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/RocketMQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Write Rows to a Apache RocketMQ topic.
| access.key | String | no | | When ACL_ENABLED is true, access key cannot be empty |
| secret.key | String | no | | When ACL_ENABLED is true, secret key cannot be empty |
| producer.group | String | no | SeaTunnel-producer-Group | SeaTunnel-producer-Group |
| tag | String | no | - | `RocketMQ` message tag. |
| partition.key.fields | array | no | - | - |
| format | String | no | json | Data format. The default format is json. Optional text format. The default field separator is ",".If you customize the delimiter, add the "field_delimiter" option. |
| field.delimiter | String | no | , | Customize the field delimiter for data format. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public class ProducerConfig extends Config {
.noDefaultValue()
.withDescription("RocketMq topic name. ");

public static final Option<String> TAG =
Options.key("tag")
.stringType()
.noDefaultValue()
.withDescription("RocketMq message tag.");

public static final Option<String> PRODUCER_GROUP =
Options.key("producer.group")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,44 @@
@Slf4j
public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {
private final String topic;
private final String tag;
private final SerializationSchema keySerialization;
private final SerializationSchema valueSerialization;

public DefaultSeaTunnelRowSerializer(
String topic,
String tag,
SeaTunnelRowType seaTunnelRowType,
SchemaFormat format,
String delimiter) {
this(
topic,
tag,
element -> null,
createSerializationSchema(seaTunnelRowType, format, delimiter));
}

public DefaultSeaTunnelRowSerializer(
String topic,
String tag,
List<String> keyFieldNames,
SeaTunnelRowType seaTunnelRowType,
SchemaFormat format,
String delimiter) {
this(
topic,
tag,
createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
createSerializationSchema(seaTunnelRowType, format, delimiter));
}

public DefaultSeaTunnelRowSerializer(
String topic,
String tag,
SerializationSchema keySerialization,
SerializationSchema valueSerialization) {
this.topic = topic;
this.tag = tag;
this.keySerialization = keySerialization;
this.valueSerialization = valueSerialization;
}
Expand Down Expand Up @@ -123,6 +130,6 @@ public Message serializeRow(SeaTunnelRow row) {
return null;
}
byte[] key = keySerialization.serialize(row);
return new Message(topic, null, key == null ? null : new String(key), value);
return new Message(topic, tag, key == null ? null : new String(key), value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class ProducerMetadata implements Serializable {
private RocketMqBaseConfiguration configuration;
/** send topic */
private String topic;
/** message tag */
private String tag;

/** partition key fields */
private List<String> partitionKeyFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public void prepare(Config config) throws PrepareFailException {
}
producerMetadata = new ProducerMetadata();
producerMetadata.setTopic(config.getString(ProducerConfig.TOPIC.key()));
if (config.hasPath(ProducerConfig.TAG.key())) {
producerMetadata.setTag(config.getString(ProducerConfig.TAG.key()));
}
RocketMqBaseConfiguration.Builder baseConfigurationBuilder =
RocketMqBaseConfiguration.newBuilder()
.producer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
SeaTunnelRowType seaTunnelRowType) {
return new DefaultSeaTunnelRowSerializer(
producerMetadata.getTopic(),
producerMetadata.getTag(),
getPartitionKeyFields(seaTunnelRowType),
seaTunnelRowType,
producerMetadata.getFormat(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.connector.rocketmq;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
public class RocketMqConsumerMessage {
private String value;
private String tag;
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public void startUp() throws Exception {
DefaultSeaTunnelRowSerializer serializer =
new DefaultSeaTunnelRowSerializer(
"test_topic_source",
null,
SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
Expand Down Expand Up @@ -183,7 +184,7 @@ public void testSinkRocketMq(TestContainer container) throws IOException, Interr
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

String topicName = "test_topic";
Map<String, String> data = getRocketMqConsumerData(topicName);
Map<String, RocketMqConsumerMessage> data = getRocketMqConsumerData(topicName);
ObjectMapper objectMapper = new ObjectMapper();
String key = data.keySet().iterator().next();
ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
Expand All @@ -199,7 +200,7 @@ public void testTextFormatSinkRocketMq(TestContainer container)
container.executeJob("/rocketmq-text-sink_fake_to_rocketmq.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
String topicName = "test_text_topic";
Map<String, String> data = getRocketMqConsumerData(topicName);
Map<String, RocketMqConsumerMessage> data = getRocketMqConsumerData(topicName);
Assertions.assertEquals(10, data.size());
}

Expand All @@ -209,6 +210,7 @@ public void testSourceRocketMqTextToConsole(TestContainer container)
DefaultSeaTunnelRowSerializer serializer =
new DefaultSeaTunnelRowSerializer(
"test_topic_text",
null,
SEATUNNEL_ROW_TYPE,
SchemaFormat.TEXT,
DEFAULT_FIELD_DELIMITER);
Expand All @@ -228,6 +230,7 @@ public void testSourceRocketMqTextToConsoleWithOffsetCheck(TestContainer contain
DefaultSeaTunnelRowSerializer serializer =
new DefaultSeaTunnelRowSerializer(
"test_topic_text_offset_check",
null,
SEATUNNEL_ROW_TYPE,
SchemaFormat.TEXT,
DEFAULT_FIELD_DELIMITER);
Expand All @@ -245,6 +248,7 @@ public void testSourceRocketMqJsonToConsole(TestContainer container)
DefaultSeaTunnelRowSerializer serializer =
new DefaultSeaTunnelRowSerializer(
"test_topic_json",
null,
SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
Expand Down Expand Up @@ -292,13 +296,33 @@ public void testSourceRocketMqStartConfig(TestContainer container)
DefaultSeaTunnelRowSerializer serializer =
new DefaultSeaTunnelRowSerializer(
"test_topic_group",
null,
SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), "test_topic_group", 100, 150);
testRocketMqGroupOffsetsToConsole(container);
}

@TestTemplate
public void testSinkRocketMqMessageTag(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/rocketmq-sink_fake_to_rocketmq_message_tag.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

String topicName = "test_topic_message_tag";
String tag = "test_tag";
Map<String, RocketMqConsumerMessage> data = getRocketMqConsumerData(topicName);
ObjectMapper objectMapper = new ObjectMapper();
String key = data.keySet().iterator().next();
ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
Assertions.assertTrue(objectNode.has("c_map"));
Assertions.assertTrue(objectNode.has("c_string"));
Assertions.assertEquals(10, data.size());
Assertions.assertEquals(tag, data.get(key).getTag());
}

public void testRocketMqGroupOffsetsToConsole(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
Expand Down Expand Up @@ -334,8 +358,8 @@ private void generateTestData(
}
}

private Map<String, String> getRocketMqConsumerData(String topicName) {
Map<String, String> data = new HashMap<>();
private Map<String, RocketMqConsumerMessage> getRocketMqConsumerData(String topicName) {
Map<String, RocketMqConsumerMessage> data = new HashMap<>();
try {
DefaultLitePullConsumer consumer =
RocketMqAdminUtil.initDefaultLitePullConsumer(newConfiguration(), false);
Expand Down Expand Up @@ -373,9 +397,11 @@ private Map<String, String> getRocketMqConsumerData(String topicName) {
break;
}
for (MessageExt message : messages) {
data.put(
message.getKeys(),
new String(message.getBody(), StandardCharsets.UTF_8));
RocketMqConsumerMessage consumerMessage =
new RocketMqConsumerMessage(
new String(message.getBody(), StandardCharsets.UTF_8),
message.getTags());
data.put(message.getKeys(), consumerMessage);
consumer.getOffsetStore()
.updateConsumeOffsetToBroker(
new MessageQueue(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"

#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, smallint>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

transform {
}

sink {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topic = "test_topic_message_tag"
partition.key.fields = ["c_map", "c_string"]
producer.send.sync = true
tag = "test_tag"
}
}

0 comments on commit 087898f

Please sign in to comment.