From 608d16904930b19f6240b44539e9b340fae780a0 Mon Sep 17 00:00:00 2001 From: JiangShuJu Date: Tue, 9 Apr 2024 00:14:43 +0800 Subject: [PATCH] [ISSUE #4047] Join parse request support --- .../config/ChatGPTSourceConnectorConfig.java | 2 + .../connector/ChatGPTSourceConnector.java | 141 +++++------------- .../chatgpt/source/dto/ChatGPTRequestDTO.java | 24 ++- .../source/enums/ChatGPTRequestType.java | 25 ++++ .../chatgpt/source/handlers/ChatHandler.java | 76 ++++++++++ .../chatgpt/source/handlers/ParseHandler.java | 43 ++++++ .../source/managers/OpenaiManager.java | 124 +++++++++++++++ .../src/main/resources/prompt | 18 +-- .../src/main/resources/source-config.yml | 1 + .../connector/ChatGPTSourceConnectorTest.java | 8 +- .../src/test/resources/source-config.yml | 1 + 11 files changed, 347 insertions(+), 116 deletions(-) create mode 100644 eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java create mode 100644 eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java create mode 100644 eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java create mode 100644 eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java index f958bc2acf..8e1eba1410 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java @@ -32,4 +32,6 @@ public class ChatGPTSourceConnectorConfig { private boolean proxyEnable; + private String parsePromptFileName; + } diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java index c28644065b..96ffa5f802 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java @@ -17,21 +17,14 @@ package org.apache.eventmesh.connector.chatgpt.source.connector; - -import okhttp3.OkHttpClient; -import retrofit2.Retrofit; - -import static com.theokanning.openai.service.OpenAiService.defaultObjectMapper; -import static com.theokanning.openai.service.OpenAiService.defaultRetrofit; - import org.apache.eventmesh.common.ThreadPoolFactory; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.utils.AssertUtils; -import org.apache.eventmesh.common.utils.JsonUtils; -import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiConfig; -import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiProxyConfig; import org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig; import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO; +import org.apache.eventmesh.connector.chatgpt.source.handlers.ChatHandler; +import org.apache.eventmesh.connector.chatgpt.source.handlers.ParseHandler; +import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager; import org.apache.eventmesh.openconnect.api.config.Config; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; @@ -39,21 +32,20 @@ import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.eventmesh.openconnect.util.CloudEventUtil; -import java.net.InetSocketAddress; -import java.net.Proxy; -import java.net.URI; -import java.time.Duration; -import java.time.ZonedDateTime; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; -import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import io.cloudevents.CloudEvent; -import io.cloudevents.core.builder.CloudEventBuilder; import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.Vertx; import io.vertx.core.http.HttpMethod; @@ -63,14 +55,6 @@ import io.vertx.ext.web.Router; import io.vertx.ext.web.handler.BodyHandler; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.theokanning.openai.client.OpenAiApi; -import com.theokanning.openai.completion.chat.ChatCompletionRequest; -import com.theokanning.openai.completion.chat.ChatCompletionRequest.ChatCompletionRequestBuilder; -import com.theokanning.openai.completion.chat.ChatMessage; -import com.theokanning.openai.completion.chat.ChatMessageRole; -import com.theokanning.openai.service.OpenAiService; - import lombok.extern.slf4j.Slf4j; @Slf4j @@ -81,14 +65,16 @@ public class ChatGPTSourceConnector implements Source { private ChatGPTSourceConfig sourceConfig; private BlockingQueue queue; private HttpServer server; - private OpenAiService openAiService; private final ExecutorService chatgptSourceExecutorService = ThreadPoolFactory.createThreadPoolExecutor( Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2, "ChatGPTSourceThread"); - private String chatCompletionRequestTemplateStr; + private OpenaiManager openaiManager; + private String parsePromptTemplateStr; + private ChatHandler chatHandler; + private ParseHandler parseHandler; @Override public Class configClass() { @@ -108,85 +94,26 @@ public void init(ConnectorContext connectorContext) { doInit(); } - private void initOpenAi() { - OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig(); - AssertUtils.isTrue(openaiConfig.getTimeout() > 0, "openaiTimeout must be >= 0"); - boolean proxyEnable = sourceConfig.connectorConfig.isProxyEnable(); - if (proxyEnable) { - OpenaiProxyConfig chatgptProxyConfig = sourceConfig.openaiProxyConfig; - if (chatgptProxyConfig.getHost() == null) { - throw new IllegalStateException("chatgpt proxy config 'host' cannot be null"); - } - ObjectMapper mapper = defaultObjectMapper(); - Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(chatgptProxyConfig.getHost(), chatgptProxyConfig.getPort())); - OkHttpClient client = OpenAiService - .defaultClient(openaiConfig.getToken(), Duration.ofSeconds(openaiConfig.getTimeout())) - .newBuilder() - .proxy(proxy) - .build(); - Retrofit retrofit = defaultRetrofit(client, mapper); - OpenAiApi api = retrofit.create(OpenAiApi.class); - this.openAiService = new OpenAiService(api); - } else { - this.openAiService = - new OpenAiService(openaiConfig.getToken(), Duration.ofSeconds(openaiConfig.getTimeout())); - } - ChatCompletionRequestBuilder builder = ChatCompletionRequest - .builder() - .model(openaiConfig.getModel()); - AssertUtils.notNull(openaiConfig.getModel(), "model cannot be null"); - builder = builder.model(openaiConfig.getModel()); - if (openaiConfig.getUser() != null) { - builder = builder.user(openaiConfig.getUser()); - } - if (openaiConfig.getPresencePenalty() != null) { - builder = builder.presencePenalty(openaiConfig.getPresencePenalty()); - } - if (openaiConfig.getFrequencyPenalty() != null) { - builder = builder.frequencyPenalty(openaiConfig.getFrequencyPenalty()); - } - if (openaiConfig.getMaxTokens() != null) { - builder = builder.maxTokens(openaiConfig.getMaxTokens()); - } - if (openaiConfig.getTemperature() != null) { - builder = builder.temperature(openaiConfig.getTemperature()); - } - if (openaiConfig.getLogitBias() != null && !openaiConfig.getLogitBias().isEmpty()) { - builder = builder.logitBias(openaiConfig.getLogitBias()); - } - if (openaiConfig.getStop() != null && !openaiConfig.getStop().isEmpty()) { - builder = builder.stop(openaiConfig.getStop()); - } - this.chatCompletionRequestTemplateStr = JsonUtils.toJSONString(builder.build()); - } - - public ChatCompletionRequest newChatCompletionRequest(List chatMessages) { - ChatCompletionRequest request = JsonUtils.parseObject(chatCompletionRequestTemplateStr, ChatCompletionRequest.class); - request.setMessages(chatMessages); - return request; - } - - private CloudEvent genGptConnectRecord(ChatGPTRequestDTO event) { - List chatMessages = new ArrayList<>(); - chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), event.getPrompt())); - ChatCompletionRequest req = newChatCompletionRequest(chatMessages); - StringBuilder gptData = new StringBuilder(); - + public void initParsePrompt() { + String parsePromptFileName = sourceConfig.getConnectorConfig().getParsePromptFileName(); + URL resource = this.getClass().getClassLoader().getResource(parsePromptFileName); + AssertUtils.notNull(resource, String.format("cannot find file %s", parsePromptFileName)); try { - openAiService.createChatCompletion(req).getChoices() - .forEach(chatCompletionChoice -> gptData.append(chatCompletionChoice.getMessage().getContent())); - } catch (Exception e) { - log.error("Failed to generate GPT connection record: {}", e.getMessage()); + this.parsePromptTemplateStr = new String(Files.readAllBytes(Paths.get(resource.toURI()))); + } catch (URISyntaxException e) { + throw new IllegalStateException("The file path is invalid", e); + } catch (IOException e) { + throw new IllegalStateException("Unable to read file", e); } - - return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create(event.getSource())).withType(event.getType()) - .withTime(ZonedDateTime.now().toOffsetDateTime()).withData(gptData.toString().getBytes()).withSubject(event.getSubject()) - .withDataContentType(event.getDataContentType()).build(); } + @SuppressWarnings("checkstyle:WhitespaceAround") private void doInit() { - initOpenAi(); + initParsePrompt(); + this.openaiManager = new OpenaiManager(sourceConfig); + this.chatHandler = new ChatHandler(this.openaiManager); + this.parseHandler = new ParseHandler(openaiManager, parsePromptTemplateStr); this.queue = new LinkedBlockingQueue<>(1024); final Vertx vertx = Vertx.vertx(); final Router router = Router.router(vertx); @@ -194,12 +121,22 @@ private void doInit() { try { RequestBody body = ctx.body(); ChatGPTRequestDTO bodyObject = body.asPojo(ChatGPTRequestDTO.class); - if (bodyObject.getSubject() == null || bodyObject.getDataContentType() == null || bodyObject.getPrompt() == null) { + if (bodyObject.getSubject() == null || bodyObject.getDataContentType() == null || bodyObject.getText() == null) { throw new IllegalStateException("Attributes 'subject', 'datacontenttype', and 'prompt' cannot be null"); } chatgptSourceExecutorService.execute(() -> { try { - CloudEvent cloudEvent = genGptConnectRecord(bodyObject); + CloudEvent cloudEvent; + switch (bodyObject.getRequestType()) { + case CHAT: + cloudEvent = chatHandler.invoke(bodyObject); + break; + case PARSE: + cloudEvent = parseHandler.invoke(bodyObject); + break; + default: + throw new IllegalStateException("the request type is illegal"); + } queue.add(cloudEvent); log.info("[ChatGPTSourceConnector] Succeed to convert payload into CloudEvent."); ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end(); diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java index d99ebf1837..83ae2900c0 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java @@ -1,10 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.connector.chatgpt.source.dto; +import org.apache.eventmesh.connector.chatgpt.source.enums.ChatGPTRequestType; + import com.fasterxml.jackson.annotation.JsonProperty; import lombok.AllArgsConstructor; import lombok.Data; -import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; @Data @@ -12,6 +30,8 @@ @NoArgsConstructor public class ChatGPTRequestDTO { + private ChatGPTRequestType requestType; + private String source; private String subject; @@ -21,6 +41,6 @@ public class ChatGPTRequestDTO { private String type; - private String prompt; + private String text; } diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java new file mode 100644 index 0000000000..9930525651 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.enums; + + +public enum ChatGPTRequestType { + + CHAT, PARSE; + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java new file mode 100644 index 0000000000..924729b6c4 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.handlers; + + +import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO; +import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager; + +import java.net.URI; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; + +import com.theokanning.openai.completion.chat.ChatCompletionRequest; +import com.theokanning.openai.completion.chat.ChatMessage; +import com.theokanning.openai.completion.chat.ChatMessageRole; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ChatHandler { + + private final OpenaiManager openaiManager; + + public ChatHandler(OpenaiManager openaiManager) { + this.openaiManager = openaiManager; + } + + public CloudEvent invoke(ChatGPTRequestDTO event) { + return genGptConnectRecord(event); + } + + private CloudEvent genGptConnectRecord(ChatGPTRequestDTO event) { + List chatMessages = new ArrayList<>(); + chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), event.getText())); + ChatCompletionRequest req = openaiManager.newChatCompletionRequest(chatMessages); + StringBuilder gptData = new StringBuilder(); + + try { + openaiManager.getOpenAiService().createChatCompletion(req).getChoices() + .forEach(chatCompletionChoice -> gptData.append(chatCompletionChoice.getMessage().getContent())); + } catch (Exception e) { + log.error("Failed to generate GPT connection record: {}", e.getMessage()); + } + + return CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create(event.getSource())) + .withType(event.getType()) + .withTime(ZonedDateTime.now().toOffsetDateTime()) + .withData(gptData.toString().getBytes()) + .withSubject(event.getSubject()) + .withDataContentType(event.getDataContentType()) + .build(); + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java new file mode 100644 index 0000000000..599a5b715f --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.handlers; + + +import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO; +import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager; + +import io.cloudevents.CloudEvent; + +public class ParseHandler { + + private final OpenaiManager openaiManager; + + private final String promptTemplate; + + public ParseHandler(OpenaiManager openaiManager, String promptTemplate) { + this.openaiManager = openaiManager; + this.promptTemplate = promptTemplate; + } + + public CloudEvent invoke(ChatGPTRequestDTO event) { + // todo use StringSubstitutor event and promptTemplate translate to final prompt + + return null; + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java new file mode 100644 index 0000000000..9e97e6cdbc --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.chatgpt.source.managers; + +import static com.theokanning.openai.service.OpenAiService.defaultObjectMapper; +import static com.theokanning.openai.service.OpenAiService.defaultRetrofit; + +import org.apache.eventmesh.common.utils.AssertUtils; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig; +import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiConfig; +import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiProxyConfig; + +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.time.Duration; +import java.util.List; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.theokanning.openai.client.OpenAiApi; +import com.theokanning.openai.completion.chat.ChatCompletionRequest; +import com.theokanning.openai.completion.chat.ChatCompletionRequest.ChatCompletionRequestBuilder; +import com.theokanning.openai.completion.chat.ChatMessage; +import com.theokanning.openai.service.OpenAiService; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import okhttp3.OkHttpClient; +import retrofit2.Retrofit; + + +@Slf4j +public class OpenaiManager { + + @Getter + private OpenAiService openAiService; + + private String chatCompletionRequestTemplateStr; + + public OpenaiManager(ChatGPTSourceConfig sourceConfig) { + initOpenAi(sourceConfig); + } + + public String getResult(ChatCompletionRequest req) { + StringBuilder gptData = new StringBuilder(); + try { + openAiService.createChatCompletion(req).getChoices() + .forEach(chatCompletionChoice -> gptData.append(chatCompletionChoice.getMessage().getContent())); + } catch (Exception e) { + log.error("Failed to generate GPT connection record: {}", e.getMessage()); + } + return gptData.toString(); + } + + public ChatCompletionRequest newChatCompletionRequest(List chatMessages) { + ChatCompletionRequest request = JsonUtils.parseObject(chatCompletionRequestTemplateStr, ChatCompletionRequest.class); + request.setMessages(chatMessages); + return request; + } + + private void initOpenAi(ChatGPTSourceConfig sourceConfig) { + OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig(); + AssertUtils.isTrue(openaiConfig.getTimeout() > 0, "openaiTimeout must be >= 0"); + boolean proxyEnable = sourceConfig.connectorConfig.isProxyEnable(); + if (proxyEnable) { + OpenaiProxyConfig chatgptProxyConfig = sourceConfig.openaiProxyConfig; + if (chatgptProxyConfig.getHost() == null) { + throw new IllegalStateException("chatgpt proxy config 'host' cannot be null"); + } + ObjectMapper mapper = defaultObjectMapper(); + Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(chatgptProxyConfig.getHost(), chatgptProxyConfig.getPort())); + OkHttpClient client = + OpenAiService.defaultClient(openaiConfig.getToken(), Duration.ofSeconds(openaiConfig.getTimeout())).newBuilder().proxy(proxy).build(); + Retrofit retrofit = defaultRetrofit(client, mapper); + OpenAiApi api = retrofit.create(OpenAiApi.class); + this.openAiService = new OpenAiService(api); + } else { + this.openAiService = new OpenAiService(openaiConfig.getToken(), Duration.ofSeconds(openaiConfig.getTimeout())); + } + ChatCompletionRequestBuilder builder = ChatCompletionRequest.builder().model(openaiConfig.getModel()); + AssertUtils.notNull(openaiConfig.getModel(), "model cannot be null"); + builder = builder.model(openaiConfig.getModel()); + if (openaiConfig.getUser() != null) { + builder = builder.user(openaiConfig.getUser()); + } + if (openaiConfig.getPresencePenalty() != null) { + builder = builder.presencePenalty(openaiConfig.getPresencePenalty()); + } + if (openaiConfig.getFrequencyPenalty() != null) { + builder = builder.frequencyPenalty(openaiConfig.getFrequencyPenalty()); + } + if (openaiConfig.getMaxTokens() != null) { + builder = builder.maxTokens(openaiConfig.getMaxTokens()); + } + if (openaiConfig.getTemperature() != null) { + builder = builder.temperature(openaiConfig.getTemperature()); + } + if (openaiConfig.getLogitBias() != null && !openaiConfig.getLogitBias().isEmpty()) { + builder = builder.logitBias(openaiConfig.getLogitBias()); + } + if (openaiConfig.getStop() != null && !openaiConfig.getStop().isEmpty()) { + builder = builder.stop(openaiConfig.getStop()); + } + this.chatCompletionRequestTemplateStr = JsonUtils.toJSONString(builder.build()); + } + + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt index 69a8852fb6..661bcb315a 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt @@ -4,17 +4,17 @@ For the following text, extract the following information: Create a CloudEvents-formatted JSON object with the following fields: - specversion: Set to "1.0" (the current CloudEvents specification version) -- type: Set to \\\ {type} \\\ -- source: Set to \\\ {source} \\\ -- id: Set to \\\ {id} \\\ (Generate a unique identifier for the event (e.g., "A234-1234-1234")) -- time: Set to \\\ {time} \\\ (the current timestamp in ISO 8601 format. e.g:"2023-03-25T12:34:56.789Z") -- datacontenttype: Set to \\\ {datacontenttype} \\\ +- type: Set to \\\ ${type} \\\ +- source: Set to \\\ ${source} \\\ +- id: Set to \\\ ${id} \\\ (Generate a unique identifier for the event.) +- time: Set to \\\ ${time} \\\ +- datacontenttype: Set to \\\ ${datacontenttype} \\\ (e.g.,application/json) - data: Set to the input text provided by the user -\\\ -{fields} -\\\ + \\\ + ${fields} + \\\ -text: \\\ {text} \\\ +text: \\\ ${text} \\\ If any of the fields marked as \\\ {} \\\ are null or empty, use a default value. diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml index 23c70bd32b..28476ee85a 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml @@ -30,6 +30,7 @@ connectorConfig: port: 3756 idleTimeout: 999 proxyEnable: true + parsePromptFileName: prompt # https://platform.openai.com/docs/api-reference/chat/create openaiConfig: diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java index 656e870357..67c31984aa 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java @@ -77,7 +77,7 @@ void testPoll() throws Exception { event.type = "com.example.someevent"; event.source = "/mycontext"; event.datacontenttype = "text/plain"; - event.prompt = expectedMessage; + event.text = expectedMessage; invalidPost.setEntity(new StringEntity(JsonUtils.toJSONString(event))); HttpResponse resp = httpClient.execute(invalidPost); Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, resp.getStatusLine().getStatusCode()); @@ -91,7 +91,8 @@ HttpResponse mockStructuredRequest() throws Exception { event.source = "/mycontext"; event.subject = "test"; event.datacontenttype = "text/plain"; - event.prompt = expectedMessage; + event.text = expectedMessage; + event.requestType = "CHAT"; httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event))); return httpClient.execute(httpPost); @@ -105,10 +106,11 @@ void tearDown() throws Exception { class TestEvent { + public String requestType; public String type; public String source; public String subject; public String datacontenttype; - public String prompt; + public String text; } } \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml index a3c0187515..ee4fd9dee8 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml @@ -30,6 +30,7 @@ connectorConfig: port: 3756 idleTimeout: 999 proxyEnable: true + parsePromptFileName: prompt # https://platform.openai.com/docs/api-reference/chat/create openaiConfig: