From 4fa6c36aba14f62790b351b57abf465de609724b Mon Sep 17 00:00:00 2001 From: xiaoyi Date: Wed, 20 Jul 2022 14:23:58 +0800 Subject: [PATCH] [ISSUE #53]Connectrecord add a key field to identify the unique data (#54) * SinkTask and SourceTask implement the validate method https://github.com/apache/rocketmq-connect/issues/85 * Adjust the init and start methods of the component interface * Set pause and resume to deprecated methods. It feels like they can be removed * Add struct object and optimize schema and schema builder API #41 * add offset storage writer #41 * add getter and setter method #41 * add SchemaAndValue #41 * add logical type #41 * Schemabuilder add required method * schema add hashCode and equals method * fixed doc method * Field add equals and hashcode method * optimize api #85 * Optimize transform api #45 * Optimize transform api and add RecordConverter * Connectrecord add a key field to identify the unique data #53 --- .../connector/api/data/ConnectRecord.java | 105 ++++++++++++++---- 1 file changed, 82 insertions(+), 23 deletions(-) diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java index cf79206..67f8411 100644 --- a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java +++ b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java @@ -29,6 +29,16 @@ public class ConnectRecord { */ private Long timestamp; + /** + * key schema + */ + private Schema keySchema; + + /** + * Payload of the key entry. + */ + private Object key; + /** * Schema of the data entry. */ @@ -63,6 +73,22 @@ public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset, this.data = data; } + public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset, + Long timestamp,Schema keySchema, Object key, Schema schema, + Object data) { + this.position = new RecordPosition(recordPartition, recordOffset); + this.timestamp = timestamp; + + // key + this.keySchema = keySchema; + this.key = key; + + // value + this.schema = schema; + this.data = data; + + } + public Long getTimestamp() { return timestamp; } @@ -71,6 +97,22 @@ public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } + public Schema getKeySchema() { + return keySchema; + } + + public void setKeySchema(Schema keySchema) { + this.keySchema = keySchema; + } + + public Object getKey() { + return key; + } + + public void setKey(Object key) { + this.key = key; + } + public Schema getSchema() { return schema; } @@ -95,6 +137,18 @@ public void setExtensions(KeyValue extensions) { this.extensions = extensions; } + public RecordPosition getPosition() { + return position; + } + + public void setPosition(RecordPosition position) { + this.position = position; + } + + /** + * add extension by KeyValue + * @param extensions + */ public void addExtension(KeyValue extensions) { if (this.extensions == null) { this.extensions = new DefaultKeyValue(); @@ -105,6 +159,11 @@ public void addExtension(KeyValue extensions) { } } + /** + * add extension by key and value + * @param key + * @param value + */ public void addExtension(String key, String value) { if (this.extensions == null) { this.extensions = new DefaultKeyValue(); @@ -112,6 +171,11 @@ public void addExtension(String key, String value) { this.extensions.put(key, value); } + /** + * get extension value + * @param key + * @return + */ public String getExtension(String key) { if (this.extensions == null) { return null; @@ -119,34 +183,29 @@ public String getExtension(String key) { return this.extensions.getString(key); } - public RecordPosition getPosition() { - return position; - } - - public void setPosition(RecordPosition position) { - this.position = position; - } - - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (!(o instanceof ConnectRecord)) - return false; + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ConnectRecord)) return false; ConnectRecord that = (ConnectRecord) o; - return Objects.equals(timestamp, that.timestamp) && Objects.equals(schema, that.schema) && Objects.equals(data, that.data) && Objects.equals(position, that.position) && Objects.equals(extensions, that.extensions); + return Objects.equals(timestamp, that.timestamp) && Objects.equals(keySchema, that.keySchema) && Objects.equals(key, that.key) && Objects.equals(schema, that.schema) && Objects.equals(data, that.data) && Objects.equals(position, that.position) && Objects.equals(extensions, that.extensions); } - @Override public int hashCode() { - return Objects.hash(timestamp, schema, data, position, extensions); + @Override + public int hashCode() { + return Objects.hash(timestamp, keySchema, key, schema, data, position, extensions); } - @Override public String toString() { + @Override + public String toString() { return "ConnectRecord{" + - "timestamp=" + timestamp + - ", schema=" + schema + - ", data=" + data + - ", position=" + position + - ", extensions=" + extensions + - '}'; + "timestamp=" + timestamp + + ", keySchema=" + keySchema + + ", key=" + key + + ", schema=" + schema + + ", data=" + data + + ", position=" + position + + ", extensions=" + extensions + + '}'; } }