Skip to content

Commit

Permalink
fix ack offset read & persist
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Jun 28, 2024
1 parent 5f1c1af commit 8f4748f
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 57 deletions.
15 changes: 3 additions & 12 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ tasks.register('dist') {
dependsOn('generateDistLicense', 'generateDistNotice')
def includedProjects =
["eventmesh-common",
"eventmesh-admin-server",
"eventmesh-meta:eventmesh-meta-api",
"eventmesh-metrics-plugin:eventmesh-metrics-api",
"eventmesh-openconnect:eventmesh-openconnect-java",
"eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api",
"eventmesh-protocol-plugin:eventmesh-protocol-api",
"eventmesh-registry:eventmesh-registry-api",
"eventmesh-retry:eventmesh-retry-api",
Expand Down Expand Up @@ -663,16 +664,6 @@ subprojects {

dependencyManagement {
dependencies {
dependencySet(group: 'org.springframework', version: '5.3.31') {
entry 'spring-aop'
entry 'spring-beans'
entry 'spring-context'
entry 'spring-core'
entry 'spring-expression'
entry 'spring-jcl'
entry 'spring-jdbc'
entry 'spring-tx'
}

dependency "org.apache.commons:commons-lang3:3.6"
dependency "org.apache.commons:commons-collections4:4.4"
Expand Down Expand Up @@ -723,7 +714,7 @@ subprojects {
dependency "com.mebigfatguy.fb-contrib:fb-contrib:7.6.0"
dependency "com.jayway.jsonpath:json-path:2.9.0"

dependency "org.springframework.boot:spring-boot-starter-web:2.7.18"
dependency "org.springframework.boot:spring-boot-starter-web:2.5.4"
dependency "io.openmessaging:registry-server:0.0.1"

dependency "org.junit.jupiter:junit-jupiter:5.6.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eventmesh.common.remote.job.JobTransportType;
import org.apache.eventmesh.common.remote.offset.RecordPosition;

import java.util.List;
import java.util.Map;

import lombok.Data;
Expand All @@ -42,7 +43,7 @@ public class EventMeshJobDetail {

private String sinkConnectorDesc;

private RecordPosition position;
private List<RecordPosition> position;

private JobState state;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

Expand All @@ -38,7 +40,7 @@ public class EventMeshPositionBizService {
PositionHandlerFactory factory;

// called isValidateReportRequest before call this
public RecordPosition getPosition(FetchPositionRequest request, Metadata metadata) {
public List<RecordPosition> getPosition(FetchPositionRequest request, Metadata metadata) {
if (request == null) {
return null;
}
Expand Down Expand Up @@ -68,7 +70,7 @@ public boolean reportPosition(ReportPositionRequest request, Metadata metadata)
return handler.handler(request, metadata);
}

public RecordPosition getPositionByJobID(Integer jobID, DataSourceType type) {
public List<RecordPosition> getPositionByJobID(Integer jobID, DataSourceType type) {
if (jobID == null || type == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;

import java.util.List;

/**
* IFetchPositionHandler
*/
public interface IFetchPositionHandler {

RecordPosition handler(FetchPositionRequest request, Metadata metadata);
List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
import org.apache.eventmesh.common.utils.JsonUtils;

import java.util.ArrayList;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -142,20 +143,21 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) {
}

@Override
public RecordPosition handler(FetchPositionRequest request, Metadata metadata) {
EventMeshMysqlPosition position = positionService.getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
public List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata) {
List<EventMeshMysqlPosition> positionList = positionService.list(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
request.getJobID()));
RecordPosition recordPosition = null;
if (position != null) {
List<RecordPosition> recordPositionList = new ArrayList<>();
for (EventMeshMysqlPosition position : positionList) {
RecordPosition recordPosition = new RecordPosition();
CanalRecordPartition partition = new CanalRecordPartition();
partition.setTimeStamp(position.getTimestamp());
partition.setJournalName(position.getJournalName());
CanalRecordOffset offset = new CanalRecordOffset();
offset.setOffset(position.getPosition());
recordPosition = new RecordPosition();
recordPosition.setRecordPartition(partition);
recordPosition.setRecordOffset(offset);
recordPositionList.add(recordPosition);
}
return recordPosition;
return recordPositionList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public class CanalSourceConfig extends SourceConfig {

private Long batchTimeout = -1L;

private String tableFilter;

private String fieldFilter;

private List<RecordPosition> recordPositions;

// ================================= channel parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.eventmesh.common.remote.job.JobTransportType;
import org.apache.eventmesh.common.remote.offset.RecordPosition;

import java.util.List;
import java.util.Map;

import lombok.Data;
Expand All @@ -45,7 +46,7 @@ public class FetchJobResponse extends BaseRemoteResponse {

private String sinkConnectorDesc;

private RecordPosition position;
private List<RecordPosition> position;

private JobState state;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.offset.RecordPosition;

import java.util.List;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class FetchPositionResponse extends BaseRemoteResponse {

private RecordPosition recordPosition;
private List<RecordPosition> recordPosition;

public static FetchPositionResponse successResponse() {
FetchPositionResponse response = new FetchPositionResponse();
Expand All @@ -36,7 +38,7 @@ public static FetchPositionResponse successResponse() {
return response;
}

public static FetchPositionResponse successResponse(RecordPosition recordPosition) {
public static FetchPositionResponse successResponse(List<RecordPosition> recordPosition) {
FetchPositionResponse response = successResponse();
response.setRecordPosition(recordPosition);
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ dependencies {
implementation project(":eventmesh-common")
implementation canal
implementation "com.alibaba:druid:1.2.6"
// implementation "org.apache.ddlutils:ddlutils:1.0"
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation "org.mockito:mockito-core"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -46,11 +47,12 @@
@Slf4j
public class EntryParser {

public List<CanalConnectRecord> parse(CanalSourceConfig sourceConfig, List<Entry> datas) {
public Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig sourceConfig, List<Entry> datas) {
List<CanalConnectRecord> recordList = new ArrayList<>();
List<Entry> transactionDataBuffer = new ArrayList<>();
// need check weather the entry is loopback
boolean needSync;
Map<Long, List<CanalConnectRecord>> recordMap = new HashMap<>();
try {
for (Entry entry : datas) {
switch (entry.getEntryType()) {
Expand All @@ -63,17 +65,19 @@ public List<CanalConnectRecord> parse(CanalSourceConfig sourceConfig, List<Entry
break;
case TRANSACTIONEND:
parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer);
if (!recordList.isEmpty()) {
recordMap.put(entry.getHeader().getLogfileOffset(), recordList);
}
transactionDataBuffer.clear();
break;
default:
break;
}
}
parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer);
} catch (Exception e) {
throw new RuntimeException(e);
}
return recordList;
return recordMap;
}

private void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, List<CanalConnectRecord> recordList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,18 @@
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import org.apache.commons.lang3.StringUtils;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;


import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
Expand Down Expand Up @@ -74,7 +76,9 @@ public class CanalSourceConnector implements Source, ConnectorCreateService<Sour

private ClientIdentity clientIdentity;

private String filter = null;
private String tableFilter = null;

private String fieldFilter = null;

private volatile boolean running = false;

Expand All @@ -95,6 +99,16 @@ public void init(Config config) throws Exception {
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext;
this.sourceConfig = (CanalSourceConfig) sourceConnectorContext.getSourceConfig();
if (sourceConnectorContext.getRecordPositionList() != null) {
this.sourceConfig.setRecordPositions(sourceConnectorContext.getRecordPositionList());
}

if (StringUtils.isNotEmpty(sourceConfig.getTableFilter())) {
tableFilter = sourceConfig.getTableFilter();
}
if (StringUtils.isNotEmpty(sourceConfig.getFieldFilter())) {
fieldFilter = sourceConfig.getFieldFilter();
}

canalServer = CanalServerWithEmbedded.instance();

Expand All @@ -103,7 +117,7 @@ public void init(ConnectorContext connectorContext) throws Exception {
public CanalInstance generate(String destination) {
Canal canal = buildCanal(sourceConfig);

CanalInstanceWithManager instance = new CanalInstanceWithManager(canal, filter) {
CanalInstanceWithManager instance = new CanalInstanceWithManager(canal, tableFilter) {

protected CanalHAController initHaController() {
return super.initHaController();
Expand All @@ -118,6 +132,9 @@ protected void startEventParserInternal(CanalEventParser parser, boolean isGroup
((MysqlEventParser) eventParser).setSupportBinlogImages("FULL");
MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
mysqlEventParser.setParallel(false);
if (StringUtils.isNotEmpty(fieldFilter)) {
mysqlEventParser.setFieldFilter(fieldFilter);
}

CanalHAController haController = mysqlEventParser.getHaController();
if (!haController.isStart()) {
Expand Down Expand Up @@ -204,7 +221,7 @@ public void start() throws Exception {
canalServer.start();

canalServer.start(sourceConfig.getDestination());
this.clientIdentity = new ClientIdentity(sourceConfig.getDestination(), sourceConfig.getClientId(), filter);
this.clientIdentity = new ClientIdentity(sourceConfig.getDestination(), sourceConfig.getClientId(), tableFilter);
canalServer.subscribe(clientIdentity);

running = true;
Expand Down Expand Up @@ -274,23 +291,31 @@ public List<ConnectRecord> poll() {
EntryParser entryParser = new EntryParser();

List<ConnectRecord> result = new ArrayList<>();

List<CanalConnectRecord> connectRecordList = entryParser.parse(sourceConfig, entries);

if (connectRecordList != null && !connectRecordList.isEmpty()) {
CanalConnectRecord lastRecord = connectRecordList.get(connectRecordList.size() - 1);

CanalRecordPartition canalRecordPartition = new CanalRecordPartition();
canalRecordPartition.setJournalName(lastRecord.getJournalName());
canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());

CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
canalRecordOffset.setOffset(lastRecord.getBinLogOffset());

ConnectRecord connectRecord = new ConnectRecord(canalRecordPartition, canalRecordOffset, System.currentTimeMillis());
connectRecord.addExtension("messageId", String.valueOf(message.getId()));
connectRecord.setData(connectRecordList);
result.add(connectRecord);
// key: Xid offset
Map<Long, List<CanalConnectRecord>> connectorRecordMap = entryParser.parse(sourceConfig, entries);

if (!connectorRecordMap.isEmpty()) {
Set<Map.Entry<Long, List<CanalConnectRecord>>> entrySet = connectorRecordMap.entrySet();
for (Map.Entry<Long, List<CanalConnectRecord>> entry : entrySet) {
// Xid offset
Long binLogOffset = entry.getKey();
List<CanalConnectRecord> connectRecordList = entry.getValue();
CanalConnectRecord lastRecord = entry.getValue().get(connectRecordList.size() - 1);
CanalRecordPartition canalRecordPartition = new CanalRecordPartition();
canalRecordPartition.setJournalName(lastRecord.getJournalName());
canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());

CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
canalRecordOffset.setOffset(binLogOffset);

ConnectRecord connectRecord = new ConnectRecord(canalRecordPartition, canalRecordOffset, System.currentTimeMillis());
connectRecord.addExtension("messageId", String.valueOf(message.getId()));
connectRecord.setData(connectRecordList);
result.add(connectRecord);
}
} else {
// for the message has been filtered need ack message
canalServer.ack(clientIdentity, message.getId());
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.eventmesh.openconnect.api.connector;

import org.apache.eventmesh.common.config.connector.SourceConfig;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;

import java.util.List;

import lombok.Data;

/**
Expand All @@ -32,4 +35,7 @@ public class SourceConnectorContext implements ConnectorContext {

public SourceConfig sourceConfig;

// initial record position
public List<RecordPosition> recordPositionList;

}
Loading

0 comments on commit 8f4748f

Please sign in to comment.