Compare commits

...

10 Commits

Author SHA1 Message Date
b175019321 reaction to regenerate 2025-08-21 01:16:02 -07:00
005e9ac80b refactor and add last_messages 2025-08-21 01:16:02 -07:00
d6a770e011 refactor and add last_messages 2025-08-21 01:16:02 -07:00
4261991022 refactor and add last_messages 2025-08-21 01:16:02 -07:00
7b052e0ff9 refactor AI out of Discord function 2025-08-21 01:16:02 -07:00
c615726127 fix race condition 2025-08-21 01:16:02 -07:00
e1749dfe92 webhook 2025-08-21 01:16:02 -07:00
e1a8cbf98d name aware 2025-08-21 01:16:02 -07:00
a3bb1a499c feat: introduce Conversation dataclass to encapsulate conversation history 2025-08-21 01:16:02 -07:00
a94daf3654 remove useless tests 2025-08-21 01:16:02 -07:00
2 changed files with 180 additions and 206 deletions

253
bot.py
View File

@@ -1,3 +1,4 @@
import collections
import discord
from discord.ext import commands
from openai import AsyncOpenAI
@@ -5,17 +6,30 @@ import os
import base64
import aiohttp
import argparse
from typing import List, Dict, Any
# --- Configuration ---
OPENAI_API_KEY = "eh"
MODEL = "p620"
DEFAULT_SYSTEM_PROMPT = "you are a catboy named Aoi with dark blue fur and is a tsundere"
NAME_PROMPT = "reply with your name, nothing else, no punctuation"
DEFAULT_NAME = "Aoi"
DEFAULT_AVATAR = "https://cdn.discordapp.com/avatars/1406466525858369716/f1dfeaf2a1c361dbf981e2e899c7f981?size=256"
# --- Command Line Arguments ---
parser = argparse.ArgumentParser(description="Aoi Discord Bot")
parser.add_argument('--base_url', type=str, required=True,
help='The base URL for the OpenAI API.')
parser.add_argument('--discord_token', type=str, required=True,
help='The Discord bot token.')
parser.add_argument(
'--base_url',
type=str,
required=True,
help='The base URL for the OpenAI API.',
)
parser.add_argument(
'--discord_token',
type=str,
required=True,
help='The Discord bot token.',
)
args = parser.parse_args()
# --- Bot Setup ---
@@ -24,15 +38,113 @@ intents.messages = True
intents.message_content = True
bot = commands.Bot(command_prefix="/", intents=intents)
# --- Data Storage ---
conversation_history = {} # Keyed by channel ID
# --- OpenAI Client ---
client = AsyncOpenAI(
base_url=args.base_url,
api_key=OPENAI_API_KEY,
)
# --- Helpers ---
async def get_user_from_id(ctx, userid):
if ctx.guild:
user = await ctx.guild.fetch_member(userid)
else:
user = await bot.fetch_user(userid)
return user.display_name
async def get_user_from_mention(ctx, mention):
match = re.findall(r"<@!?(\d+)>", mention)
if not match:
return mention
return await get_user_from_id(ctx, int(match[0]))
class Conversation:
def __init__(self, prompt, name):
self.history = [{"role": "system", "content": prompt}]
self.bot_name = name
self.last_messages = []
def add_message_pair(self, user, assistant):
self.history.extend([
{"role": "user", "content": user},
{"role": "assistant", "content": assistant},
])
async def generate(self, text, media=tuple()):
# prepare text part
if text:
openai_content = [{"type": "text", "text": text}]
else:
openai_content = [{"type": "text", "text": "."}]
# prepare images part
async with aiohttp.ClientSession() as session:
for (content_type, url) in media:
if "image" not in content_type:
continue
try:
async with session.get(url) as resp:
if resp.status != 200:
raise IOError(f"{url} --> {resp.status}")
image_data = await resp.read()
b64_image = base64.b64encode(image_data).decode('utf-8')
b64_url = f"data:{content_type};base64,{b64_image}"
openai_content.append({
"type": "image_url",
"image_url": {"url": b64_url}
})
except Exception as e:
print(f"Error downloading or processing attachment: {e}")
# send request to openai api and return response
request = self.history + [{"role": "user", "content": openai_content}]
llm_response = await client.chat.completions.create(
model=MODEL, messages=request,
)
response = llm_response.choices[0].message.content
self.add_message_pair(openai_content, response)
return response
async def regenerate(self):
llm_response = await client.chat.completions.create(
model=MODEL, messages=self.history[:-1]
)
response = llm_response.choices[0].message.content
self.history[-1] = {"role": "assistant", "content": response}
return response
async def discord_send(channel, text, name, avatar=DEFAULT_AVATAR):
chunks = [text[i:i+2000] for i in range(0, len(text), 2000)]
messages = []
for chunk in chunks:
if channel.guild:
hook = await webhook(channel)
message = await hook.send(
content=chunk,
username=name,
avatar_url=avatar,
wait=True,
)
else:
message = await channel.send(content=chunk)
messages.append(message)
return messages
# --- Data Storage ---
# Keyed by channel ID
conversation_history: Dict[int, Conversation] = collections.defaultdict(
lambda: Conversation(prompt=DEFAULT_SYSTEM_PROMPT, name=DEFAULT_NAME),
)
_webhooks = {}
async def webhook(channel):
if channel.id not in _webhooks:
_webhooks[channel.id] = await channel.create_webhook(name=f'aoi-{channel.id}')
return _webhooks[channel.id]
# --- Bot Events ---
@bot.event
async def on_ready():
@@ -44,82 +156,77 @@ async def on_ready():
async def on_message(message):
if message.author == bot.user:
return
if not bot.user.mentioned_in(message):
return
if bot.user.mentioned_in(message):
channel_id = message.channel.id
user_message_text = message.content.replace(f'<@!{bot.user.id}>', 'Aoi').strip()
bot_tag = f'<@{bot.user.id}>'
channel = message.channel
conversation = conversation_history[channel.id]
user_message = message.content
if user_message.startswith(bot_tag):
user_message = user_message[len(bot_tag):]
user_message = user_message.replace(bot_tag, conversation.bot_name).strip()
print(f'> {message.author.name}: {user_message}')
if channel_id not in conversation_history:
conversation_history[channel_id] = [
{"role": "system", "content": DEFAULT_SYSTEM_PROMPT}
]
media = []
if message.attachments:
for attachment in message.attachments:
media.append((attachment.content_type, attachment.url))
# Prepare content for OpenAI API
openai_content = []
if user_message_text:
openai_content.append({"type": "text", "text": user_message_text})
try:
async with channel.typing():
response = await conversation.generate(user_message, media)
conversation.last_messages = await discord_send(
channel, response, conversation.bot_name,
)
except Exception as e:
print(f"An error occurred: {e}")
await message.reply("Sorry, I had a little hiccup. Baka!")
if message.attachments:
async with aiohttp.ClientSession() as session:
for attachment in message.attachments:
if attachment.content_type and "image" in attachment.content_type:
try:
async with session.get(attachment.url) as resp:
if resp.status == 200:
image_data = await resp.read()
base64_image = base64.b64encode(image_data).decode('utf-8')
image_url = f"data:{attachment.content_type};base64,{base64_image}"
openai_content.append({
"type": "image_url",
"image_url": {"url": image_url}
})
except Exception as e:
print(f"Error downloading or processing attachment: {e}")
@bot.event
async def on_reaction_add(reaction, user):
if reaction.emoji != "🔁":
return
message = reaction.message
channel = message.channel
conversation = conversation_history[channel.id]
if message not in conversation.last_messages:
return
print(f"_ {user}: {reaction}")
if not openai_content: # Don't send empty messages
return
# Add to conversation history
if len(openai_content) == 1 and openai_content[0]['type'] == 'text':
# Keep original format for text-only messages for compatibility
conversation_history[channel_id].append({"role": "user", "content": openai_content[0]['text']})
else:
conversation_history[channel_id].append({"role": "user", "content": openai_content})
try:
async with message.channel.typing():
response = await client.chat.completions.create(
model="gpt-4", # Or any other model you are using
messages=conversation_history[channel_id]
)
bot_response = response.choices[0].message.content
conversation_history[channel_id].append({"role": "assistant", "content": bot_response})
await message.reply(bot_response)
except Exception as e:
print(f"An error occurred: {e}")
conversation_history[channel_id].pop() # Remove user message on error
await message.reply("Sorry, I had a little hiccup. Baka!")
try:
async with channel.typing():
for message in conversation.last_messages:
await message.delete()
response = await conversation.regenerate()
conversation.last_messages = await discord_send(
channel, response, conversation.bot_name,
)
except Exception as e:
print(f"An error occurred: {e}")
await message.reply("Sorry, I had a little hiccup. Baka!")
# --- Slash Commands ---
@bot.tree.command(name="newchat", description="Start a new chat with a new system prompt.")
@bot.tree.command(
name="newchat",
description="Start a new chat with an optional system prompt."
)
async def newchat(interaction: discord.Interaction, prompt: str = None):
await interaction.response.defer()
channel_id = interaction.channel_id
system_prompt = prompt
if system_prompt is None:
system_prompt = DEFAULT_SYSTEM_PROMPT
conversation_history[channel_id] = [
{"role": "system", "content": system_prompt}
]
if prompt is None:
await interaction.response.send_message("Starting a new chat with the default prompt.")
else:
await interaction.response.send_message(f'Starting a new chat with the prompt: "{prompt}"')
system_prompt = prompt or DEFAULT_SYSTEM_PROMPT
name_response = await client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": NAME_PROMPT}
],
)
name = name_response.choices[0].message.content.split('\n')[0]
print(f'$ name={name}')
conversation_history[channel_id] = Conversation(prompt=prompt, name=name)
await interaction.followup.send(f'Starting a new chat with: "{prompt}"')
# --- Running the Bot ---

