Skip to content

Commit

Permalink
update http push request and adaptor (apache#606)
Browse files Browse the repository at this point in the history
1.update http push request
2.update http in protocol adaptor
  • Loading branch information
xwm1992 committed Dec 27, 2021
1 parent 0c6a6fe commit ccf584b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchV2ProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageRequestProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.tcp.TcpMessageProtocolResolver;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* CloudEvents protocol adaptor, used to transform CloudEvents message to CloudEvents message.
Expand Down Expand Up @@ -93,9 +96,18 @@ public List<CloudEvent> toBatchCloudEvent(ProtocolTransportObject protocol)
public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException {
String protocolDesc = cloudEvent.getExtension(Constants.PROTOCOL_DESC).toString();
if (StringUtils.equals("http", protocolDesc)) {
// todo: return command, set cloudEvent.getData() to content?
return null;
// return new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8);
HttpCommand httpCommand = new HttpCommand();
Body body = new Body() {
final Map<String, Object> map = new HashMap<>();
@Override
public Map<String, Object> toMap() {
map.put("content", JsonUtils.serialize(cloudEvent));
return map;
}
};
body.toMap();
httpCommand.setBody(body);
return httpCommand;
} else if (StringUtils.equals("tcp", protocolDesc)) {
Package pkg = new Package();
pkg.setBody(cloudEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.meshmessage.resolver.http.SendMessageBatchProtocolResolver;
Expand All @@ -33,7 +34,10 @@

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.cloudevents.CloudEvent;

Expand Down Expand Up @@ -90,9 +94,18 @@ public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws Prot
String protocolDesc = cloudEvent.getExtension(Constants.PROTOCOL_DESC).toString();

if (StringUtils.equals("http", protocolDesc)) {
// todo: return command, set cloudEvent.getData() to content?
return null;
// return new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8);
HttpCommand httpCommand = new HttpCommand();
Body body = new Body() {
final Map<String, Object> map = new HashMap<>();
@Override
public Map<String, Object> toMap() {
map.put("content", new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8));
return map;
}
};
body.toMap();
httpCommand.setBody(body);
return httpCommand;
} else if (StringUtils.equals("tcp", protocolDesc)) {
return TcpMessageProtocolResolver.buildEventMeshMessage(cloudEvent);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
import org.apache.eventmesh.common.utils.RandomStringUtils;
Expand Down Expand Up @@ -120,12 +121,9 @@ public void tryHTTPRequest() {

ProtocolAdaptor<ProtocolTransportObject> protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);

// todo
ProtocolTransportObject protocolTransportObject =
protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());

// content =
// new String(handleMsgContext.getEvent().getData().toBytes(), EventMeshConstants.DEFAULT_CHARSET);
content = ((HttpCommand) protocolTransportObject).getBody().toMap().get("content").toString();
} catch (Exception ex) {
return;
}
Expand Down

0 comments on commit ccf584b

Please sign in to comment.