Skip to content

Commit

Permalink
Optimize ConnectRecord remove queueId field openmessaging#47
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jul 13, 2022
1 parent ea7e3c0 commit e94593e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.connector.api.component.task.sink;

import io.openmessaging.KeyValue;
Expand All @@ -11,7 +25,7 @@
*/
public class SinkRecord extends ConnectRecord<SinkRecord> {

private Integer queueId;
private final Integer queueId;
private final String brokerName;
private final long queueOffset;

Expand Down Expand Up @@ -51,31 +65,29 @@ public long queueOffset(){
* new record
*
* @param topic
* @param queueId
* @param schema
* @param data
* @param timestamp
* @return
*/
@Override
public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp) {
return newRecord(topic,queueId,schema,data,timestamp, null);
public SinkRecord newRecord(String topic, Schema schema, Object data, Long timestamp) {
return newRecord(topic, schema,data,timestamp, null);
}

/**
* new record
*
* @param topic
* @param queueId
* @param schema
* @param data
* @param timestamp
* @param extensions
* @return
*/
@Override
public SinkRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions) {
return new SinkRecord(brokerName(), queueOffset(), topic, queueId, timestamp, schema, data, extensions);
public SinkRecord newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions) {
return new SinkRecord(brokerName(), queueOffset(), topic, queueId(), timestamp, schema, data, extensions);
}

@Override
Expand All @@ -84,18 +96,19 @@ public boolean equals(Object o) {
if (!(o instanceof SinkRecord)) return false;
if (!super.equals(o)) return false;
SinkRecord that = (SinkRecord) o;
return queueOffset == that.queueOffset && Objects.equals(brokerName, that.brokerName);
return queueOffset == that.queueOffset && Objects.equals(queueId, that.queueId) && Objects.equals(brokerName, that.brokerName);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), brokerName, queueOffset);
return Objects.hash(super.hashCode(), queueId, brokerName, queueOffset);
}

@Override
public String toString() {
return "SinkRecord{" +
"brokerName='" + brokerName + '\'' +
"queueId=" + queueId +
", brokerName='" + brokerName + '\'' +
", queueOffset=" + queueOffset +
"} " + super.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.connector.api.component.task.source;

import io.openmessaging.KeyValue;
Expand Down Expand Up @@ -41,31 +55,29 @@ public RecordPosition position() {
* new record
*
* @param topic
* @param queueId
* @param schema
* @param data
* @param timestamp
* @return
*/
@Override
public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp) {
return newRecord(topic, queueId, schema , data, timestamp, null );
public SourceRecord newRecord(String topic, Schema schema, Object data, Long timestamp) {
return newRecord(topic, schema , data, timestamp, null );
}

/**
* new record
*
* @param topic
* @param queueId
* @param schema
* @param data
* @param timestamp
* @param extensions
* @return
*/
@Override
public SourceRecord newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions) {
return new SourceRecord(position().getPartition(), position().getOffset(), topic, queueId, timestamp, schema, data, extensions);
public SourceRecord newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions) {
return new SourceRecord(position().getPartition(), position().getOffset(), topic, timestamp, schema, data, extensions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,25 +64,23 @@ public ConnectRecord(String topic, Long timestamp, Schema schema, Object data,Ke
/**
* new record
* @param topic
* @param queueId
* @param schema
* @param data
* @param timestamp
* @return
*/
public abstract R newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp);
public abstract R newRecord(String topic, Schema schema, Object data, Long timestamp);

/**
* new record
* @param topic
* @param queueId
* @param schema
* @param data
* @param timestamp
* @param extensions
* @return
*/
public abstract R newRecord(String topic, Integer queueId, Schema schema, Object data, Long timestamp, KeyValue extensions);
public abstract R newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions);


public String getTopic() {
Expand Down

0 comments on commit e94593e

Please sign in to comment.