Skip to content

Commit

Permalink
Merge pull request #1228 from apache/revert-1226-master
Browse files Browse the repository at this point in the history
  • Loading branch information
qqeasonchen authored Sep 6, 2022
2 parents 7ce2c45 + a43d935 commit 3aae182
Show file tree
Hide file tree
Showing 10 changed files with 15 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,4 @@ public class Constants {
public static final String AND = "&";

public static final String EMPTY = "";

/**
* GRPC PROTOCOL
*/
public static final String PROTOCOL_GRPC = "grpc";

/**
* application/cloudevents+json Content-type
*/
public static final String CONTENT_TYPE_CLOUDEVENTS_JSON = "application/cloudevents+json";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public static CloudEvent generateCloudEventV1Async() {
.withId(UUID.randomUUID().toString())
.withSubject(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC)
.withSource(URI.create("/"))
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withDataContentType("application/cloudevents+json")
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension("ttl", "30000")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public Map<String, Object> toMap() {
String.format("DateContentType:%s is not supported", dataContentType));
pkg.setBody(eventFormat.serialize(cloudEvent));
return pkg;
} else if (StringUtils.equals(Constants.PROTOCOL_GRPC, protocolDesc)) {
} else if (StringUtils.equals("grpc", protocolDesc)) {
return GrpcMessageProtocolResolver.buildSimpleMessage(cloudEvent);
} else {
throw new ProtocolHandleException(String.format("Unsupported protocolDesc: %s", protocolDesc));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.protocol.cloudevents.resolver.grpc;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.grpc.common.SimpleMessageWrapper;
import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage;
Expand All @@ -41,7 +40,7 @@ public class GrpcMessageProtocolResolver {
public static CloudEvent buildEvent(SimpleMessage message) {
String cloudEventJson = message.getContent();

String contentType = message.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, Constants.CONTENT_TYPE_CLOUDEVENTS_JSON);
String contentType = message.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, "application/cloudevents+json");
EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(contentType);
CloudEvent event = eventFormat.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8));

