Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
sodaRyCN committed May 21, 2024
1 parent 6e70ae7 commit b94a392
Show file tree
Hide file tree
Showing 18 changed files with 49 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package com.apache.eventmesh.admin.server;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.eventmesh.common.remote.Task;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.PagedList;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition;

public interface Admin extends ComponentLifeCycle {
/**
Expand All @@ -23,31 +18,4 @@ public interface Admin extends ComponentLifeCycle {
* support for task
*/
void reportHeartbeat(ReportHeartBeatRequest heartBeat);


static void main(String[] args) {
// ReportPositionRequest request = new ReportPositionRequest();
// request.setJobID("1");
// request.setAddress("1");
// request.setState(JobState.RUNNING);
// request.setDataSourceType(DataSourceType.MYSQL);
RecordPosition recordPosition = new RecordPosition();
CanalRecordOffset recordOffset = new CanalRecordOffset();
recordOffset.setOffset(12345L);
recordPosition.setRecordOffset(recordOffset);
CanalRecordPartition partition = new CanalRecordPartition();
partition.setJournalName("demo-binary-log-01");
partition.setTimeStamp(System.currentTimeMillis());
recordPosition.setRecordPartition(partition);
// ArrayList<RecordPosition> list = new ArrayList<>();
// list.add(recordPosition);
// request.setRecordPositionList(list);
String bytes = JsonUtils.toJSONString(recordPosition);

RecordPosition object1 = JsonUtils.parseTypeReferenceObject(bytes, new TypeReference<RecordPosition>() {});
RecordPosition object2 = JsonUtils.parseObject(bytes, RecordPosition.class);
System.out.println(object1);
System.out.println(object2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
import org.apache.eventmesh.common.remote.response.FetchPositionResponse;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition;
import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset;
import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,46 @@

package org.apache.eventmesh.common.remote.offset;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.eventmesh.common.remote.offset.S3.S3RecordOffset;
import org.apache.eventmesh.common.remote.offset.S3.S3RecordPartition;
import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset;
import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition;
import org.apache.eventmesh.common.remote.offset.file.FileRecordOffset;
import org.apache.eventmesh.common.remote.offset.file.FileRecordPartition;
import org.apache.eventmesh.common.remote.offset.kafka.KafkaRecordOffset;
import org.apache.eventmesh.common.remote.offset.kafka.KafkaRecordPartition;
import org.apache.eventmesh.common.remote.offset.pulsar.PulsarRecordOffset;
import org.apache.eventmesh.common.remote.offset.pulsar.PulsarRecordPartition;
import org.apache.eventmesh.common.remote.offset.rocketmq.RocketMQRecordOffset;
import org.apache.eventmesh.common.remote.offset.rocketmq.RocketMQRecordPartition;

import java.util.Objects;

public class RecordPosition {
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY)
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
@JsonSubTypes({
@JsonSubTypes.Type(value = CanalRecordPartition.class, name = "CanalRecordPartition"),
@JsonSubTypes.Type(value = FileRecordPartition.class, name = "FileRecordPartition"),
@JsonSubTypes.Type(value = S3RecordPartition.class, name = "S3RecordPartition"),
@JsonSubTypes.Type(value = KafkaRecordPartition.class, name = "KafkaRecordPartition"),
@JsonSubTypes.Type(value = PulsarRecordPartition.class, name = "PulsarRecordPartition"),
@JsonSubTypes.Type(value = RocketMQRecordPartition.class, name = "RocketMQRecordPartition"),
})
private RecordPartition recordPartition;

private Class<? extends RecordPartition> recordPartitionClazz;

@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY)
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
@JsonSubTypes({
@JsonSubTypes.Type(value = CanalRecordOffset.class, name = "CanalRecordOffset"),
@JsonSubTypes.Type(value = FileRecordOffset.class, name = "FileRecordOffset"),
@JsonSubTypes.Type(value = S3RecordOffset.class, name = "S3RecordOffset"),
@JsonSubTypes.Type(value = KafkaRecordOffset.class, name = "KafkaRecordOffset"),
@JsonSubTypes.Type(value = PulsarRecordOffset.class, name = "PulsarRecordOffset"),
@JsonSubTypes.Type(value = RocketMQRecordOffset.class, name = "RocketMQRecordOffset"),
})
private RecordOffset recordOffset;

private Class<? extends RecordOffset> recordOffsetClazz;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.offsetmgmt.api.data.S3;
package org.apache.eventmesh.common.remote.offset.S3;

import org.apache.eventmesh.common.remote.offset.RecordOffset;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.offsetmgmt.api.data.S3;
package org.apache.eventmesh.common.remote.offset.S3;

import org.apache.eventmesh.common.remote.offset.RecordPartition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.eventmesh.common.remote.offset.canal;

import lombok.Data;
import lombok.ToString;
import org.apache.eventmesh.common.remote.offset.RecordPartition;

import java.util.Objects;

import lombok.Data;
import lombok.ToString;


@Data
@ToString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.offsetmgmt.api.data.file;
package org.apache.eventmesh.common.remote.offset.file;

import org.apache.eventmesh.common.remote.offset.RecordOffset;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.offsetmgmt.api.data.file;
package org.apache.eventmesh.common.remote.offset.file;

import org.apache.eventmesh.common.remote.offset.RecordPartition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.offsetmgmt.api.data.kafka;
package org.apache.eventmesh.common.remote.offset.kafka;

import org.apache.eventmesh.common.remote.offset.RecordOffset;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.offsetmgmt.api.data.kafka;
package org.apache.eventmesh.common.remote.offset.kafka;

import org.apache.eventmesh.common.remote.offset.RecordPartition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.offsetmgmt.api.data.pulsar;
package org.apache.eventmesh.common.remote.offset.pulsar;

import org.apache.eventmesh.common.remote.offset.RecordOffset;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.offsetmgmt.api.data.pulsar;
package org.apache.eventmesh.common.remote.offset.pulsar;

import org.apache.eventmesh.common.remote.offset.RecordPartition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.file.FileRecordPartition;
import org.apache.eventmesh.common.remote.offset.file.FileRecordPartition;

import java.io.BufferedReader;
import java.io.File;
Expand All @@ -35,9 +35,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.kafka.KafkaRecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.kafka.KafkaRecordPartition;
import org.apache.eventmesh.common.remote.offset.kafka.KafkaRecordOffset;
import org.apache.eventmesh.common.remote.offset.kafka.KafkaRecordPartition;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -36,9 +36,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class KafkaSourceConnector implements Source {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@

import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.mq.pulsar.PulsarSourceConfig;
import org.apache.eventmesh.common.remote.offset.RecordPartition;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.pulsar.PulsarRecordPartition;
import org.apache.eventmesh.common.remote.offset.pulsar.PulsarRecordPartition;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand All @@ -34,9 +33,7 @@

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.S3.S3RecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.S3.S3RecordPartition;
import org.apache.eventmesh.common.remote.offset.S3.S3RecordOffset;
import org.apache.eventmesh.common.remote.offset.S3.S3RecordPartition;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.apache.eventmesh.common.remote.offset.RecordOffset;
import org.apache.eventmesh.common.remote.offset.RecordPartition;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordPartition;

import java.util.Objects;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.eventmesh.common.remote.offset.RecordOffset;
import org.apache.eventmesh.common.remote.offset.RecordPartition;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordPartition;

import java.util.Collection;
import java.util.HashMap;
Expand Down

0 comments on commit b94a392

Please sign in to comment.