diff --git a/docs/cn/features/spi.md b/docs/cn/features/spi.md index b990d5956e..7499e655b1 100644 --- a/docs/cn/features/spi.md +++ b/docs/cn/features/spi.md @@ -101,7 +101,7 @@ public class RocketMQProducerImpl implements MeshMQProducer { ``` 同时,还需要在eventmesh-connector-rocketmq模块中resource/META-INF/eventmesh目录下创建文件名为SPI接口全限定名的文件 -org.apache.eventmesh.api.producer.MeshMQProducer +org.apache.eventmesh.api.producer.Producer 文件内容为扩展实例名和对应的实例全类名 diff --git a/docs/en/features/spi.md b/docs/en/features/spi.md index 1c26d17ed5..af1d62725e 100644 --- a/docs/en/features/spi.md +++ b/docs/en/features/spi.md @@ -105,7 +105,7 @@ public class RocketMQProducerImpl implements MeshMQProducer { At the same time, we need to create a file with the full qualified name of the SPI interface under the resource/META-INF/eventmesh directory in the eventmesh-connector-rocketmq module. -org.apache.eventmesh.api.producer.MeshMQProducer +org.apache.eventmesh.api.producer.Producer The content of the file is the extension instance name and the corresponding instance full class name diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle index 81d1568ec0..3c2c9ede9a 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle +++ b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle @@ -19,7 +19,7 @@ dependencies { implementation project(":eventmesh-spi") implementation project(":eventmesh-common") api 'io.cloudevents:cloudevents-core' - api 'io.openmessaging:openmessaging-api' +// api 'io.openmessaging:openmessaging-api' api 'io.dropwizard.metrics:metrics-core' api "io.dropwizard.metrics:metrics-healthchecks" api "io.dropwizard.metrics:metrics-annotation" diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java index 788f9f3906..eba4acdbaa 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java @@ -1,28 +1,28 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.api; - -import io.openmessaging.api.Message; - -public interface RRCallback { - - public void onSuccess(Message msg); - - public void onException(Throwable e); - -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.eventmesh.api; +// +//import io.openmessaging.api.Message; +// +//public interface RRCallback { +// +// public void onSuccess(Message msg); +// +// public void onException(Throwable e); +// +//} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java index dcf558b685..8747cd9945 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java @@ -1,44 +1,40 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.api.consumer; - -import java.util.List; -import java.util.Properties; - -import io.openmessaging.api.AsyncMessageListener; -import io.openmessaging.api.Consumer; -import io.openmessaging.api.Message; - -import org.apache.eventmesh.api.AbstractContext; -import org.apache.eventmesh.spi.EventMeshExtensionType; -import org.apache.eventmesh.spi.EventMeshSPI; - -@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) -public interface MeshMQPushConsumer extends Consumer { - - void init(Properties keyValue) throws Exception; - - void updateOffset(List msgs, AbstractContext context); - -// void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently); - - void subscribe(String topic, final AsyncMessageListener listener) throws Exception; - - @Override - void unsubscribe(String topic); -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.eventmesh.api.consumer; +// +//import java.util.List; +//import java.util.Properties; +// +//import org.apache.eventmesh.api.AbstractContext; +//import org.apache.eventmesh.spi.EventMeshExtensionType; +//import org.apache.eventmesh.spi.EventMeshSPI; +// +//@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) +//public interface MeshMQPushConsumer extends Consumer { +// +// void init(Properties keyValue) throws Exception; +// +// void updateOffset(List msgs, AbstractContext context); +// +//// void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently); +// +// void subscribe(String topic, final AsyncMessageListener listener) throws Exception; +// +// @Override +// void unsubscribe(String topic); +//} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java index d87be7d842..e44bd50a8e 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java @@ -1,45 +1,45 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.api.producer; - -import java.util.Properties; - -import io.openmessaging.api.Message; -import io.openmessaging.api.Producer; -import io.openmessaging.api.SendCallback; - -import org.apache.eventmesh.api.RRCallback; -import org.apache.eventmesh.spi.EventMeshExtensionType; -import org.apache.eventmesh.spi.EventMeshSPI; - -@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) -public interface MeshMQProducer extends Producer { - - void init(Properties properties) throws Exception; - - void send(Message message, SendCallback sendCallback) throws Exception; - - void request(Message message, RRCallback rrCallback, long timeout) throws Exception; - - boolean reply(final Message message, final SendCallback sendCallback) throws Exception; - - void checkTopicExist(String topic) throws Exception; - - void setExtFields(); - -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.eventmesh.api.producer; +// +//import java.util.Properties; +// +//import io.openmessaging.api.Message; +//import io.openmessaging.api.Producer; +//import io.openmessaging.api.SendCallback; +// +//import org.apache.eventmesh.api.RRCallback; +//import org.apache.eventmesh.spi.EventMeshExtensionType; +//import org.apache.eventmesh.spi.EventMeshSPI; +// +//@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) +//public interface MeshMQProducer extends Producer { +// +// void init(Properties properties) throws Exception; +// +// void send(Message message, SendCallback sendCallback) throws Exception; +// +// void request(Message message, RRCallback rrCallback, long timeout) throws Exception; +// +// boolean reply(final Message message, final SendCallback sendCallback) throws Exception; +// +// void checkTopicExist(String topic) throws Exception; +// +// void setExtFields(); +// +//} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java index 7a9e0320e5..5f14582eac 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java @@ -18,7 +18,6 @@ package org.apache.eventmesh.api.producer; import org.apache.eventmesh.api.LifeCycle; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; @@ -45,7 +44,7 @@ public interface Producer extends LifeCycle { void sendAsync(final CloudEvent cloudEvent, final SendCallback sendCallback); - void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception; +// void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception; void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception; diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java index 8770b7d484..06999487d3 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java @@ -1,90 +1,90 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.connector.rocketmq; - -import java.util.Properties; - -import io.openmessaging.api.Consumer; -import io.openmessaging.api.MessagingAccessPoint; -import io.openmessaging.api.Producer; -import io.openmessaging.api.PullConsumer; -import io.openmessaging.api.batch.BatchConsumer; -import io.openmessaging.api.order.OrderConsumer; -import io.openmessaging.api.order.OrderProducer; -import io.openmessaging.api.transaction.LocalTransactionChecker; -import io.openmessaging.api.transaction.TransactionProducer; - -public class MessagingAccessPointImpl implements MessagingAccessPoint { - - private Properties accessPointProperties; - - public MessagingAccessPointImpl(final Properties accessPointProperties) { - this.accessPointProperties = accessPointProperties; - } - - @Override - public String version() { - return null; - } - - @Override - public Properties attributes() { - return accessPointProperties; - } - - @Override - public Producer createProducer(Properties properties) { - return null; - } - - @Override - public OrderProducer createOrderProducer(Properties properties) { - return null; - } - - @Override - public TransactionProducer createTransactionProducer(Properties properties, LocalTransactionChecker checker) { - return null; - } - - @Override - public TransactionProducer createTransactionProducer(Properties properties) { - return null; - } - - @Override - public Consumer createConsumer(Properties properties) { - return null; - } - - @Override - public PullConsumer createPullConsumer(Properties properties) { - return null; - } - - @Override - public BatchConsumer createBatchConsumer(Properties properties) { - return null; - } - - @Override - public OrderConsumer createOrderedConsumer(Properties properties) { - return null; - } - -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.eventmesh.connector.rocketmq; +// +//import java.util.Properties; +// +//import io.openmessaging.api.Consumer; +//import io.openmessaging.api.MessagingAccessPoint; +//import io.openmessaging.api.Producer; +//import io.openmessaging.api.PullConsumer; +//import io.openmessaging.api.batch.BatchConsumer; +//import io.openmessaging.api.order.OrderConsumer; +//import io.openmessaging.api.order.OrderProducer; +//import io.openmessaging.api.transaction.LocalTransactionChecker; +//import io.openmessaging.api.transaction.TransactionProducer; +// +//public class MessagingAccessPointImpl implements MessagingAccessPoint { +// +// private Properties accessPointProperties; +// +// public MessagingAccessPointImpl(final Properties accessPointProperties) { +// this.accessPointProperties = accessPointProperties; +// } +// +// @Override +// public String version() { +// return null; +// } +// +// @Override +// public Properties attributes() { +// return accessPointProperties; +// } +// +// @Override +// public Producer createProducer(Properties properties) { +// return null; +// } +// +// @Override +// public OrderProducer createOrderProducer(Properties properties) { +// return null; +// } +// +// @Override +// public TransactionProducer createTransactionProducer(Properties properties, LocalTransactionChecker checker) { +// return null; +// } +// +// @Override +// public TransactionProducer createTransactionProducer(Properties properties) { +// return null; +// } +// +// @Override +// public Consumer createConsumer(Properties properties) { +// return null; +// } +// +// @Override +// public PullConsumer createPullConsumer(Properties properties) { +// return null; +// } +// +// @Override +// public BatchConsumer createBatchConsumer(Properties properties) { +// return null; +// } +// +// @Override +// public OrderConsumer createOrderedConsumer(Properties properties) { +// return null; +// } +// +//} diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java index aad0b49a53..f03a38a4a9 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java @@ -17,11 +17,10 @@ package org.apache.eventmesh.connector.rocketmq.config; -import io.openmessaging.api.OMSBuiltinKeys; import org.apache.eventmesh.connector.rocketmq.domain.NonStandardKeys; -public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { +public class ClientConfig implements NonStandardKeys { private String driverImpl; private String accessPoints; private String namespace; diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java index 7258131c23..dbc07030ea 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java @@ -50,7 +50,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import io.cloudevents.CloudEvent; -import io.openmessaging.api.exception.OMSRuntimeException; public class PushConsumerImpl { private final DefaultMQPushConsumer rocketmqPushConsumer; @@ -105,7 +104,7 @@ public void start() { try { this.rocketmqPushConsumer.start(); } catch (Exception e) { - throw new OMSRuntimeException(e.getMessage()); + throw new ConnectorRuntimeException(e.getMessage()); } } } @@ -137,8 +136,7 @@ public void subscribe(String topic, String subExpression, EventListener listener try { this.rocketmqPushConsumer.subscribe(topic, subExpression); } catch (MQClientException e) { - throw new OMSRuntimeException(-1, - String.format("RocketMQ push consumer can't attach to %s.", topic)); + throw new ConnectorRuntimeException(String.format("RocketMQ push consumer can't attach to %s.", topic)); } } @@ -148,8 +146,7 @@ public void unsubscribe(String topic) { try { this.rocketmqPushConsumer.unsubscribe(topic); } catch (Exception e) { - throw new OMSRuntimeException(-1, - String.format("RocketMQ push consumer fails to unsubscribe topic: %s", topic)); + throw new ConnectorRuntimeException(String.format("RocketMQ push consumer fails to unsubscribe topic: %s", topic)); } } @@ -186,8 +183,7 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic()); if (listener == null) { - throw new OMSRuntimeException(-1, - String.format("The topic/queue %s isn't attached to this consumer", + throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer", msg.getTopic())); } @@ -245,8 +241,7 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic()); if (listener == null) { - throw new OMSRuntimeException(-1, - String.format("The topic/queue %s isn't attached to this consumer", + throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer", msg.getTopic())); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java index 9626e2c0d4..1b3fe482ce 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java @@ -20,22 +20,13 @@ import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.api.EventListener; import org.apache.eventmesh.api.consumer.Consumer; -import org.apache.eventmesh.api.consumer.MeshMQPushConsumer; -import org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl; import org.apache.eventmesh.connector.rocketmq.common.Constants; import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants; import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration; import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper; -import org.apache.eventmesh.connector.rocketmq.patch.EventMeshConsumeConcurrentlyContext; -import org.apache.eventmesh.connector.rocketmq.utils.OMSUtil; - -import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService; -import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService; -import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.io.File; -import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -43,13 +34,6 @@ import org.slf4j.LoggerFactory; import io.cloudevents.CloudEvent; -import io.openmessaging.api.AsyncGenericMessageListener; -import io.openmessaging.api.AsyncMessageListener; -import io.openmessaging.api.GenericMessageListener; -import io.openmessaging.api.Message; -import io.openmessaging.api.MessageListener; -import io.openmessaging.api.MessageSelector; -import io.openmessaging.api.MessagingAccessPoint; public class RocketMQConsumerImpl implements Consumer { diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java index ec04b58bed..7831639ad6 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.connector.rocketmq.producer; -import org.apache.eventmesh.api.RRCallback; +import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.ConnectorRuntimeException; @@ -117,7 +117,7 @@ public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) { } } - public void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) + public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl()); @@ -142,12 +142,12 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac } - private RequestCallback rrCallbackConvert(final Message message, final RRCallback rrCallback) { + private RequestCallback rrCallbackConvert(final Message message, final RequestReplyCallback rrCallback) { return new RequestCallback() { @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) { - io.openmessaging.api.Message openMessage = OMSUtil.msgConvert((MessageExt) message); - rrCallback.onSuccess(openMessage); + CloudEvent event = RocketMQMessageFactory.createReader(message).toEvent(); + rrCallback.onSuccess(event); } @Override diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java index 54d06a0bf9..8ab3d969e4 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java @@ -17,12 +17,10 @@ package org.apache.eventmesh.connector.rocketmq.producer; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.producer.Producer; -import org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl; import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants; import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration; import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper; @@ -38,7 +36,6 @@ import org.slf4j.LoggerFactory; import io.cloudevents.CloudEvent; -import io.openmessaging.api.MessagingAccessPoint; public class RocketMQProducerImpl implements Producer { @@ -94,15 +91,15 @@ public void publish(CloudEvent message, SendCallback sendCallback) throws Except } @Override - public void request(CloudEvent message, RRCallback rrCallback, long timeout) + public void request(CloudEvent message, RequestReplyCallback rrCallback, long timeout) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { producer.request(message, rrCallback, timeout); } - @Override - public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { - - } +// @Override +// public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { +// +// } @Override public boolean reply(final CloudEvent message, final SendCallback sendCallback) throws Exception { diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java deleted file mode 100644 index cc30eb5ab0..0000000000 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.connector.rocketmq.promise; - -import java.util.ArrayList; -import java.util.List; - -import io.openmessaging.api.Future; -import io.openmessaging.api.FutureListener; -import io.openmessaging.api.exception.OMSRuntimeException; - -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; - -public class DefaultPromise implements Future { - private static final InternalLogger LOG = InternalLoggerFactory.getLogger(DefaultPromise.class); - private final Object lock = new Object(); - private volatile FutureState state = FutureState.DOING; - private V result = null; - private long timeout; - private long createTime; - private Throwable exception = null; - private List> promiseListenerList; - - public DefaultPromise() { - createTime = System.currentTimeMillis(); - promiseListenerList = new ArrayList<>(); - timeout = 5000; - } - - @Override - public boolean cancel(final boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return state.isCancelledState(); - } - - @Override - public boolean isDone() { - return state.isDoneState(); - } - - @Override - public V get() { - return result; - } - - @Override - public V get(final long timeout) { - synchronized (lock) { - if (!isDoing()) { - return getValueOrThrowable(); - } - - if (timeout <= 0) { - try { - lock.wait(); - } catch (Exception e) { - cancel(e); - } - return getValueOrThrowable(); - } else { - long waitTime = timeout - (System.currentTimeMillis() - createTime); - if (waitTime > 0) { - for (; ; ) { - try { - lock.wait(waitTime); - } catch (InterruptedException e) { - LOG.error("promise get value interrupted,excepiton:{}", e.getMessage()); - } - - if (!isDoing()) { - break; - } else { - waitTime = timeout - (System.currentTimeMillis() - createTime); - if (waitTime <= 0) { - break; - } - } - } - } - - if (isDoing()) { - timeoutSoCancel(); - } - } - return getValueOrThrowable(); - } - } - - public boolean set(final V value) { - if (value == null) { - return false; - } - this.result = value; - return done(); - } - - public boolean setFailure(final Throwable cause) { - if (cause == null) { - return false; - } - this.exception = cause; - return done(); - } - - @Override - public void addListener(final FutureListener listener) { - if (listener == null) { - throw new NullPointerException("FutureListener is null"); - } - - boolean notifyNow = false; - synchronized (lock) { - if (!isDoing()) { - notifyNow = true; - } else { - if (promiseListenerList == null) { - promiseListenerList = new ArrayList<>(); - } - promiseListenerList.add(listener); - } - } - - if (notifyNow) { - notifyListener(listener); - } - } - - @Override - public Throwable getThrowable() { - return exception; - } - - private void notifyListeners() { - if (promiseListenerList != null) { - for (FutureListener listener : promiseListenerList) { - notifyListener(listener); - } - } - } - - private boolean isSuccess() { - return isDone() && (exception == null); - } - - private void timeoutSoCancel() { - synchronized (lock) { - if (!isDoing()) { - return; - } - state = FutureState.CANCELLED; - exception = new RuntimeException("Get request result is timeout or interrupted"); - lock.notifyAll(); - } - notifyListeners(); - } - - private V getValueOrThrowable() { - if (exception != null) { - Throwable e = exception.getCause() != null ? exception.getCause() : exception; - throw new OMSRuntimeException("-1", e); - } - notifyListeners(); - return result; - } - - private boolean isDoing() { - return state.isDoingState(); - } - - private boolean done() { - synchronized (lock) { - if (!isDoing()) { - return false; - } - - state = FutureState.DONE; - lock.notifyAll(); - } - - notifyListeners(); - return true; - } - - private void notifyListener(final FutureListener listener) { - try { - listener.operationComplete(this); - } catch (Throwable t) { - LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t); - } - } - - private boolean cancel(Exception e) { - synchronized (lock) { - if (!isDoing()) { - return false; - } - - state = FutureState.CANCELLED; - exception = e; - lock.notifyAll(); - } - - notifyListeners(); - return true; - } -} - diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java index 00bcb85f2a..d6a1ae965a 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java @@ -24,10 +24,6 @@ import java.util.Properties; import java.util.Set; -import io.openmessaging.api.Message; -import io.openmessaging.api.OMSBuiltinKeys; -import io.openmessaging.api.SendResult; -import io.openmessaging.api.exception.OMSRuntimeException; import org.apache.eventmesh.common.Constants; import org.apache.rocketmq.common.UtilAll; @@ -45,252 +41,252 @@ public static String buildInstanceName() { return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime(); } - public static org.apache.rocketmq.common.message.Message msgConvert(Message omsMessage) { - org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); - if (omsMessage == null) { - throw new OMSRuntimeException("'message' is null"); - } else { - if (omsMessage.getTopic() != null) { - rmqMessage.setTopic(omsMessage.getTopic()); - } - if (omsMessage.getKey() != null) { - rmqMessage.setKeys(omsMessage.getKey()); - } - if (omsMessage.getTag() != null) { - rmqMessage.setTags(omsMessage.getTag()); - } - if (omsMessage.getStartDeliverTime() > 0L) { - rmqMessage.putUserProperty("TIMER_DELIVER_MS", String.valueOf(omsMessage.getStartDeliverTime())); - } - - if (omsMessage.getBody() != null) { - rmqMessage.setBody(omsMessage.getBody()); - } - - if (omsMessage.getShardingKey() != null && !omsMessage.getShardingKey().isEmpty()) { - rmqMessage.putUserProperty("__SHARDINGKEY", omsMessage.getShardingKey()); - } - } - Properties systemProperties = omsMessage.getSystemProperties(); - Properties userProperties = omsMessage.getUserProperties(); - - //All destinations in RocketMQ use Topic -// rmqMessage.setTopic(systemProperties.getProperty(BuiltinKeys.DESTINATION)); - -// if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) { -// long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0); -// if (deliverTime > 0) { -// rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); +// public static org.apache.rocketmq.common.message.Message msgConvert(Message omsMessage) { +// org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); +// if (omsMessage == null) { +// throw new OMSRuntimeException("'message' is null"); +// } else { +// if (omsMessage.getTopic() != null) { +// rmqMessage.setTopic(omsMessage.getTopic()); +// } +// if (omsMessage.getKey() != null) { +// rmqMessage.setKeys(omsMessage.getKey()); +// } +// if (omsMessage.getTag() != null) { +// rmqMessage.setTags(omsMessage.getTag()); +// } +// if (omsMessage.getStartDeliverTime() > 0L) { +// rmqMessage.putUserProperty("TIMER_DELIVER_MS", String.valueOf(omsMessage.getStartDeliverTime())); +// } +// +// if (omsMessage.getBody() != null) { +// rmqMessage.setBody(omsMessage.getBody()); +// } +// +// if (omsMessage.getShardingKey() != null && !omsMessage.getShardingKey().isEmpty()) { +// rmqMessage.putUserProperty("__SHARDINGKEY", omsMessage.getShardingKey()); // } // } - - for (String key : userProperties.stringPropertyNames()) { - MessageAccessor.putProperty(rmqMessage, key, userProperties.getProperty(key)); - } - - //System headers has a high priority - for (String key : systemProperties.stringPropertyNames()) { - MessageAccessor.putProperty(rmqMessage, key, systemProperties.getProperty(key)); - } - - return rmqMessage; - } - - public static Message msgConvert(MessageExt rmqMsg) { - Message message = new Message(); - if (rmqMsg.getTopic() != null) { - message.setTopic(rmqMsg.getTopic()); - } - - if (rmqMsg.getKeys() != null) { - message.setKey(rmqMsg.getKeys()); - } - - if (rmqMsg.getTags() != null) { - message.setTag(rmqMsg.getTags()); - } - - if (rmqMsg.getBody() != null) { - message.setBody(rmqMsg.getBody()); - } - - if (rmqMsg.getUserProperty("TIMER_DELIVER_MS") != null) { - long ms = Long.parseLong(rmqMsg.getUserProperty("TIMER_DELIVER_MS")); - rmqMsg.getProperties().remove("TIMER_DELIVER_MS"); - message.setStartDeliverTime(ms); - } - - Properties systemProperties = new Properties(); - Properties userProperties = new Properties(); - - - final Set> entries = rmqMsg.getProperties().entrySet(); - - for (final Map.Entry entry : entries) { - if (isOMSHeader(entry.getKey())) { - //sysHeader - systemProperties.put(entry.getKey(), entry.getValue()); - } else { - //userHeader - userProperties.put(entry.getKey(), entry.getValue()); - } - } - - if (rmqMsg.getMsgId() != null){ - systemProperties.put(Constants.PROPERTY_MESSAGE_MESSAGE_ID, rmqMsg.getMsgId()); - } - - if (rmqMsg.getTopic() != null){ - systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, rmqMsg.getTopic()); - } - -// omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys()); - systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_HOST, String.valueOf(rmqMsg.getBornHost())); - systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); - systemProperties.put(Constants.PROPERTY_MESSAGE_STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); - systemProperties.put("STORE_TIMESTAMP", rmqMsg.getStoreTimestamp()); - - //use in manual ack - userProperties.put(Constants.PROPERTY_MESSAGE_QUEUE_ID, rmqMsg.getQueueId()); - userProperties.put(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET, rmqMsg.getQueueOffset()); - - message.setSystemProperties(systemProperties); - message.setUserProperties(userProperties); - - return message; - } - - public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Message omsMessage) { - - org.apache.rocketmq.common.message.MessageExt rmqMessageExt = new org.apache.rocketmq.common.message.MessageExt(); - try { - if (omsMessage.getKey() != null) { - rmqMessageExt.setKeys(omsMessage.getKey()); - } - if (omsMessage.getTag() != null) { - rmqMessageExt.setTags(omsMessage.getTag()); - } - if (omsMessage.getStartDeliverTime() > 0L) { - rmqMessageExt.putUserProperty("TIMER_DELIVER_MS", String.valueOf(omsMessage.getStartDeliverTime())); - } - - if (omsMessage.getBody() != null) { - rmqMessageExt.setBody(omsMessage.getBody()); - } - - if (omsMessage.getShardingKey() != null && !omsMessage.getShardingKey().isEmpty()) { - rmqMessageExt.putUserProperty("__SHARDINGKEY", omsMessage.getShardingKey()); - } - - Properties systemProperties = omsMessage.getSystemProperties(); - Properties userProperties = omsMessage.getUserProperties(); - - //All destinations in RocketMQ use Topic - rmqMessageExt.setTopic(omsMessage.getTopic()); - - int queueId = (int) userProperties.get(Constants.PROPERTY_MESSAGE_QUEUE_ID); - long queueOffset = (long) userProperties.get(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET); - //use in manual ack - rmqMessageExt.setQueueId(queueId); - rmqMessageExt.setQueueOffset(queueOffset); - - for (String key : userProperties.stringPropertyNames()) { - MessageAccessor.putProperty(rmqMessageExt, key, userProperties.getProperty(key)); - } - - //System headers has a high priority - for (String key : systemProperties.stringPropertyNames()) { - MessageAccessor.putProperty(rmqMessageExt, key, systemProperties.getProperty(key)); - } - - } catch (Exception e) { - e.printStackTrace(); - } - return rmqMessageExt; - - } - - public static boolean isOMSHeader(String value) { - for (Field field : OMSBuiltinKeys.class.getDeclaredFields()) { - try { - if (field.get(OMSBuiltinKeys.class).equals(value)) { - return true; - } - } catch (IllegalAccessException e) { - return false; - } - } - return false; - } - - /** - * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult. - * - * @param rmqResult RocketMQ result - * @return send result - */ - public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) { - SendResult sendResult = new SendResult(); - sendResult.setTopic(rmqResult.getMessageQueue().getTopic()); - sendResult.setMessageId(rmqResult.getMsgId()); - return sendResult; - } - -// public static KeyValue buildKeyValue(KeyValue... keyValues) { -// KeyValue keyValue = OMS.newKeyValue(); -// for (KeyValue properties : keyValues) { -// for (String key : properties.keySet()) { -// keyValue.put(key, properties.getString(key)); +// Properties systemProperties = omsMessage.getSystemProperties(); +// Properties userProperties = omsMessage.getUserProperties(); +// +// //All destinations in RocketMQ use Topic +//// rmqMessage.setTopic(systemProperties.getProperty(BuiltinKeys.DESTINATION)); +// +//// if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) { +//// long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0); +//// if (deliverTime > 0) { +//// rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); +//// } +//// } +// +// for (String key : userProperties.stringPropertyNames()) { +// MessageAccessor.putProperty(rmqMessage, key, userProperties.getProperty(key)); +// } +// +// //System headers has a high priority +// for (String key : systemProperties.stringPropertyNames()) { +// MessageAccessor.putProperty(rmqMessage, key, systemProperties.getProperty(key)); +// } +// +// return rmqMessage; +// } +// +// public static Message msgConvert(MessageExt rmqMsg) { +// Message message = new Message(); +// if (rmqMsg.getTopic() != null) { +// message.setTopic(rmqMsg.getTopic()); +// } +// +// if (rmqMsg.getKeys() != null) { +// message.setKey(rmqMsg.getKeys()); +// } +// +// if (rmqMsg.getTags() != null) { +// message.setTag(rmqMsg.getTags()); +// } +// +// if (rmqMsg.getBody() != null) { +// message.setBody(rmqMsg.getBody()); +// } +// +// if (rmqMsg.getUserProperty("TIMER_DELIVER_MS") != null) { +// long ms = Long.parseLong(rmqMsg.getUserProperty("TIMER_DELIVER_MS")); +// rmqMsg.getProperties().remove("TIMER_DELIVER_MS"); +// message.setStartDeliverTime(ms); +// } +// +// Properties systemProperties = new Properties(); +// Properties userProperties = new Properties(); +// +// +// final Set> entries = rmqMsg.getProperties().entrySet(); +// +// for (final Map.Entry entry : entries) { +// if (isOMSHeader(entry.getKey())) { +// //sysHeader +// systemProperties.put(entry.getKey(), entry.getValue()); +// } else { +// //userHeader +// userProperties.put(entry.getKey(), entry.getValue()); // } // } -// return keyValue; +// +// if (rmqMsg.getMsgId() != null){ +// systemProperties.put(Constants.PROPERTY_MESSAGE_MESSAGE_ID, rmqMsg.getMsgId()); +// } +// +// if (rmqMsg.getTopic() != null){ +// systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, rmqMsg.getTopic()); +// } +// +//// omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys()); +// systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_HOST, String.valueOf(rmqMsg.getBornHost())); +// systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); +// systemProperties.put(Constants.PROPERTY_MESSAGE_STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); +// systemProperties.put("STORE_TIMESTAMP", rmqMsg.getStoreTimestamp()); +// +// //use in manual ack +// userProperties.put(Constants.PROPERTY_MESSAGE_QUEUE_ID, rmqMsg.getQueueId()); +// userProperties.put(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET, rmqMsg.getQueueOffset()); +// +// message.setSystemProperties(systemProperties); +// message.setUserProperties(userProperties); +// +// return message; +// } +// +// public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Message omsMessage) { +// +// org.apache.rocketmq.common.message.MessageExt rmqMessageExt = new org.apache.rocketmq.common.message.MessageExt(); +// try { +// if (omsMessage.getKey() != null) { +// rmqMessageExt.setKeys(omsMessage.getKey()); +// } +// if (omsMessage.getTag() != null) { +// rmqMessageExt.setTags(omsMessage.getTag()); +// } +// if (omsMessage.getStartDeliverTime() > 0L) { +// rmqMessageExt.putUserProperty("TIMER_DELIVER_MS", String.valueOf(omsMessage.getStartDeliverTime())); +// } +// +// if (omsMessage.getBody() != null) { +// rmqMessageExt.setBody(omsMessage.getBody()); +// } +// +// if (omsMessage.getShardingKey() != null && !omsMessage.getShardingKey().isEmpty()) { +// rmqMessageExt.putUserProperty("__SHARDINGKEY", omsMessage.getShardingKey()); +// } +// +// Properties systemProperties = omsMessage.getSystemProperties(); +// Properties userProperties = omsMessage.getUserProperties(); +// +// //All destinations in RocketMQ use Topic +// rmqMessageExt.setTopic(omsMessage.getTopic()); +// +// int queueId = (int) userProperties.get(Constants.PROPERTY_MESSAGE_QUEUE_ID); +// long queueOffset = (long) userProperties.get(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET); +// //use in manual ack +// rmqMessageExt.setQueueId(queueId); +// rmqMessageExt.setQueueOffset(queueOffset); +// +// for (String key : userProperties.stringPropertyNames()) { +// MessageAccessor.putProperty(rmqMessageExt, key, userProperties.getProperty(key)); +// } +// +// //System headers has a high priority +// for (String key : systemProperties.stringPropertyNames()) { +// MessageAccessor.putProperty(rmqMessageExt, key, systemProperties.getProperty(key)); +// } +// +// } catch (Exception e) { +// e.printStackTrace(); +// } +// return rmqMessageExt; +// +// } +// +// public static boolean isOMSHeader(String value) { +// for (Field field : OMSBuiltinKeys.class.getDeclaredFields()) { +// try { +// if (field.get(OMSBuiltinKeys.class).equals(value)) { +// return true; +// } +// } catch (IllegalAccessException e) { +// return false; +// } +// } +// return false; +// } +// +// /** +// * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult. +// * +// * @param rmqResult RocketMQ result +// * @return send result +// */ +// public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) { +// SendResult sendResult = new SendResult(); +// sendResult.setTopic(rmqResult.getMessageQueue().getTopic()); +// sendResult.setMessageId(rmqResult.getMsgId()); +// return sendResult; +// } +// +//// public static KeyValue buildKeyValue(KeyValue... keyValues) { +//// KeyValue keyValue = OMS.newKeyValue(); +//// for (KeyValue properties : keyValues) { +//// for (String key : properties.keySet()) { +//// keyValue.put(key, properties.getString(key)); +//// } +//// } +//// return keyValue; +//// } +// +// /** +// * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}. +// * +// * @param Target type +// * @return Iterator +// */ +// public static Iterator cycle(final Iterable iterable) { +// return new Iterator() { +// Iterator iterator = new Iterator() { +// @Override +// public synchronized boolean hasNext() { +// return false; +// } +// +// @Override +// public synchronized T next() { +// throw new NoSuchElementException(); +// } +// +// @Override +// public synchronized void remove() { +// //Ignore +// } +// }; +// +// @Override +// public synchronized boolean hasNext() { +// return iterator.hasNext() || iterable.iterator().hasNext(); +// } +// +// @Override +// public synchronized T next() { +// if (!iterator.hasNext()) { +// iterator = iterable.iterator(); +// if (!iterator.hasNext()) { +// throw new NoSuchElementException(); +// } +// } +// return iterator.next(); +// } +// +// @Override +// public synchronized void remove() { +// iterator.remove(); +// } +// }; // } - - /** - * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}. - * - * @param Target type - * @return Iterator - */ - public static Iterator cycle(final Iterable iterable) { - return new Iterator() { - Iterator iterator = new Iterator() { - @Override - public synchronized boolean hasNext() { - return false; - } - - @Override - public synchronized T next() { - throw new NoSuchElementException(); - } - - @Override - public synchronized void remove() { - //Ignore - } - }; - - @Override - public synchronized boolean hasNext() { - return iterator.hasNext() || iterable.iterator().hasNext(); - } - - @Override - public synchronized T next() { - if (!iterator.hasNext()) { - iterator = iterable.iterator(); - if (!iterator.hasNext()) { - throw new NoSuchElementException(); - } - } - return iterator.next(); - } - - @Override - public synchronized void remove() { - iterator.remove(); - } - }; - } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer similarity index 100% rename from eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.Producer similarity index 100% rename from eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.Producer diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint deleted file mode 100644 index e326c919c7..0000000000 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java index 72a9b58e65..99135aa7e8 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java @@ -18,7 +18,6 @@ package org.apache.eventmesh.connector.standalone.broker; import io.cloudevents.CloudEvent; -import io.openmessaging.api.Message; import org.apache.commons.lang3.tuple.Pair; import org.apache.eventmesh.connector.standalone.broker.model.MessageEntity; import org.apache.eventmesh.connector.standalone.broker.model.TopicMetadata; diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/MessageEntity.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/MessageEntity.java index 968053aba9..dd7468f03b 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/MessageEntity.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/MessageEntity.java @@ -18,7 +18,6 @@ package org.apache.eventmesh.connector.standalone.broker.model; import io.cloudevents.CloudEvent; -import io.openmessaging.api.Message; import java.io.Serializable; diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducer.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducer.java index 57c8a2a749..b2a6122d91 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducer.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.connector.standalone.producer; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; @@ -37,7 +36,7 @@ import io.cloudevents.CloudEvent; -public class StandaloneProducer implements Producer { +public class StandaloneProducer { private Logger logger = LoggerFactory.getLogger(StandaloneProducer.class); @@ -50,32 +49,26 @@ public StandaloneProducer(Properties properties) { this.isStarted = new AtomicBoolean(false); } - @Override public boolean isStarted() { return isStarted.get(); } - @Override public boolean isClosed() { return !isStarted.get(); } - @Override public void start() { isStarted.compareAndSet(false, true); } - @Override public void shutdown() { isStarted.compareAndSet(true, false); } - @Override - public void init(Properties properties) throws Exception { - + public StandaloneProducer init(Properties properties) throws Exception { + return new StandaloneProducer(properties); } - @Override public SendResult publish(CloudEvent cloudEvent) { Preconditions.checkNotNull(cloudEvent); try { @@ -91,7 +84,6 @@ public SendResult publish(CloudEvent cloudEvent) { } } - @Override public void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception { Preconditions.checkNotNull(cloudEvent); Preconditions.checkNotNull(sendCallback); @@ -108,12 +100,10 @@ public void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exc } } - @Override public void sendOneway(CloudEvent cloudEvent) { publish(cloudEvent); } - @Override public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) { Preconditions.checkNotNull(cloudEvent); Preconditions.checkNotNull(sendCallback); @@ -130,22 +120,19 @@ public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) { } } - @Override - public void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception { - throw new ConnectorRuntimeException("Request is not supported"); - } +// @Override +// public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { +// throw new ConnectorRuntimeException("Request is not supported"); +// } - @Override public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { throw new ConnectorRuntimeException("Request is not supported"); } - @Override public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception { throw new ConnectorRuntimeException("Reply is not supported"); } - @Override public void checkTopicExist(String topic) throws Exception { boolean exist = standaloneBroker.checkTopicExist(topic); if (!exist) { @@ -153,7 +140,6 @@ public void checkTopicExist(String topic) throws Exception { } } - @Override public void setExtFields() { } diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java index d60968cafc..8cff7e4f82 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.connector.standalone.producer; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; @@ -61,7 +60,7 @@ public void shutdown() { @Override public void init(Properties properties) throws Exception { - standaloneProducer.init(properties); + standaloneProducer = new StandaloneProducer(properties); } @Override @@ -84,10 +83,10 @@ public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) { standaloneProducer.sendAsync(cloudEvent, sendCallback); } - @Override - public void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception { - standaloneProducer.request(cloudEvent, rrCallback, timeout); - } +// @Override +// public void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception { +// standaloneProducer.request(cloudEvent, rrCallback, timeout); +// } @Override public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { diff --git a/eventmesh-examples/build.gradle b/eventmesh-examples/build.gradle index 3cb42bce3e..6e57ad6722 100644 --- a/eventmesh-examples/build.gradle +++ b/eventmesh-examples/build.gradle @@ -26,6 +26,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'io.netty:netty-all' implementation "io.cloudevents:cloudevents-core" + implementation "io.openmessaging:openmessaging-api" compileOnly 'org.projectlombok:lombok:1.18.22' annotationProcessor 'org.projectlombok:lombok:1.18.22' diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java index bd7f7cbcaa..d6f30e8502 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java @@ -22,6 +22,7 @@ import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientBroadCast; import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast; +import org.apache.eventmesh.client.tcp.common.MessageUtils; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Header; @@ -33,8 +34,9 @@ public class EventMeshTestUtils { private static final int seqLength = 10; + // generate pub-client public static UserAgent generateClient1() { - return UserAgent.builder() + UserAgent agent = UserAgent.builder() .env("test") .host("127.0.0.1") .password(generateRandomString(8)) @@ -48,10 +50,12 @@ public static UserAgent generateClient1() { .version("2.0.11") .idc("FT") .build(); + return MessageUtils.generatePubClient(agent); } + // generate sub-client public static UserAgent generateClient2() { - return UserAgent.builder() + UserAgent agent = UserAgent.builder() .env("test") .host("127.0.0.1") .password(generateRandomString(8)) @@ -65,6 +69,7 @@ public static UserAgent generateClient2() { .version("2.0.11") .idc("FT") .build(); + return MessageUtils.generateSubClient(agent); } public static Package syncRR() { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java index cb773f4eb4..bf762ebd92 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java @@ -20,7 +20,7 @@ import org.apache.eventmesh.api.acl.AclPropertyKeys; import org.apache.eventmesh.api.acl.AclService; import org.apache.eventmesh.api.exception.AclException; -import org.apache.eventmesh.api.producer.MeshMQProducer; +//import org.apache.eventmesh.api.producer.MeshMQProducer; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.spi.EventMeshExtensionFactory; import org.slf4j.Logger; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java index 58b3151578..f781071126 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java @@ -19,7 +19,6 @@ import java.util.Properties; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.factory.ConnectorPluginFactory; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/OMSMessageFactory.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/OMSMessageFactory.java deleted file mode 100644 index fcb2026fb2..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/OMSMessageFactory.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.runtime.core.protocol.cloudevent; - -import io.cloudevents.core.message.MessageReader; -import io.cloudevents.core.message.MessageWriter; -import io.cloudevents.core.message.impl.GenericStructuredMessageReader; -import io.cloudevents.core.message.impl.MessageUtils; -import io.cloudevents.lang.Nullable; -import io.cloudevents.rw.CloudEventRWException; -import io.cloudevents.rw.CloudEventWriter; -import io.openmessaging.api.Message; -import org.apache.eventmesh.runtime.core.protocol.cloudevent.impl.OMSBinaryMessageReader; -import org.apache.eventmesh.runtime.core.protocol.cloudevent.impl.OMSHeaders; -import org.apache.eventmesh.runtime.core.protocol.cloudevent.impl.OMSMessageWriter; - -import javax.annotation.ParametersAreNonnullByDefault; -import java.util.Properties; - -/** - * This class provides a collection of methods to create {@link io.cloudevents.core.message.MessageReader} - * and {@link io.cloudevents.core.message.MessageWriter} - * manually serialize/deserialize {@link io.cloudevents.CloudEvent} messages. - */ -@ParametersAreNonnullByDefault -public final class OMSMessageFactory { - - private OMSMessageFactory() { - // prevent instantiation - } - - /** - * create reader by message - * @param message - * @return - * @throws CloudEventRWException - */ - public static MessageReader createReader(final Message message) throws CloudEventRWException { - return createReader(message.getUserProperties(), message.getBody()); - } - - - public static MessageReader createReader(final Properties props, @Nullable final byte[] body) throws CloudEventRWException { - - return MessageUtils.parseStructuredOrBinaryMessage( - () -> props.getOrDefault(OMSHeaders.CONTENT_TYPE,"").toString(), - format -> new GenericStructuredMessageReader(format, body), - () -> props.getOrDefault(OMSHeaders.SPEC_VERSION,"").toString(), - sv -> new OMSBinaryMessageReader(sv, props, body) - ); - } - - - /** - * create writer by topic - * @param topic - * @return - */ - public static MessageWriter, Message> createWriter(String topic) { - return new OMSMessageWriter<>(topic); - } - - /** - * create writer by topic,keys - * @param topic - * @param keys - * @return - */ - public static MessageWriter, Message> createWriter(String topic, String keys) { - return new OMSMessageWriter<>(topic, keys); - } - - /** - * create writer by topic,keys,tags - * @param topic - * @param keys - * @param tags - * @return - */ - public static MessageWriter, Message> createWriter(String topic, String keys, String tags) { - return new OMSMessageWriter<>(topic, keys, tags); - } - -} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSBinaryMessageReader.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSBinaryMessageReader.java deleted file mode 100644 index ae035c7be8..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSBinaryMessageReader.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.runtime.core.protocol.cloudevent.impl; - -import io.cloudevents.SpecVersion; -import io.cloudevents.core.data.BytesCloudEventData; -import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl; - -import java.util.Objects; -import java.util.Properties; -import java.util.function.BiConsumer; - -/** - * binary message reader - */ -public class OMSBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl { - - private final Properties headers; - - public OMSBinaryMessageReader(SpecVersion version, Properties headers, byte[] payload) { - super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null); - - Objects.requireNonNull(headers); - this.headers = headers; - } - - /** - * whether header key is content type - * @param key - * @return - */ - @Override - protected boolean isContentTypeHeader(String key) { - return key.equals(OMSHeaders.CONTENT_TYPE); - } - - /** - * whether message header is cloudEvent header - * @param key - * @return - */ - @Override - protected boolean isCloudEventsHeader(String key) { - return key.length() > 3 && key.substring(0, OMSHeaders.CE_PREFIX.length()).startsWith(OMSHeaders.CE_PREFIX); - } - - /** - * parse message header to cloudEvent attribute - * @param key - * @return - */ - @Override - protected String toCloudEventsKey(String key) { - return key.substring(OMSHeaders.CE_PREFIX.length()).toLowerCase(); - } - - /** - * - * @param fn - */ - @Override - protected void forEachHeader(BiConsumer fn) { - this.headers.forEach((k, v) -> { - if (k != null && v != null) { - fn.accept(k.toString(), v.toString()); - } - - }); - } - - @Override - protected String toCloudEventsValue(String value) { - return value; - } -} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSHeaders.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSHeaders.java deleted file mode 100644 index 7747175165..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSHeaders.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eventmesh.runtime.core.protocol.cloudevent.impl; - -import io.cloudevents.core.message.impl.MessageUtils; -import io.cloudevents.core.v1.CloudEventV1; - -import java.util.Map; - -/** - * Define the value of CE attribute in the header of ons - */ -public class OMSHeaders { - - /** - * CE prefix - */ - public static final String CE_PREFIX = "ce_"; - - /** - * Prefix each value - */ - protected static final Map ATTRIBUTES_TO_HEADERS = MessageUtils.generateAttributesToHeadersMapping(v -> CE_PREFIX + v); - - public static final String CONTENT_TYPE = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.DATACONTENTTYPE); - - public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.SPECVERSION); - -} - diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSMessageWriter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSMessageWriter.java deleted file mode 100644 index 0d74331a06..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/cloudevent/impl/OMSMessageWriter.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.runtime.core.protocol.cloudevent.impl; - -import io.cloudevents.CloudEventData; -import io.cloudevents.SpecVersion; -import io.cloudevents.core.format.EventFormat; -import io.cloudevents.core.message.MessageWriter; -import io.cloudevents.rw.CloudEventContextWriter; -import io.cloudevents.rw.CloudEventRWException; -import io.cloudevents.rw.CloudEventWriter; -import io.openmessaging.api.Message; -import org.apache.commons.lang3.StringUtils; - -/** - * write ce to ons - * @param - */ -public final class OMSMessageWriter implements MessageWriter, Message>, CloudEventWriter { - - private Message message; - - - public OMSMessageWriter(String topic) { - message = new Message(); - message.setTopic(topic); - } - - public OMSMessageWriter(String topic, String key) { - message = new Message(); - message.setTopic(topic); - if (key != null && key.length() > 0) { - message.setKey(key); - } - } - - public OMSMessageWriter(String topic, String key, String tag) { - message = new Message(); - message.setTopic(topic); - if (StringUtils.isNotEmpty(tag)) { - message.setTag(tag); - } - - if (StringUtils.isNotEmpty(key)) { - message.setKey(key); - } - } - - - @Override - public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { - - String propName = OMSHeaders.ATTRIBUTES_TO_HEADERS.get(name); - if (propName == null) { - propName = OMSHeaders.CE_PREFIX + name; - } - message.putUserProperties(propName, value); - return this; - } - - @Override - public OMSMessageWriter create(final SpecVersion version) { - message.putUserProperties(OMSHeaders.SPEC_VERSION, version.toString()); - return this; - } - - @Override - public Message setEvent(final EventFormat format, final byte[] value) throws CloudEventRWException { - message.putUserProperties(OMSHeaders.CONTENT_TYPE, format.serializedContentType()); - message.setBody(value); - return message; - } - - @Override - public Message end(final CloudEventData data) throws CloudEventRWException { - message.setBody(data.toBytes()); - return message; - } - - @Override - public Message end() { - message.setBody(null); - return message; - } -} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java index 29cf4c81c5..34c8d7d5d2 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java @@ -20,7 +20,6 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java index 4fbea05a01..e5db1ab7e3 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java @@ -20,7 +20,6 @@ import io.cloudevents.core.builder.CloudEventBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; -import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.common.protocol.tcp.Command; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java index df4a6062b4..10e8f1ad41 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java @@ -55,7 +55,7 @@ public void run() { UserAgent user = (UserAgent) pkg.getBody(); try { //do acl check in connect - if(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable){ + if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable) { String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); Acl.doAclCheckInTcpConnect(remoteAddr, user, HELLO_REQUEST.value()); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java index 035e9d4c97..22400f7c3d 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java @@ -38,7 +38,6 @@ import com.fasterxml.jackson.databind.SerializationFeature; import io.cloudevents.CloudEvent; -import io.openmessaging.api.Message; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; @@ -200,42 +199,42 @@ public static Map getEventProp(CloudEvent event) { // return accessMessage; // } - public static EventMeshMessage encodeMessage(Message omsMessage) throws Exception { - - EventMeshMessage eventMeshMessage = new EventMeshMessage(); - eventMeshMessage.setBody(new String(omsMessage.getBody(), StandardCharsets.UTF_8)); - - Properties sysHeaders = omsMessage.getSystemProperties(); - Properties userHeaders = omsMessage.getUserProperties(); - - //All destinations in RocketMQ use Topic - eventMeshMessage.setTopic(sysHeaders.getProperty(Constants.PROPERTY_MESSAGE_DESTINATION)); - - if (sysHeaders.containsKey("START_TIME")) { - long deliverTime; - if (StringUtils.isBlank(sysHeaders.getProperty("START_TIME"))) { - deliverTime = 0; - } else { - deliverTime = Long.parseLong(sysHeaders.getProperty("START_TIME")); - } - - if (deliverTime > 0) { -// rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); - eventMeshMessage.getProperties().put("START_TIME", String.valueOf(deliverTime)); - } - } - - for (String key : userHeaders.stringPropertyNames()) { - eventMeshMessage.getProperties().put(key, userHeaders.getProperty(key)); - } - - //System headers has a high priority - for (String key : sysHeaders.stringPropertyNames()) { - eventMeshMessage.getProperties().put(key, sysHeaders.getProperty(key)); - } - - return eventMeshMessage; - } +// public static EventMeshMessage encodeMessage(Message omsMessage) throws Exception { +// +// EventMeshMessage eventMeshMessage = new EventMeshMessage(); +// eventMeshMessage.setBody(new String(omsMessage.getBody(), StandardCharsets.UTF_8)); +// +// Properties sysHeaders = omsMessage.getSystemProperties(); +// Properties userHeaders = omsMessage.getUserProperties(); +// +// //All destinations in RocketMQ use Topic +// eventMeshMessage.setTopic(sysHeaders.getProperty(Constants.PROPERTY_MESSAGE_DESTINATION)); +// +// if (sysHeaders.containsKey("START_TIME")) { +// long deliverTime; +// if (StringUtils.isBlank(sysHeaders.getProperty("START_TIME"))) { +// deliverTime = 0; +// } else { +// deliverTime = Long.parseLong(sysHeaders.getProperty("START_TIME")); +// } +// +// if (deliverTime > 0) { +//// rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); +// eventMeshMessage.getProperties().put("START_TIME", String.valueOf(deliverTime)); +// } +// } +// +// for (String key : userHeaders.stringPropertyNames()) { +// eventMeshMessage.getProperties().put(key, userHeaders.getProperty(key)); +// } +// +// //System headers has a high priority +// for (String key : sysHeaders.stringPropertyNames()) { +// eventMeshMessage.getProperties().put(key, sysHeaders.getProperty(key)); +// } +// +// return eventMeshMessage; +// } public static String getLocalAddr() { //priority of networkInterface when generating client ip diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java index ceaa4ce655..8ed1ea85b8 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java @@ -41,8 +41,8 @@ public EventMeshMessageTCPClient(EventMeshTCPClientConfig eventMeshTcpClientConf @Override public void init() throws EventMeshException { - eventMeshMessageTCPPubClient.init(); eventMeshMessageTCPSubClient.init(); + eventMeshMessageTCPPubClient.init(); } @Override diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java index 794cea66e5..a3dd6a41bd 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java @@ -56,7 +56,7 @@ class EventMeshMessageTCPPubClient extends TcpClient implements EventMeshTCPPubC public EventMeshMessageTCPPubClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) { super(eventMeshTcpClientConfig); - this.userAgent = eventMeshTcpClientConfig.getUserAgent(); + this.userAgent = MessageUtils.generatePubClient(eventMeshTcpClientConfig.getUserAgent()); } @Override diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java index 7939066e92..add12a5d85 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java @@ -55,7 +55,7 @@ class EventMeshMessageTCPSubClient extends TcpClient implements EventMeshTCPSubC public EventMeshMessageTCPSubClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) { super(eventMeshTcpClientConfig); - this.userAgent = eventMeshTcpClientConfig.getUserAgent(); + this.userAgent = MessageUtils.generateSubClient(eventMeshTcpClientConfig.getUserAgent()); } @Override