diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java index b949520..5319082 100644 --- a/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java +++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/sink/SinkRecord.java @@ -1,3 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.openmessaging.connector.api.component.task.sink; import io.openmessaging.KeyValue; @@ -11,7 +25,7 @@ */ public class SinkRecord extends ConnectRecord { - private Integer queueId; + private final Integer queueId; private final String brokerName; private final long queueOffset; @@ -51,22 +65,20 @@ public long queueOffset(){ * new record * * @param topic - * @param queueId * @param schema * @param data * @param timestamp * @return */ @Override - public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp) { - return newRecord(topic,queueId,schema,data,timestamp, null); + public SinkRecord newRecord(String topic, Schema schema, Object data, Long timestamp) { + return newRecord(topic, schema,data,timestamp, null); } /** * new record * * @param topic - * @param queueId * @param schema * @param data * @param timestamp @@ -74,8 +86,8 @@ public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object * @return */ @Override - public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions) { - return new SinkRecord(brokerName(), queueOffset(), topic, queueId, timestamp, schema, data, extensions); + public SinkRecord newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions) { + return new SinkRecord(brokerName(), queueOffset(), topic, queueId(), timestamp, schema, data, extensions); } @Override @@ -84,18 +96,19 @@ public boolean equals(Object o) { if (!(o instanceof SinkRecord)) return false; if (!super.equals(o)) return false; SinkRecord that = (SinkRecord) o; - return queueOffset == that.queueOffset && Objects.equals(brokerName, that.brokerName); + return queueOffset == that.queueOffset && Objects.equals(queueId, that.queueId) && Objects.equals(brokerName, that.brokerName); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), brokerName, queueOffset); + return Objects.hash(super.hashCode(), queueId, brokerName, queueOffset); } @Override public String toString() { return "SinkRecord{" + - "brokerName='" + brokerName + '\'' + + "queueId=" + queueId + + ", brokerName='" + brokerName + '\'' + ", queueOffset=" + queueOffset + "} " + super.toString(); } diff --git a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java index 571fdad..155b22b 100644 --- a/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java +++ b/connector/src/main/java/io/openmessaging/connector/api/component/task/source/SourceRecord.java @@ -1,3 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.openmessaging.connector.api.component.task.source; import io.openmessaging.KeyValue; @@ -41,22 +55,20 @@ public RecordPosition position() { * new record * * @param topic - * @param queueId * @param schema * @param data * @param timestamp * @return */ @Override - public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp) { - return newRecord(topic, queueId, schema , data, timestamp, null ); + public SourceRecord newRecord(String topic, Schema schema, Object data, Long timestamp) { + return newRecord(topic, schema , data, timestamp, null ); } /** * new record * * @param topic - * @param queueId * @param schema * @param data * @param timestamp @@ -64,8 +76,8 @@ public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Obje * @return */ @Override - public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions) { - return new SourceRecord(position().getPartition(), position().getOffset(), topic, queueId, timestamp, schema, data, extensions); + public SourceRecord newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions) { + return new SourceRecord(position().getPartition(), position().getOffset(), topic, timestamp, schema, data, extensions); } @Override diff --git a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java index d7d619e..c1c2e29 100644 --- a/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java +++ b/connector/src/main/java/io/openmessaging/connector/api/data/ConnectRecord.java @@ -64,25 +64,23 @@ public ConnectRecord(String topic, Long timestamp, Schema schema, Object data,Ke /** * new record * @param topic - * @param queueId * @param schema * @param data * @param timestamp * @return */ - public abstract R newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp); + public abstract R newRecord(String topic, Schema schema, Object data, Long timestamp); /** * new record * @param topic - * @param queueId * @param schema * @param data * @param timestamp * @param extensions * @return */ - public abstract R newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions); + public abstract R newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions); public String getTopic() {