Skip to content

Commit

Permalink
update canal source connector
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed May 15, 2024
1 parent 2302346 commit d1c4689
Show file tree
Hide file tree
Showing 21 changed files with 328 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.eventmesh.common.config.connector.offset;

import org.apache.eventmesh.common.remote.job.DataSourceType;

import java.util.Map;

import lombok.Data;
Expand All @@ -29,4 +31,8 @@ public class OffsetStorageConfig {
private String offsetStorageAddr;

private Map<String, String> extensions;

private DataSourceType dataSourceType;

private DataSourceType dataSinkType;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,30 @@ public class RecordOffset {
* if pull message from mq key=queueOffset,
* value=queueOffset value
*/
private Map<String, ?> offset = new HashMap<>();
// private Map<String, ?> offset = new HashMap<>();
private Class<? extends RecordOffset> clazz;

public RecordOffset() {

}

public RecordOffset(Map<String, ?> offset) {
this.offset = offset;
}

public Map<String, ?> getOffset() {
return offset;
}
// public RecordOffset(Map<String, ?> offset) {
// this.offset = offset;
// }
//
// public Map<String, ?> getOffset() {
// return offset;
// }

public Class<? extends RecordOffset> getRecordOffsetClass() {
return RecordOffset.class;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof RecordOffset)) {
return false;
}
RecordOffset offset1 = (RecordOffset) o;
return Objects.equals(offset, offset1.offset);
public Class<? extends RecordOffset> getClazz() {
return clazz;
}

@Override
public int hashCode() {
return Objects.hash(offset);
public void setClazz(Class<? extends RecordOffset> clazz) {
this.clazz = clazz;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,30 @@

public class RecordPartition {

private Map<String, ?> partitionMap = new HashMap<>();
// private Map<String, ?> partitionMap = new HashMap<>();

public RecordPartition() {

}
private Class<? extends RecordPartition> clazz;

public RecordPartition(Map<String, ?> partition) {
this.partitionMap = partition;
}
public RecordPartition() {

public Map<String, ?> getPartitionMap() {
return partitionMap;
}
// public RecordPartition(Map<String, ?> partition) {
// this.partitionMap = partition;
// }
//
// public Map<String, ?> getPartitionMap() {
// return partitionMap;
// }

public Class<? extends RecordPartition> getRecordPartitionClass() {
return RecordPartition.class;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RecordPartition that = (RecordPartition) o;
return Objects.equals(partitionMap, that.partitionMap);
}

@Override
public int hashCode() {
return Objects.hash(partitionMap);
public Class<? extends RecordPartition> getClazz() {
return clazz;
}

@Override
public String toString() {
return "RecordPartition{" +
"partitionMap=" + partitionMap +
'}';
public void setClazz(Class<? extends RecordPartition> clazz) {
this.clazz = clazz;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@

public class RecordPosition {

private final RecordPartition recordPartition;
private RecordPartition recordPartition;

private final Class<? extends RecordPartition> recordPartitionClazz;
private Class<? extends RecordPartition> recordPartitionClazz;

private final RecordOffset recordOffset;
private RecordOffset recordOffset;

private final Class<? extends RecordOffset> recordOffsetClazz;
private Class<? extends RecordOffset> recordOffsetClazz;

public RecordPosition(){

}

public RecordPosition(
RecordPartition recordPartition, RecordOffset recordOffset) {
Expand All @@ -37,6 +41,22 @@ public RecordPosition(
this.recordOffsetClazz = recordOffset.getRecordOffsetClass();
}

public void setRecordPartition(RecordPartition recordPartition) {
this.recordPartition = recordPartition;
}

public void setRecordPartitionClazz(Class<? extends RecordPartition> recordPartitionClazz) {
this.recordPartitionClazz = recordPartitionClazz;
}

public void setRecordOffset(RecordOffset recordOffset) {
this.recordOffset = recordOffset;
}

public void setRecordOffsetClazz(Class<? extends RecordOffset> recordOffsetClazz) {
this.recordOffsetClazz = recordOffsetClazz;
}

public RecordPartition getPartition() {
return recordPartition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
@Data
@EqualsAndHashCode(callSuper = true)
public class FetchJobRequest extends BaseGrpcRequest {
String jobID;
private String jobID;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.eventmesh.common.remote.request;

import org.apache.eventmesh.common.remote.job.DataSourceType;
import org.apache.eventmesh.common.remote.offset.RecordPosition;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class FetchPositionRequest extends BaseGrpcRequest {

private String jobID;

private String address;

private RecordPosition recordPosition;

private DataSourceType dataSourceType;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.apache.eventmesh.common.remote.response;

import org.apache.eventmesh.common.remote.JobState;
import org.apache.eventmesh.common.remote.Position;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.job.JobTransportType;
import org.apache.eventmesh.common.remote.offset.RecordPosition;

import java.util.Map;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class FetchPositionResponse extends BaseGrpcResponse {

private RecordPosition recordPosition;

public static FetchPositionResponse successResponse() {
FetchPositionResponse response = new FetchPositionResponse();
response.setSuccess(true);
response.setErrorCode(ErrorCode.SUCCESS);
return response;
}

public static FetchPositionResponse failResponse(int code, String desc) {
FetchPositionResponse response = new FetchPositionResponse();
response.setSuccess(false);
response.setErrorCode(code);
response.setDesc(desc);
return response;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;

import java.net.InetSocketAddress;
import java.util.Arrays;
Expand Down Expand Up @@ -61,6 +62,8 @@ public class CanalSourceConnector implements Source, ConnectorCreateService<Sour

private CanalSourceConfig sourceConfig;

private OffsetStorageReader offsetStorageReader;

private CanalServerWithEmbedded canalServer;

private ClientIdentity clientIdentity;
Expand All @@ -84,6 +87,7 @@ public void init(Config config) throws Exception {
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext;
this.sourceConfig = (CanalSourceConfig) sourceConnectorContext.getSourceConfig();
this.offsetStorageReader = sourceConnectorContext.getOffsetStorageReader();
// init source database connection
DatabaseConnection.sourceConfig = sourceConfig;
DatabaseConnection.initSourceConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,19 @@ public void start() throws Exception {

for (MessageQueue messageQueue : mqDivided) {
try {
Map<String, String> partitionMap = new HashMap<>();
partitionMap.put("topic", messageQueue.getTopic());
partitionMap.put("brokerName", messageQueue.getBrokerName());
partitionMap.put("queueId", messageQueue.getQueueId() + "");
RecordPartition recordPartition = new RocketMQRecordPartition(partitionMap);
// Map<String, String> partitionMap = new HashMap<>();
// partitionMap.put("topic", messageQueue.getTopic());
// partitionMap.put("brokerName", messageQueue.getBrokerName());
// partitionMap.put("queueId", messageQueue.getQueueId() + "");
RocketMQRecordPartition recordPartition = new RocketMQRecordPartition();
recordPartition.setBroker(messageQueue.getBrokerName());
recordPartition.setTopic(messageQueue.getTopic());
recordPartition.setQueueId(messageQueue.getQueueId() + "");
recordPartition.setClazz(recordPartition.getRecordPartitionClass());
RecordOffset recordOffset = offsetStorageReader.readOffset(recordPartition);
log.info("assigned messageQueue {}, recordOffset {}", messageQueue, recordOffset);
if (recordOffset != null) {
long pollOffset = (Long) recordOffset.getOffset().get("queueOffset");
long pollOffset = ((RocketMQRecordOffset) recordOffset).getQueueOffset();
if (pollOffset != 0) {
consumer.seek(messageQueue, pollOffset);
}
Expand Down Expand Up @@ -189,13 +193,13 @@ private List<MessageQueue> getMessageQueueList(String topic) throws MQClientExce
@Override
public void commit(ConnectRecord record) {
// send success, commit offset
Map<String, ?> map = record.getPosition().getPartition().getPartitionMap();
String brokerName = (String) map.get("brokerName");
String topic = (String) map.get("topic");
int queueId = Integer.parseInt((String) map.get("queueId"));
RocketMQRecordPartition rocketMQRecordPartition = (RocketMQRecordPartition)(record.getPosition().getPartition());
String brokerName = rocketMQRecordPartition.getBroker();
String topic = rocketMQRecordPartition.getTopic();
int queueId = Integer.parseInt(rocketMQRecordPartition.getQueueId());
MessageQueue mq = new MessageQueue(topic, brokerName, queueId);
Map<String, ?> offsetMap = record.getPosition().getOffset().getOffset();
long offset = Long.parseLong((String) offsetMap.get("queueOffset"));
RocketMQRecordOffset rocketMQRecordOffset = (RocketMQRecordOffset)record.getPosition().getOffset();
long offset = rocketMQRecordOffset.getQueueOffset();
long canCommitOffset = removeMessage(mq, offset);
log.info("commit record {}|mq {}|canCommitOffset {}", record, mq, canCommitOffset);
// commit offset to prepareCommitOffset
Expand Down Expand Up @@ -235,17 +239,19 @@ public List<ConnectRecord> poll() {
}

public static RecordOffset convertToRecordOffset(Long offset) {
Map<String, String> offsetMap = new HashMap<>();
offsetMap.put("queueOffset", offset + "");
return new RocketMQRecordOffset(offsetMap);
RocketMQRecordOffset rocketMQRecordOffset = new RocketMQRecordOffset();
rocketMQRecordOffset.setQueueOffset(offset);
return rocketMQRecordOffset;
}

public static RecordPartition convertToRecordPartition(String topic, String brokerName, int queueId) {
Map<String, String> map = new HashMap<>();
map.put("topic", topic);
map.put("brokerName", brokerName);
map.put("queueId", queueId + "");
return new RocketMQRecordPartition(map);
RocketMQRecordPartition rocketMQRecordPartition = new RocketMQRecordPartition();
rocketMQRecordPartition.setBroker(brokerName);
rocketMQRecordPartition.setTopic(topic);
rocketMQRecordPartition.setQueueId(queueId + "");
rocketMQRecordPartition.setClazz(rocketMQRecordPartition.getRecordPartitionClass());

return rocketMQRecordPartition;
}

private void putPulledQueueOffset(MessageExt messageExt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ public void init() {
.map(storageType -> EventMeshExtensionFactory.getExtension(OffsetManagementService.class, storageType))
.orElse(new DefaultOffsetManagementServiceImpl());
this.offsetManagementService.initialize(offsetStorageConfig);
this.offsetStorageWriter = new OffsetStorageWriterImpl(source.name(), offsetManagementService);
this.offsetStorageReader = new OffsetStorageReaderImpl(source.name(), offsetManagementService);
this.offsetStorageWriter = new OffsetStorageWriterImpl(offsetManagementService);
this.offsetStorageReader = new OffsetStorageReaderImpl(offsetManagementService);
}

@Override
Expand Down
Loading

0 comments on commit d1c4689

Please sign in to comment.