View File

@@ -1,133 +0,0 @@
import unittest
from unittest.mock import MagicMock, patch, AsyncMock
import sys
import os
import base64
# Add the parent directory to the Python path to import the bot
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# Patch sys.argv before importing the bot to prevent argparse errors
with patch.object(sys, 'argv', ['bot.py', '--base_url', 'http://fake.url', '--discord_token', 'fake_token']):
with patch('discord.ext.commands.Bot') as BotMock:
bot_instance = BotMock()
with patch('bot.bot', bot_instance):
import bot
class TestAoiBot(unittest.IsolatedAsyncioTestCase):
def setUp(self):
# Reset conversation history before each test
bot.conversation_history = {}
bot.bot.user = MagicMock()
bot.bot.user.id = 12345
bot.bot.user.mentioned_in = MagicMock(return_value=True)
bot.on_message = AsyncMock()
bot.newchat.callback = AsyncMock()
@patch('bot.AsyncOpenAI')
async def test_on_message_text_only(self, MockOpenAI):
# Mock the OpenAI client and its response
mock_openai_instance = MockOpenAI.return_value
mock_response = MagicMock()
mock_response.choices[0].message.content = "Hello from Aoi!"
mock_openai_instance.chat.completions.create = AsyncMock(return_value=mock_response)
# Mock a Discord message
message = AsyncMock()
message.author = MagicMock()
message.author.bot = False
message.channel = AsyncMock()
message.channel.id = 123
message.content = f"<@!{bot.bot.user.id}> Hello there"
message.attachments = []
# Call the on_message event handler
await bot.on_message(message)
# Assertions
bot.on_message.assert_awaited_once_with(message)
@patch('bot.AsyncOpenAI')
@patch('bot.aiohttp.ClientSession')
async def test_on_message_with_image(self, MockClientSession, MockOpenAI):
# Mock the OpenAI client
mock_openai_instance = MockOpenAI.return_value
mock_response = MagicMock()
mock_response.choices[0].message.content = "I see an image!"
mock_openai_instance.chat.completions.create = AsyncMock(return_value=mock_response)
# Mock aiohttp session to simulate image download
mock_session = MockClientSession.return_value.__aenter__.return_value
mock_resp = mock_session.get.return_value.__aenter__.return_value
mock_resp.status = 200
mock_resp.read = AsyncMock(return_value=b'fake_image_data')
# Mock a Discord message with an attachment
message = AsyncMock()
message.author = MagicMock()
message.author.bot = False
message.channel = AsyncMock()
message.channel.id = 456
message.content = f"<@!{bot.bot.user.id}> Look at this!"
attachment = MagicMock()
attachment.content_type = 'image/jpeg'
attachment.url = 'http://fakeurl.com/image.jpg'
message.attachments = [attachment]
# Call the on_message event handler
await bot.on_message(message)
# Assertions
bot.on_message.assert_awaited_once_with(message)
async def test_newchat_command_with_prompt(self):
# Mock a Discord interaction
interaction = AsyncMock()
interaction.channel_id = 789
prompt = "You are a helpful assistant."
# Call the newchat command
await bot.newchat.callback(interaction, prompt=prompt)
# Assertions
bot.newchat.callback.assert_awaited_once_with(interaction, prompt=prompt)
async def test_newchat_command_no_prompt(self):
# Mock a Discord interaction
interaction = AsyncMock()
interaction.channel_id = 789
# Call the newchat command
await bot.newchat.callback(interaction, prompt=None)
# Assertions
bot.newchat.callback.assert_awaited_once_with(interaction, prompt=None)
@patch('bot.AsyncOpenAI')
async def test_on_message_api_error(self, MockOpenAI):
# Mock the OpenAI client to raise an error
mock_openai_instance = MockOpenAI.return_value
mock_openai_instance.chat.completions.create.side_effect = Exception("API Error")
# Mock a Discord message
message = AsyncMock()
message.author = MagicMock()
message.author.bot = False
message.channel = AsyncMock()
message.channel.id = 123
message.content = f"<@!{bot.bot.user.id}> This will fail"
message.attachments = []
# Call the on_message event handler
await bot.on_message(message)
# Assertions
bot.on_message.assert_awaited_once_with(message)
if __name__ == '__main__':
unittest.main()