Expand Down Expand Up @@ -116,7 +115,7 @@ public static SimpleMessageWrapper buildSimpleMessage(CloudEvent cloudEvent) {
String sys = cloudEvent.getExtension(ProtocolKey.SYS) == null ? "sys123" : cloudEvent.getExtension(ProtocolKey.SYS).toString();
String userName = cloudEvent.getExtension(ProtocolKey.USERNAME) == null ? "user" : cloudEvent.getExtension(ProtocolKey.USERNAME).toString();
String passwd = cloudEvent.getExtension(ProtocolKey.PASSWD) == null ? "pass" : cloudEvent.getExtension(ProtocolKey.PASSWD).toString();
String language = cloudEvent.getExtension(ProtocolKey.LANGUAGE) == null ? Constants.LANGUAGE_JAVA : cloudEvent.getExtension(ProtocolKey.LANGUAGE).toString();
String language = cloudEvent.getExtension(ProtocolKey.LANGUAGE) == null ? "JAVA" : cloudEvent.getExtension(ProtocolKey.LANGUAGE).toString();
String protocol = cloudEvent.getExtension(ProtocolKey.PROTOCOL_TYPE) == null ? "protocol" :
cloudEvent.getExtension(ProtocolKey.PROTOCOL_TYPE).toString();
String protocolDesc = cloudEvent.getExtension(ProtocolKey.PROTOCOL_DESC) == null ? "protocolDesc" :
Expand Down Expand Up @@ -167,7 +166,7 @@ public static List<CloudEvent> buildBatchEvents(BatchMessage batchMessage) {
for (BatchMessage.MessageItem item : batchMessage.getMessageItemList()) {
String cloudEventJson = item.getContent();

String contentType = item.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, Constants.CONTENT_TYPE_CLOUDEVENTS_JSON);
String contentType = item.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, "application/cloudevents+json");
EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(contentType);
CloudEvent event = eventFormat.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public Map<String, Object> toMap() {
body.toMap();
httpCommand.setBody(body);
return httpCommand;
} else if (StringUtils.equals(Constants.PROTOCOL_GRPC, protocolDesc)) {
} else if (StringUtils.equals("grpc", protocolDesc)) {
return GrpcMessageProtocolResolver.buildSimpleMessage(cloudEvent);
} else if (StringUtils.equals("tcp", protocolDesc)) {
return TcpMessageProtocolResolver.buildEventMeshMessage(cloudEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.protocol.meshmessage.resolver.grpc;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.grpc.common.SimpleMessageWrapper;
import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage;
Expand Down Expand Up @@ -228,7 +227,7 @@ public static SimpleMessageWrapper buildSimpleMessage(CloudEvent cloudEvent) {
String sys = cloudEvent.getExtension(ProtocolKey.SYS) == null ? "sys" : cloudEvent.getExtension(ProtocolKey.SYS).toString();
String userName = cloudEvent.getExtension(ProtocolKey.USERNAME) == null ? "user" : cloudEvent.getExtension(ProtocolKey.USERNAME).toString();
String passwd = cloudEvent.getExtension(ProtocolKey.PASSWD) == null ? "pass" : cloudEvent.getExtension(ProtocolKey.PASSWD).toString();
String language = cloudEvent.getExtension(ProtocolKey.LANGUAGE) == null ? Constants.LANGUAGE_JAVA : cloudEvent.getExtension(ProtocolKey.LANGUAGE).toString();
String language = cloudEvent.getExtension(ProtocolKey.LANGUAGE) == null ? "JAVA" : cloudEvent.getExtension(ProtocolKey.LANGUAGE).toString();
String protocol = cloudEvent.getExtension(ProtocolKey.PROTOCOL_TYPE) == null ? "" :
cloudEvent.getExtension(ProtocolKey.PROTOCOL_TYPE).toString();
String protocolDesc = cloudEvent.getExtension(ProtocolKey.PROTOCOL_DESC) == null ? "" :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void tryPushRequest() {
builder.addHeader(ProtocolKey.PROTOCOL_DESC, requestHeader.getProtocolDesc());
builder.addHeader(ProtocolKey.PROTOCOL_VERSION, requestHeader.getProtocolVersion());
builder.addHeader(ProtocolKey.CONTENT_TYPE, simpleMessage.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE,
Constants.CONTENT_TYPE_CLOUDEVENTS_JSON));
"application/cloudevents+json"));

List<NameValuePair> body = new ArrayList<>();
body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, simpleMessage.getContent()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ private CloudEvent enhanceCloudEvent(final CloudEvent cloudEvent, String timeout
.withExtension(ProtocolKey.IP, IPUtils.getLocalAddress())
.withExtension(ProtocolKey.PID, Long.toString(ThreadUtils.getPID()))
.withExtension(ProtocolKey.SYS, clientConfig.getSys())
.withExtension(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.withExtension(ProtocolKey.LANGUAGE, "JAVA")
.withExtension(ProtocolKey.PROTOCOL_TYPE, PROTOCOL_TYPE)
.withExtension(ProtocolKey.PROTOCOL_DESC, Constants.PROTOCOL_GRPC)
.withExtension(ProtocolKey.PROTOCOL_DESC, "grpc")
.withExtension(ProtocolKey.PROTOCOL_VERSION, cloudEvent.getSpecVersion().toString())
.withExtension(ProtocolKey.UNIQUE_ID, RandomStringUtils.generateNum(30))
.withExtension(ProtocolKey.SEQ_NUM, RandomStringUtils.generateNum(30))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static RequestHeader buildHeader(EventMeshGrpcClientConfig clientConfig,
.setUsername(clientConfig.getUserName())
.setPassword(clientConfig.getPassword())
.setProtocolType(protocolType)
.setProtocolDesc(Constants.PROTOCOL_GRPC)
.setProtocolDesc("grpc")
// default CloudEvents version is V1
.setProtocolVersion(SpecVersion.V1.toString())
.build();
Expand Down Expand Up @@ -117,7 +117,7 @@ public static <T> SimpleMessage buildSimpleMessage(T message, EventMeshGrpcClien
String protocolType) {
if (EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME.equals(protocolType)) {
CloudEvent cloudEvent = (CloudEvent) message;
String contentType = StringUtils.isEmpty(cloudEvent.getDataContentType()) ? Constants.CONTENT_TYPE_CLOUDEVENTS_JSON
String contentType = StringUtils.isEmpty(cloudEvent.getDataContentType()) ? "application/cloudevents+json"
: cloudEvent.getDataContentType();
byte[] bodyByte = EventFormatProvider.getInstance().resolveFormat(contentType)
.serialize(cloudEvent);
Expand Down Expand Up @@ -183,7 +183,7 @@ public static <T> BatchMessage buildBatchMessages(List<T> messageList, EventMesh
.setTopic(events.get(0).getSubject());

for (CloudEvent event : events) {
String contentType = StringUtils.isEmpty(event.getDataContentType()) ? Constants.CONTENT_TYPE_CLOUDEVENTS_JSON
String contentType = StringUtils.isEmpty(event.getDataContentType()) ? "application/cloudevents+json"
: event.getDataContentType();
byte[] bodyByte = EventFormatProvider.getInstance().resolveFormat(contentType)
.serialize(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testBuildHeader() {
.hasFieldOrPropertyWithValue("username", clientConfig.getUserName())
.hasFieldOrPropertyWithValue("password", clientConfig.getPassword())
.hasFieldOrPropertyWithValue("protocolType", "protocolType")
.hasFieldOrPropertyWithValue("protocolDesc", Constants.PROTOCOL_GRPC)
.hasFieldOrPropertyWithValue("protocolDesc", "grpc")
.hasFieldOrPropertyWithValue("protocolVersion", SpecVersion.V1.toString());
}

Expand Down

0 comments on commit 3aae182

Please sign in to comment.