Skip to content

Commit

Permalink
merge new features openmessaging#47
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jul 20, 2022
2 parents 2fa5bb1 + 58a9143 commit 7036d48
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
private final long queueOffset;

public SinkRecord(String brokerName, long queueOffset,String topic, Integer queueId, Schema schema, Object data) {
this(brokerName, queueOffset, topic, queueId, null, schema, data ,null );
this(brokerName, queueOffset, topic, queueId, null, null, schema, data ,null );
}

public SinkRecord(String brokerName, long queueOffset,String topic, Integer queueId, Schema schema, Object data, KeyValue extensions) {
this(brokerName, queueOffset, topic, queueId, null, schema, data ,extensions );
public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Schema keySchema, Object key, Schema schema, Object data) {
this(brokerName, queueOffset, topic, queueId, null, keySchema, key, schema, data ,null );
}

public SinkRecord(String brokerName, long queueOffset,String topic, Integer queueId, Long timestamp, Schema schema, Object data) {
this(brokerName, queueOffset, topic, queueId, timestamp, schema, data ,null );
public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Schema keySchema, Object key, Schema schema, Object data, KeyValue extensions) {
this(brokerName, queueOffset, topic, queueId, null, keySchema, key, schema, data ,extensions );
}

public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Long timestamp, Schema schema, Object data, KeyValue extensions) {
super(topic, timestamp, schema, data, extensions);
public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Long timestamp, Schema keySchema, Object key, Schema schema, Object data, KeyValue extensions) {
super(topic, timestamp, keySchema, key, schema, data, extensions);
this.brokerName = brokerName;
this.queueOffset = queueOffset;
this.queueId = queueId;
Expand Down Expand Up @@ -71,8 +71,8 @@ public long queueOffset(){
* @return
*/
@Override
public SinkRecord newRecord(String topic, Schema schema, Object data, Long timestamp) {
return newRecord(topic, schema,data,timestamp, null);
public SinkRecord newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data) {
return newRecord(topic, timestamp, keySchema, key, schema, data, null);
}

/**
Expand All @@ -86,8 +86,8 @@ public SinkRecord newRecord(String topic, Schema schema, Object data, Long times
* @return
*/
@Override
public SinkRecord newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions) {
return new SinkRecord(brokerName(), queueOffset(), topic, queueId(), timestamp, schema, data, extensions);
public SinkRecord newRecord(String topic, Long timestamp, Schema keySchema, Object key, Schema schema, Object data, KeyValue extensions) {
return new SinkRecord(brokerName(), queueOffset(), topic, queueId(), timestamp, keySchema, key, schema, data, extensions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@ public class SourceRecord extends ConnectRecord<SourceRecord> {

private final RecordPosition position;

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Schema schema, Object data, KeyValue extensions) {
this(recordPartition, recordOffset, topic, null,schema , data, extensions);
public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Schema schema, Object data) {
this(recordPartition, recordOffset, topic, null, null, null, schema , data, null);
}

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Schema schema, Object data) {
this(recordPartition, recordOffset, topic, null, schema , data, null);
public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Schema keySchema, Object key, Schema schema, Object data) {
this(recordPartition, recordOffset, topic, keySchema, key, schema , data, null);
}

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Long timestamp, Schema schema, Object data) {
this(recordPartition, recordOffset, topic, timestamp, schema , data, null);
public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Schema keySchema, Object key, Schema valueSchema, Object value, KeyValue extensions) {
this(recordPartition, recordOffset, topic, null, keySchema, key, valueSchema, value, extensions);
}

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Long timestamp, Schema schema, Object data, KeyValue extensions) {
super(topic, timestamp, schema, data, extensions);
public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Long timestamp, Schema keySchema, Object key, Schema schema, Object data, KeyValue extensions) {
super(topic, timestamp, keySchema, key, schema, data, extensions);
this.position = new RecordPosition(recordPartition, recordOffset);
}

Expand All @@ -61,8 +61,8 @@ public RecordPosition position() {
* @return
*/
@Override
public SourceRecord newRecord(String topic, Schema schema, Object data, Long timestamp) {
return newRecord(topic, schema , data, timestamp, null );
public SourceRecord newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data) {
return newRecord(topic, timestamp, keySchema, key, schema , data, null );
}

/**
Expand All @@ -76,8 +76,8 @@ public SourceRecord newRecord(String topic, Schema schema, Object data, Long tim
* @return
*/
@Override
public SourceRecord newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions) {
return new SourceRecord(position().getPartition(), position().getOffset(), topic, timestamp, schema, data, extensions);
public SourceRecord newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data, KeyValue extensions) {
return new SourceRecord(position().getPartition(), position().getOffset(), topic, timestamp, keySchema, key, schema, data, extensions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.data.ConnectRecord;

import java.util.List;
import java.util.Map;

/**
* The source task API definition is used to define the logic for data pulling
Expand All @@ -39,20 +41,19 @@ public void init(SourceTaskContext sourceTaskContext) {
public abstract List<SourceRecord> poll() throws InterruptedException;

/**
* <p>
* Commit an individual {@link ConnectRecord} when the callback from the producer client is received.
* </p>
* <p>
* SourceTasks are not required to implement this functionality;Connect System will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
* </p>
*
* @throws InterruptedException task thread interupt exception
* @param records connect records
* batch commit
* @param records
* @param metadata
*/
public void commit(final List<SourceRecord> records) throws InterruptedException {
commit();
public void commit(final List<SourceRecord> records, Map<String,String> metadata) throws InterruptedException {
}
/**
* commit record
* @param record
* @param metadata
*/
public void commit(final SourceRecord record, Map<String,String> metadata) throws InterruptedException {

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
*/
private Long timestamp;

/**
* key schema
*/
private Schema keySchema;

/**
* Payload of the key entry.
*/
private Object key;

/**
* Schema of the data entry.
*/
Expand All @@ -52,12 +62,27 @@ public ConnectRecord(String topic, Long timestamp, Schema schema, Object data) {
this(topic, timestamp, schema, data, null);
}

public ConnectRecord(String topic, Long timestamp, Schema schema, Object data,KeyValue extensions) {
public ConnectRecord(String topic, Long timestamp, Schema schema, Object data, KeyValue extensions) {
this(topic, timestamp, null, null, schema, data, extensions);
}

public ConnectRecord(String topic, Long timestamp,Schema keySchema, Object key, Schema schema, Object data) {
this(topic, timestamp, keySchema, key, schema, data, null);
}

public ConnectRecord(String topic, Long timestamp, Schema keySchema, Object key, Schema schema, Object data , KeyValue extensions) {
this.topic = topic;
this.schema = schema;
this.timestamp = timestamp;
// key
this.keySchema = keySchema;
this.key = key;

// value
this.schema = schema;
this.data = data;
// extension
this.extensions = extensions;

}


Expand All @@ -69,7 +94,7 @@ public ConnectRecord(String topic, Long timestamp, Schema schema, Object data,Ke
* @param timestamp
* @return
*/
public abstract R newRecord(String topic, Schema schema, Object data, Long timestamp);
public abstract R newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data);

/**
* new record
Expand All @@ -80,7 +105,7 @@ public ConnectRecord(String topic, Long timestamp, Schema schema, Object data,Ke
* @param extensions
* @return
*/
public abstract R newRecord(String topic, Schema schema, Object data, Long timestamp, KeyValue extensions);
public abstract R newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data, KeyValue extensions);


public String getTopic() {
Expand All @@ -99,6 +124,22 @@ public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}

public Schema getKeySchema() {
return keySchema;
}

public void setKeySchema(Schema keySchema) {
this.keySchema = keySchema;
}

public Object getKey() {
return key;
}

public void setKey(Object key) {
this.key = key;
}

public Schema getSchema() {
return schema;
}
Expand All @@ -123,6 +164,10 @@ public void setExtensions(KeyValue extensions) {
this.extensions = extensions;
}

/**
* add extension by KeyValue
* @param extensions
*/
public void addExtension(KeyValue extensions) {
if (this.extensions == null) {
this.extensions = new DefaultKeyValue();
Expand All @@ -133,39 +178,50 @@ public void addExtension(KeyValue extensions) {
}
}

/**
* add extension by key and value
* @param key
* @param value
*/
public void addExtension(String key, String value) {
if (this.extensions == null) {
this.extensions = new DefaultKeyValue();
}
this.extensions.put(key, value);
}

/**
* get extension value
* @param key
* @return
*/
public String getExtension(String key) {
if (this.extensions == null) {
return null;
}
return this.extensions.getString(key);
}


@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ConnectRecord)) return false;
ConnectRecord<?> that = (ConnectRecord<?>) o;
return Objects.equals(topic, that.topic) && Objects.equals(timestamp, that.timestamp) && Objects.equals(schema, that.schema) && Objects.equals(data, that.data) && Objects.equals(extensions, that.extensions);
return Objects.equals(topic, that.topic) && Objects.equals(timestamp, that.timestamp) && Objects.equals(keySchema, that.keySchema) && Objects.equals(key, that.key) && Objects.equals(schema, that.schema) && Objects.equals(data, that.data) && Objects.equals(extensions, that.extensions);
}

@Override
public int hashCode() {
return Objects.hash(topic, timestamp, schema, data, extensions);
return Objects.hash(topic, timestamp, keySchema, key, schema, data, extensions);
}

@Override
public String toString() {
return "ConnectRecord{" +
"topic='" + topic + '\'' +
", timestamp=" + timestamp +
", keySchema=" + keySchema +
", key=" + key +
", schema=" + schema +
", data=" + data +
", extensions=" + extensions +
Expand Down

0 comments on commit 7036d48

Please sign in to comment.