Skip to content

Commit

Permalink
[ISSUE apache#4047] Join parse request support
Browse files Browse the repository at this point in the history
  • Loading branch information
JiangShuJu committed Apr 8, 2024
1 parent 317ffb4 commit 608d169
Show file tree
Hide file tree
Showing 11 changed files with 347 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ public class ChatGPTSourceConnectorConfig {

private boolean proxyEnable;

private String parsePromptFileName;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,35 @@

package org.apache.eventmesh.connector.chatgpt.source.connector;


import okhttp3.OkHttpClient;
import retrofit2.Retrofit;

import static com.theokanning.openai.service.OpenAiService.defaultObjectMapper;
import static com.theokanning.openai.service.OpenAiService.defaultRetrofit;

import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.utils.AssertUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiConfig;
import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiProxyConfig;
import org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig;
import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO;
import org.apache.eventmesh.connector.chatgpt.source.handlers.ChatHandler;
import org.apache.eventmesh.connector.chatgpt.source.handlers.ParseHandler;
import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.CloudEventUtil;

import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URI;
import java.time.Duration;
import java.time.ZonedDateTime;

import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
Expand All @@ -63,14 +55,6 @@
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.theokanning.openai.client.OpenAiApi;
import com.theokanning.openai.completion.chat.ChatCompletionRequest;
import com.theokanning.openai.completion.chat.ChatCompletionRequest.ChatCompletionRequestBuilder;
import com.theokanning.openai.completion.chat.ChatMessage;
import com.theokanning.openai.completion.chat.ChatMessageRole;
import com.theokanning.openai.service.OpenAiService;

import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -81,14 +65,16 @@ public class ChatGPTSourceConnector implements Source {
private ChatGPTSourceConfig sourceConfig;
private BlockingQueue<CloudEvent> queue;
private HttpServer server;
private OpenAiService openAiService;
private final ExecutorService chatgptSourceExecutorService =
ThreadPoolFactory.createThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2,
Runtime.getRuntime().availableProcessors() * 2,
"ChatGPTSourceThread");

private String chatCompletionRequestTemplateStr;
private OpenaiManager openaiManager;
private String parsePromptTemplateStr;
private ChatHandler chatHandler;
private ParseHandler parseHandler;

@Override
public Class<? extends Config> configClass() {
Expand All @@ -108,98 +94,49 @@ public void init(ConnectorContext connectorContext) {
doInit();
}

private void initOpenAi() {
OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig();
AssertUtils.isTrue(openaiConfig.getTimeout() > 0, "openaiTimeout must be >= 0");
boolean proxyEnable = sourceConfig.connectorConfig.isProxyEnable();
if (proxyEnable) {
OpenaiProxyConfig chatgptProxyConfig = sourceConfig.openaiProxyConfig;
if (chatgptProxyConfig.getHost() == null) {
throw new IllegalStateException("chatgpt proxy config 'host' cannot be null");
}
ObjectMapper mapper = defaultObjectMapper();
Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(chatgptProxyConfig.getHost(), chatgptProxyConfig.getPort()));
OkHttpClient client = OpenAiService
.defaultClient(openaiConfig.getToken(), Duration.ofSeconds(openaiConfig.getTimeout()))
.newBuilder()
.proxy(proxy)
.build();
Retrofit retrofit = defaultRetrofit(client, mapper);
OpenAiApi api = retrofit.create(OpenAiApi.class);
this.openAiService = new OpenAiService(api);
} else {
this.openAiService =
new OpenAiService(openaiConfig.getToken(), Duration.ofSeconds(openaiConfig.getTimeout()));
}
ChatCompletionRequestBuilder builder = ChatCompletionRequest
.builder()
.model(openaiConfig.getModel());
AssertUtils.notNull(openaiConfig.getModel(), "model cannot be null");
builder = builder.model(openaiConfig.getModel());
if (openaiConfig.getUser() != null) {
builder = builder.user(openaiConfig.getUser());
}
if (openaiConfig.getPresencePenalty() != null) {
builder = builder.presencePenalty(openaiConfig.getPresencePenalty());
}
if (openaiConfig.getFrequencyPenalty() != null) {
builder = builder.frequencyPenalty(openaiConfig.getFrequencyPenalty());
}
if (openaiConfig.getMaxTokens() != null) {
builder = builder.maxTokens(openaiConfig.getMaxTokens());
}
if (openaiConfig.getTemperature() != null) {
builder = builder.temperature(openaiConfig.getTemperature());
}
if (openaiConfig.getLogitBias() != null && !openaiConfig.getLogitBias().isEmpty()) {
builder = builder.logitBias(openaiConfig.getLogitBias());
}
if (openaiConfig.getStop() != null && !openaiConfig.getStop().isEmpty()) {
builder = builder.stop(openaiConfig.getStop());
}
this.chatCompletionRequestTemplateStr = JsonUtils.toJSONString(builder.build());
}

public ChatCompletionRequest newChatCompletionRequest(List<ChatMessage> chatMessages) {
ChatCompletionRequest request = JsonUtils.parseObject(chatCompletionRequestTemplateStr, ChatCompletionRequest.class);
request.setMessages(chatMessages);
return request;
}

private CloudEvent genGptConnectRecord(ChatGPTRequestDTO event) {
List<ChatMessage> chatMessages = new ArrayList<>();
chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), event.getPrompt()));
ChatCompletionRequest req = newChatCompletionRequest(chatMessages);
StringBuilder gptData = new StringBuilder();

