Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] How to elegantly use the extension field in ConnectRecord to sink messages? #4567

Closed
1 task done
hhuang1231 opened this issue Nov 19, 2023 · 11 comments
Closed
1 task done
Labels
question Further information is requested

Comments

@hhuang1231
Copy link
Contributor

Search before asking

  • I had searched in the issues and found no similar issues.

Question

Currently I try to use connector-spring's soouce to send messages and then sink in DingTalk.

目前我尝试使用connector-spring的soouce来发送消息,然后在钉钉中下沉。

  • Since DingTalk requires special extension fields, I first need to add org.apache.eventmesh.connector.spring.source.connector.SpringSourceConnector#send(java.lang.Object, org.apache.eventmesh.openconnect.api. callback.SendMessageCallback) method, add the required extension fields to ConnectRecord

    由于钉钉需要特殊的扩展字段,所以我首先需要在org.apache.eventmesh.connector.spring.source.connector.SpringSourceConnector#send(java.lang.Object, org.apache.eventmesh.openconnect.api.callback.SendMessageCallback)方法,将需要的扩展字段添加进ConnectRecord

  • And due to CloudEvent's name conventions, I also have to convert the dingdingtemplatetypekey to all lowercase before I can put it into the extension of CloudEvent

    并且由于CloudEvent的命名约束,我也必须将dingdingtemplatetypekey转为全部小写,才能将其放入CloudEventextension

    image

  • And had to modify the source code of org.apache.eventmesh.openconnect.SourceWorker#convertRecordToEvent so that it can get extension from ConnectRecord and add it to CloudEvent

    并且不得不修改org.apache.eventmesh.openconnect.SourceWorker#convertRecordToEvent的源代码使其可以从ConnectRecord中得到extension并添加到CloudEvent

    image

    • In addition, I found that sinkWoker will call org.apache.eventmesh.openconnect.util.CloudEventUtil#convertEventToRecord when receiving events, thereby completing the conversion from CloudEvent's extension to ConnectorRecord's extension, and sourceWoker But they didn't do this. Why?

      另外我发现sinkWoker接收事件的过程中将会调用org.apache.eventmesh.openconnect.util.CloudEventUtil#convertEventToRecord,从而完成从CloudEventextension转换为ConnectorRecordextension,而sourceWoker却没有这么做,这是为什么呢?

      image

My doubts should be similar to #4560, and I hope to understand the community's current plans for this area.

我的疑惑与#4560 应该是类似的,我希望能够了解社区目前对于这一块的计划。

@hhuang1231 hhuang1231 added the question Further information is requested label Nov 19, 2023
@pandaapo
Copy link
Member

  • And had to modify the source code of org.apache.eventmesh.openconnect.SourceWorker#convertRecordToEvent so that it can get extension from ConnectRecord and add it to CloudEvent

In your multiple questions, regarding the aforementioned point, I think you can create an issue and PR to make SourceWorker#convertRecordToEvent() able to add the extension from ConnectRecord to CloudEvent.

The remaining questions are waiting for responses from others in the community.

在您的多个疑问中,关于上述这一点,我觉得您可以创建个issue和PR,使SourceWorker#convertRecordToEvent()可以将ConnectRecord中的extension添加到CloudEvent中。
剩下的疑问,等待社区其他人的回复。

@xwm1992
Copy link
Contributor

xwm1992 commented Nov 19, 2023

SourceWorker是由connectorRecord转为cloudevents协议,再发给EventMesh;SinkWorker是由cloudevents转为connectorRecord再调用connect api. 因此connect api部分都是connectorRecord,与eventmesh交互部分都是cloudevents.
整体链路是:source->connectorRecord->sourceWorker->cloudevents->mesh->cloudevents->sinkWorker->connectorRecord->sink

@hhuang1231
Copy link
Contributor Author

hhuang1231 commented Nov 20, 2023

