Skip to content

Commit

Permalink
update HeartBeat and connector runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Apr 26, 2024
1 parent 0d9e744 commit 00a8fb7
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

@Data
public class HeartBeat {

private String address;

private String reportedTimeStamp;

private String jobID;
private Position position;
private JobState state;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.eventmesh.common.adminserver.request;

import org.apache.eventmesh.common.adminserver.JobState;
import org.apache.eventmesh.common.adminserver.Position;

import lombok.Data;

@Data
public class ReportPositionRequest {

private String jobID;

private Position position;

private JobState state;

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.apache.eventmesh.common.adminserver.response;

import org.apache.eventmesh.common.adminserver.JobState;
import org.apache.eventmesh.common.adminserver.Position;
import org.apache.eventmesh.common.adminserver.job.JobTransportType;

import java.util.Map;
Expand All @@ -23,5 +25,8 @@ public class FetchJobResponse extends BaseResponse {

private String sinkConnectorDesc;

private Position position;

private JobState state;

}
1 change: 1 addition & 0 deletions eventmesh-runtime-v2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'

api project (":eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api")
implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
implementation project(":eventmesh-common")
implementation project(":eventmesh-meta:eventmesh-meta-api")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.eventmesh.common.config.ConfigService;
import org.apache.eventmesh.common.config.connector.SinkConfig;
import org.apache.eventmesh.common.config.connector.SourceConfig;
import org.apache.eventmesh.common.config.offset.OffsetStorageConfig;
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceBlockingStub;
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub;
Expand All @@ -24,12 +25,19 @@
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffsetManagement;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.DefaultOffsetManagementServiceImpl;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReaderImpl;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageWriterImpl;
import org.apache.eventmesh.openconnect.util.ConfigUtil;
import org.apache.eventmesh.runtime.Runtime;
import org.apache.eventmesh.runtime.RuntimeInstanceConfig;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -67,15 +75,25 @@ public class ConnectorRuntime implements Runtime {

private Sink sinkConnector;

private OffsetStorageWriterImpl offsetStorageWriter;

private OffsetStorageReaderImpl offsetStorageReader;

private OffsetManagementService offsetManagementService;

private RecordOffsetManagement offsetManagement;

private volatile RecordOffsetManagement.CommittableOffsets committableOffsets;

private Producer producer;

private Consumer consumer;

private final ExecutorService sourceService =
ThreadPoolFactory.createSingleExecutor("eventMesh-sourceService");

private final ExecutorService startService =
ThreadPoolFactory.createSingleExecutor("eventMesh-sourceWorker-startService");
private final ExecutorService sinkService =
ThreadPoolFactory.createSingleExecutor("eventMesh-sinkService");

private final ScheduledExecutorService heartBeatExecutor = Executors.newSingleThreadScheduledExecutor();

Expand All @@ -91,6 +109,15 @@ public ConnectorRuntime(RuntimeInstanceConfig runtimeInstanceConfig) {

@Override
public void init() throws Exception {

initAdminService();

initStorageService();

initConnectorService();
}

private void initAdminService() {
// create gRPC channel
channel = ManagedChannelBuilder.forTarget(runtimeInstanceConfig.getAdminServerAddr())
.usePlaintext()
Expand All @@ -100,10 +127,6 @@ public void init() throws Exception {

adminServiceBlockingStub = AdminServiceGrpc.newBlockingStub(channel).withWaitForReady();

producer = StoragePluginFactory.getMeshMQProducer(runtimeInstanceConfig.getStoragePluginType());

consumer = StoragePluginFactory.getMeshMQPushConsumer(runtimeInstanceConfig.getStoragePluginType());

responseObserver = new StreamObserver<Payload>() {
@Override
public void onNext(Payload response) {
Expand All @@ -122,6 +145,17 @@ public void onCompleted() {
};

requestObserver = adminServiceStub.invokeBiStream(responseObserver);
}

private void initStorageService() {

producer = StoragePluginFactory.getMeshMQProducer(runtimeInstanceConfig.getStoragePluginType());

consumer = StoragePluginFactory.getMeshMQPushConsumer(runtimeInstanceConfig.getStoragePluginType());

}

private void initConnectorService() throws Exception {

connectorRuntimeConfig = ConfigService.getInstance().buildConfigInstance(ConnectorRuntimeConfig.class);

Expand All @@ -146,7 +180,19 @@ public void onCompleted() {
SourceConfig sourceConfig = (SourceConfig) ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(), sourceConnector.configClass());
SourceConnectorContext sourceConnectorContext = new SourceConnectorContext();
sourceConnectorContext.setSourceConfig(sourceConfig);
// sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);

// spi load offsetMgmtService
this.offsetManagement = new RecordOffsetManagement();
this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY;
OffsetStorageConfig offsetStorageConfig = sourceConfig.getOffsetStorageConfig();
this.offsetManagementService = Optional.ofNullable(offsetStorageConfig)
.map(OffsetStorageConfig::getOffsetStorageType)
.map(storageType -> EventMeshExtensionFactory.getExtension(OffsetManagementService.class, storageType))
.orElse(new DefaultOffsetManagementServiceImpl());
this.offsetManagementService.initialize(offsetStorageConfig);
this.offsetStorageWriter = new OffsetStorageWriterImpl(sourceConnector.name(), offsetManagementService);
this.offsetStorageReader = new OffsetStorageReaderImpl(sourceConnector.name(), offsetManagementService);

sourceConnector.init(sourceConnectorContext);

Expand All @@ -157,6 +203,7 @@ public void onCompleted() {
SinkConnectorContext sinkConnectorContext = new SinkConnectorContext();
sinkConnectorContext.setSinkConfig(sinkConfig);
sinkConnector.init(sinkConnectorContext);

}

private FetchJobResponse fetchJobConfig() {
Expand Down Expand Up @@ -188,7 +235,7 @@ public void start() throws Exception {
HeartBeat heartBeat = new HeartBeat();
heartBeat.setAddress(IPUtils.getLocalAddress());
heartBeat.setReportedTimeStamp(String.valueOf(System.currentTimeMillis()));
// TODO: add more info for heartBeat
heartBeat.setJobID(connectorRuntimeConfig.getJobID());

Metadata metadata = Metadata.newBuilder()
.setType(HeartBeat.class.getSimpleName())
Expand All @@ -203,20 +250,35 @@ public void start() throws Exception {
requestObserver.onNext(request);
}, 5, 5, TimeUnit.SECONDS);

// sourceConnector.start();
// sourceConnector.poll();
// start offsetMgmtService
// offsetManagementService.start();

// start offsetMgmtService
offsetManagementService.start();
isRunning = true;
// pollService.execute(this::startPollAndSend);
sinkService.execute(
() -> {
try {
startSinkConnector();
} catch (Exception e) {
log.error("sink connector [{}] start fail", sinkConnector.name(), e);
try {
this.stop();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
});

sourceService.execute(
() -> {
try {
startSourceConnector();
} catch (Exception e) {
log.error("source connector [{}] start fail", sourceConnector.name(), e);
// this.stop();
try {
this.stop();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
});

Expand All @@ -237,10 +299,18 @@ private void startSourceConnector() throws Exception {
while (isRunning) {
List<ConnectRecord> connectorRecordList = sourceConnector.poll();
if (connectorRecordList != null && !connectorRecordList.isEmpty()) {
for (ConnectRecord record : connectorRecordList) {
queue.put(record);
}
// for (ConnectRecord record : connectorRecordList) {
// queue.put(record);
// }
// TODO: producer pub to storage
}
}
}

private void startSinkConnector() throws Exception {
sinkConnector.start();
while (isRunning) {
// TODO: consumer sub from storage
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
@Config(path = "classPath://connector.yaml")
public class ConnectorRuntimeConfig {

private String connectorName;

private String connectorRuntimeInstanceId;

private String jobID;
Expand Down

0 comments on commit 00a8fb7

Please sign in to comment.