序論
現代のオンラインコミュニティにおいて、Discord は最も重要なコミュニケーションプラットフォームの一つとして確立されています。しかし、アクティブなサーバーでは1日に数千から数万のメッセージが投稿され、重要な情報や議論の流れを追跡することが困難になっています。この課題に対する解決策として、AI技術を活用した自動要約システムの構築が注目されています。
本記事では、現役AIスタートアップCTOとしての実践経験に基づき、Discord Bot を使用したAI要約システムの完全な実装方法を解説します。単なる機能実装に留まらず、自然言語処理の理論的背景、実装時の技術的課題、本番運用における最適化手法まで、包括的に説明します。
技術的背景と必要性
Discord の会話要約における技術的課題は、主に以下の3つの側面に分類されます。
1. リアルタイム性の要求 Discordのようなリアルタイムコミュニケーションでは、会話の文脈が時系列で変化するため、従来のバッチ処理型要約では不十分です。会話の流れを理解し、適切なタイミングで要約を生成する必要があります。
2. マルチモーダルコンテンツの処理 現代のDiscord会話では、テキスト、画像、リンク、絵文字、メンションなど多様な要素が混在しています。これらの要素を適切に処理し、要約に反映させる技術が求められます。
3. コンテキスト保持と文脈理解 長時間にわたる会話において、話題の変遷や参加者の意図を正確に把握し、意味のある要約を生成することは、従来の単純な抽出型要約では困難です。
Discord Bot アーキテクチャの設計原理
システム全体の構成
Discord Bot AI要約システムは、以下の5つの主要コンポーネントから構成されます。
コンポーネント | 役割 | 使用技術 |
---|---|---|
Message Collector | Discord APIからのメッセージ収集 | discord.py, WebSocket |
Context Manager | 会話文脈の管理・保持 | Redis, MongoDB |
AI Processor | 自然言語処理・要約生成 | Transformer Models, LangChain |
Summary Formatter | 要約結果の整形・表示 | Discord Embed, Markdown |
Configuration Manager | 設定管理・ユーザー権限制御 | SQLite, JSON Schema |
メッセージ収集システムの実装
Discord APIを使用したメッセージ収集の実装では、WebSocketベースのリアルタイム通信と、REST APIを使用した履歴取得の両方を適切に組み合わせる必要があります。
import discord
from discord.ext import commands
import asyncio
from typing import List, Dict, Optional
import logging
from datetime import datetime, timedelta
class MessageCollector:
def __init__(self, bot: commands.Bot):
self.bot = bot
self.message_buffer: Dict[int, List[discord.Message]] = {}
self.collection_config = {
'max_buffer_size': 1000,
'collection_window': 300, # 5分間
'min_messages_for_summary': 10
}
async def collect_channel_messages(self, channel: discord.TextChannel,
limit: int = 100,
time_window: Optional[timedelta] = None) -> List[discord.Message]:
"""
指定されたチャンネルから条件に基づいてメッセージを収集
Args:
channel: 対象チャンネル
limit: 取得メッセージ数の上限
time_window: 取得対象の時間範囲
Returns:
収集されたメッセージのリスト
"""
messages = []
cutoff_time = datetime.utcnow() - time_window if time_window else None
try:
async for message in channel.history(limit=limit):
# 時間範囲チェック
if cutoff_time and message.created_at < cutoff_time:
break
# フィルタリング条件
if self._should_include_message(message):
messages.append(message)
return messages[::-1] # 時系列順に並び替え
except discord.errors.Forbidden:
logging.error(f"チャンネル {channel.name} へのアクセス権限がありません")
return []
def _should_include_message(self, message: discord.Message) -> bool:
"""
メッセージが要約対象に含まれるべきかを判定
Args:
message: 判定対象のメッセージ
Returns:
要約対象に含むかどうかのブール値
"""
# Bot自身のメッセージは除外
if message.author.bot:
return False
# システムメッセージは除外
if message.type != discord.MessageType.default:
return False
# 空のメッセージや短すぎるメッセージは除外
if len(message.content.strip()) < 3:
return False
# 特定のプレフィックスで始まるメッセージは除外(コマンドなど)
excluded_prefixes = ['!', '/', '.', '>']
if any(message.content.startswith(prefix) for prefix in excluded_prefixes):
return False
return True
async def get_conversation_context(self, channel: discord.TextChannel,
current_message: discord.Message,
context_length: int = 50) -> List[discord.Message]:
"""
現在のメッセージを中心とした会話コンテキストを取得
Args:
channel: 対象チャンネル
current_message: 基準となるメッセージ
context_length: 取得するコンテキストの長さ
Returns:
コンテキストメッセージのリスト
"""
context_messages = []
# 現在のメッセージより前のメッセージを取得
async for message in channel.history(limit=context_length,
before=current_message):
if self._should_include_message(message):
context_messages.append(message)
context_messages.reverse() # 時系列順に並び替え
return context_messages
自然言語処理エンジンの実装
AI要約システムの核心部分である自然言語処理エンジンでは、複数のアプローチを組み合わせて最適な要約を生成します。
import openai
from transformers import pipeline, AutoTokenizer, AutoModel
import torch
from typing import List, Dict, Any, Tuple
import re
from dataclasses import dataclass
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
@dataclass
class SummaryConfig:
max_input_length: int = 4000
summary_length_ratio: float = 0.3
min_summary_sentences: int = 2
max_summary_sentences: int = 10
use_extractive: bool = True
use_abstractive: bool = True
topic_clustering: bool = True
class NLPProcessor:
def __init__(self, config: SummaryConfig):
self.config = config
self.tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
# 抽出型要約用のパイプライン初期化
self.extractive_summarizer = pipeline(
"summarization",
model="facebook/bart-large-cnn",
device=0 if torch.cuda.is_available() else -1
)
# OpenAI API設定(抽象型要約用)
self.openai_client = openai.OpenAI()
# TF-IDF vectorizer for topic clustering
self.tfidf_vectorizer = TfidfVectorizer(
max_features=1000,
stop_words='english',
ngram_range=(1, 2)
)
async def process_messages(self, messages: List[discord.Message]) -> Dict[str, Any]:
"""
メッセージリストを処理して包括的な要約を生成
Args:
messages: 処理対象のメッセージリスト
Returns:
処理結果を含む辞書
"""
if not messages:
return {"error": "処理対象のメッセージがありません"}
# メッセージの前処理
processed_text = self._preprocess_messages(messages)
# トピッククラスタリング
topics = await self._extract_topics(processed_text) if self.config.topic_clustering else None
# 抽出型要約
extractive_summary = await self._generate_extractive_summary(processed_text)
# 抽象型要約
abstractive_summary = await self._generate_abstractive_summary(processed_text, topics)
# 統計情報の計算
stats = self._calculate_conversation_stats(messages)
return {
"extractive_summary": extractive_summary,
"abstractive_summary": abstractive_summary,
"topics": topics,
"statistics": stats,
"message_count": len(messages),
"time_span": self._calculate_time_span(messages)
}
def _preprocess_messages(self, messages: List[discord.Message]) -> str:
"""
Discord メッセージを自然言語処理用に前処理
Args:
messages: 前処理対象のメッセージリスト
Returns:
前処理済みのテキスト
"""
processed_lines = []
for message in messages:
content = message.content
# Discord特有の要素の処理
content = self._clean_discord_markup(content)
# ユーザー名の追加(文脈理解向上のため)
author_name = message.author.display_name
processed_line = f"{author_name}: {content}"
processed_lines.append(processed_line)
return "\n".join(processed_lines)
def _clean_discord_markup(self, text: str) -> str:
"""
Discord特有のマークアップをクリーンアップ
Args:
text: クリーンアップ対象のテキスト
Returns:
クリーンアップされたテキスト
"""
# メンション除去 (<@123456789> -> @username)
text = re.sub(r'<@!?(\d+)>', r'@user', text)
# チャンネルメンション除去 (<#123456789> -> #channel)
text = re.sub(r'<#(\d+)>', r'#channel', text)
# カスタム絵文字除去 (<:emoji_name:123456789> -> :emoji_name:)
text = re.sub(r'<:([^:]+):\d+>', r':\1:', text)
# URLの簡略化
text = re.sub(r'https?://[^\s]+', '[URL]', text)
# 連続する空白の除去
text = re.sub(r'\s+', ' ', text)
return text.strip()
async def _extract_topics(self, text: str) -> List[Dict[str, Any]]:
"""
TF-IDFとクラスタリングを使用してトピックを抽出
Args:
text: 分析対象のテキスト
Returns:
抽出されたトピックのリスト
"""
# テキストを文単位に分割
sentences = text.split('\n')
sentences = [s.strip() for s in sentences if len(s.strip()) > 10]
if len(sentences) < 3:
return []
# TF-IDF特徴量の計算
tfidf_matrix = self.tfidf_vectorizer.fit_transform(sentences)
# クラスタ数の決定(最大5個)
n_clusters = min(5, max(2, len(sentences) // 10))
# K-meansクラスタリング
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
cluster_labels = kmeans.fit_predict(tfidf_matrix)
# 各クラスタの代表的なキーワードを抽出
topics = []
feature_names = self.tfidf_vectorizer.get_feature_names_out()
for i in range(n_clusters):
# クラスタ中心に近い特徴量を取得
cluster_center = kmeans.cluster_centers_[i]
top_indices = cluster_center.argsort()[-10:][::-1]
top_keywords = [feature_names[idx] for idx in top_indices]
# クラスタに属する文を取得
cluster_sentences = [sentences[j] for j in range(len(sentences))
if cluster_labels[j] == i]
topics.append({
"topic_id": i,
"keywords": top_keywords[:5],
"representative_messages": cluster_sentences[:3],
"message_count": len(cluster_sentences)
})
return topics
async def _generate_extractive_summary(self, text: str) -> str:
"""
抽出型要約の生成
Args:
text: 要約対象のテキスト
Returns:
生成された抽出型要約
"""
try:
# テキスト長の調整
if len(text) > self.config.max_input_length:
text = text[:self.config.max_input_length]
# BART モデルを使用した要約生成
summary_result = self.extractive_summarizer(
text,
max_length=min(150, len(text.split()) // 3),
min_length=30,
do_sample=False
)
return summary_result[0]['summary_text']
except Exception as e:
logging.error(f"抽出型要約生成エラー: {e}")
return "要約生成中にエラーが発生しました。"
async def _generate_abstractive_summary(self, text: str,
topics: List[Dict[str, Any]] = None) -> str:
"""
抽象型要約の生成(OpenAI GPT使用)
Args:
text: 要約対象のテキスト
topics: 抽出されたトピック情報
Returns:
生成された抽象型要約
"""
try:
# プロンプトの構築
system_prompt = """あなたはDiscordの会話を要約する専門AIです。
以下の会話内容を、重要なポイントを含めて簡潔に要約してください。
要約の条件:
1. 主要な議題や決定事項を明確に示す
2. 参加者の主要な意見や提案を含める
3. 3-5文程度で簡潔にまとめる
4. 日本語で出力する"""
user_prompt = f"会話内容:\n{text}"
# トピック情報がある場合は追加
if topics:
topic_info = "\n".join([
f"トピック{t['topic_id']}: {', '.join(t['keywords'][:3])}"
for t in topics
])
user_prompt += f"\n\n主要トピック:\n{topic_info}"
response = await self.openai_client.chat.completions.acreate(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
max_tokens=300,
temperature=0.3
)
return response.choices[0].message.content.strip()
except Exception as e:
logging.error(f"抽象型要約生成エラー: {e}")
return "高度な要約生成中にエラーが発生しました。"
def _calculate_conversation_stats(self, messages: List[discord.Message]) -> Dict[str, Any]:
"""
会話の統計情報を計算
Args:
messages: 分析対象のメッセージリスト
Returns:
統計情報の辞書
"""
if not messages:
return {}
# 参加者分析
participants = {}
total_chars = 0
for message in messages:
author = message.author.display_name
if author not in participants:
participants[author] = {
"message_count": 0,
"total_chars": 0,
"avg_message_length": 0
}
participants[author]["message_count"] += 1
participants[author]["total_chars"] += len(message.content)
total_chars += len(message.content)
# 平均メッセージ長の計算
for participant in participants.values():
if participant["message_count"] > 0:
participant["avg_message_length"] = (
participant["total_chars"] / participant["message_count"]
)
# 最も活発な参加者
most_active = max(participants.items(),
key=lambda x: x[1]["message_count"])
return {
"total_participants": len(participants),
"total_characters": total_chars,
"average_message_length": total_chars / len(messages),
"most_active_participant": {
"name": most_active[0],
"message_count": most_active[1]["message_count"]
},
"participants_detail": participants
}
def _calculate_time_span(self, messages: List[discord.Message]) -> Dict[str, str]:
"""
会話の時間範囲を計算
Args:
messages: 時間範囲計算対象のメッセージリスト
Returns:
時間範囲情報の辞書
"""
if not messages:
return {}
start_time = min(msg.created_at for msg in messages)
end_time = max(msg.created_at for msg in messages)
duration = end_time - start_time
return {
"start_time": start_time.strftime("%Y-%m-%d %H:%M:%S UTC"),
"end_time": end_time.strftime("%Y-%m-%d %H:%M:%S UTC"),
"duration_minutes": int(duration.total_seconds() / 60),
"duration_formatted": str(duration).split('.')[0] # 秒以下切り捨て
}
Bot コマンドシステムの実装
Discord Bot のユーザーインターフェースとなるコマンドシステムでは、直感的な操作性と高度な機能性を両立させる必要があります。
from discord.ext import commands
import discord
from typing import Optional, Union
import asyncio
from datetime import timedelta
import json
class SummaryBot(commands.Bot):
def __init__(self):
intents = discord.Intents.default()
intents.message_content = True
intents.guild_messages = True
super().__init__(
command_prefix='!',
intents=intents,
description="Discord会話AI要約Bot"
)
self.message_collector = MessageCollector(self)
self.nlp_processor = NLPProcessor(SummaryConfig())
async def on_ready(self):
print(f'{self.user} としてログインしました')
print(f'Bot ID: {self.user.id}')
# スラッシュコマンドの同期
try:
synced = await self.tree.sync()
print(f'{len(synced)} 個のスラッシュコマンドを同期しました')
except Exception as e:
print(f'スラッシュコマンドの同期に失敗しました: {e}')
@commands.hybrid_command(name="summarize", description="チャンネルの会話を要約します")
async def summarize_command(self, ctx: commands.Context,
messages: Optional[int] = 50,
hours: Optional[int] = None):
"""
チャンネルの会話を要約するメインコマンド
Args:
ctx: コマンドコンテキスト
messages: 取得するメッセージ数(デフォルト: 50)
hours: 取得対象の時間範囲(時間単位)
"""
await ctx.defer() # 処理時間を確保
try:
# 権限チェック
if not await self._check_permissions(ctx):
await ctx.followup.send("❌ このコマンドを実行する権限がありません。")
return
# メッセージ収集
time_window = timedelta(hours=hours) if hours else None
collected_messages = await self.message_collector.collect_channel_messages(
ctx.channel,
limit=messages,
time_window=time_window
)
if len(collected_messages) < 5:
await ctx.followup.send("❌ 要約するのに十分なメッセージがありません(最低5件必要)。")
return
# 処理中メッセージの送信
processing_embed = discord.Embed(
title="🤖 AI要約処理中...",
description=f"{len(collected_messages)}件のメッセージを分析しています",
color=discord.Color.blue()
)
processing_msg = await ctx.followup.send(embed=processing_embed)
# AI処理の実行
result = await self.nlp_processor.process_messages(collected_messages)
# 結果の整形と送信
await self._send_summary_result(ctx, result, processing_msg)
except Exception as e:
error_embed = discord.Embed(
title="❌ エラーが発生しました",
description=f"要約処理中にエラーが発生しました: {str(e)}",
color=discord.Color.red()
)
await ctx.followup.send(embed=error_embed)
logging.error(f"要約コマンドでエラー: {e}")
@commands.hybrid_command(name="quick_summary", description="直近の会話を簡単に要約します")
async def quick_summary_command(self, ctx: commands.Context):
"""
直近20件のメッセージを簡単に要約
Args:
ctx: コマンドコンテキスト
"""
await ctx.defer()
try:
# 直近20件のメッセージを取得
messages = await self.message_collector.collect_channel_messages(
ctx.channel,
limit=20
)
if len(messages) < 3:
await ctx.followup.send("❌ 要約するメッセージが不足しています。")
return
# 抽出型要約のみ実行(高速化のため)
processed_text = self.nlp_processor._preprocess_messages(messages)
summary = await self.nlp_processor._generate_extractive_summary(processed_text)
# 簡易形式で結果を送信
embed = discord.Embed(
title="📝 クイック要約",
description=summary,
color=discord.Color.green()
)
embed.add_field(
name="📊 統計",
value=f"メッセージ数: {len(messages)}件",
inline=True
)
await ctx.followup.send(embed=embed)
except Exception as e:
await ctx.followup.send(f"❌ クイック要約でエラーが発生しました: {str(e)}")
@commands.hybrid_command(name="topic_analysis", description="会話のトピック分析を実行します")
async def topic_analysis_command(self, ctx: commands.Context,
messages: Optional[int] = 100):
"""
会話のトピック分析を実行
Args:
ctx: コマンドコンテキスト
messages: 分析対象のメッセージ数
"""
await ctx.defer()
try:
# メッセージ収集
collected_messages = await self.message_collector.collect_channel_messages(
ctx.channel,
limit=messages
)
if len(collected_messages) < 10:
await ctx.followup.send("❌ トピック分析には最低10件のメッセージが必要です。")
return
# トピック分析の実行
processed_text = self.nlp_processor._preprocess_messages(collected_messages)
topics = await self.nlp_processor._extract_topics(processed_text)
if not topics:
await ctx.followup.send("❌ 明確なトピックを抽出できませんでした。")
return
# 結果の整形
embed = discord.Embed(
title="🎯 トピック分析結果",
description=f"{len(collected_messages)}件のメッセージから{len(topics)}個のトピックを抽出",
color=discord.Color.purple()
)
for i, topic in enumerate(topics[:5]): # 最大5個まで表示
keywords_str = ", ".join(topic["keywords"][:3])
embed.add_field(
name=f"トピック {i+1}",
value=f"**キーワード:** {keywords_str}\n**関連メッセージ:** {topic['message_count']}件",
inline=False
)
await ctx.followup.send(embed=embed)
except Exception as e:
await ctx.followup.send(f"❌ トピック分析でエラーが発生しました: {str(e)}")
async def _check_permissions(self, ctx: commands.Context) -> bool:
"""
コマンド実行権限をチェック
Args:
ctx: コマンドコンテキスト
Returns:
権限があるかどうかのブール値
"""
# 管理者権限または特定の役職をチェック
if ctx.author.guild_permissions.administrator:
return True
# 特定の役職名をチェック(設定可能)
allowed_roles = ["Moderator", "Summary Bot User"]
user_roles = [role.name for role in ctx.author.roles]
return any(role in allowed_roles for role in user_roles)
async def _send_summary_result(self, ctx: commands.Context,
result: Dict[str, Any],
processing_msg: discord.Message):
"""
要約結果を整形して送信
Args:
ctx: コマンドコンテキスト
result: AI処理結果
processing_msg: 処理中メッセージ(更新用)
"""
try:
# メイン要約の埋め込み
main_embed = discord.Embed(
title="📋 AI会話要約",
color=discord.Color.gold()
)
# 抽象型要約(メイン)
if result.get("abstractive_summary"):
main_embed.add_field(
name="🎯 要約",
value=result["abstractive_summary"],
inline=False
)
# 統計情報
stats = result.get("statistics", {})
if stats:
stats_text = f"""
**参加者数:** {stats.get('total_participants', 'N/A')}人
**メッセージ数:** {result.get('message_count', 'N/A')}件
**最も活発:** {stats.get('most_active_participant', {}).get('name', 'N/A')}
**期間:** {result.get('time_span', {}).get('duration_formatted', 'N/A')}
"""
main_embed.add_field(
name="📊 統計情報",
value=stats_text.strip(),
inline=True
)
# トピック情報
topics = result.get("topics", [])
if topics:
topic_text = ""
for i, topic in enumerate(topics[:3]): # 最大3個まで
keywords = ", ".join(topic["keywords"][:2])
topic_text += f"**{i+1}.** {keywords} ({topic['message_count']}件)\n"
main_embed.add_field(
name="🎯 主要トピック",
value=topic_text.strip(),
inline=True
)
# タイムスタンプ
main_embed.timestamp = discord.utils.utcnow()
main_embed.set_footer(text="AI要約システム | Powered by GPT & BART")
# 処理中メッセージを結果で更新
await processing_msg.edit(embed=main_embed)
# 詳細情報があれば追加のメッセージとして送信
if result.get("extractive_summary") and len(result["extractive_summary"]) > 100:
detail_embed = discord.Embed(
title="📖 詳細要約",
description=result["extractive_summary"],
color=discord.Color.blue()
)
await ctx.followup.send(embed=detail_embed)
except Exception as e:
error_embed = discord.Embed(
title="❌ 結果表示エラー",
description=f"要約結果の表示中にエラーが発生しました: {str(e)}",
color=discord.Color.red()
)
await processing_msg.edit(embed=error_embed)
# Bot インスタンスの作成と実行
def main():
bot = SummaryBot()
# 環境変数からトークンを取得
import os
token = os.getenv('DISCORD_BOT_TOKEN')
if not token:
print("❌ DISCORD_BOT_TOKEN 環境変数が設定されていません")
return
try:
bot.run(token)
except Exception as e:
print(f"❌ Bot の起動に失敗しました: {e}")
if __name__ == "__main__":
main()
高度な機能実装
リアルタイム要約システム
継続的に更新される会話に対して、リアルタイムで要約を更新するシステムの実装は、特に技術的な挑戦となります。
import asyncio
from collections import deque
from datetime import datetime, timedelta
import threading
from typing import Dict, Set
class RealtimeSummaryManager:
def __init__(self, bot: SummaryBot):
self.bot = bot
self.active_channels: Dict[int, 'ChannelSummarySession'] = {}
self.update_queue = asyncio.Queue()
self.is_running = False
async def start_realtime_summary(self, channel: discord.TextChannel,
update_interval: int = 300) -> bool:
"""
チャンネルのリアルタイム要約を開始
Args:
channel: 対象チャンネル
update_interval: 更新間隔(秒)
Returns:
開始成功の可否
"""
if channel.id in self.active_channels:
return False # 既に開始済み
session = ChannelSummarySession(channel, update_interval, self.bot)
self.active_channels[channel.id] = session
# バックグラウンドタスクの開始
asyncio.create_task(self._run_summary_session(session))
return True
async def stop_realtime_summary(self, channel_id: int) -> bool:
"""
リアルタイム要約を停止
Args:
channel_id: 停止対象のチャンネルID
Returns:
停止成功の可否
"""
if channel_id not in self.active_channels:
return False
session = self.active_channels[channel_id]
session.stop()
del self.active_channels[channel_id]
return True
async def _run_summary_session(self, session: 'ChannelSummarySession'):
"""
要約セッションのメインループ
Args:
session: 実行対象のセッション
"""
while session.is_active():
try:
await session.update_summary()
await asyncio.sleep(session.update_interval)
except Exception as e:
logging.error(f"リアルタイム要約セッションエラー: {e}")
break
# セッション終了処理
if session.channel.id in self.active_channels:
del self.active_channels[session.channel.id]
class ChannelSummarySession:
def __init__(self, channel: discord.TextChannel,
update_interval: int, bot: SummaryBot):
self.channel = channel
self.update_interval = update_interval
self.bot = bot
self.message_buffer = deque(maxlen=200) # 最大200件のメッセージを保持
self.last_summary_time = datetime.utcnow()
self.current_summary = ""
self.summary_message: Optional[discord.Message] = None
self._active = True
def is_active(self) -> bool:
return self._active
def stop(self):
self._active = False
async def add_message(self, message: discord.Message):
"""
新しいメッセージをバッファに追加
Args:
message: 追加するメッセージ
"""
if self.bot.message_collector._should_include_message(message):
self.message_buffer.append(message)
async def update_summary(self):
"""
要約の更新を実行
"""
if len(self.message_buffer) < 5:
return # メッセージが少なすぎる場合はスキップ
try:
# 最近のメッセージから要約を生成
recent_messages = list(self.message_buffer)[-50:] # 最新50件
# 簡易要約の生成(処理時間短縮のため)
processed_text = self.bot.nlp_processor._preprocess_messages(recent_messages)
new_summary = await self.bot.nlp_processor._generate_extractive_summary(
processed_text
)
# 要約が変更された場合のみ更新
if new_summary != self.current_summary:
self.current_summary = new_summary
await self._update_summary_message()
except Exception as e:
logging.error(f"要約更新エラー: {e}")
async def _update_summary_message(self):
"""
要約メッセージの更新または作成
"""
embed = discord.Embed(
title="🔄 リアルタイム要約",
description=self.current_summary,
color=discord.Color.green()
)
embed.add_field(
name="📊 情報",
value=f"対象メッセージ: {len(self.message_buffer)}件\n更新時刻: {datetime.utcnow().strftime('%H:%M:%S')}",
inline=False
)
embed.set_footer(text="自動更新中 | 停止するには !stop_realtime を使用")
try:
if self.summary_message:
# 既存メッセージを更新
await self.summary_message.edit(embed=embed)
else:
# 新しいメッセージを作成
self.summary_message = await self.channel.send(embed=embed)
except discord.errors.NotFound:
# メッセージが削除されていた場合は新規作成
self.summary_message = await self.channel.send(embed=embed)
except Exception as e:
logging.error(f"要約メッセージ更新エラー: {e}")
多言語対応システム
グローバルなDiscordコミュニティに対応するため、多言語での要約生成機能を実装します。
from googletrans import Translator
import langdetect
from typing import List, Tuple
class MultilingualProcessor:
def __init__(self):
self.translator = Translator()
self.supported_languages = {
'ja': '日本語',
'en': 'English',
'ko': '한국어',
'zh': '中文',
'es': 'Español',
'fr': 'Français',
'de': 'Deutsch'
}
def detect_dominant_language(self, messages: List[discord.Message]) -> str:
"""
メッセージ群の主要言語を検出
Args:
messages: 言語検出対象のメッセージリスト
Returns:
検出された言語コード
"""
combined_text = " ".join([msg.content for msg in messages
if len(msg.content.strip()) > 0])
try:
detected_lang = langdetect.detect(combined_text)
return detected_lang if detected_lang in self.supported_languages else 'en'
except:
return 'en' # デフォルトは英語
async def generate_multilingual_summary(self, text: str,
target_languages: List[str]) -> Dict[str, str]:
"""
複数言語での要約生成
Args:
text: 要約対象テキスト
target_languages: 出力対象言語のリスト
Returns:
言語別要約の辞書
"""
# まず英語で要約を生成
english_summary = await self._generate_summary_in_english(text)
results = {'en': english_summary}
# 他の言語に翻訳
for lang in target_languages:
if lang != 'en' and lang in self.supported_languages:
try:
translated = self.translator.translate(
english_summary,
dest=lang
).text
results[lang] = translated
except Exception as e:
logging.error(f"翻訳エラー ({lang}): {e}")
results[lang] = f"翻訳エラーが発生しました: {english_summary}"
return results
async def _generate_summary_in_english(self, text: str) -> str:
"""
英語での要約生成(内部処理用)
Args:
text: 要約対象テキスト
Returns:
英語での要約
"""
# 入力テキストが日本語の場合は英語に翻訳してから要約
detected_lang = langdetect.detect(text)
if detected_lang != 'en':
try:
english_text = self.translator.translate(text, dest='en').text
except:
english_text = text # 翻訳失敗時は元のテキストを使用
else:
english_text = text
# 英語での要約生成(OpenAI API使用)
system_prompt = """You are an AI specialized in summarizing Discord conversations.
Please provide a concise summary of the following conversation content.
Summary requirements:
1. Clearly indicate main topics and decisions
2. Include key opinions and suggestions from participants
3. Keep it concise within 3-5 sentences
4. Output in English"""
try:
response = await openai.ChatCompletion.acreate(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Conversation content:\n{english_text}"}
],
max_tokens=300,
temperature=0.3
)
return response.choices[0].message.content.strip()
except Exception as e:
logging.error(f"英語要約生成エラー: {e}")
return "An error occurred while generating the summary."
パフォーマンス最適化と運用考慮事項
メモリ効率化
大量のメッセージを処理する際のメモリ使用量を最適化するための実装です。
import gc
import psutil
import weakref
from typing import Generator
class MemoryOptimizedProcessor:
def __init__(self, max_memory_mb: int = 500):
self.max_memory_mb = max_memory_mb
self.message_cache = weakref.WeakValueDictionary()
def monitor_memory_usage(self) -> Dict[str, float]:
"""
現在のメモリ使用量を監視
Returns:
メモリ使用量の詳細情報
"""
process = psutil.Process()
memory_info = process.memory_info()
return {
"rss_mb": memory_info.rss / 1024 / 1024, # 物理メモリ使用量
"vms_mb": memory_info.vms / 1024 / 1024, # 仮想メモリ使用量
"percent": process.memory_percent(), # メモリ使用率
"available_mb": psutil.virtual_memory().available / 1024 / 1024
}
async def process_messages_in_chunks(self, messages: List[discord.Message],
chunk_size: int = 50) -> List[str]:
"""
メモリ効率化のためのチャンク処理
Args:
messages: 処理対象メッセージ
chunk_size: チャンクサイズ
Returns:
チャンク別要約のリスト
"""
summaries = []
for i in range(0, len(messages), chunk_size):
chunk = messages[i:i + chunk_size]
# メモリ使用量チェック
memory_status = self.monitor_memory_usage()
if memory_status["rss_mb"] > self.max_memory_mb:
# ガベージコレクション実行
gc.collect()
logging.warning(f"メモリ使用量が上限に達しました: {memory_status['rss_mb']:.1f}MB")
# チャンクの処理
chunk_summary = await self._process_single_chunk(chunk)
summaries.append(chunk_summary)
# チャンク処理後のクリーンアップ
del chunk
return summaries
async def _process_single_chunk(self, messages: List[discord.Message]) -> str:
"""
単一チャンクの処理
Args:
messages: 処理対象のメッセージチャンク
Returns:
チャンクの要約
"""
# 軽量な前処理
text_content = []
for msg in messages:
clean_content = self._lightweight_clean(msg.content)
if len(clean_content) > 5:
text_content.append(f"{msg.author.display_name}: {clean_content}")
combined_text = "\n".join(text_content)
# 簡易要約生成
return await self._generate_lightweight_summary(combined_text)
def _lightweight_clean(self, text: str) -> str:
"""
軽量なテキストクリーニング
Args:
text: クリーニング対象テキスト
Returns:
クリーニング済みテキスト
"""
# 基本的なクリーニングのみ実行
import re
# URLの除去
text = re.sub(r'https?://\S+', '[URL]', text)
# メンション除去
text = re.sub(r'<@!?\d+>', '@user', text)
# 連続空白の正規化
text = re.sub(r'\s+', ' ', text)
return text.strip()
async def _generate_lightweight_summary(self, text: str) -> str:
"""
軽量な要約生成
Args:
text: 要約対象テキスト
Returns:
生成された要約
"""
# 文長制限
if len(text) > 1000:
text = text[:1000] + "..."
# 簡単な抽出型要約
sentences = text.split('.')
important_sentences = []
# キーワードベースの重要文抽出
keywords = ['決定', '結論', '重要', '提案', '問題', '解決', 'important', 'decided', 'conclusion']
for sentence in sentences:
if any(keyword in sentence.lower() for keyword in keywords):
important_sentences.append(sentence.strip())
if important_sentences:
return '. '.join(important_sentences[:3]) + '.'
else:
# フォールバック: 最初の2文を返す
return '. '.join(sentences[:2]) + '.'
エラーハンドリングと復旧システム
本番環境での安定稼働を実現するための包括的なエラーハンドリングシステムです。
import traceback
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable, Any
import aiofiles
import json
from datetime import datetime
class ErrorSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class ErrorContext:
command: str
channel_id: int
user_id: int
message_count: int
timestamp: datetime
additional_info: Dict[str, Any]
class RobustErrorHandler:
def __init__(self, bot: SummaryBot):
self.bot = bot
self.error_log_path = "logs/error_log.json"
self.retry_attempts = {
ErrorSeverity.LOW: 1,
ErrorSeverity.MEDIUM: 2,
ErrorSeverity.HIGH: 3,
ErrorSeverity.CRITICAL: 5
}
async def handle_error(self, error: Exception,
context: ErrorContext,
severity: ErrorSeverity = ErrorSeverity.MEDIUM) -> bool:
"""
エラーの包括的な処理
Args:
error: 発生したエラー
context: エラー発生時のコンテキスト
severity: エラーの重要度
Returns:
復旧成功の可否
"""
# エラーログの記録
await self._log_error(error, context, severity)
# 重要度に応じた処理
if severity == ErrorSeverity.CRITICAL:
await self._handle_critical_error(error, context)
return False
# リトライ処理
max_retries = self.retry_attempts[severity]
for attempt in range(max_retries):
try:
await self._attempt_recovery(context, attempt)
logging.info(f"エラー復旧成功 (試行 {attempt + 1}/{max_retries})")
return True
except Exception as retry_error:
logging.warning(f"復旧試行 {attempt + 1} 失敗: {retry_error}")
await asyncio.sleep(2 ** attempt) # 指数バックオフ
# 全ての復旧試行が失敗
await self._handle_recovery_failure(error, context)
return False
async def _log_error(self, error: Exception,
context: ErrorContext,
severity: ErrorSeverity):
"""
エラーログの詳細記録
Args:
error: 記録対象のエラー
context: エラーコンテキスト
severity: エラー重要度
"""
error_entry = {
"timestamp": context.timestamp.isoformat(),
"severity": severity.value,
"error_type": type(error).__name__,
"error_message": str(error),
"traceback": traceback.format_exc(),
"context": {
"command": context.command,
"channel_id": context.channel_id,
"user_id": context.user_id,
"message_count": context.message_count,
"additional_info": context.additional_info
}
}
try:
# 非同期でファイルに記録
async with aiofiles.open(self.error_log_path, 'a') as f:
await f.write(json.dumps(error_entry, ensure_ascii=False) + '\n')
except Exception as log_error:
logging.error(f"エラーログ記録失敗: {log_error}")
async def _attempt_recovery(self, context: ErrorContext, attempt: int):
"""
エラー復旧の試行
Args:
context: 復旧対象のコンテキスト
attempt: 試行回数
"""
# コンテキストに基づいて適切な復旧処理を実行
if context.command == "summarize":
await self._recover_summarize_command(context, attempt)
elif context.command == "topic_analysis":
await self._recover_topic_analysis(context, attempt)
else:
# 汎用復旧処理
await self._generic_recovery(context, attempt)
async def _recover_summarize_command(self, context: ErrorContext, attempt: int):
"""
要約コマンドの復旧処理
Args:
context: 復旧コンテキスト
attempt: 試行回数
"""
channel = self.bot.get_channel(context.channel_id)
if not channel:
raise Exception("チャンネルが見つかりません")
# より少ないメッセージ数で再試行
reduced_count = max(10, context.message_count // (2 ** (attempt + 1)))
messages = await self.bot.message_collector.collect_channel_messages(
channel,
limit=reduced_count
)
if len(messages) < 5:
raise Exception("要約に十分なメッセージがありません")
# 簡易要約のみで復旧試行
processed_text = self.bot.nlp_processor._preprocess_messages(messages)
summary = await self.bot.nlp_processor._generate_extractive_summary(processed_text)
# 復旧成功の通知
embed = discord.Embed(
title="⚠️ 復旧要約",
description=f"一部機能制限で要約を生成しました:\n\n{summary}",
color=discord.Color.orange()
)
embed.add_field(
name="制限事項",
value=f"メッセージ数: {len(messages)}件(通常より少ない可能性があります)",
inline=False
)
await channel.send(embed=embed)
async def _handle_critical_error(self, error: Exception, context: ErrorContext):
"""
クリティカルエラーの処理
Args:
error: クリティカルエラー
context: エラーコンテキスト
"""
# 管理者への通知
admin_channel_id = os.getenv('ADMIN_CHANNEL_ID')
if admin_channel_id:
admin_channel = self.bot.get_channel(int(admin_channel_id))
if admin_channel:
embed = discord.Embed(
title="🚨 クリティカルエラー発生",
description=f"重大なエラーが発生しました: {str(error)}",
color=discord.Color.red()
)
embed.add_field(
name="詳細",
value=f"コマンド: {context.command}\nチャンネル: <#{context.channel_id}>",
inline=False
)
await admin_channel.send(embed=embed)
# 一時的な機能停止
self.bot.maintenance_mode = True
logging.critical(f"クリティカルエラーによりメンテナンスモードに移行: {error}")
限界とリスク
Discord Bot AI要約システムの実装と運用において、以下のような技術的限界とリスクが存在します。
技術的限界
1. 自然言語処理の精度限界 現在のトランスフォーマーベースのモデルでも、文脈理解や意図推定において完璧ではありません。特に、皮肉や隠喩、文化的なニュアンスを含む表現の理解には限界があります。また、専門用語や略語が多用される技術的な議論では、要約の精度が低下する可能性があります。
2. リアルタイム処理の制約 Discord APIのレート制限により、大量のメッセージが短時間で投稿される場合、全てのメッセージをリアルタイムで処理することは困難です。また、AI処理自体の計算時間により、真の意味でのリアルタイム要約は技術的に困難です。
3. メモリとコストの制約 大規模なサーバーでは、長時間の会話履歴を保持することが困難であり、OpenAI APIの使用量に応じてコストが増大します。また、複数のサーバーで同時運用する場合、サーバーあたりのリソース制約が問題となります。
セキュリティリスク
1. プライベート情報の露出リスク 要約生成過程で、個人情報や機密情報が意図せず要約に含まれる可能性があります。また、API経由でのデータ送信により、外部サービスにセンシティブな情報が送信されるリスクがあります。
2. 権限昇格攻撃 Bot の管理者権限を悪用した攻撃や、設定ファイルの改ざんによる不正な動作の可能性があります。
運用リスク
1. API依存性リスク OpenAI APIやDiscord APIの仕様変更、サービス停止により、Bot の機能が使用不能になる可能性があります。また、APIキーの漏洩により、予期しない課金や悪用のリスクがあります。
2. 不適切なユースケース ハラスメントや誹謗中傷を含む会話の要約、法的問題のある内容の拡散、著作権侵害コンテンツの要約生成などのリスクがあります。
実装時のベストプラクティス
設定管理とセキュリティ
import os
from cryptography.fernet import Fernet
import yaml
from typing import Dict, Any
class SecureConfigManager:
def __init__(self, config_path: str = "config.yaml"):
self.config_path = config_path
self.encryption_key = os.getenv('CONFIG_ENCRYPTION_KEY')
self.cipher_suite = Fernet(self.encryption_key) if self.encryption_key else None
def load_config(self) -> Dict[str, Any]:
"""
設定ファイルの安全な読み込み
Returns:
設定内容の辞書
"""
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# 機密情報の復号化
if self.cipher_suite and 'encrypted' in config:
for key, encrypted_value in config['encrypted'].items():
decrypted_value = self.cipher_suite.decrypt(
encrypted_value.encode()
).decode()
config[key] = decrypted_value
del config['encrypted']
return config
except Exception as e:
logging.error(f"設定ファイル読み込みエラー: {e}")
return self._get_default_config()
def _get_default_config(self) -> Dict[str, Any]:
"""
デフォルト設定の取得
Returns:
デフォルト設定の辞書
"""
return {
"bot": {
"command_prefix": "!",
"max_message_length": 2000,
"default_summary_length": 5
},
"ai": {
"model": "gpt-3.5-turbo",
"max_tokens": 300,
"temperature": 0.3
},
"security": {
"allowed_roles": ["Admin", "Moderator"],
"max_requests_per_hour": 100,
"enable_content_filter": True
}
}
# 使用例
config_manager = SecureConfigManager()
config = config_manager.load_config()
パフォーマンス監視
import time
import asyncio
from functools import wraps
from typing import Callable, Any
import logging
class PerformanceMonitor:
def __init__(self):
self.metrics = {
"command_executions": 0,
"total_processing_time": 0,
"average_processing_time": 0,
"error_count": 0,
"memory_peaks": []
}
def monitor_performance(self, func_name: str = None):
"""