diff --git a/eventmesh-sdk-go/common/constants.go b/eventmesh-sdk-go/common/constants.go new file mode 100644 index 0000000000..bdc085915f --- /dev/null +++ b/eventmesh-sdk-go/common/constants.go @@ -0,0 +1,33 @@ +package common + +var Constants = struct { + LANGUAGE_GO string + HTTP_PROTOCOL_PREFIX string + HTTPS_PROTOCOL_PREFIX string + PROTOCOL_TYPE string + PROTOCOL_VERSION string + PROTOCOL_DESC string + DEFAULT_HTTP_TIME_OUT int64 + EVENTMESH_MESSAGE_CONST_TTL string + + // Client heartbeat interval + HEARTBEAT int64 + + // Protocol type + CLOUD_EVENTS_PROTOCOL_NAME string + EM_MESSAGE_PROTOCOL_NAME string + OPEN_MESSAGE_PROTOCOL_NAME string +}{ + LANGUAGE_GO: "GO", + HTTP_PROTOCOL_PREFIX: "http://", + HTTPS_PROTOCOL_PREFIX: "https://", + PROTOCOL_TYPE: "protocoltype", + PROTOCOL_VERSION: "protocolversion", + PROTOCOL_DESC: "protocoldesc", + DEFAULT_HTTP_TIME_OUT: 15000, + EVENTMESH_MESSAGE_CONST_TTL: "ttl", + HEARTBEAT: 30 * 1000, + CLOUD_EVENTS_PROTOCOL_NAME: "cloudevents", + EM_MESSAGE_PROTOCOL_NAME: "eventmeshmessage", + OPEN_MESSAGE_PROTOCOL_NAME: "openmessage", +} diff --git a/eventmesh-sdk-go/common/protocol/http/body/body.go b/eventmesh-sdk-go/common/protocol/http/body/body.go new file mode 100644 index 0000000000..b0508204fa --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/http/body/body.go @@ -0,0 +1,9 @@ +package body + +type Body struct { + ToMap map[string]interface{} +} + +func (b *Body) BuildBody(requestCode string, originalMap map[string]interface{}) *Body { + return nil +} diff --git a/eventmesh-sdk-go/common/protocol/http/body/client/heartbeat_request_body.go b/eventmesh-sdk-go/common/protocol/http/body/client/heartbeat_request_body.go new file mode 100644 index 0000000000..2ae5ab38f1 --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/http/body/client/heartbeat_request_body.go @@ -0,0 +1,57 @@ +package client + +import ( + "eventmesh/common/protocol/http/body" +) + +var HeartbeatRequestBodyKey = struct { + CLIENTTYPE string + CONSUMERGROUP string + HEARTBEATENTITIES string +}{ + CLIENTTYPE: "clientType", + HEARTBEATENTITIES: "heartbeatEntities", + CONSUMERGROUP: "consumerGroup", +} + +type HeartbeatEntity struct { + Topic string `json:"topic"` + Url string `json:"url"` + ServiceId string `json:"serviceId"` + InstanceId string `json:"instanceId"` +} + +type HeartbeatRequestBody struct { + body.Body + consumerGroup string + clientType string + heartbeatEntities string +} + +func (h *HeartbeatRequestBody) ConsumerGroup() string { + return h.consumerGroup +} + +func (h *HeartbeatRequestBody) SetConsumerGroup(consumerGroup string) { + h.consumerGroup = consumerGroup +} + +func (h *HeartbeatRequestBody) ClientType() string { + return h.clientType +} + +func (h *HeartbeatRequestBody) SetClientType(clientType string) { + h.clientType = clientType +} + +func (h *HeartbeatRequestBody) HeartbeatEntities() string { + return h.heartbeatEntities +} + +func (h *HeartbeatRequestBody) SetHeartbeatEntities(heartbeatEntities string) { + h.heartbeatEntities = heartbeatEntities +} + +func (h *HeartbeatRequestBody) BuildBody(bodyParam map[string]interface{}) *HeartbeatRequestBody { + return nil +} diff --git a/eventmesh-sdk-go/common/protocol/http/body/client/subscribe_request_body.go b/eventmesh-sdk-go/common/protocol/http/body/client/subscribe_request_body.go new file mode 100644 index 0000000000..07d797d529 --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/http/body/client/subscribe_request_body.go @@ -0,0 +1,23 @@ +package client + +import ( + "eventmesh/common/protocol" + "eventmesh/common/protocol/http/body" +) + +var SubscribeRequestBodyKey = struct { + TOPIC string + URL string + CONSUMERGROUP string +}{ + TOPIC: "topic", + URL: "url", + CONSUMERGROUP: "consumerGroup", +} + +type SubscribeRequestBody struct { + body.Body + topics []protocol.SubscriptionItem + url string + consumerGroup string +} diff --git a/eventmesh-sdk-go/common/protocol/http/common/client_type.go b/eventmesh-sdk-go/common/protocol/http/common/client_type.go new file mode 100644 index 0000000000..b72ba80339 --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/http/common/client_type.go @@ -0,0 +1,20 @@ +package common + +type ClientType struct { + Type int `json:"type"` + Desc string `json:"desc"` +} + +var DefaultClientType = struct { + PUB ClientType + SUB ClientType +}{ + PUB: ClientType{ + Type: 1, + Desc: "Client for publishing", + }, + SUB: ClientType{ + Type: 2, + Desc: "Client for subscribing", + }, +} diff --git a/eventmesh-sdk-go/common/protocol/http/common/eventmesh_ret_code.go b/eventmesh-sdk-go/common/protocol/http/common/eventmesh_ret_code.go new file mode 100644 index 0000000000..3f26f827c8 --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/http/common/eventmesh_ret_code.go @@ -0,0 +1,12 @@ +package common + +type EventMeshRetCode struct { + RetCode int `json:"retCode"` + ErrMsg string `json:"errMsg"` +} + +var DefaultEventMeshRetCode = struct { + SUCCESS EventMeshRetCode +}{ + SUCCESS: EventMeshRetCode{RetCode: 0, ErrMsg: "success"}, +} diff --git a/eventmesh-sdk-go/common/protocol/http/common/protocol_key.go b/eventmesh-sdk-go/common/protocol/http/common/protocol_key.go new file mode 100644 index 0000000000..6d67de8d0a --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/http/common/protocol_key.go @@ -0,0 +1,70 @@ +package common + +type ClientInstanceKey struct { + //Protocol layer requester description + ENV string + IDC string + SYS string + PID string + IP string + USERNAME string + PASSWORD string + BIZSEQNO string + UNIQUEID string +} + +type EventMeshInstanceKey struct { + //Protocol layer EventMesh description + EVENTMESHCLUSTER string + EVENTMESHIP string + EVENTMESHENV string + EVENTMESHIDC string +} + +var ProtocolKey = struct { + REQUEST_CODE string + LANGUAGE string + VERSION string + PROTOCOL_TYPE string + PROTOCOL_VERSION string + PROTOCOL_DESC string + + ClientInstanceKey ClientInstanceKey + + EventMeshInstanceKey EventMeshInstanceKey + + //return of CLIENT <-> EventMesh + RETCODE string + RETMSG string + RESTIME string +}{ + REQUEST_CODE: "code", + LANGUAGE: "language", + VERSION: "version", + PROTOCOL_TYPE: "protocoltype", + PROTOCOL_VERSION: "protocolversion", + PROTOCOL_DESC: "protocoldesc", + + ClientInstanceKey: ClientInstanceKey{ + ENV: "env", + IDC: "idc", + SYS: "sys", + PID: "pid", + IP: "ip", + USERNAME: "username", + PASSWORD: "passwd", + BIZSEQNO: "bizseqno", + UNIQUEID: "uniqueid", + }, + + EventMeshInstanceKey: EventMeshInstanceKey{ + EVENTMESHCLUSTER: "eventmeshcluster", + EVENTMESHIP: "eventmeship", + EVENTMESHENV: "eventmeshenv", + EVENTMESHIDC: "eventmeshidc", + }, + + RETCODE: "retCode", + RETMSG: "retMsg", + RESTIME: "resTime", +} diff --git a/eventmesh-sdk-go/common/protocol/http/common/protocol_version.go b/eventmesh-sdk-go/common/protocol/http/common/protocol_version.go new file mode 100644 index 0000000000..f7fe6a5dda --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/http/common/protocol_version.go @@ -0,0 +1,25 @@ +package common + +type ProtocolVersion struct { + version string +} + +func (p *ProtocolVersion) Version() string { + return p.version +} + +func (p *ProtocolVersion) SetVersion(version string) { + p.version = version +} + +var DefaultProtocolVersion = struct { + V1 ProtocolVersion + V2 ProtocolVersion +}{ + V1: ProtocolVersion{ + version: "1.0", + }, + V2: ProtocolVersion{ + version: "2.0", + }, +} diff --git a/eventmesh-sdk-go/common/protocol/http/common/request_code.go b/eventmesh-sdk-go/common/protocol/http/common/request_code.go new file mode 100644 index 0000000000..6ea3a66b43 --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/http/common/request_code.go @@ -0,0 +1,80 @@ +package common + +type RequestCode struct { + RequestCode int `json:"requestCode"` + Desc string `json:"desc"` +} + +var DefaultRequestCode = struct { + MSG_BATCH_SEND RequestCode + MSG_BATCH_SEND_V2 RequestCode + MSG_SEND_SYNC RequestCode + MSG_SEND_ASYNC RequestCode + HTTP_PUSH_CLIENT_ASYNC RequestCode + HTTP_PUSH_CLIENT_SYNC RequestCode + REGISTER RequestCode + UNREGISTER RequestCode + HEARTBEAT RequestCode + SUBSCRIBE RequestCode + UNSUBSCRIBE RequestCode + REPLY_MESSAGE RequestCode + ADMIN_METRICS RequestCode + ADMIN_SHUTDOWN RequestCode +}{ + MSG_BATCH_SEND: RequestCode{ + RequestCode: 102, + Desc: "SEND BATCH MSG", + }, + MSG_BATCH_SEND_V2: RequestCode{ + RequestCode: 107, + Desc: "SEND BATCH MSG V2", + }, + MSG_SEND_SYNC: RequestCode{ + RequestCode: 101, + Desc: "SEND SINGLE MSG SYNC", + }, + MSG_SEND_ASYNC: RequestCode{ + RequestCode: 104, + Desc: "SEND SINGLE MSG ASYNC", + }, + HTTP_PUSH_CLIENT_ASYNC: RequestCode{ + RequestCode: 105, + Desc: "PUSH CLIENT BY HTTP POST", + }, + HTTP_PUSH_CLIENT_SYNC: RequestCode{ + RequestCode: 106, + Desc: "PUSH CLIENT BY HTTP POST", + }, + REGISTER: RequestCode{ + RequestCode: 201, + Desc: "REGISTER", + }, + UNREGISTER: RequestCode{ + RequestCode: 202, + Desc: "UNREGISTER", + }, + HEARTBEAT: RequestCode{ + RequestCode: 203, + Desc: "HEARTBEAT", + }, + SUBSCRIBE: RequestCode{ + RequestCode: 206, + Desc: "SUBSCRIBE", + }, + UNSUBSCRIBE: RequestCode{ + RequestCode: 207, + Desc: "UNSUBSCRIBE", + }, + REPLY_MESSAGE: RequestCode{ + RequestCode: 301, + Desc: "REPLY MESSAGE", + }, + ADMIN_METRICS: RequestCode{ + RequestCode: 603, + Desc: "ADMIN METRICS", + }, + ADMIN_SHUTDOWN: RequestCode{ + RequestCode: 601, + Desc: "ADMIN SHUTDOWN", + }, +} diff --git a/eventmesh-sdk-go/common/protocol/http/message/send_message_request_body.go b/eventmesh-sdk-go/common/protocol/http/message/send_message_request_body.go new file mode 100644 index 0000000000..0fc0744caf --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/http/message/send_message_request_body.go @@ -0,0 +1,96 @@ +package message + +type SendMessageRequestBody struct { + topic string + bizSeqNo string + uniqueId string + ttl string + content string + tag string + extFields map[string]string + producerGroup string +} + +func (s *SendMessageRequestBody) Topic() string { + return s.topic +} + +func (s *SendMessageRequestBody) SetTopic(topic string) { + s.topic = topic +} + +func (s *SendMessageRequestBody) BizSeqNo() string { + return s.bizSeqNo +} + +func (s *SendMessageRequestBody) SetBizSeqNo(bizSeqNo string) { + s.bizSeqNo = bizSeqNo +} + +func (s *SendMessageRequestBody) UniqueId() string { + return s.uniqueId +} + +func (s *SendMessageRequestBody) SetUniqueId(uniqueId string) { + s.uniqueId = uniqueId +} + +func (s *SendMessageRequestBody) Ttl() string { + return s.ttl +} + +func (s *SendMessageRequestBody) SetTtl(ttl string) { + s.ttl = ttl +} + +func (s *SendMessageRequestBody) Content() string { + return s.content +} + +func (s *SendMessageRequestBody) SetContent(content string) { + s.content = content +} + +func (s *SendMessageRequestBody) Tag() string { + return s.tag +} + +func (s *SendMessageRequestBody) SetTag(tag string) { + s.tag = tag +} + +func (s *SendMessageRequestBody) ExtFields() map[string]string { + return s.extFields +} + +func (s *SendMessageRequestBody) SetExtFields(extFields map[string]string) { + s.extFields = extFields +} + +func (s *SendMessageRequestBody) ProducerGroup() string { + return s.producerGroup +} + +func (s *SendMessageRequestBody) SetProducerGroup(producerGroup string) { + s.producerGroup = producerGroup +} + +var SendMessageRequestBodyKey = struct { + TOPIC string + BIZSEQNO string + UNIQUEID string + CONTENT string + TTL string + TAG string + EXTFIELDS string + PRODUCERGROUP string +}{ + TOPIC: "topic", + BIZSEQNO: "bizseqno", + UNIQUEID: "uniqueid", + CONTENT: "content", + TTL: "ttl", + TAG: "tag", + EXTFIELDS: "extFields", + PRODUCERGROUP: "producergroup", +} diff --git a/eventmesh-sdk-go/common/protocol/message_type.go b/eventmesh-sdk-go/common/protocol/message_type.go new file mode 100644 index 0000000000..36f87e864b --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/message_type.go @@ -0,0 +1,13 @@ +package protocol + +type MessageType string + +var DefaultMessageType = struct { + CloudEvent MessageType + OpenMessage MessageType + EventMeshMessage MessageType +}{ + CloudEvent: "CloudEvent", + OpenMessage: "OpenMessage", + EventMeshMessage: "EventMeshMessage", +} diff --git a/eventmesh-sdk-go/common/protocol/subscription_item.go b/eventmesh-sdk-go/common/protocol/subscription_item.go new file mode 100644 index 0000000000..3cfa3c0381 --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/subscription_item.go @@ -0,0 +1,7 @@ +package protocol + +type SubscriptionItem struct { + Topic string `json:"topic"` + Mode SubscriptionMode `json:"mode"` + Type SubscriptionType `json:"type"` +} diff --git a/eventmesh-sdk-go/common/protocol/subscription_mode.go b/eventmesh-sdk-go/common/protocol/subscription_mode.go new file mode 100644 index 0000000000..1cf6961063 --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/subscription_mode.go @@ -0,0 +1,11 @@ +package protocol + +type SubscriptionMode string + +var DefaultSubscriptionMode = struct { + BROADCASTING SubscriptionMode + CLUSTERING SubscriptionMode +}{ + BROADCASTING: "BROADCASTING", + CLUSTERING: "CLUSTERING", +} diff --git a/eventmesh-sdk-go/common/protocol/subscription_type.go b/eventmesh-sdk-go/common/protocol/subscription_type.go new file mode 100644 index 0000000000..3d09844730 --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/subscription_type.go @@ -0,0 +1,11 @@ +package protocol + +type SubscriptionType string + +var DefaultSubscriptionType = struct { + SYNC SubscriptionType + ASYNC SubscriptionType +}{ + SYNC: "SYNC", + ASYNC: "ASYNC", +} diff --git a/eventmesh-sdk-go/common/protocol/tcp/codec/codec.go b/eventmesh-sdk-go/common/protocol/tcp/codec/codec.go new file mode 100644 index 0000000000..396ef7aca2 --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/tcp/codec/codec.go @@ -0,0 +1,168 @@ +package codec + +import ( + "bytes" + "encoding/binary" + gcommon "eventmesh/common" + "eventmesh/common/protocol/tcp" + gutils "eventmesh/common/utils" + "eventmesh/tcp/common" + "log" +) + +const ( + MAGIC = "EventMesh" + VERSION = "0000" + LENGTH_SIZE = 4 +) + +func EncodePackage(message tcp.Package) *bytes.Buffer { + + header := message.Header + headerData := header.Marshal() + + var bodyData []byte + if header.GetProperty(gcommon.Constants.PROTOCOL_TYPE) != common.EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME { + bodyData = gutils.MarshalJsonBytes(message.Body) + } else { + bodyData = (message.Body).([]byte) + } + + headerLen := len(headerData) + bodyLen := len(bodyData) + + length := LENGTH_SIZE + LENGTH_SIZE + headerLen + bodyLen + + var out bytes.Buffer + out.WriteString(MAGIC) + out.WriteString(VERSION) + + lengthBytes := make([]byte, LENGTH_SIZE) + binary.BigEndian.PutUint32(lengthBytes, uint32(length)) + + headerLenBytes := make([]byte, LENGTH_SIZE) + binary.BigEndian.PutUint32(headerLenBytes, uint32(headerLen)) + + out.Write(lengthBytes) + out.Write(headerLenBytes) + out.Write(headerData) + out.Write(bodyData) + + return &out +} + +func DecodePackage(in *bytes.Buffer) tcp.Package { + flagBytes := parseFlag(in) + versionBytes := parseVersion(in) + validateFlag(flagBytes, versionBytes) + + length := parseLength(in) + headerLen := parseLength(in) + bodyLen := length - headerLen - LENGTH_SIZE - LENGTH_SIZE + header := parseHeader(in, int(headerLen)) + body := parseBody(in, header, int(bodyLen)) + return tcp.Package{Header: header, Body: body} +} + +func parseFlag(in *bytes.Buffer) []byte { + flagLen := len([]byte(MAGIC)) + flagBytes := make([]byte, flagLen) + n, err := in.Read(flagBytes) + if err != nil { + return nil + } + log.Printf("read %d bytes (flag) \n", n) + return flagBytes +} + +func parseVersion(in *bytes.Buffer) []byte { + verLen := len([]byte(VERSION)) + verBytes := make([]byte, verLen) + n, err := in.Read(verBytes) + if err != nil { + return nil + } + log.Printf("read %d bytes (version) \n", n) + return verBytes +} + +func parseLength(in *bytes.Buffer) uint32 { + lenBytes := make([]byte, 4) + n, err := in.Read(lenBytes) + if err != nil { + log.Fatal("Failed to parse length") + } + log.Printf("read %d bytes (length) \n", n) + return binary.BigEndian.Uint32(lenBytes) +} + +func parseHeader(in *bytes.Buffer, headerLen int) tcp.Header { + headerBytes := make([]byte, headerLen) + n, err := in.Read(headerBytes) + if err != nil { + log.Fatal("Failed to parse header") + } + log.Printf("read %d bytes (header) \n", n) + + var header tcp.Header + return header.Unmarshal(headerBytes) +} + +func parseBody(in *bytes.Buffer, header tcp.Header, bodyLen int) interface{} { + if bodyLen <= 0 { + return nil + } + + bodyBytes := make([]byte, bodyLen) + n, err := in.Read(bodyBytes) + if err != nil { + log.Fatal("Failed to parse body") + } + log.Printf("read %d bytes (body) \n", n) + + bodyStr := string(bodyBytes) + return deserializeBody(bodyStr, header) +} + +func deserializeBody(bodyStr string, header tcp.Header) interface{} { + command := header.Cmd + switch command { + case tcp.DefaultCommand.HELLO_REQUEST: + case tcp.DefaultCommand.RECOMMEND_REQUEST: + var useAgent tcp.UserAgent + gutils.UnMarshalJsonString(bodyStr, &useAgent) + return useAgent + case tcp.DefaultCommand.SUBSCRIBE_REQUEST: + case tcp.DefaultCommand.UNSUBSCRIBE_REQUEST: + return nil + //return OBJECT_MAPPER.readValue(bodyJsonString, Subscription.class); + case tcp.DefaultCommand.REQUEST_TO_SERVER: + case tcp.DefaultCommand.RESPONSE_TO_SERVER: + case tcp.DefaultCommand.ASYNC_MESSAGE_TO_SERVER: + case tcp.DefaultCommand.BROADCAST_MESSAGE_TO_SERVER: + case tcp.DefaultCommand.REQUEST_TO_CLIENT: + case tcp.DefaultCommand.RESPONSE_TO_CLIENT: + case tcp.DefaultCommand.ASYNC_MESSAGE_TO_CLIENT: + case tcp.DefaultCommand.BROADCAST_MESSAGE_TO_CLIENT: + case tcp.DefaultCommand.REQUEST_TO_CLIENT_ACK: + case tcp.DefaultCommand.RESPONSE_TO_CLIENT_ACK: + case tcp.DefaultCommand.ASYNC_MESSAGE_TO_CLIENT_ACK: + case tcp.DefaultCommand.BROADCAST_MESSAGE_TO_CLIENT_ACK: + // The message string will be deserialized by protocol plugin, if the event is cloudevents, the body is + // just a string. + return bodyStr + case tcp.DefaultCommand.REDIRECT_TO_CLIENT: + return nil + //return OBJECT_MAPPER.readValue(bodyJsonString, RedirectInfo.class); + default: + // FIXME improve codes + log.Printf("Invalidate TCP command: %s\n", command) + return nil + } + + return nil +} + +func validateFlag(flagBytes, versionBytes []byte) { + // TODO add check +} diff --git a/eventmesh-sdk-go/common/protocol/tcp/command.go b/eventmesh-sdk-go/common/protocol/tcp/command.go new file mode 100644 index 0000000000..5d0c668458 --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/tcp/command.go @@ -0,0 +1,202 @@ +package tcp + +type Command string + +var DefaultCommand = struct { + //heartbeat + HEARTBEAT_REQUEST Command //client send heartbeat packet to server + HEARTBEAT_RESPONSE Command //server response heartbeat packet of client + + //handshake + HELLO_REQUEST Command //client send handshake request to server + HELLO_RESPONSE Command //server response handshake request of client + + //disconnection + CLIENT_GOODBYE_REQUEST Command //Notify server when client actively disconnects + CLIENT_GOODBYE_RESPONSE Command //Server replies to client's active disconnection notification + SERVER_GOODBYE_REQUEST Command //Notify client when server actively disconnects + SERVER_GOODBYE_RESPONSE Command //Client replies to server's active disconnection notification + + //subscription management + SUBSCRIBE_REQUEST Command //Subscription request sent by client to server + SUBSCRIBE_RESPONSE Command //Server replies to client's subscription request + UNSUBSCRIBE_REQUEST Command //Unsubscribe request from client to server + UNSUBSCRIBE_RESPONSE Command //Server replies to client's unsubscribe request + + //monitor + LISTEN_REQUEST Command //Request from client to server to start topic listening + LISTEN_RESPONSE Command //The server replies to the client's listening request + + //RR + REQUEST_TO_SERVER Command //The client sends the RR request to the server + REQUEST_TO_CLIENT Command //The server pushes the RR request to the client + REQUEST_TO_CLIENT_ACK Command //After receiving RR request, the client sends ACK to the server + RESPONSE_TO_SERVER Command //The client sends the RR packet back to the server + RESPONSE_TO_CLIENT Command //The server pushes the RR packet back to the client + RESPONSE_TO_CLIENT_ACK Command //After receiving the return packet, the client sends ACK to the server + + //Asynchronous events + ASYNC_MESSAGE_TO_SERVER Command //The client sends asynchronous events to the server + ASYNC_MESSAGE_TO_SERVER_ACK Command //After receiving the asynchronous event, the server sends ack to the client + ASYNC_MESSAGE_TO_CLIENT Command //The server pushes asynchronous events to the client + ASYNC_MESSAGE_TO_CLIENT_ACK Command //After the client receives the asynchronous event, the ACK is sent to the server + + //radio broadcast + BROADCAST_MESSAGE_TO_SERVER Command //The client sends the broadcast message to the server + BROADCAST_MESSAGE_TO_SERVER_ACK Command //After receiving the broadcast message, the server sends ACK to the client + BROADCAST_MESSAGE_TO_CLIENT Command //The server pushes the broadcast message to the client + BROADCAST_MESSAGE_TO_CLIENT_ACK Command //After the client receives the broadcast message, the ACK is sent to the server + + //Log reporting + SYS_LOG_TO_LOGSERVER Command //Business log reporting + + //RMB tracking log reporting + TRACE_LOG_TO_LOGSERVER Command //RMB tracking log reporting + + //Redirecting instruction + REDIRECT_TO_CLIENT Command //The server pushes the redirection instruction to the client + + //service register + REGISTER_REQUEST Command //Client sends registration request to server + REGISTER_RESPONSE Command //The server sends the registration result to the client + + //service unregister + UNREGISTER_REQUEST Command //The client sends a de registration request to the server + UNREGISTER_RESPONSE Command //The server will register the result to the client + + //The client asks which EventMesh to recommend + RECOMMEND_REQUEST Command //Client sends recommendation request to server + RECOMMEND_RESPONSE Command //The server will recommend the results to the client +}{ + //heartbeat + HEARTBEAT_REQUEST: "HEARTBEAT_REQUEST", + HEARTBEAT_RESPONSE: "HEARTBEAT_RESPONSE", + + //handshake + HELLO_REQUEST: "HELLO_REQUEST", + HELLO_RESPONSE: "HELLO_RESPONSE", + + //disconnection + CLIENT_GOODBYE_REQUEST: "CLIENT_GOODBYE_REQUEST", + CLIENT_GOODBYE_RESPONSE: "CLIENT_GOODBYE_RESPONSE", + SERVER_GOODBYE_REQUEST: "SERVER_GOODBYE_REQUEST", + SERVER_GOODBYE_RESPONSE: "SERVER_GOODBYE_RESPONSE", + + //subscription management + SUBSCRIBE_REQUEST: "SUBSCRIBE_REQUEST", + SUBSCRIBE_RESPONSE: "SUBSCRIBE_RESPONSE", + UNSUBSCRIBE_REQUEST: "UNSUBSCRIBE_REQUEST", + UNSUBSCRIBE_RESPONSE: "UNSUBSCRIBE_RESPONSE", + + //monitor + LISTEN_REQUEST: "LISTEN_REQUEST", + LISTEN_RESPONSE: "LISTEN_RESPONSE", + + //RR + REQUEST_TO_SERVER: "REQUEST_TO_SERVER", + REQUEST_TO_CLIENT: "REQUEST_TO_CLIENT", + REQUEST_TO_CLIENT_ACK: "REQUEST_TO_CLIENT_ACK", + RESPONSE_TO_SERVER: "RESPONSE_TO_SERVER", + RESPONSE_TO_CLIENT: "RESPONSE_TO_CLIENT", + RESPONSE_TO_CLIENT_ACK: "RESPONSE_TO_CLIENT_ACK", + + //Asynchronous events + ASYNC_MESSAGE_TO_SERVER: "ASYNC_MESSAGE_TO_SERVER", + ASYNC_MESSAGE_TO_SERVER_ACK: "ASYNC_MESSAGE_TO_SERVER_ACK", + ASYNC_MESSAGE_TO_CLIENT: "ASYNC_MESSAGE_TO_CLIENT", + ASYNC_MESSAGE_TO_CLIENT_ACK: "ASYNC_MESSAGE_TO_CLIENT_ACK", + + //radio broadcast + BROADCAST_MESSAGE_TO_SERVER: "BROADCAST_MESSAGE_TO_SERVER", + BROADCAST_MESSAGE_TO_SERVER_ACK: "BROADCAST_MESSAGE_TO_SERVER_ACK", + BROADCAST_MESSAGE_TO_CLIENT: "BROADCAST_MESSAGE_TO_CLIENT", + BROADCAST_MESSAGE_TO_CLIENT_ACK: "BROADCAST_MESSAGE_TO_CLIENT_ACK", + + //Log reporting + SYS_LOG_TO_LOGSERVER: "SYS_LOG_TO_LOGSERVER", + + //RMB tracking log reporting + TRACE_LOG_TO_LOGSERVER: "TRACE_LOG_TO_LOGSERVER", + + //Redirecting instruction + REDIRECT_TO_CLIENT: "REDIRECT_TO_CLIENT", + + //service register + REGISTER_REQUEST: "REGISTER_REQUEST", + REGISTER_RESPONSE: "REGISTER_RESPONSE", + + //service unregister + UNREGISTER_REQUEST: "UNREGISTER_REQUEST", + UNREGISTER_RESPONSE: "UNREGISTER_RESPONSE", + + //The client asks which EventMesh to recommend + RECOMMEND_REQUEST: "RECOMMEND_REQUEST", + RECOMMEND_RESPONSE: "RECOMMEND_RESPONSE", +} + +//{ +// //heartbeat +// HEARTBEAT_REQUEST: 0, +// HEARTBEAT_RESPONSE: 1, +// +// //handshake +// HELLO_REQUEST: 2, +// HELLO_RESPONSE: 3, +// +// //disconnection +// CLIENT_GOODBYE_REQUEST: 4, +// CLIENT_GOODBYE_RESPONSE: 5, +// SERVER_GOODBYE_REQUEST: 6, +// SERVER_GOODBYE_RESPONSE: 7, +// +// //subscription management +// SUBSCRIBE_REQUEST: 8, +// SUBSCRIBE_RESPONSE: 9, +// UNSUBSCRIBE_REQUEST: 10, +// UNSUBSCRIBE_RESPONSE: 11, +// +// //monitor +// LISTEN_REQUEST: 12, +// LISTEN_RESPONSE: 13, +// +// //RR +// REQUEST_TO_SERVER: 14, +// REQUEST_TO_CLIENT: 15, +// REQUEST_TO_CLIENT_ACK: 16, +// RESPONSE_TO_SERVER: 17, +// RESPONSE_TO_CLIENT: 18, +// RESPONSE_TO_CLIENT_ACK: 19, +// +// //Asynchronous events +// ASYNC_MESSAGE_TO_SERVER: 20, +// ASYNC_MESSAGE_TO_SERVER_ACK: 21, +// ASYNC_MESSAGE_TO_CLIENT: 22, +// ASYNC_MESSAGE_TO_CLIENT_ACK: 23, +// +// //radio broadcast +// BROADCAST_MESSAGE_TO_SERVER: 24, +// BROADCAST_MESSAGE_TO_SERVER_ACK: 25, +// BROADCAST_MESSAGE_TO_CLIENT: 26, +// BROADCAST_MESSAGE_TO_CLIENT_ACK: 27, +// +// //Log reporting +// SYS_LOG_TO_LOGSERVER: 28, +// +// //RMB tracking log reporting +// TRACE_LOG_TO_LOGSERVER: 29, +// +// //Redirecting instruction +// REDIRECT_TO_CLIENT: 30, +// +// //service register +// REGISTER_REQUEST: 31, +// REGISTER_RESPONSE: 32, +// +// //service unregister +// UNREGISTER_REQUEST: 33, +// UNREGISTER_RESPONSE: 34, +// +// //The client asks which EventMesh to recommend +// RECOMMEND_REQUEST: 35, +// RECOMMEND_RESPONSE: 36, +//} diff --git a/eventmesh-sdk-go/common/protocol/tcp/header.go b/eventmesh-sdk-go/common/protocol/tcp/header.go new file mode 100644 index 0000000000..56dc817ade --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/tcp/header.go @@ -0,0 +1,80 @@ +package tcp + +import ( + "eventmesh/common/utils" +) + +type Header struct { + Cmd Command `json:"cmd"` + Code int `json:"code"` + Desc string `json:"desc"` + Seq string `json:"seq"` + Properties map[string]interface{} `json:"properties"` +} + +func (h Header) PutProperty(name string, value interface{}) { + h.Properties[name] = value +} + +func (h Header) GetProperty(name string) interface{} { + if h.Properties == nil { + return nil + } + + if val, ok := h.Properties[name]; ok { + return val + } + + return nil +} + +func (h Header) Marshal() []byte { + newHeader := make(map[string]interface{}) + newHeader["cmd"] = h.Cmd + // Compatible with Java Enum serialization + newHeader["command"] = h.Cmd + newHeader["code"] = h.Code + newHeader["desc"] = h.Desc + newHeader["seq"] = h.Seq + newHeader["properties"] = h.Properties + return utils.MarshalJsonBytes(newHeader) +} + +func (h Header) getVal(key string, headerDict map[string]interface{}) interface{} { + if val, ok := headerDict[key]; ok { + return val + } + return nil +} + +func (h Header) Unmarshal(header []byte) Header { + + var headerDict map[string]interface{} + utils.UnMarshalJsonBytes(header, &headerDict) + + if val := h.getVal("cmd", headerDict); val != nil { + h.Cmd = Command(val.(string)) + } + + if val := h.getVal("code", headerDict); val != nil { + h.Code = int(val.(float64)) + } + + if val := h.getVal("desc", headerDict); val != nil { + h.Desc = val.(string) + } + + if val := h.getVal("seq", headerDict); val != nil { + h.Seq = val.(string) + } + + if val := h.getVal("properties", headerDict); val != nil { + h.Properties = val.(map[string]interface{}) + } + + return h +} + +func NewHeader(cmd Command, code int, desc string, seq string) Header { + return Header{Cmd: cmd, Code: code, Desc: desc, Seq: seq, Properties: map[string]interface{}{}} +} diff --git a/eventmesh-sdk-go/common/protocol/tcp/package.go b/eventmesh-sdk-go/common/protocol/tcp/package.go new file mode 100644 index 0000000000..e4e05a7702 --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/tcp/package.go @@ -0,0 +1,10 @@ +package tcp + +type Package struct { + Header Header `json:"header"` + Body interface{} `json:"body"` +} + +func NewPackage(header Header) Package { + return Package{Header: header} +} diff --git a/eventmesh-sdk-go/common/protocol/tcp/user_agent.go b/eventmesh-sdk-go/common/protocol/tcp/user_agent.go new file mode 100644 index 0000000000..b2da802357 --- /dev/null +++ b/eventmesh-sdk-go/common/protocol/tcp/user_agent.go @@ -0,0 +1,23 @@ +package tcp + +type UserAgent struct { + Env string `json:"env"` + Subsystem string `json:"subsystem"` + Path string `json:"path"` + Pid int `json:"pid"` + Host string `json:"host"` + Port int `json:"port"` + Version string `json:"version"` + Username string `json:"username"` + Password string `json:"password"` + Idc string `json:"idc"` + Group string `json:"group"` + Purpose string `json:"purpose"` + Unack int `json:"unack"` +} + +func NewUserAgent(env string, subsystem string, path string, pid int, host string, port int, version string, + username string, password string, idc string, producerGroup string, consumerGroup string) *UserAgent { + return &UserAgent{Env: env, Subsystem: subsystem, Path: path, Pid: pid, Host: host, Port: port, Version: version, + Username: username, Password: password, Idc: idc, Group: producerGroup} +} diff --git a/eventmesh-sdk-go/common/utils/json_utils.go b/eventmesh-sdk-go/common/utils/json_utils.go new file mode 100644 index 0000000000..8d1a093fa6 --- /dev/null +++ b/eventmesh-sdk-go/common/utils/json_utils.go @@ -0,0 +1,29 @@ +package utils + +import ( + "encoding/json" + "log" +) + +func MarshalJsonBytes(obj interface{}) []byte { + ret, err := json.Marshal(obj) + if err != nil { + log.Fatal("Failed to marshal json") + } + return ret +} + +func MarshalJsonString(obj interface{}) string { + return string(MarshalJsonBytes(obj)) +} + +func UnMarshalJsonBytes(data []byte, obj interface{}) { + err := json.Unmarshal(data, obj) + if err != nil { + log.Fatal("Failed to unmarshal json") + } +} + +func UnMarshalJsonString(data string, obj interface{}) { + UnMarshalJsonBytes([]byte(data), obj) +} diff --git a/eventmesh-sdk-go/examples/http/async_pub_cloudevents.go b/eventmesh-sdk-go/examples/http/async_pub_cloudevents.go new file mode 100644 index 0000000000..c1466424e2 --- /dev/null +++ b/eventmesh-sdk-go/examples/http/async_pub_cloudevents.go @@ -0,0 +1,52 @@ +package http + +import ( + "eventmesh/common" + "eventmesh/common/utils" + "eventmesh/http/conf" + "eventmesh/http/producer" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" + "log" + "os" + "strconv" +) + +func AsyncPubCloudEvents() { + eventMeshIPPort := "127.0.0.1" + ":" + "10105" + producerGroup := "EventMeshTest-producerGroup" + topic := "TEST-TOPIC-HTTP-ASYNC" + env := "P" + idc := "FT" + subSys := "1234" + // FIXME Get ip dynamically + localIp := "127.0.0.1" + + // (Deep) Copy of default config + eventMeshClientConfig := conf.DefaultEventMeshHttpClientConfig + eventMeshClientConfig.SetLiteEventMeshAddr(eventMeshIPPort) + eventMeshClientConfig.SetProducerGroup(producerGroup) + eventMeshClientConfig.SetEnv(env) + eventMeshClientConfig.SetIdc(idc) + eventMeshClientConfig.SetSys(subSys) + eventMeshClientConfig.SetIp(localIp) + eventMeshClientConfig.SetPid(strconv.Itoa(os.Getpid())) + + // Make event to send + event := cloudevents.NewEvent() + event.SetID(uuid.New().String()) + event.SetSubject(topic) + event.SetSource("example/uri") + event.SetType(common.Constants.CLOUD_EVENTS_PROTOCOL_NAME) + event.SetExtension(common.Constants.EVENTMESH_MESSAGE_CONST_TTL, strconv.Itoa(4*1000)) + event.SetDataContentType(cloudevents.ApplicationCloudEventsJSON) + data := map[string]string{"hello": "EventMesh"} + err := event.SetData(cloudevents.ApplicationCloudEventsJSON, utils.MarshalJsonBytes(data)) + if err != nil { + log.Fatalf("Failed to set cloud event data, error: %v", err) + } + + // Publish event + httpProducer := producer.NewEventMeshHttpProducer(eventMeshClientConfig) + httpProducer.Publish(event) +} diff --git a/eventmesh-sdk-go/examples/http/sub_cloudevents.go b/eventmesh-sdk-go/examples/http/sub_cloudevents.go new file mode 100644 index 0000000000..35fd3e1d0e --- /dev/null +++ b/eventmesh-sdk-go/examples/http/sub_cloudevents.go @@ -0,0 +1,99 @@ +package http + +import ( + "eventmesh/common/protocol" + "eventmesh/common/utils" + "eventmesh/http/conf" + "eventmesh/http/consumer" + cloudevents "github.com/cloudevents/sdk-go/v2" + "log" + "net/http" + "os" + "strconv" + "strings" +) + +func SubCloudEvents() { + eventMeshIPPort := "127.0.0.1" + ":" + "10105" + consumerGroup := "EventMeshTest-consumerGroup" + topic := "TEST-TOPIC-HTTP-ASYNC" + env := "P" + idc := "FT" + subSys := "1234" + // FIXME Get ip dynamically + localIp := "127.0.0.1" + localPort := 8090 + + subscribeUrl := "http://" + localIp + ":" + strconv.Itoa(localPort) + "/hello" + topicList := []protocol.SubscriptionItem{ + { + Topic: topic, + Mode: protocol.DefaultSubscriptionMode.CLUSTERING, + Type: protocol.DefaultSubscriptionType.ASYNC, + }, + } + + // Callback handle + exit := make(chan bool) + go httpServer(localIp, localPort, exit) + + // (Deep) Copy of default config + eventMeshClientConfig := conf.DefaultEventMeshHttpClientConfig + eventMeshClientConfig.SetLiteEventMeshAddr(eventMeshIPPort) + eventMeshClientConfig.SetConsumerGroup(consumerGroup) + eventMeshClientConfig.SetEnv(env) + eventMeshClientConfig.SetIdc(idc) + eventMeshClientConfig.SetSys(subSys) + eventMeshClientConfig.SetIp(localIp) + eventMeshClientConfig.SetPid(strconv.Itoa(os.Getpid())) + + // Subscribe + eventMeshHttpConsumer := consumer.NewEventMeshHttpConsumer(eventMeshClientConfig) + eventMeshHttpConsumer.Subscribe(topicList, subscribeUrl) + eventMeshHttpConsumer.HeartBeat(topicList, subscribeUrl) + + // FIXME Add unsubscribe + + // Wait for exit + <-exit +} + +func httpServer(ip string, port int, exit chan<- bool) { + http.HandleFunc("/hello", hello) + err := http.ListenAndServe(ip+":"+strconv.Itoa(port), nil) + if err != nil { + log.Fatalf("Failed to launch a callback http server, error: %v", err) + } + + exit <- true +} + +func hello(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/hello" { + http.NotFound(w, r) + return + } + + switch r.Method { + case "POST": + contentType := r.Header.Get("Content-Type") + + // FIXME Now we only support post form + if strings.Contains(contentType, "application/x-www-form-urlencoded") { + err := r.ParseForm() + if err != nil { + log.Printf("Failed to parse post form parameter, error: %v", err) + } + content := r.FormValue("content") + event := cloudevents.NewEvent() + utils.UnMarshalJsonString(content, &event) + log.Printf("Received data from eventmesh server: %v", string(event.Data())) + return + } + + w.WriteHeader(http.StatusUnsupportedMediaType) + default: + w.WriteHeader(http.StatusNotImplemented) + w.Write([]byte(http.StatusText(http.StatusNotImplemented))) + } +} diff --git a/eventmesh-sdk-go/examples/tcp/async_pub_cloudevents.go b/eventmesh-sdk-go/examples/tcp/async_pub_cloudevents.go new file mode 100644 index 0000000000..62fd2c3052 --- /dev/null +++ b/eventmesh-sdk-go/examples/tcp/async_pub_cloudevents.go @@ -0,0 +1,45 @@ +package tcp + +import ( + "eventmesh/common" + "eventmesh/common/protocol" + gtcp "eventmesh/common/protocol/tcp" + "eventmesh/common/utils" + "eventmesh/tcp" + "time" + + //"eventmesh/tcp/common" + "eventmesh/tcp/conf" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" + "strconv" +) + +func AsyncPubCloudEvents() { + eventMeshIp := "127.0.0.1" + eventMeshTcpPort := 10000 + topic := "TEST-TOPIC-TCP-ASYNC" + + // Init client + userAgent := gtcp.UserAgent{Env: "test", Subsystem: "5023", Path: "/data/app/umg_proxy", Pid: 32893, + Host: "127.0.0.1", Port: 8362, Version: "2.0.11", Username: "PU4283", Password: "PUPASS", Idc: "FT", + Group: "EventmeshTestGroup", Purpose: "pub"} + config := conf.NewEventMeshTCPClientConfig(eventMeshIp, eventMeshTcpPort, userAgent) + client := tcp.CreateEventMeshTCPClient(*config, protocol.DefaultMessageType.CloudEvent) + client.Init() + + // Make event to send + event := cloudevents.NewEvent() + event.SetID(uuid.New().String()) + event.SetSubject(topic) + event.SetSource("example/uri") + event.SetType(common.Constants.CLOUD_EVENTS_PROTOCOL_NAME) + event.SetExtension(common.Constants.EVENTMESH_MESSAGE_CONST_TTL, strconv.Itoa(4*1000)) + event.SetDataContentType(cloudevents.ApplicationCloudEventsJSON) + data := map[string]string{"hello": "EventMesh"} + event.SetData(cloudevents.ApplicationCloudEventsJSON, utils.MarshalJsonBytes(data)) + + // Publish event + client.Publish(event, 10000) + time.Sleep(10 * time.Second) +} diff --git a/eventmesh-sdk-go/go.mod b/eventmesh-sdk-go/go.mod new file mode 100644 index 0000000000..8a12b03067 --- /dev/null +++ b/eventmesh-sdk-go/go.mod @@ -0,0 +1,8 @@ +module eventmesh + +go 1.16 + +require ( + github.com/cloudevents/sdk-go/v2 v2.6.0 // indirect + github.com/google/uuid v1.3.0 // indirect +) diff --git a/eventmesh-sdk-go/go.sum b/eventmesh-sdk-go/go.sum new file mode 100644 index 0000000000..f490f71535 --- /dev/null +++ b/eventmesh-sdk-go/go.sum @@ -0,0 +1,39 @@ +github.com/cloudevents/sdk-go/v2 v2.6.0 h1:yp6zLEvhXSi6P25zzfgORgFI0quG2/NXoH9QoHzvKn8= +github.com/cloudevents/sdk-go/v2 v2.6.0/go.mod h1:nlXhgFkf0uTopxmRXalyMwS2LG70cRGPrxzmjJgSG0U= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= +github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/eventmesh-sdk-go/http/abstract_http_client.go b/eventmesh-sdk-go/http/abstract_http_client.go new file mode 100644 index 0000000000..d31b09d399 --- /dev/null +++ b/eventmesh-sdk-go/http/abstract_http_client.go @@ -0,0 +1,43 @@ +package http + +import ( + gcommon "eventmesh/common" + "eventmesh/http/conf" + nethttp "net/http" + "time" +) + +type AbstractHttpClient struct { + EventMeshHttpClientConfig conf.EventMeshHttpClientConfig + HttpClient *nethttp.Client +} + +func NewAbstractHttpClient(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *AbstractHttpClient { + c := &AbstractHttpClient{EventMeshHttpClientConfig: eventMeshHttpClientConfig} + c.HttpClient = c.SetHttpClient() + return c +} + +func (c *AbstractHttpClient) Close() { + // Http Client does not need to close explicitly +} + +func (c *AbstractHttpClient) SetHttpClient() *nethttp.Client { + if !c.EventMeshHttpClientConfig.UseTls() { + return &nethttp.Client{Timeout: 100 * time.Second} + } + + // Use TLS + return &nethttp.Client{Timeout: 100 * time.Second} +} + +func (c *AbstractHttpClient) SelectEventMesh() string { + // FIXME Add load balance support + uri := c.EventMeshHttpClientConfig.LiteEventMeshAddr() + + if c.EventMeshHttpClientConfig.UseTls() { + return gcommon.Constants.HTTPS_PROTOCOL_PREFIX + uri + } + + return gcommon.Constants.HTTP_PROTOCOL_PREFIX + uri +} diff --git a/eventmesh-sdk-go/http/conf/eventmesh_http_client_config.go b/eventmesh-sdk-go/http/conf/eventmesh_http_client_config.go new file mode 100644 index 0000000000..27ba603283 --- /dev/null +++ b/eventmesh-sdk-go/http/conf/eventmesh_http_client_config.go @@ -0,0 +1,144 @@ +package conf + +type EventMeshHttpClientConfig struct { + //The event server address list + // If it's a cluster, please use ; to split, and the address format is related to loadBalanceType. + // E.g. + // If you use Random strategy, the format like: 127.0.0.1:10105;127.0.0.2:10105 + // If you use weighted round robin or weighted random strategy, the format like: 127.0.0.1:10105:1;127.0.0.2:10105:2 + liteEventMeshAddr string + // TODO support load balance + //loadBalanceType string + consumeThreadCore int + consumeThreadMax int + env string + consumerGroup string + producerGroup string + idc string + ip string + pid string + sys string + userName string + password string + useTls bool +} + +func (e *EventMeshHttpClientConfig) LiteEventMeshAddr() string { + return e.liteEventMeshAddr +} + +func (e *EventMeshHttpClientConfig) SetLiteEventMeshAddr(liteEventMeshAddr string) { + e.liteEventMeshAddr = liteEventMeshAddr +} + +func (e *EventMeshHttpClientConfig) ConsumeThreadCore() int { + return e.consumeThreadCore +} + +func (e *EventMeshHttpClientConfig) SetConsumeThreadCore(consumeThreadCore int) { + e.consumeThreadCore = consumeThreadCore +} + +func (e *EventMeshHttpClientConfig) ConsumeThreadMax() int { + return e.consumeThreadMax +} + +func (e *EventMeshHttpClientConfig) SetConsumeThreadMax(consumeThreadMax int) { + e.consumeThreadMax = consumeThreadMax +} + +func (e *EventMeshHttpClientConfig) Env() string { + return e.env +} + +func (e *EventMeshHttpClientConfig) SetEnv(env string) { + e.env = env +} + +func (e *EventMeshHttpClientConfig) ConsumerGroup() string { + return e.consumerGroup +} + +func (e *EventMeshHttpClientConfig) SetConsumerGroup(consumerGroup string) { + e.consumerGroup = consumerGroup +} + +func (e *EventMeshHttpClientConfig) ProducerGroup() string { + return e.producerGroup +} + +func (e *EventMeshHttpClientConfig) SetProducerGroup(producerGroup string) { + e.producerGroup = producerGroup +} + +func (e *EventMeshHttpClientConfig) Idc() string { + return e.idc +} + +func (e *EventMeshHttpClientConfig) SetIdc(idc string) { + e.idc = idc +} + +func (e *EventMeshHttpClientConfig) Ip() string { + return e.ip +} + +func (e *EventMeshHttpClientConfig) SetIp(ip string) { + e.ip = ip +} + +func (e *EventMeshHttpClientConfig) Pid() string { + return e.pid +} + +func (e *EventMeshHttpClientConfig) SetPid(pid string) { + e.pid = pid +} + +func (e *EventMeshHttpClientConfig) Sys() string { + return e.sys +} + +func (e *EventMeshHttpClientConfig) SetSys(sys string) { + e.sys = sys +} + +func (e *EventMeshHttpClientConfig) UserName() string { + return e.userName +} + +func (e *EventMeshHttpClientConfig) SetUserName(userName string) { + e.userName = userName +} + +func (e *EventMeshHttpClientConfig) Password() string { + return e.password +} + +func (e *EventMeshHttpClientConfig) SetPassword(password string) { + e.password = password +} + +func (e *EventMeshHttpClientConfig) UseTls() bool { + return e.useTls +} + +func (e *EventMeshHttpClientConfig) SetUseTls(useTls bool) { + e.useTls = useTls +} + +var DefaultEventMeshHttpClientConfig = EventMeshHttpClientConfig{ + liteEventMeshAddr: "127.0.0.1:10105", + consumeThreadCore: 2, + consumeThreadMax: 5, + env: "", + consumerGroup: "DefaultConsumerGroup", + producerGroup: "DefaultProducerGroup", + idc: "", + ip: "", + pid: "", + sys: "", + userName: "", + password: "", + useTls: false, +} diff --git a/eventmesh-sdk-go/http/consumer/eventmesh_http_consumer.go b/eventmesh-sdk-go/http/consumer/eventmesh_http_consumer.go new file mode 100644 index 0000000000..62ba2a62fe --- /dev/null +++ b/eventmesh-sdk-go/http/consumer/eventmesh_http_consumer.go @@ -0,0 +1,95 @@ +package consumer + +import ( + gcommon "eventmesh/common" + "eventmesh/common/protocol" + "eventmesh/common/protocol/http/body/client" + "eventmesh/common/protocol/http/common" + gutils "eventmesh/common/utils" + "eventmesh/http" + "eventmesh/http/conf" + "eventmesh/http/model" + "eventmesh/http/utils" + "log" + nethttp "net/http" + "strconv" + "time" +) + +type EventMeshHttpConsumer struct { + *http.AbstractHttpClient + subscriptions []protocol.SubscriptionItem +} + +func NewEventMeshHttpConsumer(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *EventMeshHttpConsumer { + c := &EventMeshHttpConsumer{AbstractHttpClient: http.NewAbstractHttpClient(eventMeshHttpClientConfig)} + c.subscriptions = make([]protocol.SubscriptionItem, 1000) + return c +} + +func (e *EventMeshHttpConsumer) HeartBeat(topicList []protocol.SubscriptionItem, subscribeUrl string) { + + // FIXME check topicList, subscribeUrl is not blank + + for range time.Tick(time.Duration(gcommon.Constants.HEARTBEAT) * time.Millisecond) { + + var heartbeatEntities []client.HeartbeatEntity + for _, item := range topicList { + entity := client.HeartbeatEntity{Topic: item.Topic, Url: subscribeUrl} + heartbeatEntities = append(heartbeatEntities, entity) + } + + requestParam := e.buildCommonRequestParam() + requestParam.AddHeader(common.ProtocolKey.REQUEST_CODE, strconv.Itoa(common.DefaultRequestCode.HEARTBEAT.RequestCode)) + // FIXME Java is name of SUB name + //requestParam.AddBody(client.HeartbeatRequestBodyKey.CLIENTTYPE, common.DefaultClientType.SUB.name()) + requestParam.AddBody(client.HeartbeatRequestBodyKey.CLIENTTYPE, "SUB") + requestParam.AddBody(client.HeartbeatRequestBodyKey.HEARTBEATENTITIES, gutils.MarshalJsonString(heartbeatEntities)) + + target := e.SelectEventMesh() + resp := utils.HttpPost(e.HttpClient, target, requestParam) + var ret http.EventMeshRetObj + gutils.UnMarshalJsonString(resp, &ret) + if ret.RetCode != common.DefaultEventMeshRetCode.SUCCESS.RetCode { + log.Fatalf("Request failed, error code: %d", ret.RetCode) + } + + } + +} + +func (e *EventMeshHttpConsumer) Subscribe(topicList []protocol.SubscriptionItem, subscribeUrl string) { + + // FIXME check topicList, subscribeUrl is not blank + + requestParam := e.buildCommonRequestParam() + requestParam.AddHeader(common.ProtocolKey.REQUEST_CODE, strconv.Itoa(common.DefaultRequestCode.SUBSCRIBE.RequestCode)) + requestParam.AddBody(client.SubscribeRequestBodyKey.TOPIC, gutils.MarshalJsonString(topicList)) + requestParam.AddBody(client.SubscribeRequestBodyKey.URL, subscribeUrl) + requestParam.AddBody(client.SubscribeRequestBodyKey.CONSUMERGROUP, e.EventMeshHttpClientConfig.ConsumerGroup()) + + target := e.SelectEventMesh() + resp := utils.HttpPost(e.HttpClient, target, requestParam) + var ret http.EventMeshRetObj + gutils.UnMarshalJsonString(resp, &ret) + if ret.RetCode != common.DefaultEventMeshRetCode.SUCCESS.RetCode { + log.Fatalf("Request failed, error code: %d", ret.RetCode) + } + e.subscriptions = append(e.subscriptions, topicList...) +} + +func (e *EventMeshHttpConsumer) buildCommonRequestParam() *model.RequestParam { + param := model.NewRequestParam(nethttp.MethodPost) + param.AddHeader(common.ProtocolKey.ClientInstanceKey.ENV, e.EventMeshHttpClientConfig.Env()) + param.AddHeader(common.ProtocolKey.ClientInstanceKey.IDC, e.EventMeshHttpClientConfig.Idc()) + param.AddHeader(common.ProtocolKey.ClientInstanceKey.IP, e.EventMeshHttpClientConfig.Ip()) + param.AddHeader(common.ProtocolKey.ClientInstanceKey.PID, e.EventMeshHttpClientConfig.Pid()) + param.AddHeader(common.ProtocolKey.ClientInstanceKey.SYS, e.EventMeshHttpClientConfig.Sys()) + param.AddHeader(common.ProtocolKey.ClientInstanceKey.USERNAME, e.EventMeshHttpClientConfig.UserName()) + param.AddHeader(common.ProtocolKey.ClientInstanceKey.PASSWORD, e.EventMeshHttpClientConfig.Password()) + param.AddHeader(common.ProtocolKey.VERSION, common.DefaultProtocolVersion.V1.Version()) + param.AddHeader(common.ProtocolKey.LANGUAGE, gcommon.Constants.LANGUAGE_GO) + param.SetTimeout(gcommon.Constants.DEFAULT_HTTP_TIME_OUT) + param.AddBody(client.HeartbeatRequestBodyKey.CONSUMERGROUP, e.EventMeshHttpClientConfig.ConsumerGroup()) + return param +} diff --git a/eventmesh-sdk-go/http/eventmesh_ret_obj.go b/eventmesh-sdk-go/http/eventmesh_ret_obj.go new file mode 100644 index 0000000000..9cab9fea94 --- /dev/null +++ b/eventmesh-sdk-go/http/eventmesh_ret_obj.go @@ -0,0 +1,7 @@ +package http + +type EventMeshRetObj struct { + ResTime int64 `json:"resTime"` + RetCode int `json:"retCode"` + RetMsg string `json:"retMsg"` +} diff --git a/eventmesh-sdk-go/http/model/request_param.go b/eventmesh-sdk-go/http/model/request_param.go new file mode 100644 index 0000000000..75b9d8a91c --- /dev/null +++ b/eventmesh-sdk-go/http/model/request_param.go @@ -0,0 +1,53 @@ +package model + +type HttpMethod string + +type RequestParam struct { + queryParams map[string][]string + httpMethod HttpMethod + body map[string]string + headers map[string]string + timeout int64 +} + +func NewRequestParam(httpMethod HttpMethod) *RequestParam { + return &RequestParam{httpMethod: httpMethod} +} + +func (r *RequestParam) QueryParams() map[string][]string { + return r.queryParams +} + +func (r *RequestParam) SetQueryParams(queryParams map[string][]string) { + r.queryParams = queryParams +} + +func (r *RequestParam) Body() map[string]string { + return r.body +} + +func (r *RequestParam) AddBody(key, value string) { + if r.body == nil { + r.body = make(map[string]string) + } + r.body[key] = value +} + +func (r *RequestParam) Headers() map[string]string { + return r.headers +} + +func (r *RequestParam) AddHeader(key string, object interface{}) { + if r.headers == nil { + r.headers = make(map[string]string) + } + r.headers[key] = object.(string) +} + +func (r *RequestParam) Timeout() int64 { + return r.timeout +} + +func (r *RequestParam) SetTimeout(timeout int64) { + r.timeout = timeout +} diff --git a/eventmesh-sdk-go/http/producer/cloudevent_producer.go b/eventmesh-sdk-go/http/producer/cloudevent_producer.go new file mode 100644 index 0000000000..17a178cc1e --- /dev/null +++ b/eventmesh-sdk-go/http/producer/cloudevent_producer.go @@ -0,0 +1,85 @@ +package producer + +import ( + gcommon "eventmesh/common" + "eventmesh/common/protocol/http/common" + "eventmesh/common/protocol/http/message" + gutils "eventmesh/common/utils" + "eventmesh/http" + "eventmesh/http/conf" + "eventmesh/http/model" + "eventmesh/http/utils" + cloudevents "github.com/cloudevents/sdk-go/v2" + "log" + nethttp "net/http" + "strconv" +) + +type CloudEventProducer struct { + *http.AbstractHttpClient +} + +func NewCloudEventProducer(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *CloudEventProducer { + c := &CloudEventProducer{AbstractHttpClient: http.NewAbstractHttpClient(eventMeshHttpClientConfig)} + return c +} + +func (c *CloudEventProducer) Publish(event cloudevents.Event) { + enhancedEvent := c.enhanceCloudEvent(event) + requestParam := c.buildCommonPostParam(enhancedEvent) + requestParam.AddHeader(common.ProtocolKey.REQUEST_CODE, strconv.Itoa(common.DefaultRequestCode.MSG_SEND_ASYNC.RequestCode)) + + target := c.SelectEventMesh() + resp := utils.HttpPost(c.HttpClient, target, requestParam) + var ret http.EventMeshRetObj + gutils.UnMarshalJsonString(resp, &ret) + if ret.RetCode != common.DefaultEventMeshRetCode.SUCCESS.RetCode { + log.Fatalf("Request failed, error code: %d", ret.RetCode) + } +} + +func (c *CloudEventProducer) buildCommonPostParam(event cloudevents.Event) *model.RequestParam { + + eventBytes, err := event.MarshalJSON() + if err != nil { + log.Fatal("Failed to marshal cloudevent") + } + content := string(eventBytes) + + requestParam := model.NewRequestParam(nethttp.MethodPost) + requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.ENV, c.EventMeshHttpClientConfig.Env()) + requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.IDC, c.EventMeshHttpClientConfig.Idc()) + requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.IP, c.EventMeshHttpClientConfig.Ip()) + requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.PID, c.EventMeshHttpClientConfig.Pid()) + requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.SYS, c.EventMeshHttpClientConfig.Sys()) + requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.USERNAME, c.EventMeshHttpClientConfig.UserName()) + requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.PASSWORD, c.EventMeshHttpClientConfig.Password()) + requestParam.AddHeader(common.ProtocolKey.LANGUAGE, gcommon.Constants.LANGUAGE_GO) + // FIXME Improve constants + requestParam.AddHeader(common.ProtocolKey.PROTOCOL_TYPE, "cloudevents") + requestParam.AddHeader(common.ProtocolKey.PROTOCOL_DESC, "http") + requestParam.AddHeader(common.ProtocolKey.PROTOCOL_VERSION, event.SpecVersion()) + + // todo: move producerGroup tp header + requestParam.AddBody(message.SendMessageRequestBodyKey.PRODUCERGROUP, c.EventMeshHttpClientConfig.ProducerGroup()) + requestParam.AddBody(message.SendMessageRequestBodyKey.CONTENT, content) + + return requestParam +} + +func (c *CloudEventProducer) enhanceCloudEvent(event cloudevents.Event) cloudevents.Event { + event.SetExtension(common.ProtocolKey.ClientInstanceKey.ENV, c.EventMeshHttpClientConfig.Env()) + event.SetExtension(common.ProtocolKey.ClientInstanceKey.IDC, c.EventMeshHttpClientConfig.Idc()) + event.SetExtension(common.ProtocolKey.ClientInstanceKey.IP, c.EventMeshHttpClientConfig.Ip()) + event.SetExtension(common.ProtocolKey.ClientInstanceKey.PID, c.EventMeshHttpClientConfig.Pid()) + event.SetExtension(common.ProtocolKey.ClientInstanceKey.SYS, c.EventMeshHttpClientConfig.Sys()) + // FIXME Random string + event.SetExtension(common.ProtocolKey.ClientInstanceKey.BIZSEQNO, "333333") + event.SetExtension(common.ProtocolKey.ClientInstanceKey.UNIQUEID, "444444") + event.SetExtension(common.ProtocolKey.LANGUAGE, gcommon.Constants.LANGUAGE_GO) + // FIXME Java is name of spec version name + //event.SetExtension(common.ProtocolKey.PROTOCOL_DESC, event.SpecVersion()) + event.SetExtension(common.ProtocolKey.PROTOCOL_VERSION, event.SpecVersion()) + + return event +} diff --git a/eventmesh-sdk-go/http/producer/eventmesh_http_producer.go b/eventmesh-sdk-go/http/producer/eventmesh_http_producer.go new file mode 100644 index 0000000000..dbab7245d0 --- /dev/null +++ b/eventmesh-sdk-go/http/producer/eventmesh_http_producer.go @@ -0,0 +1,27 @@ +package producer + +import ( + "eventmesh/http/conf" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +type EventMeshHttpProducer struct { + cloudEventProducer *CloudEventProducer +} + +func NewEventMeshHttpProducer(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *EventMeshHttpProducer { + return &EventMeshHttpProducer{ + cloudEventProducer: NewCloudEventProducer(eventMeshHttpClientConfig), + } +} + +func (e *EventMeshHttpProducer) Publish(eventMeshMessage interface{}) { + + // FIXME Check eventMeshMessage is not nil + + // CloudEvent + if _, ok := eventMeshMessage.(cloudevents.Event); ok { + event := eventMeshMessage.(cloudevents.Event) + e.cloudEventProducer.Publish(event) + } +} diff --git a/eventmesh-sdk-go/http/producer/eventmesh_protocol_producer.go b/eventmesh-sdk-go/http/producer/eventmesh_protocol_producer.go new file mode 100644 index 0000000000..ab70cb19f7 --- /dev/null +++ b/eventmesh-sdk-go/http/producer/eventmesh_protocol_producer.go @@ -0,0 +1,5 @@ +package producer + +type EventMeshProtocolProducer interface { + Publish(eventMeshMessage interface{}) +} diff --git a/eventmesh-sdk-go/http/utils/http_utils.go b/eventmesh-sdk-go/http/utils/http_utils.go new file mode 100644 index 0000000000..4333e84847 --- /dev/null +++ b/eventmesh-sdk-go/http/utils/http_utils.go @@ -0,0 +1,41 @@ +package utils + +import ( + "eventmesh/http/model" + "io/ioutil" + nethttp "net/http" + "net/url" + "strings" +) + +func HttpPost(client *nethttp.Client, uri string, requestParam *model.RequestParam) string { + + data := url.Values{} + body := requestParam.Body() + for key := range body { + data.Set(key, body[key]) + } + + req, err := nethttp.NewRequest(nethttp.MethodPost, uri, strings.NewReader(data.Encode())) + if err != nil { + } + + req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8") + + headers := requestParam.Headers() + for header := range headers { + req.Header[header] = []string{headers[header]} + } + + resp, err := client.Do(req) + if err != nil { + } + + defer resp.Body.Close() + + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + } + + return string(respBody) +} diff --git a/eventmesh-sdk-go/main.go b/eventmesh-sdk-go/main.go new file mode 100644 index 0000000000..1e04a6566e --- /dev/null +++ b/eventmesh-sdk-go/main.go @@ -0,0 +1,12 @@ +package main + +import "eventmesh/examples/tcp" + +func main() { + // HTTP Test + //http.AsyncPubCloudEvents() + //http.SubCloudEvents() + + // TCP Test + tcp.AsyncPubCloudEvents() +} diff --git a/eventmesh-sdk-go/tcp/cloudevent_tcp_client.go b/eventmesh-sdk-go/tcp/cloudevent_tcp_client.go new file mode 100644 index 0000000000..456399750a --- /dev/null +++ b/eventmesh-sdk-go/tcp/cloudevent_tcp_client.go @@ -0,0 +1,35 @@ +package tcp + +import ( + gtcp "eventmesh/common/protocol/tcp" + "eventmesh/tcp/conf" +) + +type CloudEventTCPClient struct { + cloudEventTCPPubClient *CloudEventTCPPubClient + cloudEventTCPSubClient *CloudEventTCPSubClient +} + +func NewCloudEventTCPClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *CloudEventTCPClient { + return &CloudEventTCPClient{ + cloudEventTCPPubClient: NewCloudEventTCPPubClient(eventMeshTcpClientConfig), + cloudEventTCPSubClient: NewCloudEventTCPSubClient(eventMeshTcpClientConfig), + } +} + +func (c *CloudEventTCPClient) Init() { + c.cloudEventTCPPubClient.init() + c.cloudEventTCPSubClient.init() +} + +func (c *CloudEventTCPClient) Publish(message interface{}, timeout int64) gtcp.Package { + return c.cloudEventTCPPubClient.publish(message, timeout) +} + +func (c *CloudEventTCPClient) GetPubClient() EventMeshTCPPubClient { + return c.cloudEventTCPPubClient +} + +func (c *CloudEventTCPClient) GetSubClient() EventMeshTCPSubClient { + return c.cloudEventTCPSubClient +} diff --git a/eventmesh-sdk-go/tcp/cloudevent_tcp_pub_client.go b/eventmesh-sdk-go/tcp/cloudevent_tcp_pub_client.go new file mode 100644 index 0000000000..ea08329d75 --- /dev/null +++ b/eventmesh-sdk-go/tcp/cloudevent_tcp_pub_client.go @@ -0,0 +1,31 @@ +package tcp + +import ( + "eventmesh/common/protocol/tcp" + "eventmesh/tcp/conf" + "eventmesh/tcp/utils" +) + +type CloudEventTCPPubClient struct { + *BaseTCPClient +} + +func NewCloudEventTCPPubClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *CloudEventTCPPubClient { + return &CloudEventTCPPubClient{BaseTCPClient: NewBaseTCPClient(eventMeshTcpClientConfig)} +} + +func (c CloudEventTCPPubClient) init() { + c.Open() + c.Hello() + c.Heartbeat() +} + +func (c CloudEventTCPPubClient) reconnect() { + c.Reconnect() + c.Heartbeat() +} + +func (c CloudEventTCPPubClient) publish(message interface{}, timeout int64) tcp.Package { + msg := utils.BuildPackage(message, tcp.DefaultCommand.ASYNC_MESSAGE_TO_SERVER) + return c.IO(msg, timeout) +} diff --git a/eventmesh-sdk-go/tcp/cloudevent_tcp_sub_client.go b/eventmesh-sdk-go/tcp/cloudevent_tcp_sub_client.go new file mode 100644 index 0000000000..c524f193bb --- /dev/null +++ b/eventmesh-sdk-go/tcp/cloudevent_tcp_sub_client.go @@ -0,0 +1,26 @@ +package tcp + +import ( + "eventmesh/common/protocol" + "eventmesh/tcp/conf" +) + +type CloudEventTCPSubClient struct { + *BaseTCPClient +} + +func NewCloudEventTCPSubClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *CloudEventTCPSubClient { + return &CloudEventTCPSubClient{BaseTCPClient: NewBaseTCPClient(eventMeshTcpClientConfig)} +} + +func (c CloudEventTCPSubClient) init() { + //panic("implement me") +} + +func (c CloudEventTCPSubClient) subscribe(topic string, subscriptionMode protocol.SubscriptionMode, subscriptionType protocol.SubscriptionType) { + panic("implement me") +} + +func (c CloudEventTCPSubClient) unsubscribe() { + panic("implement me") +} diff --git a/eventmesh-sdk-go/tcp/common/eventmesh_common.go b/eventmesh-sdk-go/tcp/common/eventmesh_common.go new file mode 100644 index 0000000000..262f29e44a --- /dev/null +++ b/eventmesh-sdk-go/tcp/common/eventmesh_common.go @@ -0,0 +1,22 @@ +package common + +var EventMeshCommon = struct { + // Timeout time shared by the server + DEFAULT_TIME_OUT_MILLS int + + // User agent + USER_AGENT_PURPOSE_PUB string + USER_AGENT_PURPOSE_SUB string + + // Protocol type + CLOUD_EVENTS_PROTOCOL_NAME string + EM_MESSAGE_PROTOCOL_NAME string + OPEN_MESSAGE_PROTOCOL_NAME string +}{ + DEFAULT_TIME_OUT_MILLS: 20 * 1000, + USER_AGENT_PURPOSE_PUB: "pub", + USER_AGENT_PURPOSE_SUB: "sub", + CLOUD_EVENTS_PROTOCOL_NAME: "cloudevents", + EM_MESSAGE_PROTOCOL_NAME: "eventmeshmessage", + OPEN_MESSAGE_PROTOCOL_NAME: "openmessage", +} diff --git a/eventmesh-sdk-go/tcp/common/request_context.go b/eventmesh-sdk-go/tcp/common/request_context.go new file mode 100644 index 0000000000..4c7b681c7f --- /dev/null +++ b/eventmesh-sdk-go/tcp/common/request_context.go @@ -0,0 +1,60 @@ +package common + +import ( + "eventmesh/common/protocol/tcp" + "sync" +) + +type RequestContext struct { + key interface{} + request tcp.Package + response tcp.Package + wg sync.WaitGroup +} + +func (r *RequestContext) Key() interface{} { + return r.key +} + +func (r *RequestContext) SetKey(key interface{}) { + r.key = key +} + +func (r *RequestContext) Request() tcp.Package { + return r.request +} + +func (r *RequestContext) SetRequest(request tcp.Package) { + r.request = request +} + +func (r *RequestContext) Response() tcp.Package { + return r.response +} + +func (r *RequestContext) SetResponse(response tcp.Package) { + r.response = response +} + +func (r *RequestContext) Wg() sync.WaitGroup { + return r.wg +} + +func (r *RequestContext) SetWg(wg sync.WaitGroup) { + r.wg = wg +} + +func (r *RequestContext) Finish(message tcp.Package) { + r.response = message + //r.wg.Done() +} + +func GetRequestContextKey(request tcp.Package) interface{} { + return request.Header.Seq +} + +func NewRequestContext(key interface{}, request tcp.Package, latch int) *RequestContext { + ctx := &RequestContext{key: key, request: request} + //ctx.Wg().Add(latch) + return ctx +} diff --git a/eventmesh-sdk-go/tcp/conf/eventmesh_tcp_client_config.go b/eventmesh-sdk-go/tcp/conf/eventmesh_tcp_client_config.go new file mode 100644 index 0000000000..b353aa300b --- /dev/null +++ b/eventmesh-sdk-go/tcp/conf/eventmesh_tcp_client_config.go @@ -0,0 +1,37 @@ +package conf + +import "eventmesh/common/protocol/tcp" + +type EventMeshTCPClientConfig struct { + host string + port int + userAgent tcp.UserAgent +} + +func NewEventMeshTCPClientConfig(host string, port int, userAgent tcp.UserAgent) *EventMeshTCPClientConfig { + return &EventMeshTCPClientConfig{host: host, port: port, userAgent: userAgent} +} + +func (e *EventMeshTCPClientConfig) Host() string { + return e.host +} + +func (e *EventMeshTCPClientConfig) SetHost(host string) { + e.host = host +} + +func (e *EventMeshTCPClientConfig) Port() int { + return e.port +} + +func (e *EventMeshTCPClientConfig) SetPort(port int) { + e.port = port +} + +func (e *EventMeshTCPClientConfig) UserAgent() tcp.UserAgent { + return e.userAgent +} + +func (e *EventMeshTCPClientConfig) SetUserAgent(userAgent tcp.UserAgent) { + e.userAgent = userAgent +} diff --git a/eventmesh-sdk-go/tcp/eventmesh_tcp_client.go b/eventmesh-sdk-go/tcp/eventmesh_tcp_client.go new file mode 100644 index 0000000000..ffa341481c --- /dev/null +++ b/eventmesh-sdk-go/tcp/eventmesh_tcp_client.go @@ -0,0 +1,10 @@ +package tcp + +import gtcp "eventmesh/common/protocol/tcp" + +type EventMeshTCPClient interface { + Init() + Publish(msg interface{}, timeout int64) gtcp.Package + GetPubClient() EventMeshTCPPubClient + GetSubClient() EventMeshTCPSubClient +} diff --git a/eventmesh-sdk-go/tcp/eventmesh_tcp_client_factory.go b/eventmesh-sdk-go/tcp/eventmesh_tcp_client_factory.go new file mode 100644 index 0000000000..a8c8fc9fb1 --- /dev/null +++ b/eventmesh-sdk-go/tcp/eventmesh_tcp_client_factory.go @@ -0,0 +1,15 @@ +package tcp + +import ( + "eventmesh/common/protocol" + "eventmesh/tcp/conf" +) + +func CreateEventMeshTCPClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig, messageType protocol.MessageType) EventMeshTCPClient { + + if messageType == protocol.DefaultMessageType.CloudEvent { + return NewCloudEventTCPClient(eventMeshTcpClientConfig) + } + + return nil +} diff --git a/eventmesh-sdk-go/tcp/eventmesh_tcp_pub_client.go b/eventmesh-sdk-go/tcp/eventmesh_tcp_pub_client.go new file mode 100644 index 0000000000..9bcabf44cb --- /dev/null +++ b/eventmesh-sdk-go/tcp/eventmesh_tcp_pub_client.go @@ -0,0 +1,9 @@ +package tcp + +import gtcp "eventmesh/common/protocol/tcp" + +type EventMeshTCPPubClient interface { + init() + reconnect() + publish(message interface{}, timeout int64) gtcp.Package +} diff --git a/eventmesh-sdk-go/tcp/eventmesh_tcp_sub_client.go b/eventmesh-sdk-go/tcp/eventmesh_tcp_sub_client.go new file mode 100644 index 0000000000..a496e49871 --- /dev/null +++ b/eventmesh-sdk-go/tcp/eventmesh_tcp_sub_client.go @@ -0,0 +1,9 @@ +package tcp + +import "eventmesh/common/protocol" + +type EventMeshTCPSubClient interface { + init() + subscribe(topic string, subscriptionMode protocol.SubscriptionMode, subscriptionType protocol.SubscriptionType) + unsubscribe() +} diff --git a/eventmesh-sdk-go/tcp/tcp_client.go b/eventmesh-sdk-go/tcp/tcp_client.go new file mode 100644 index 0000000000..79bf2765ad --- /dev/null +++ b/eventmesh-sdk-go/tcp/tcp_client.go @@ -0,0 +1,129 @@ +package tcp + +import ( + "bufio" + "bytes" + "eventmesh/common/protocol/tcp" + "eventmesh/common/protocol/tcp/codec" + "eventmesh/tcp/common" + "eventmesh/tcp/conf" + "eventmesh/tcp/utils" + "io" + "log" + "math/rand" + "net" + "strconv" +) + +type BaseTCPClient struct { + clientNo int + host string + port int + useAgent tcp.UserAgent + conn net.Conn +} + +func NewBaseTCPClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *BaseTCPClient { + return &BaseTCPClient{ + clientNo: rand.Intn(10000), + host: eventMeshTcpClientConfig.Host(), + port: eventMeshTcpClientConfig.Port(), + useAgent: eventMeshTcpClientConfig.UserAgent(), + } +} + +func (c *BaseTCPClient) Open() { + eventMeshIpAndPort := c.host + ":" + strconv.Itoa(c.port) + conn, err := net.Dial("tcp", eventMeshIpAndPort) + if err != nil { + log.Fatal("Failed to dial") + } + c.conn = conn + + go c.read() +} + +func (c *BaseTCPClient) Close() { + if c.conn != nil { + err := c.conn.Close() + if err != nil { + log.Fatal("Failed to close connection") + } + c.Goodbye() + } +} + +func (c *BaseTCPClient) Heartbeat() { + msg := utils.BuildHeartBeatPackage() + c.IO(msg, 1000) +} + +func (c *BaseTCPClient) Hello() { + msg := utils.BuildHelloPackage(c.useAgent) + c.IO(msg, 1000) +} + +func (c *BaseTCPClient) Reconnect() { + +} + +func (c *BaseTCPClient) Goodbye() { + +} + +func (c *BaseTCPClient) IsActive() { + +} + +func (c *BaseTCPClient) read() error { + for { + var buf bytes.Buffer + for { + reader := bufio.NewReader(c.conn) + msg, isPrefix, err := reader.ReadLine() + if err != nil { + if err == io.EOF { + break + } + return err + } + + buf.Write(msg) + if !isPrefix { + break + } + } + + go c.handleRead(&buf) + } +} + +func (c *BaseTCPClient) handleRead(in *bytes.Buffer) { + decoded := codec.DecodePackage(in) + log.Printf("Read from server: %v\n", decoded) + // TODO Handle according to the command +} + +func (c *BaseTCPClient) write(message []byte) (int, error) { + writer := bufio.NewWriter(c.conn) + n, err := writer.Write(message) + if err == nil { + err = writer.Flush() + } + return n, err +} + +func (c *BaseTCPClient) Send(message tcp.Package) { + out := codec.EncodePackage(message) + _, err := c.write(out.Bytes()) + if err != nil { + log.Fatal("Failed to write to peer") + } +} + +func (c *BaseTCPClient) IO(message tcp.Package, timeout int64) tcp.Package { + key := common.GetRequestContextKey(message) + ctx := common.NewRequestContext(key, message, 1) + c.Send(message) + return ctx.Response() +} diff --git a/eventmesh-sdk-go/tcp/utils/message_utils.go b/eventmesh-sdk-go/tcp/utils/message_utils.go new file mode 100644 index 0000000000..1b4e6fb786 --- /dev/null +++ b/eventmesh-sdk-go/tcp/utils/message_utils.go @@ -0,0 +1,45 @@ +package utils + +import ( + gcommon "eventmesh/common" + "eventmesh/common/protocol/tcp" + "eventmesh/tcp/common" + cloudevents "github.com/cloudevents/sdk-go/v2" + "log" +) + +func BuildPackage(message interface{}, command tcp.Command) tcp.Package { + // FIXME Support random sequence + header := tcp.NewHeader(command, 0, "", "22222") + pkg := tcp.NewPackage(header) + + if _, ok := message.(cloudevents.Event); ok { + event := message.(cloudevents.Event) + eventBytes, err := event.MarshalJSON() + if err != nil { + log.Fatal("Failed to marshal cloud event") + } + + pkg.Header.PutProperty(gcommon.Constants.PROTOCOL_TYPE, common.EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) + pkg.Header.PutProperty(gcommon.Constants.PROTOCOL_VERSION, event.SpecVersion()) + pkg.Header.PutProperty(gcommon.Constants.PROTOCOL_DESC, "tcp") + pkg.Body = eventBytes + } + + return pkg +} + +func BuildHelloPackage(agent tcp.UserAgent) tcp.Package { + // FIXME Support random sequence + header := tcp.NewHeader(tcp.DefaultCommand.HELLO_REQUEST, 0, "", "22222") + msg := tcp.NewPackage(header) + msg.Body = agent + return msg +} + +func BuildHeartBeatPackage() tcp.Package { + // FIXME Support random sequence + header := tcp.NewHeader(tcp.DefaultCommand.HEARTBEAT_REQUEST, 0, "", "22222") + msg := tcp.NewPackage(header) + return msg +}