diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java index 96ffa5f802..359aa08888 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java @@ -22,6 +22,7 @@ import org.apache.eventmesh.common.utils.AssertUtils; import org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig; import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO; +import org.apache.eventmesh.connector.chatgpt.source.enums.ChatGPTRequestType; import org.apache.eventmesh.connector.chatgpt.source.handlers.ChatHandler; import org.apache.eventmesh.connector.chatgpt.source.handlers.ParseHandler; import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager; @@ -53,6 +54,7 @@ import io.vertx.core.http.HttpServerOptions; import io.vertx.ext.web.RequestBody; import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.BodyHandler; import lombok.extern.slf4j.Slf4j; @@ -121,39 +123,60 @@ private void doInit() { try { RequestBody body = ctx.body(); ChatGPTRequestDTO bodyObject = body.asPojo(ChatGPTRequestDTO.class); - if (bodyObject.getSubject() == null || bodyObject.getDataContentType() == null || bodyObject.getText() == null) { - throw new IllegalStateException("Attributes 'subject', 'datacontenttype', and 'prompt' cannot be null"); - } - chatgptSourceExecutorService.execute(() -> { - try { - CloudEvent cloudEvent; - switch (bodyObject.getRequestType()) { - case CHAT: - cloudEvent = chatHandler.invoke(bodyObject); - break; - case PARSE: - cloudEvent = parseHandler.invoke(bodyObject); - break; - default: - throw new IllegalStateException("the request type is illegal"); - } - queue.add(cloudEvent); - log.info("[ChatGPTSourceConnector] Succeed to convert payload into CloudEvent."); - ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end(); - } catch (Exception e) { - log.error("[ChatGPTSourceConnector] Error processing request: {}", e.getMessage(), e); - ctx.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end(); - } - }); + validateRequestDTO(bodyObject); + handleRequest(bodyObject, ctx); } catch (Exception e) { - log.error("[ChatGPTSourceConnector] Malformed request. StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), e); - ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end(); + handleError(e, ctx); } }); this.server = vertx.createHttpServer(new HttpServerOptions().setPort(this.sourceConfig.connectorConfig.getPort()) .setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router); } + + private void validateRequestDTO(ChatGPTRequestDTO bodyObject) { + if (bodyObject.getSubject() == null || bodyObject.getDataContentType() == null || bodyObject.getText() == null) { + throw new IllegalArgumentException("Attributes 'subject', 'datacontenttype', and 'prompt' cannot be null"); + } + } + + private void handleRequest(ChatGPTRequestDTO bodyObject, RoutingContext ctx) { + chatgptSourceExecutorService.execute(() -> { + try { + ChatGPTRequestType chatgptRequestType = ChatGPTRequestType.valueOf(bodyObject.getRequestType()); + CloudEvent cloudEvent = invokeHandler(chatgptRequestType, bodyObject); + queue.add(cloudEvent); + log.info("[ChatGPTSourceConnector] Succeed to convert payload into CloudEvent."); + ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end(); + } catch (IllegalArgumentException e) { + log.error("[ChatGPTSourceConnector] the request type is illegal: {}", e.getMessage(), e); + ctx.response() + .setStatusCode(HttpResponseStatus.BAD_REQUEST.code()) + .setStatusMessage(String.format("request type '%s' is not supported", bodyObject.getRequestType())) + .end(); + } catch (Exception e) { + log.error("[ChatGPTSourceConnector] Error processing request: {}", e.getMessage(), e); + ctx.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end(); + } + }); + } + + private CloudEvent invokeHandler(ChatGPTRequestType chatgptRequestType, ChatGPTRequestDTO bodyObject) { + switch (chatgptRequestType) { + case CHAT: + return chatHandler.invoke(bodyObject); + case PARSE: + return parseHandler.invoke(bodyObject); + default: + throw new IllegalStateException("the request type is illegal"); + } + } + + private void handleError(Exception e, RoutingContext ctx) { + log.error("[ChatGPTSourceConnector] Malformed request.", e); + ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end(); + } + @Override public void start() { Throwable t = this.server.listen().cause(); diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java index 83ae2900c0..3e339011d0 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java @@ -19,6 +19,11 @@ import org.apache.eventmesh.connector.chatgpt.source.enums.ChatGPTRequestType; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.UUID; + +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.AllArgsConstructor; @@ -30,7 +35,7 @@ @NoArgsConstructor public class ChatGPTRequestDTO { - private ChatGPTRequestType requestType; + private String requestType = ChatGPTRequestType.CHAT.name(); private String source; @@ -43,4 +48,15 @@ public class ChatGPTRequestDTO { private String text; + private String fields; + + @JsonInclude + private String id = UUID.randomUUID().toString(); + + @JsonInclude + private String time = ZonedDateTime.now().toOffsetDateTime().toString(); + + public String getFields() { + return fields.replace(";", "\n"); + } } diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java index 924729b6c4..6d79a0559f 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java @@ -53,21 +53,14 @@ private CloudEvent genGptConnectRecord(ChatGPTRequestDTO event) { List chatMessages = new ArrayList<>(); chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), event.getText())); ChatCompletionRequest req = openaiManager.newChatCompletionRequest(chatMessages); - StringBuilder gptData = new StringBuilder(); - - try { - openaiManager.getOpenAiService().createChatCompletion(req).getChoices() - .forEach(chatCompletionChoice -> gptData.append(chatCompletionChoice.getMessage().getContent())); - } catch (Exception e) { - log.error("Failed to generate GPT connection record: {}", e.getMessage()); - } + String chatResult = openaiManager.getResult(req); return CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) .withSource(URI.create(event.getSource())) .withType(event.getType()) .withTime(ZonedDateTime.now().toOffsetDateTime()) - .withData(gptData.toString().getBytes()) + .withData(chatResult.getBytes()) .withSubject(event.getSubject()) .withDataContentType(event.getDataContentType()) .build(); diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java index 599a5b715f..008fc140e2 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java @@ -18,26 +18,120 @@ package org.apache.eventmesh.connector.chatgpt.source.handlers; +import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO; import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.text.StringSubstitutor; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import io.cloudevents.CloudEvent; +import io.cloudevents.jackson.JsonFormat; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.theokanning.openai.completion.chat.ChatCompletionRequest; +import com.theokanning.openai.completion.chat.ChatMessage; +import com.theokanning.openai.completion.chat.ChatMessageRole; + +import lombok.extern.slf4j.Slf4j; +@Slf4j public class ParseHandler { private final OpenaiManager openaiManager; private final String promptTemplate; + private static final JsonFormat jsonFormat = new JsonFormat(false, true); + + public ParseHandler(OpenaiManager openaiManager, String promptTemplate) { this.openaiManager = openaiManager; this.promptTemplate = promptTemplate; } + @SuppressWarnings("checkstyle:WhitespaceAfter") public CloudEvent invoke(ChatGPTRequestDTO event) { - // todo use StringSubstitutor event and promptTemplate translate to final prompt + Map map = convertToMap(event); + + StringSubstitutor substitute = new StringSubstitutor(map); + String finalPrompt = substitute.replace(promptTemplate); + List chatMessages = new ArrayList<>(); + chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), finalPrompt)); + ChatCompletionRequest req = openaiManager.newChatCompletionRequest(chatMessages); + String chatResult = openaiManager.getResult(req); + chatResult = StringUtils.removeFirst(chatResult, "```json"); + chatResult = StringUtils.removeEnd(chatResult, "```"); + CloudEvent cloudEvent; + try { + cloudEvent = jsonFormat.deserialize(chatResult.getBytes(Constants.DEFAULT_CHARSET)); + } catch (Exception e) { + throw new IllegalStateException("cloudEvent parse fail, please check your parse prompt file content", e); + } + return cloudEvent; + } + + public Map convertToMap(Object obj) { + Map map = new HashMap<>(); + Class clazz = obj.getClass(); + Field[] fields = clazz.getDeclaredFields(); + for (Field field : fields) { + if (field.isSynthetic()) { + continue; + } + if (Map.class.isAssignableFrom(field.getType()) || List.class.isAssignableFrom(field.getType())) { + continue; + } + try { + String key = field.getName(); + if (field.isAnnotationPresent(JsonProperty.class)) { + JsonProperty annotation = field.getAnnotation(JsonProperty.class); + key = annotation.value(); + } + Method getter = getGetter(field, clazz); + map.put(key, String.valueOf(getter.invoke(obj))); + } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + throw new IllegalStateException("convert to Map is fail", e); + } + } + + return map; + } + + public Method getGetter(Field field, Class clazz) throws NoSuchMethodException { + boolean isBooleanField = false; + if (boolean.class.isAssignableFrom(field.getType()) || Boolean.class.isAssignableFrom(field.getType())) { + isBooleanField = true; + } + String handledFieldName = upperFirst(field.getName()); + String methodName; + if (isBooleanField) { + methodName = "is" + handledFieldName; + } else { + methodName = "get" + handledFieldName; + } + return clazz.getDeclaredMethod(methodName); + } - return null; + public String upperFirst(String str) { + if (null == str) { + return null; + } + if (!str.isEmpty()) { + char firstChar = str.charAt(0); + if (Character.isLowerCase(firstChar)) { + return Character.toUpperCase(firstChar) + StringUtils.substring(str, 1); + } + } + return str; } } diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt index 661bcb315a..b598c8e973 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt @@ -1,21 +1,27 @@ -You are an AI assistant named CloudEventsConverter. Your task is to convert input text provided by the user into a CloudEvents-formatted JSON object, avoid escape characters . +You are an AI assistant named CloudEventsConverter. avoid escape characters . +Your task is to construct a JSON object in CloudEvents format. Based on the field name and field description in the 'data' field of the CloudEvents formatted JSON object, convert the input text provided by the user into the content of the 'data' field, which must comply with the specifications of the content of the 'datacontenttype' field. +The role is : + - If the 'datacontenttype' field content is 'application/json', then the' data 'field content should be a JSON object, + - else If the 'datacontenttype' field content is not 'application/json' and is 'application/xml', then the' data 'field content should be a string in XML format and the outermost of XML format is , inside is the XML generated by you based on field info; + - else the 'datacontenttype' field content is not 'application/json' and 'application/xml', then the' data 'field content is string of the 'text' field content; +Except for the content of the data field, all other values should be set to and cannot be modified. Finally, return to me the JSON object in CloudEvents format that you constructed -For the following text, extract the following information: +The following text is the field name and field description in the 'data' field of the CloudEvents-formatted JSON object, extract the following information: + +${fields} + -Create a CloudEvents-formatted JSON object with the following fields: -- specversion: Set to "1.0" (the current CloudEvents specification version) -- type: Set to \\\ ${type} \\\ -- source: Set to \\\ ${source} \\\ -- id: Set to \\\ ${id} \\\ (Generate a unique identifier for the event.) -- time: Set to \\\ ${time} \\\ -- datacontenttype: Set to \\\ ${datacontenttype} \\\ (e.g.,application/json) -- data: Set to the input text provided by the user - \\\ - ${fields} - \\\ +text: ${text} -text: \\\ ${text} \\\ - -If any of the fields marked as \\\ {} \\\ are null or empty, use a default value. - -Return the CloudEvents-formatted JSON object to the user,The format of the data field matches the datacontenttype,Just need to return the JSON object, nothing else needs to be returned。 +The output should be a markdown code snippet formatted in the following schema, including the leading and trailing "```json" and "```": +```json +{ + "specversion": string, Set to "1.0" + "type": string, Set to ${type} + "source": string, Set to ${source} + "subject": string, Set to ${subject} + "id": string, Set to ${id} + "time": string, Set to ${time} + "datacontenttype": string, Set to ${datacontenttype} + "data": object or string +} \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java index 67c31984aa..047b915ac3 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java @@ -46,6 +46,8 @@ class ChatGPTSourceConnectorTest { private String uri; private final String expectedMessage = "Hello, can you tell me a story."; + private final String expectedParseMessage = "User 13356288979 from Tianjin store placed an order with order number 11221122"; + @BeforeEach void setUp() throws Exception { connector = new ChatGPTSourceConnector(); @@ -64,13 +66,21 @@ void testPoll() throws Exception { final int batchSize = 10; for (int i = 0; i < batchSize; i++) { - HttpResponse resp = mockStructuredRequest(); + HttpResponse resp = mockStructuredChatRequest(); Assertions.assertEquals(resp.getStatusLine().getStatusCode(), HttpStatus.SC_OK); } List res = connector.poll(); Assertions.assertEquals(batchSize, res.size()); + for (int i = 0; i < batchSize; i++) { + HttpResponse resp = mockStructuredParseRequest(); + Assertions.assertEquals(resp.getStatusLine().getStatusCode(), HttpStatus.SC_OK); + } + + List res1 = connector.poll(); + Assertions.assertEquals(batchSize, res1.size()); + // test invalid requests HttpPost invalidPost = new HttpPost(uri); TestEvent event = new TestEvent(); @@ -84,7 +94,7 @@ void testPoll() throws Exception { } - HttpResponse mockStructuredRequest() throws Exception { + HttpResponse mockStructuredChatRequest() throws Exception { HttpPost httpPost = new HttpPost(uri); TestEvent event = new TestEvent(); event.type = "com.example.someevent"; @@ -98,6 +108,22 @@ HttpResponse mockStructuredRequest() throws Exception { return httpClient.execute(httpPost); } + + HttpResponse mockStructuredParseRequest() throws Exception { + HttpPost httpPost = new HttpPost(uri); + TestEvent event = new TestEvent(); + event.type = "com.example.someevent"; + event.source = "/mycontext"; + event.subject = "test"; + event.datacontenttype = "application/json"; + event.text = expectedParseMessage; + event.requestType = "PARSE1"; + event.fields = "orderNo:this is order number;address:this is a address;phone:this is phone number"; + httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event))); + + return httpClient.execute(httpPost); + } + @AfterEach void tearDown() throws Exception { connector.stop(); @@ -112,5 +138,6 @@ class TestEvent { public String subject; public String datacontenttype; public String text; + public String fields; } } \ No newline at end of file