@xwm1992 假如我想在应用程序中直接调用eventmesh提供的api来发送事件到mesh,目前我只知道两种方式:

  1. 利用原生的java sdk在应用程序中new CloudEvent后,再发送给mesh,mesh再根据topic发送给sink。
  2. 利用connector-spring的source来发送。特别地,它提供的send方法将ConnectRecord隐藏了起来,那么用户将无法为其添加额外的extension。这样将无法满足将消息发送到钉钉那种需要特殊字段标识的sink中的条件

我主要想知道如果想让用户使用更方便的话,是不是应该对在应用程序中发送CloudEvent做更多的优化,例如丰富spring的支持。
除此之外我还想知道目前connector中的spring和#2314 中提到的利用spring简化java sdk的使用有什么区别。

@pandaapo
Copy link
Member

pandaapo commented Nov 20, 2023

除此之外我还想知道目前connector中的spring和#2314 中提到的利用spring简化java sdk的使用有什么区别。

关于上述这一小点的回复。
首先,connector是区别于EventMesh核心的相对独立的组件,你可以将connector大概地理解为实现事件、消息、数据传输的工具组件。EventMesh的核心主要是eventmesh-runtime(服务端)和eventmesh-sdk(客户端)。
connector中的Spring,是将connector和Spring做了集成。#2314 中讨论的是将EventMesh的核心与Spring集成,讨论的焦点是在eventmesh-runtime做Spring的集成,还是在eventmesh-sdk做Spring的集成,以及集成方式。
剩下的疑问,请等待@xwm1992回复。

@xwm1992
Copy link
Contributor

xwm1992 commented Nov 21, 2023

@xwm1992 假如我想在应用程序中直接调用eventmesh提供的api来发送事件到mesh,目前我只知道两种方式:

  1. 利用原生的java sdk在应用程序中new CloudEvent后,再发送给mesh,mesh再根据topic发送给sink。
  2. 利用connector-spring的source来发送。特别地,它提供的send方法将ConnectRecord隐藏了起来,那么用户将无法为其添加额外的extension。这样将无法满足将消息发送到钉钉那种需要特殊字段标识的sink中的条件

我主要想知道如果想让用户使用更方便的话,是不是应该对在应用程序中发送CloudEvent做更多的优化,例如丰富spring的支持。 除此之外我还想知道目前connector中的spring和#2314 中提到的利用spring简化java sdk的使用有什么区别。

  1. 首先通过api调用来发送事件到mesh的两种方式描述的不是很对,一种是通过sdk的方式,另外一种是采用restful api的方式;你说的connector-spring,这个只是connector集成了eventmesh的sdk来进行了封装,任何一种connector都是集成了eventmesh sdk,无非spring是直接面向应用程序的,这个点正是我们想要体现的,让用户看到的是spring的方式,至于 #2314中提到的和 @pandaapo 回复的内容是一致的,因为mesh原本是netty server实现的,该issue讨论的是是否用spring的方式来实现mesh。
  2. 对于你说的spring connector和dingding connector无法很好的配合使用这个点,我理解主要问题点在于spring connector现有的source设计是直接将json化后的message作为event发给了mesh,并没有考虑相关的扩展字段的问题,这个点是可以考虑进行优化的,这个问题背景是spring connector是先提交实现的,没有考虑到和其他connector结合的问题,因此没有保留扩展属性。
    @yanrongzhen 也可以看一下这里的讨论,看看你的想法。

@pandaapo
Copy link
Member

通过api调用来发送事件到mesh的两种方式……,一种是通过sdk的方式,另外一种是采用restful api的方式;

@xwm1992 restful api的方式我不是很确定,是指像这个issue中“Management Interface in EventMesh”部分描述的方式吗 #4400 (comment) ?如果是的,是不是还在完善和补充中?

@yanrongzhen
Copy link
Contributor

