Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmatthis committed Dec 31, 2023
2 parents 5925a1c + 5b568d9 commit 9d5ca80
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 33 deletions.
2 changes: 0 additions & 2 deletions src/interfaces/chat/discord/myDiscord.module.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { Logger, Module } from '@nestjs/common';
import { DiscordPingService } from './services/discordPing.service';
import { NecordModule } from 'necord';
import { DiscordChatService } from './services/discordChat.service';
import { GcpModule } from '../../../shared/gcp/gcp.module';
import { NecordConfigService } from './services/necordConfig.service';
import { DiscordReadyLoggingService } from './services/discordReadyLogging.service';
import { DiscordThreadService } from './services/discordThread.service';
import { ChatbotService } from '../../../shared/chatbot-core/chatbot.service';
import { ChatbotCoreModule } from '../../../shared/chatbot-core/chatbotCore.module';

@Module({
Expand Down
106 changes: 83 additions & 23 deletions src/interfaces/chat/discord/services/discordThread.service.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import { Injectable, Logger } from '@nestjs/common';
import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common';
import { Context, Options, SlashCommand, SlashCommandContext } from 'necord';
import { DEV_GUILDS } from '../../../../shared/config/constants';
import { TextDto } from '../dto/textDto';
import { ChatbotService } from '../../../../shared/chatbot-core/chatbot.service';
import { Chatbot } from '../../../../shared/chatbot-core/chatbot.dto';
import {
ChatInputCommandInteraction,
Client,
Message,
TextChannel,
ThreadChannel,
} from 'discord.js';

@Injectable()
export class DiscordThreadService {
export class DiscordThreadService implements OnModuleDestroy {
constructor(
private readonly _logger: Logger,
private readonly _chatbotService: ChatbotService,
private readonly _client: Client,
) {}

@SlashCommand({
Expand All @@ -27,39 +34,92 @@ export class DiscordThreadService {
this._logger.log(
`Creating thread with starting text:'${text}' in channel: name= ${interaction.channel.name}, id=${interaction.channel.id} `,
);

// @ts-ignore
const thread = await interaction.channel.threads.create({
const channel = interaction.channel as TextChannel;
const thread = await channel.threads.create({
name: text || 'new thread',
autoArchiveDuration: 60,
reason: 'wow this is a thread',
});

await this._chatbotService.createChatbot(thread.id);

interaction.client.on('messageCreate', async (message) => {
this._logger.log(`Recieved message ${message.content}`);
this._beginWatchingIncomingMessages(interaction, channel, thread);
await this._sendInitialReply(interaction, channel, thread, text);
}

/**
* When the bot application dies, we remove all listeners.
*/
onModuleDestroy() {
this._client.removeAllListeners();
}

private _beginWatchingIncomingMessages(
interaction: ChatInputCommandInteraction,
channel: TextChannel,
thread: ThreadChannel,
) {
const t = { ...thread };
const handleMessageCreation = async (message: Message) => {
if (message.author.bot) {
return;
}
if (message.channelId !== thread.id) {
return;
}

const aiResponse = await this._chatbotService.generateAiResponse(
thread.id,
message.content,
// @ts-ignore
{ topic: interaction.channel.topic },
);
await message.reply(aiResponse);
});
this._logger.log(`Received message ${message.content}`);
await this._handleStream(channel, thread, message.content, message);
};

interaction.client.on('messageCreate', handleMessageCreation);
}

private async _sendInitialReply(
interaction: ChatInputCommandInteraction,
channel: TextChannel,
thread: ThreadChannel,
inputText: string,
) {
const initialMessage = await thread.send(inputText);
await interaction.editReply('Thread Created!');
await this._handleStream(channel, thread, inputText, initialMessage);
}

const initialMessage = await thread.send(startingText.text);
const aiResponse = await this._chatbotService.generateAiResponse(
private async _handleStream(
channel: TextChannel,
thread: ThreadChannel,
inputText: string,
message: Message<boolean>,
) {
const tokenStream = this._chatbotService.streamResponse(
thread.id,
startingText.text,
// @ts-ignore
{ topic: interaction.channel.topic },
inputText,
{
topic: channel.topic,
},
);
await initialMessage.reply(aiResponse);
await interaction.editReply('Thread Created!');
thread.sendTyping();

let initialReply: Message<boolean> = undefined;
let final = '';
for await (const current of tokenStream) {
const { theChunk, didResetOccur } = current;

if (!initialReply) {
initialReply = await message.reply(theChunk);
continue;
}

if (didResetOccur) {
initialReply = await message.reply(theChunk);
continue;
}

await initialReply.edit(theChunk);
final = current.data;
}

this._logger.debug(`Final thingy`, final);
}
}
74 changes: 66 additions & 8 deletions src/shared/chatbot-core/chatbot.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@ import { Injectable, Logger } from '@nestjs/common';
import { ChainBuilderService } from '../ai/langchain/chain-builder/chain-builder.service';
import { Chatbot } from './chatbot.dto';

class StreamResponseOptions {
/**
* Character limit to split the outgoing data
*/
splitAt: number = 1800;
}

@Injectable()
export class ChatbotService {
_chatbots: Map<string | number, Chatbot> = new Map();
_chatbots: Map<string, Chatbot> = new Map();
constructor(
private readonly _logger: Logger,
private readonly _chainBuilderService: ChainBuilderService,
Expand All @@ -14,7 +21,7 @@ export class ChatbotService {
const chain = await this._chainBuilderService.createChain(modelName);

const chatbot = { chain } as Chatbot;
this._chatbots.set(chatbotId, chatbot);
this._chatbots.set(String(chatbotId), chatbot);
return chatbot;
}

Expand All @@ -26,18 +33,69 @@ export class ChatbotService {
this._logger.log(
`Responding to message ${humanMessage} with chatbotId: ${chatbotId}`,
);
let chatbot: Chatbot;
const chatbot = this.getChatbotById(chatbotId);
return await chatbot.chain.invoke({
text: humanMessage,
...additionalArgs,
});
}

getChatbotById(chatbotId: string | number) {
try {
chatbot = this._chatbots.get(chatbotId);
return this._chatbots.get(String(chatbotId));
} catch (error) {
this._logger.error(
`Could not find 'chatbot' for 'chatbotId': ${chatbotId}`,
);
this._logger.error(`Could not find chatbot for chatbotId: ${chatbotId}`);
throw error;
}
return await chatbot.chain.invoke({
}
async *streamResponse(
chatbotId: string | number,
humanMessage: string,
additionalArgs: any,
options: StreamResponseOptions = new StreamResponseOptions(),
) {
const normalizedOptions = {
...new StreamResponseOptions(),
...options,
};
const { splitAt } = normalizedOptions;
const chatbot = this.getChatbotById(chatbotId);
const chatStream = await chatbot.chain.stream({
text: humanMessage,
...additionalArgs,
});

let streamedResult = '';
let subStreamResult = '';
let didResetOccur = false;
let tokens = 0;
for await (const chunk of chatStream) {
// the full message
streamedResult += chunk;
tokens++;
if (subStreamResult.length < splitAt) {
subStreamResult += chunk;
} else {
//
subStreamResult = subStreamResult.slice(subStreamResult.length * 0.95);
subStreamResult += chunk;
didResetOccur = true;
}

if (tokens === 30) {
yield {
data: streamedResult,
theChunk: subStreamResult,
didResetOccur,
};
tokens = 0;
didResetOccur = false;
}
}

yield {
data: streamedResult,
theChunk: subStreamResult,
};
}
}

0 comments on commit 9d5ca80

Please sign in to comment.