public void initParsePrompt() {
String parsePromptFileName = sourceConfig.getConnectorConfig().getParsePromptFileName();
URL resource = this.getClass().getClassLoader().getResource(parsePromptFileName);
AssertUtils.notNull(resource, String.format("cannot find file %s", parsePromptFileName));
try {
openAiService.createChatCompletion(req).getChoices()
.forEach(chatCompletionChoice -> gptData.append(chatCompletionChoice.getMessage().getContent()));
} catch (Exception e) {
log.error("Failed to generate GPT connection record: {}", e.getMessage());
this.parsePromptTemplateStr = new String(Files.readAllBytes(Paths.get(resource.toURI())));
} catch (URISyntaxException e) {
throw new IllegalStateException("The file path is invalid", e);
} catch (IOException e) {
throw new IllegalStateException("Unable to read file", e);
}

return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create(event.getSource())).withType(event.getType())
.withTime(ZonedDateTime.now().toOffsetDateTime()).withData(gptData.toString().getBytes()).withSubject(event.getSubject())
.withDataContentType(event.getDataContentType()).build();
}


@SuppressWarnings("checkstyle:WhitespaceAround")
private void doInit() {
initOpenAi();
initParsePrompt();
this.openaiManager = new OpenaiManager(sourceConfig);
this.chatHandler = new ChatHandler(this.openaiManager);
this.parseHandler = new ParseHandler(openaiManager, parsePromptTemplateStr);
this.queue = new LinkedBlockingQueue<>(1024);
final Vertx vertx = Vertx.vertx();
final Router router = Router.router(vertx);
router.route().path(this.sourceConfig.connectorConfig.getPath()).method(HttpMethod.POST).handler(BodyHandler.create()).handler(ctx -> {
try {
RequestBody body = ctx.body();
ChatGPTRequestDTO bodyObject = body.asPojo(ChatGPTRequestDTO.class);
if (bodyObject.getSubject() == null || bodyObject.getDataContentType() == null || bodyObject.getPrompt() == null) {
if (bodyObject.getSubject() == null || bodyObject.getDataContentType() == null || bodyObject.getText() == null) {
throw new IllegalStateException("Attributes 'subject', 'datacontenttype', and 'prompt' cannot be null");
}
chatgptSourceExecutorService.execute(() -> {
try {
CloudEvent cloudEvent = genGptConnectRecord(bodyObject);
CloudEvent cloudEvent;
switch (bodyObject.getRequestType()) {
case CHAT:
cloudEvent = chatHandler.invoke(bodyObject);
break;
case PARSE:
cloudEvent = parseHandler.invoke(bodyObject);
break;
default:
throw new IllegalStateException("the request type is illegal");
}
queue.add(cloudEvent);
log.info("[ChatGPTSourceConnector] Succeed to convert payload into CloudEvent.");
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.chatgpt.source.dto;

import org.apache.eventmesh.connector.chatgpt.source.enums.ChatGPTRequestType;

import com.fasterxml.jackson.annotation.JsonProperty;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class ChatGPTRequestDTO {

private ChatGPTRequestType requestType;

private String source;

private String subject;
Expand All @@ -21,6 +41,6 @@ public class ChatGPTRequestDTO {

private String type;

private String prompt;
private String text;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.chatgpt.source.enums;


public enum ChatGPTRequestType {

CHAT, PARSE;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.chatgpt.source.handlers;


import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO;
import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager;

import java.net.URI;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;

import com.theokanning.openai.completion.chat.ChatCompletionRequest;
import com.theokanning.openai.completion.chat.ChatMessage;
import com.theokanning.openai.completion.chat.ChatMessageRole;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ChatHandler {

private final OpenaiManager openaiManager;

public ChatHandler(OpenaiManager openaiManager) {
this.openaiManager = openaiManager;
}

public CloudEvent invoke(ChatGPTRequestDTO event) {
return genGptConnectRecord(event);
}

private CloudEvent genGptConnectRecord(ChatGPTRequestDTO event) {
List<ChatMessage> chatMessages = new ArrayList<>();
chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), event.getText()));
ChatCompletionRequest req = openaiManager.newChatCompletionRequest(chatMessages);
StringBuilder gptData = new StringBuilder();

try {
openaiManager.getOpenAiService().createChatCompletion(req).getChoices()
.forEach(chatCompletionChoice -> gptData.append(chatCompletionChoice.getMessage().getContent()));
} catch (Exception e) {
log.error("Failed to generate GPT connection record: {}", e.getMessage());
}

return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create(event.getSource()))
.withType(event.getType())
.withTime(ZonedDateTime.now().toOffsetDateTime())
.withData(gptData.toString().getBytes())
.withSubject(event.getSubject())
.withDataContentType(event.getDataContentType())
.build();
}

}
Loading

0 comments on commit 608d169

Please sign in to comment.