@xwm1992 假如我想在应用程序中直接调用eventmesh提供的api来发送事件到mesh,目前我只知道两种方式:

  1. 利用原生的java sdk在应用程序中new CloudEvent后,再发送给mesh,mesh再根据topic发送给sink。
  2. 利用connector-spring的source来发送。特别地,它提供的send方法将ConnectRecord隐藏了起来,那么用户将无法为其添加额外的extension。这样将无法满足将消息发送到钉钉那种需要特殊字段标识的sink中的条件

我主要想知道如果想让用户使用更方便的话,是不是应该对在应用程序中发送CloudEvent做更多的优化,例如丰富spring的支持。 除此之外我还想知道目前connector中的spring和#2314 中提到的利用spring简化java sdk的使用有什么区别。

  1. 首先通过api调用来发送事件到mesh的两种方式描述的不是很对,一种是通过sdk的方式,另外一种是采用restful api的方式;你说的connector-spring,这个只是connector集成了eventmesh的sdk来进行了封装,任何一种connector都是集成了eventmesh sdk,无非spring是直接面向应用程序的,这个点正是我们想要体现的,让用户看到的是spring的方式,至于 #2314中提到的和 @pandaapo 回复的内容是一致的,因为mesh原本是netty server实现的,该issue讨论的是是否用spring的方式来实现mesh。
  2. 对于你说的spring connector和dingding connector无法很好的配合使用这个点,我理解主要问题点在于spring connector现有的source设计是直接将json化后的message作为event发给了mesh,并没有考虑相关的扩展字段的问题,这个点是可以考虑进行优化的,这个问题背景是spring connector是先提交实现的,没有考虑到和其他connector结合的问题,因此没有保留扩展属性。
    @yanrongzhen 也可以看一下这里的讨论,看看你的想法。

我个人觉得spring可能需要支持extension, 一般使用spring的用户, 配置都在spring的environment里, 我觉得可以把spring environment和connector做一个联动.

@pandaapo
Copy link
Member

Since DingTalk requires special extension fields, I first need to add org.apache.eventmesh.connector.spring.source.connector.SpringSourceConnector#send(java.lang.Object, org.apache.eventmesh.openconnect.api. callback.SendMessageCallback) method, add the required extension fields to ConnectRecord

@hhuang1231
The current solution to this problem is #4582, which retrieves the properties in the configuration file with the prefix eventmesh.connector. through the spring source connector and automatically adds them to the extension of the ConnectRecord. You can experience this plan.

对于这个问题,目前的解决方案是#4582,通过spring source connector获取配置文件中以eventmesh.connector.为前缀的属性,自动将其添加到ConnectRecord的extension中。您可以体验一下该方案。

@hhuang1231
Copy link
Contributor Author

hhuang1231 commented Nov 28, 2023

@pandaapo @yanrongzhen
org.apache.eventmesh.connector.wecom.constants.ConnectRecordExtensionKeys的定义中,由于出现了:,这将无法正常写入properties文件中作为key
image

@pandaapo
Copy link
Member

pandaapo commented Nov 29, 2023

@pandaapo @yanrongzhenorg.apache.eventmesh.connector.wecom.constants.ConnectRecordExtensionKeys的定义中,由于出现了:,这将无法正常写入properties文件中作为key image

@hhuang1231 Thank you for your feedback and reporting this bug!

@xwm1992
Copy link
Contributor

xwm1992 commented Nov 29, 2023

通过api调用来发送事件到mesh的两种方式……,一种是通过sdk的方式,另外一种是采用restful api的方式;

@xwm1992 restful api的方式我不是很确定,是指像这个issue中“Management Interface in EventMesh”部分描述的方式吗 #4400 (comment) ?如果是的,是不是还在完善和补充中?

目前EventMesh 的http调用是有两种方式,一种是sdk,另外一种是直接调用eventmesh的uri,你可以看对应http协议下面的processor与uri之间的映射关系

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants