序論
AI開発の現場において、「Sanity Check」(妥当性検証)は、システムの基本的な動作確認と品質保証を担う極めて重要な概念です。本記事では、元Google BrainのAIリサーチャーとして、現役AIスタートアップCTOの立場から、AI開発における包括的なSanity Checkの理論と実践について詳細に解説します。
Sanity Checkは、単なる動作確認を超えて、AIモデルの出力の合理性、学習プロセスの健全性、そしてデプロイメント前の最終確認まで、AI開発ライフサイクル全体において多層的に適用される品質保証手法です。これらの検証プロセスを適切に実装することで、本番環境でのクリティカルな障害を未然に防ぎ、AIシステムの信頼性を大幅に向上させることが可能となります。
なぜSanity Checkが重要なのか
AI開発において、従来のソフトウェア開発とは異なる複雑性が存在します。確率的な出力、非決定性、膨大なパラメータ空間、そして学習データに依存する振る舞いなど、これらの特性により、AIシステムの動作予測は極めて困難です。Sanity Checkは、このような不確実性の中で、システムが期待される範囲内で動作することを確認する重要な役割を果たします。
第1章:Sanity Checkの理論的基盤
1.1 定義と概念
Sanity Check(妥当性検証)は、システムやアルゴリズムが基本的な期待値や常識的な範囲内で動作することを確認する検証プロセスです。AI開発文脈において、これは以下の要素を包含します:
- 出力の合理性検証: モデルの予測結果が現実的な範囲内にあるかの確認
- 学習プロセスの健全性: 訓練中の損失関数の推移や勾配の正常性
- データの整合性: 入力データの品質と一貫性の検証
- パフォーマンスの妥当性: 期待される精度レンジでの動作確認
1.2 理論的フレームワーク
AI開発におけるSanity Checkは、統計学習理論における汎化誤差の概念と密接に関連しています。Vapnik-Chervonenkis理論に基づく汎化境界において、以下の不等式が成り立ちます:
R(f) ≤ R_emp(f) + √((d log(2m/d) + log(4/δ)) / 2m)
ここで、R(f)は真の誤差、R_emp(f)は経験誤差、dはVC次元、mはサンプル数、δは信頼度パラメータです。Sanity Checkは、この理論的枠組みの中で、実際の性能が理論的予測と整合するかを検証する役割を担います。
1.3 分類体系
AI開発におけるSanity Checkは、実装レベルに応じて以下のように分類できます:
分類 | 対象 | 検証内容 | 実装タイミング |
---|---|---|---|
データレベル | 入力データ | 統計的特性、欠損値、外れ値 | 前処理段階 |
モデルレベル | アーキテクチャ | 勾配フロー、パラメータ初期化 | 訓練開始前 |
学習レベル | 訓練プロセス | 損失収束、過学習検出 | 訓練中 |
出力レベル | 予測結果 | 分布、範囲、一貫性 | 推論時 |
システムレベル | 統合環境 | レイテンシ、スループット | デプロイ前 |
第2章:データレベルSanity Check
2.1 統計的特性の検証
データの統計的特性の検証は、AI開発における最初のSanity Checkポイントです。以下のコードは、包括的なデータ検証を実装する例です:
import pandas as pd
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple
class DataSanityChecker:
def __init__(self, data: pd.DataFrame):
self.data = data
self.report = {}
def check_basic_statistics(self) -> Dict:
"""基本統計量の検証"""
stats_report = {}
for column in self.data.select_dtypes(include=[np.number]).columns:
col_stats = {
'mean': self.data[column].mean(),
'std': self.data[column].std(),
'min': self.data[column].min(),
'max': self.data[column].max(),
'skewness': stats.skew(self.data[column].dropna()),
'kurtosis': stats.kurtosis(self.data[column].dropna())
}
# 異常値検出(IQR法)
Q1 = self.data[column].quantile(0.25)
Q3 = self.data[column].quantile(0.75)
IQR = Q3 - Q1
outliers = self.data[
(self.data[column] < Q1 - 1.5 * IQR) |
(self.data[column] > Q3 + 1.5 * IQR)
][column]
col_stats['outlier_count'] = len(outliers)
col_stats['outlier_percentage'] = len(outliers) / len(self.data) * 100
stats_report[column] = col_stats
return stats_report
def check_data_drift(self, reference_data: pd.DataFrame) -> Dict:
"""データドリフトの検証"""
drift_report = {}
for column in self.data.select_dtypes(include=[np.number]).columns:
if column in reference_data.columns:
# Kolmogorov-Smirnov検定
ks_statistic, p_value = stats.ks_2samp(
reference_data[column].dropna(),
self.data[column].dropna()
)
drift_report[column] = {
'ks_statistic': ks_statistic,
'p_value': p_value,
'significant_drift': p_value < 0.05
}
return drift_report
def validate_feature_correlations(self,
expected_correlations: Dict[Tuple[str, str], float],
tolerance: float = 0.1) -> Dict:
"""特徴量間相関の検証"""
correlation_matrix = self.data.corr()
validation_results = {}
for (feature1, feature2), expected_corr in expected_correlations.items():
if feature1 in correlation_matrix.index and feature2 in correlation_matrix.columns:
actual_corr = correlation_matrix.loc[feature1, feature2]
is_valid = abs(actual_corr - expected_corr) <= tolerance
validation_results[(feature1, feature2)] = {
'expected': expected_corr,
'actual': actual_corr,
'difference': abs(actual_corr - expected_corr),
'is_valid': is_valid
}
return validation_results
# 使用例
data = pd.read_csv('training_data.csv')
checker = DataSanityChecker(data)
# 基本統計量チェック
basic_stats = checker.check_basic_statistics()
print("基本統計量検証結果:")
for feature, stats in basic_stats.items():
print(f"{feature}: 平均={stats['mean']:.2f}, 標準偏差={stats['std']:.2f}, 外れ値={stats['outlier_count']}個")
実行結果例:
基本統計量検証結果:
age: 平均=35.42, 標準偏差=12.18, 外れ値=23個
income: 平均=65432.11, 標準偏差=25891.33, 外れ値=45個
score: 平均=0.73, 標準偏差=0.19, 外れ値=12個
2.2 データ品質メトリクス
データ品質の定量的評価には、以下のメトリクスが有効です:
メトリクス | 計算式 | 許容範囲 | 用途 |
---|---|---|---|
完全性 | (総レコード数 – 欠損レコード数) / 総レコード数 | > 0.95 | 欠損データの影響評価 |
一意性 | 一意レコード数 / 総レコード数 | > 0.98 | 重複データの検出 |
一貫性 | 制約違反レコード数 / 総レコード数 | < 0.02 | データ整合性の確認 |
適時性 | (現在時刻 – データ生成時刻) / 期待更新間隔 | < 1.2 | データの鮮度評価 |
2.3 分布適合性テスト
データの分布が期待される理論分布と適合するかを検証することは、モデルの前提条件を満たすために重要です:
from scipy.stats import normaltest, jarque_bera, anderson
import warnings
class DistributionTester:
def __init__(self, data: np.ndarray):
self.data = data
def test_normality(self) -> Dict:
"""正規性の検定"""
results = {}
# Shapiro-Wilk検定(サンプル数が少ない場合)
if len(self.data) <= 5000:
from scipy.stats import shapiro
statistic, p_value = shapiro(self.data)
results['shapiro_wilk'] = {
'statistic': statistic,
'p_value': p_value,
'is_normal': p_value > 0.05
}
# D'Agostino-Pearson検定
statistic, p_value = normaltest(self.data)
results['dagostino_pearson'] = {
'statistic': statistic,
'p_value': p_value,
'is_normal': p_value > 0.05
}
# Jarque-Bera検定
statistic, p_value = jarque_bera(self.data)
results['jarque_bera'] = {
'statistic': statistic,
'p_value': p_value,
'is_normal': p_value > 0.05
}
# Anderson-Darling検定
result = anderson(self.data, dist='norm')
results['anderson_darling'] = {
'statistic': result.statistic,
'critical_values': result.critical_values,
'significance_levels': result.significance_levels,
'is_normal': result.statistic < result.critical_values[2] # 5%水準
}
return results
def estimate_distribution_parameters(self) -> Dict:
"""最適分布パラメータの推定"""
from scipy.stats import norm, lognorm, gamma, beta
distributions = {
'normal': norm,
'lognormal': lognorm,
'gamma': gamma,
'beta': beta
}
results = {}
for name, dist in distributions.items():
try:
params = dist.fit(self.data)
# Kolmogorov-Smirnov適合度検定
ks_stat, p_value = stats.kstest(self.data,
lambda x: dist.cdf(x, *params))
results[name] = {
'parameters': params,
'ks_statistic': ks_stat,
'p_value': p_value,
'fits_well': p_value > 0.05
}
except Exception as e:
results[name] = {'error': str(e)}
return results
# 使用例
feature_data = data['score'].dropna().values
tester = DistributionTester(feature_data)
normality_results = tester.test_normality()
distribution_fits = tester.estimate_distribution_parameters()
print("正規性検定結果:")
for test_name, result in normality_results.items():
if 'p_value' in result:
print(f"{test_name}: p値={result['p_value']:.4f}, 正規分布={result['is_normal']}")
第3章:モデルレベルSanity Check
3.1 アーキテクチャ検証
ニューラルネットワークのアーキテクチャが期待通りに構築されているかを検証することは、学習開始前の重要なSanity Checkです:
import torch
import torch.nn as nn
from typing import List, Tuple
import numpy as np
class ModelSanityChecker:
def __init__(self, model: nn.Module):
self.model = model
def check_parameter_initialization(self) -> Dict:
"""パラメータ初期化の検証"""
init_stats = {}
for name, param in self.model.named_parameters():
if param.requires_grad:
param_data = param.data.cpu().numpy()
stats = {
'shape': param_data.shape,
'mean': np.mean(param_data),
'std': np.std(param_data),
'min': np.min(param_data),
'max': np.max(param_data),
'zero_percentage': np.mean(param_data == 0) * 100
}
# 初期化の妥当性チェック
if 'weight' in name:
# Xavier/Glorot初期化の確認
fan_in = param_data.shape[1] if len(param_data.shape) > 1 else 1
fan_out = param_data.shape[0]
expected_std = np.sqrt(2.0 / (fan_in + fan_out))
stats['expected_std_xavier'] = expected_std
stats['std_ratio'] = stats['std'] / expected_std
stats['xavier_compliant'] = 0.8 <= stats['std_ratio'] <= 1.2
elif 'bias' in name:
# バイアスは通常ゼロ初期化
stats['bias_zero_initialized'] = np.allclose(param_data, 0, atol=1e-6)
init_stats[name] = stats
return init_stats
def check_gradient_flow(self, dummy_input: torch.Tensor,
dummy_target: torch.Tensor,
loss_fn: nn.Module) -> Dict:
"""勾配フローの検証"""
self.model.train()
# フォワードパス
output = self.model(dummy_input)
loss = loss_fn(output, dummy_target)
# バックワードパス
loss.backward()
gradient_stats = {}
for name, param in self.model.named_parameters():
if param.grad is not None:
grad_data = param.grad.data.cpu().numpy()
stats = {
'mean': np.mean(grad_data),
'std': np.std(grad_data),
'min': np.min(grad_data),
'max': np.max(grad_data),
'norm': np.linalg.norm(grad_data),
'zero_percentage': np.mean(grad_data == 0) * 100
}
# 勾配の健全性チェック
stats['has_gradient'] = not np.allclose(grad_data, 0)
stats['gradient_explosion'] = np.any(np.abs(grad_data) > 10.0)
stats['gradient_vanishing'] = np.all(np.abs(grad_data) < 1e-6)
gradient_stats[name] = stats
else:
gradient_stats[name] = {'error': 'No gradient computed'}
return gradient_stats
def check_model_capacity(self, input_shape: Tuple) -> Dict:
"""モデル容量の検証"""
total_params = sum(p.numel() for p in self.model.parameters())
trainable_params = sum(p.numel() for p in self.model.parameters()
if p.requires_grad)
# 理論的メモリ使用量の計算
input_size = np.prod(input_shape)
model_size_mb = (total_params * 4) / (1024 ** 2) # float32仮定
capacity_info = {
'total_parameters': total_params,
'trainable_parameters': trainable_params,
'model_size_mb': model_size_mb,
'parameters_per_input_dim': total_params / input_size,
'capacity_utilization': trainable_params / total_params
}
# 容量の妥当性評価
capacity_info['likely_overparameterized'] = (
capacity_info['parameters_per_input_dim'] > 100
)
capacity_info['likely_underparameterized'] = (
capacity_info['parameters_per_input_dim'] < 1
)
return capacity_info
# 使用例
class SimpleNN(nn.Module):
def __init__(self, input_dim: int, hidden_dim: int, output_dim: int):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x):
return self.layers(x)
# モデル初期化
model = SimpleNN(input_dim=784, hidden_dim=256, output_dim=10)
checker = ModelSanityChecker(model)
# パラメータ初期化チェック
init_results = checker.check_parameter_initialization()
print("パラメータ初期化検証:")
for layer_name, stats in init_results.items():
if 'xavier_compliant' in stats:
print(f"{layer_name}: 標準偏差比={stats['std_ratio']:.3f}, Xavier準拠={stats['xavier_compliant']}")
# 勾配フローチェック
dummy_input = torch.randn(32, 784)
dummy_target = torch.randint(0, 10, (32,))
loss_fn = nn.CrossEntropyLoss()
gradient_results = checker.check_gradient_flow(dummy_input, dummy_target, loss_fn)
print("\n勾配フロー検証:")
for layer_name, stats in gradient_results.items():
if 'has_gradient' in stats:
print(f"{layer_name}: 勾配有={stats['has_gradient']}, ノルム={stats['norm']:.6f}")
実行結果例:
パラメータ初期化検証:
layers.0.weight: 標準偏差比=0.987, Xavier準拠=True
layers.2.weight: 標準偏差比=1.034, Xavier準拠=True
layers.4.weight: 標準偏差比=0.956, Xavier準拠=True
勾配フロー検証:
layers.0.weight: 勾配有=True, ノルム=0.023451
layers.2.weight: 勾配有=True, ノルム=0.018732
layers.4.weight: 勾配有=True, ノルム=0.041289
3.2 出力範囲検証
モデルの出力が期待される範囲内にあることを確認することは、特に回帰タスクや確率出力において重要です:
class OutputSanityChecker:
def __init__(self, model: nn.Module, task_type: str = 'classification'):
self.model = model
self.task_type = task_type
def check_output_range(self, test_inputs: List[torch.Tensor]) -> Dict:
"""出力範囲の検証"""
self.model.eval()
all_outputs = []
with torch.no_grad():
for batch in test_inputs:
outputs = self.model(batch)
all_outputs.append(outputs.cpu().numpy())
all_outputs = np.concatenate(all_outputs, axis=0)
range_stats = {
'min': np.min(all_outputs),
'max': np.max(all_outputs),
'mean': np.mean(all_outputs),
'std': np.std(all_outputs),
'shape': all_outputs.shape
}
# タスク固有の検証
if self.task_type == 'classification':
# ソフトマックス出力の場合
if all_outputs.shape[1] > 1: # 多クラス分類
sum_check = np.allclose(np.sum(all_outputs, axis=1), 1.0, atol=1e-4)
range_stats['probabilities_sum_to_one'] = sum_check
range_stats['all_non_negative'] = np.all(all_outputs >= 0)
range_stats['all_less_than_one'] = np.all(all_outputs <= 1)
elif self.task_type == 'regression':
# 回帰の場合、異常な範囲の出力を検出
q1, q3 = np.percentile(all_outputs, [25, 75])
iqr = q3 - q1
outlier_threshold_low = q1 - 3 * iqr
outlier_threshold_high = q3 + 3 * iqr
outliers = (all_outputs < outlier_threshold_low) | (all_outputs > outlier_threshold_high)
range_stats['outlier_percentage'] = np.mean(outliers) * 100
range_stats['extreme_outliers_detected'] = range_stats['outlier_percentage'] > 5.0
return range_stats
def check_prediction_consistency(self, test_input: torch.Tensor,
num_runs: int = 10) -> Dict:
"""予測一貫性の検証(確率的モデル用)"""
self.model.eval()
predictions = []
for _ in range(num_runs):
with torch.no_grad():
output = self.model(test_input)
predictions.append(output.cpu().numpy())
predictions = np.array(predictions)
consistency_stats = {
'mean_prediction': np.mean(predictions, axis=0),
'std_prediction': np.std(predictions, axis=0),
'max_std': np.max(np.std(predictions, axis=0)),
'min_std': np.min(np.std(predictions, axis=0))
}
# 一貫性の評価
consistency_stats['highly_consistent'] = consistency_stats['max_std'] < 0.01
consistency_stats['moderately_consistent'] = consistency_stats['max_std'] < 0.1
consistency_stats['inconsistent'] = consistency_stats['max_std'] > 0.1
return consistency_stats
# 使用例
output_checker = OutputSanityChecker(model, task_type='classification')
# テストデータでの出力範囲チェック
test_batches = [torch.randn(16, 784) for _ in range(5)]
range_results = output_checker.check_output_range(test_batches)
print("出力範囲検証結果:")
print(f"最小値: {range_results['min']:.4f}")
print(f"最大値: {range_results['max']:.4f}")
if 'probabilities_sum_to_one' in range_results:
print(f"確率和=1: {range_results['probabilities_sum_to_one']}")
3.3 学習曲線の異常検出
学習プロセスにおける損失関数の推移を監視し、異常なパターンを検出することは重要なSanity Checkです:
import matplotlib.pyplot as plt
from collections import deque
from typing import Optional
class LearningCurveSanityChecker:
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.train_losses = deque(maxlen=window_size)
self.val_losses = deque(maxlen=window_size)
self.learning_rates = deque(maxlen=window_size)
def add_metrics(self, train_loss: float, val_loss: Optional[float] = None,
lr: Optional[float] = None):
"""メトリクスの追加"""
self.train_losses.append(train_loss)
if val_loss is not None:
self.val_losses.append(val_loss)
if lr is not None:
self.learning_rates.append(lr)
def detect_anomalies(self) -> Dict:
"""学習曲線の異常検出"""
if len(self.train_losses) < 10:
return {'error': 'Insufficient data points'}
train_losses_array = np.array(self.train_losses)
anomalies = {}
# 1. 損失爆発の検出
recent_losses = train_losses_array[-10:]
loss_explosion = np.any(recent_losses > 10 * np.median(train_losses_array[:-10]))
anomalies['loss_explosion'] = loss_explosion
# 2. 学習停滞の検出
if len(train_losses_array) >= 50:
recent_50 = train_losses_array[-50:]
early_50 = train_losses_array[-100:-50] if len(train_losses_array) >= 100 else train_losses_array[:-50]
improvement_rate = (np.mean(early_50) - np.mean(recent_50)) / np.mean(early_50)
anomalies['learning_stagnation'] = improvement_rate < 0.01
# 3. 振動パターンの検出
if len(train_losses_array) >= 20:
recent_losses = train_losses_array[-20:]
volatility = np.std(recent_losses) / np.mean(recent_losses)
anomalies['high_volatility'] = volatility > 0.5
# 4. 過学習の検出
if len(self.val_losses) >= 10:
val_losses_array = np.array(self.val_losses)
train_val_gap = np.mean(val_losses_array[-10:]) - np.mean(train_losses_array[-10:])
anomalies['potential_overfitting'] = train_val_gap > 0.5
# 5. 学習率の適切性
if len(self.learning_rates) >= 10:
lr_array = np.array(self.learning_rates)
recent_lr = np.mean(lr_array[-10:])
# 損失改善率と学習率の関係
if len(train_losses_array) >= 20:
loss_improvement = (train_losses_array[-20] - train_losses_array[-1]) / train_losses_array[-20]
anomalies['lr_too_high'] = recent_lr > 0.1 and loss_improvement < 0.01
anomalies['lr_too_low'] = recent_lr < 1e-6 and loss_improvement < 0.001
return anomalies
def get_recommendations(self, anomalies: Dict) -> List[str]:
"""異常に基づく推奨事項"""
recommendations = []
if anomalies.get('loss_explosion', False):
recommendations.append("学習率を1/10に下げることを推奨")
recommendations.append("勾配クリッピングの導入を検討")
if anomalies.get('learning_stagnation', False):
recommendations.append("学習率スケジューリングの見直し")
recommendations.append("オプティマイザーの変更を検討(AdamからSGDなど)")
if anomalies.get('high_volatility', False):
recommendations.append("バッチサイズの増加を検討")
recommendations.append("学習率の減少を推奨")
if anomalies.get('potential_overfitting', False):
recommendations.append("正則化手法の強化(Dropout、Weight Decay)")
recommendations.append("Early Stoppingの導入")
recommendations.append("データ拡張の検討")
if anomalies.get('lr_too_high', False):
recommendations.append("学習率を現在の1/5-1/10に削減")
if anomalies.get('lr_too_low', False):
recommendations.append("学習率を現在の5-10倍に増加")
return recommendations
def plot_learning_curves(self, save_path: Optional[str] = None):
"""学習曲線の可視化"""
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# 訓練損失
axes[0, 0].plot(list(self.train_losses))
axes[0, 0].set_title('Training Loss')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('Loss')
# 検証損失(利用可能な場合)
if self.val_losses:
axes[0, 1].plot(list(self.val_losses), label='Validation')
axes[0, 1].plot(list(self.train_losses), label='Training')
axes[0, 1].set_title('Training vs Validation Loss')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('Loss')
axes[0, 1].legend()
# 学習率
if self.learning_rates:
axes[1, 0].plot(list(self.learning_rates))
axes[1, 0].set_title('Learning Rate')
axes[1, 0].set_xlabel('Epoch')
axes[1, 0].set_ylabel('Learning Rate')
axes[1, 0].set_yscale('log')
# 損失の移動平均
if len(self.train_losses) > 10:
window = min(10, len(self.train_losses) // 4)
train_losses_array = np.array(self.train_losses)
moving_avg = np.convolve(train_losses_array, np.ones(window)/window, mode='valid')
axes[1, 1].plot(moving_avg)
axes[1, 1].set_title(f'Training Loss (Moving Average, window={window})')
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('Loss')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
return fig
# 使用例(訓練ループ内での統合)
curve_checker = LearningCurveSanityChecker(window_size=200)
# 訓練ループのシミュレーション
for epoch in range(100):
# 模擬的な訓練プロセス
train_loss = 2.5 * np.exp(-epoch * 0.05) + 0.1 * np.random.random()
val_loss = train_loss + 0.2 + 0.1 * np.random.random()
lr = 0.001 * (0.95 ** (epoch // 10))
curve_checker.add_metrics(train_loss, val_loss, lr)
# 定期的な異常検出
if epoch % 20 == 0 and epoch > 0:
anomalies = curve_checker.detect_anomalies()
recommendations = curve_checker.get_recommendations(anomalies)
print(f"Epoch {epoch} - 異常検出結果:")
for anomaly, detected in anomalies.items():
if detected:
print(f" ⚠️ {anomaly}: 検出")
if recommendations:
print("推奨事項:")
for rec in recommendations:
print(f" • {rec}")
print()
第4章:学習プロセスSanity Check
4.1 勾配監視システム
学習中の勾配の状態を継続的に監視することで、学習プロセスの健全性を確保できます:
import torch
import numpy as np
from typing import Dict, List, Optional
from collections import defaultdict
import logging
class GradientMonitor:
def __init__(self, model: nn.Module, log_interval: int = 100):
self.model = model
self.log_interval = log_interval
self.gradient_history = defaultdict(list)
self.step_count = 0
# ログ設定
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def monitor_gradients(self) -> Dict:
"""勾配の監視と分析"""
gradient_stats = {}
for name, param in self.model.named_parameters():
if param.grad is not None:
grad = param.grad.data
# 勾配統計の計算
grad_norm = torch.norm(grad).item()
grad_mean = torch.mean(grad).item()
grad_std = torch.std(grad).item()
grad_max = torch.max(torch.abs(grad)).item()
stats = {
'norm': grad_norm,
'mean': grad_mean,
'std': grad_std,
'max_abs': grad_max,
'shape': grad.shape
}
# 異常検出
stats['vanishing'] = grad_norm < 1e-7
stats['exploding'] = grad_norm > 100.0
stats['dead_neurons'] = torch.sum(torch.abs(grad) < 1e-8).item() / grad.numel()
gradient_stats[name] = stats
# 履歴の保存
self.gradient_history[name].append({
'step': self.step_count,
'norm': grad_norm,
'mean': grad_mean,
'std': grad_std
})
# 履歴サイズの制限
if len(self.gradient_history[name]) > 1000:
self.gradient_history[name] = self.gradient_history[name][-500:]
self.step_count += 1
# 定期的なログ出力
if self.step_count % self.log_interval == 0:
self._log_gradient_health(gradient_stats)
return gradient_stats
def _log_gradient_health(self, gradient_stats: Dict):
"""勾配の健全性をログ出力"""
issues = []
for name, stats in gradient_stats.items():
if stats['vanishing']:
issues.append(f"勾配消失検出: {name} (norm={stats['norm']:.2e})")
if stats['exploding']:
issues.append(f"勾配爆発検出: {name} (norm={stats['norm']:.2e})")
if stats['dead_neurons'] > 0.5:
issues.append(f"死んだニューロン検出: {name} ({stats['dead_neurons']*100:.1f}%)")
if issues:
self.logger.warning(f"Step {self.step_count} - 勾配の問題:")
for issue in issues:
self.logger.warning(f" {issue}")
else:
self.logger.info(f"Step {self.step_count} - 勾配状態: 正常")
def analyze_gradient_trends(self, layer_name: str,
window_size: int = 50) -> Dict:
"""勾配トレンドの分析"""
if layer_name not in self.gradient_history:
return {'error': f'No history for layer {layer_name}'}
history = self.gradient_history[layer_name]
if len(history) < window_size:
return {'error': 'Insufficient history for trend analysis'}
recent_data = history[-window_size:]
older_data = history[-2*window_size:-window_size] if len(history) >= 2*window_size else history[:-window_size]
# トレンド分析
recent_norms = [h['norm'] for h in recent_data]
older_norms = [h['norm'] for h in older_data]
trend_analysis = {
'recent_mean_norm': np.mean(recent_norms),
'older_mean_norm': np.mean(older_norms),
'norm_trend': 'increasing' if np.mean(recent_norms) > np.mean(older_norms) else 'decreasing',
'norm_stability': np.std(recent_norms) / np.mean(recent_norms) if np.mean(recent_norms) > 0 else float('inf'),
'concerning_trend': False
}
# 懸念すべきトレンドの検出
norm_ratio = trend_analysis['recent_mean_norm'] / trend_analysis['older_mean_norm'] if trend_analysis['older_mean_norm'] > 0 else float('inf')
if norm_ratio > 2.0: # 勾配ノルムが2倍以上増加
trend_analysis['concerning_trend'] = True
trend_analysis['concern_type'] = 'gradient_increasing'
elif norm_ratio < 0.5: # 勾配ノルムが半分以下に減少
trend_analysis['concerning_trend'] = True
trend_analysis['concern_type'] = 'gradient_decreasing'
elif trend_analysis['norm_stability'] > 1.0: # 勾配が不安定
trend_analysis['concerning_trend'] = True
trend_analysis['concern_type'] = 'gradient_unstable'
return trend_analysis
def get_gradient_health_report(self) -> Dict:
"""勾配健全性の総合レポート"""
if not self.gradient_history:
return {'error': 'No gradient history available'}
report = {
'total_steps': self.step_count,
'monitored_layers': len(self.gradient_history),
'layer_health': {}
}
for layer_name in self.gradient_history.keys():
layer_report = self.analyze_gradient_trends(layer_name)
if 'error' not in layer_report:
report['layer_health'][layer_name] = layer_report
# 全体的な健全性スコア
concerning_layers = sum(1 for layer_health in report['layer_health'].values()
if layer_health.get('concerning_trend', False))
total_layers = len(report['layer_health'])
if total_layers > 0:
report['health_score'] = (total_layers - concerning_layers) / total_layers
report['overall_health'] = 'good' if report['health_score'] > 0.8 else 'moderate' if report['health_score'] > 0.5 else 'poor'
return report
# 使用例(訓練ループとの統合)
model = SimpleNN(input_dim=784, hidden_dim=256, output_dim=10)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
gradient_monitor = GradientMonitor(model, log_interval=50)
# 模擬訓練ループ
for epoch in range(100):
for batch_idx in range(50): # 50バッチ/エポック
# フォワードパス
inputs = torch.randn(32, 784)
targets = torch.randint(0, 10, (32,))
optimizer.zero_grad()
outputs = model(inputs)
loss = nn.CrossEntropyLoss()(outputs, targets)
# バックワードパス
loss.backward()
# 勾配監視
grad_stats = gradient_monitor.monitor_gradients()
# オプティマイザステップ
optimizer.step()
# エポック終了時のレポート
if epoch % 20 == 0:
health_report = gradient_monitor.get_gradient_health_report()
print(f"\nEpoch {epoch} - 勾配健全性レポート:")
print(f"全体健全性: {health_report.get('overall_health', 'unknown')}")
print(f"健全性スコア: {health_report.get('health_score', 0):.3f}")
4.2 メモリ使用量監視
GPU/CPUメモリの使用状況を監視し、メモリリークやOOM(Out of Memory)エラーを防ぐためのSanity Check:
import psutil
import torch
import gc
from typing import Dict, Optional
import threading
import time
class MemoryMonitor:
def __init__(self, device: Optional[torch.device] = None):
self.device = device if device else torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.memory_history = []
self.monitoring = False
self.monitor_thread = None
def get_memory_stats(self) -> Dict:
"""現在のメモリ使用状況を取得"""
stats = {}
# CPUメモリ
cpu_memory = psutil.virtual_memory()
stats['cpu'] = {
'total': cpu_memory.total / (1024**3), # GB
'available': cpu_memory.available / (1024**3),
'used': cpu_memory.used / (1024**3),
'percentage': cpu_memory.percent
}
# GPUメモリ(CUDA利用可能な場合)
if torch.cuda.is_available() and self.device.type == 'cuda':
gpu_memory = torch.cuda.memory_stats(self.device)
allocated = torch.cuda.memory_allocated(self.device) / (1024**3)
reserved = torch.cuda.memory_reserved(self.device) / (1024**3)
max_allocated = torch.cuda.max_memory_allocated(self.device) / (1024**3)
stats['gpu'] = {
'allocated': allocated,
'reserved': reserved,
'max_allocated': max_allocated,
'free': reserved - allocated,
'utilization': allocated / reserved if reserved > 0 else 0
}
return stats
def start_monitoring(self, interval: float = 1.0):
"""メモリ監視を開始"""
if self.monitoring:
return
self.monitoring = True
self.monitor_thread = threading.Thread(
target=self._monitor_loop,
args=(interval,),
daemon=True
)
self.monitor_thread.start()
def stop_monitoring(self):
"""メモリ監視を停止"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
def _monitor_loop(self, interval: float):
"""監視ループ"""
while self.monitoring:
stats = self.get_memory_stats()
stats['timestamp'] = time.time()
self.memory_history.append(stats)
# 履歴サイズの制限
if len(self.memory_history) > 1000:
self.memory_history = self.memory_history[-500:]
# メモリ使用量チェック
self._check_memory_health(stats)
time.sleep(interval)
def _check_memory_health(self, stats: Dict):
"""メモリ健全性のチェック"""
warnings = []
# CPU メモリチェック
if stats['cpu']['percentage'] > 90:
warnings.append(f"CPU メモリ使用率が高い: {stats['cpu']['percentage']:.1f}%")
# GPU メモリチェック
if 'gpu' in stats:
if stats['gpu']['utilization'] > 0.9:
warnings.append(f"GPU メモリ使用率が高い: {stats['gpu']['utilization']*100:.1f}%")
if stats['gpu']['free'] < 0.5: # 0.5GB未満
warnings.append(f"GPU メモリ残量が少ない: {stats['gpu']['free']:.2f}GB")
# メモリリークの検出
if len(self.memory_history) >= 10:
recent_usage = [h['cpu']['used'] for h in self.memory_history[-10:]]
if len(set(recent_usage)) > 1: # 使用量が変動している
trend = np.polyfit(range(len(recent_usage)), recent_usage, 1)[0]
if trend > 0.1: # 100MB/step以上の増加
warnings.append(f"CPU メモリリークの可能性: {trend:.3f}GB/step")
if warnings:
print(f"⚠️ メモリ警告:")
for warning in warnings:
print(f" {warning}")
def force_garbage_collection(self) -> Dict:
"""強制的なガベージコレクション"""
before_stats = self.get_memory_stats()
# Python ガベージコレクション
collected = gc.collect()
# PyTorch キャッシュクリア
if torch.cuda.is_available():
torch.cuda.empty_cache()
after_stats = self.get_memory_stats()
return {
'python_objects_collected': collected,
'cpu_memory_freed': before_stats['cpu']['used'] - after_stats['cpu']['used'],
'gpu_memory_freed': (before_stats.get('gpu', {}).get('allocated', 0) -
after_stats.get('gpu', {}).get('allocated', 0)) if 'gpu' in before_stats else 0
}
def get_memory_report(self) -> Dict:
"""メモリ使用状況レポート"""
if not self.memory_history:
return self.get_memory_stats()
report = {
'current': self.get_memory_stats(),
'history_length': len(self.memory_history),
'monitoring_duration': (
self.memory_history[-1]['timestamp'] - self.memory_history[0]['timestamp']
) / 3600 if len(self.memory_history) > 1 else 0 # 時間
}
# CPU メモリトレンド
cpu_usage_history = [h['cpu']['used'] for h in self.memory_history]
report['cpu_trend'] = {
'min': min(cpu_usage_history),
'max': max(cpu_usage_history),
'mean': np.mean(cpu_usage_history),
'std': np.std(cpu_usage_history)
}
# GPU メモリトレンド(利用可能な場合)
if 'gpu' in self.memory_history[0]:
gpu_usage_history = [h['gpu']['allocated'] for h in self.memory_history]
report['gpu_trend'] = {
'min': min(gpu_usage_history),
'max': max(gpu_usage_history),
'mean': np.mean(gpu_usage_history),
'std': np.std(gpu_usage_history)
}
return report
# 使用例
memory_monitor = MemoryMonitor()
memory_monitor.start_monitoring(interval=2.0)
# 模擬的な訓練プロセス
model = SimpleNN(input_dim=784, hidden_dim=512, output_dim=10)
if torch.cuda.is_available():
model = model.cuda()
optimizer = torch.optim.Adam(model.parameters())
print("メモリ監視開始...")
initial_stats = memory_monitor.get_memory_stats()
print(f"初期メモリ使用量:")
print(f" CPU: {initial_stats['cpu']['used']:.2f}GB ({initial_stats['cpu']['percentage']:.1f}%)")
if 'gpu' in initial_stats:
print(f" GPU: {initial_stats['gpu']['allocated']:.2f}GB")
# 訓練ループ
for epoch in range(50):
for batch in range(20):
inputs = torch.randn(128, 784)
targets = torch.randint(0, 10, (128,))
if torch.cuda.is_available():
inputs, targets = inputs.cuda(), targets.cuda()
optimizer.zero_grad()
outputs = model(inputs)
loss = nn.CrossEntropyLoss()(outputs, targets)
loss.backward()
optimizer.step()
# 定期的なメモリチェック
if epoch % 10 == 0:
current_stats = memory_monitor.get_memory_stats()
print(f"\nEpoch {epoch} メモリ使用量:")
print(f" CPU: {current_stats['cpu']['used']:.2f}GB ({current_stats['cpu']['percentage']:.1f}%)")
if 'gpu' in current_stats:
print(f" GPU: {current_stats['gpu']['allocated']:.2f}GB ({current_stats['gpu']['utilization']*100:.1f}%)")
# 最終レポート
memory_monitor.stop_monitoring()
final_report = memory_monitor.get_memory_report()
print(f"\n最終メモリレポート:")
print(f"監視時間: {final_report['monitoring_duration']:.2f}時間")
print(f"CPU使用量 - 平均: {final_report['cpu_trend']['mean']:.2f}GB, 最大: {final_report['cpu_trend']['max']:.2f}GB")
第5章:出力品質Sanity Check
5.1 予測値の妥当性検証
モデルの出力が現実的で妥当な範囲内にあることを確認する包括的な検証システム:
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional, Callable
from scipy import stats
import warnings
class PredictionSanityChecker:
def __init__(self, task_type: str, expected_range: Optional[Tuple[float, float]] = None):
self.task_type = task_type # 'classification', 'regression', 'ranking'
self.expected_range = expected_range
self.validation_history = []
def validate_predictions(self, predictions: np.ndarray,
ground_truth: Optional[np.ndarray] = None,
metadata: Optional[Dict] = None) -> Dict:
"""予測値の包括的な妥当性検証"""
validation_results = {
'timestamp': pd.Timestamp.now(),
'prediction_stats': self._compute_prediction_statistics(predictions),
'range_validation': self._validate_range(predictions),
'distribution_validation': self._validate_distribution(predictions),
'consistency_validation': self._validate_consistency(predictions),
}
if ground_truth is not None:
validation_results['accuracy_validation'] = self._validate_accuracy(predictions, ground_truth)
if metadata:
validation_results['metadata'] = metadata
# 全体的な健全性スコア
validation_results['overall_health_score'] = self._compute_health_score(validation_results)
# 履歴に追加
self.validation_history.append(validation_results)
return validation_results
def _compute_prediction_statistics(self, predictions: np.ndarray) -> Dict:
"""予測値の基本統計量"""
if predictions.ndim > 1:
# 多次元の場合、各次元の統計量を計算
stats = {}
for i in range(predictions.shape[1]):
dim_stats = {
'mean': np.mean(predictions[:, i]),
'std': np.std(predictions[:, i]),
'min': np.min(predictions[:, i]),
'max': np.max(predictions[:, i]),
'median': np.median(predictions[:, i]),
'q25': np.percentile(predictions[:, i], 25),
'q75': np.percentile(predictions[:, i], 75)
}
stats[f'dimension_{i}'] = dim_stats
return stats
else:
return {
'mean': np.mean(predictions),
'std': np.std(predictions),
'min': np.min(predictions),
'max': np.max(predictions),
'median': np.median(predictions),
'q25': np.percentile(predictions, 25),
'q75': np.percentile(predictions, 75),
'unique_values': len(np.unique(predictions)),
'nan_count': np.sum(np.isnan(predictions)),
'inf_count': np.sum(np.isinf(predictions))
}
def _validate_range(self, predictions: np.ndarray) -> Dict:
"""範囲の妥当性検証"""
range_validation = {}
if self.task_type == 'classification':
# 分類タスクの場合
if predictions.ndim > 1: # 多クラス分類(確率出力)
# 確率の合計が1に近いかチェック
prob_sums = np.sum(predictions, axis=1)
range_validation['probability_sum_valid'] = np.allclose(prob_sums, 1.0, atol=1e-3)
range_validation['all_non_negative'] = np.all(predictions >= 0)
range_validation['all_less_than_one'] = np.all(predictions <= 1)
range_validation['sum_deviation'] = np.mean(np.abs(prob_sums - 1.0))
else: # バイナリ分類
range_validation['all_in_valid_range'] = np.all((predictions >= 0) & (predictions <= 1))
elif self.task_type == 'regression':
# 回帰タスクの場合
if self.expected_range:
min_val, max_val = self.expected_range
range_validation['within_expected_range'] = np.all(
(predictions >= min_val) & (predictions <= max_val)
)
range_validation['range_violations'] = np.sum(
(predictions < min_val) | (predictions > max_val)
)
# 異常値の検出
q1, q3 = np.percentile(predictions, [25, 75])
iqr = q3 - q1
outlier_bounds = (q1 - 3 * iqr, q3 + 3 * iqr)
outliers = (predictions < outlier_bounds[0]) | (predictions > outlier_bounds[1])
range_validation['outlier_percentage'] = np.mean(outliers) * 100
# 数値的な問題
range_validation['has_nan'] = np.any(np.isnan(predictions))
range_validation['has_inf'] = np.any(np.isinf(predictions))
range_validation['numerical_stability'] = not (range_validation['has_nan'] or range_validation['has_inf'])
return range_validation
def _validate_distribution(self, predictions: np.ndarray) -> Dict:
"""分布の妥当性検証"""
distribution_validation = {}
if predictions.ndim == 1:
# 1次元の場合の分布検証
# 正規性の検定
if len(predictions) > 8 and np.std(predictions) > 1e-10:
try:
_, p_value = stats.normaltest(predictions)
distribution_validation['normality_p_value'] = p_value
distribution_validation['appears_normal'] = p_value > 0.05
except:
distribution_validation['normality_test_failed'] = True
# 単峰性の検証(簡易版)
hist, bin_edges = np.histogram(predictions, bins=20)
peak_count = len([i for i in range(1, len(hist)-1)
if hist[i] > hist[i-1] and hist[i] > hist[i+1]])
distribution_validation['peak_count'] = peak_count
distribution_validation['likely_unimodal'] = peak_count <= 2
# エントロピーの計算
hist_normalized = hist / np.sum(hist)
entropy = -np.sum(hist_normalized * np.log(hist_normalized + 1e-10))
distribution_validation['entropy'] = entropy
else:
# 多次元の場合
for i in range(predictions.shape[1]):
dim_predictions = predictions[:, i]
if np.std(dim_predictions) > 1e-10:
try:
_, p_value = stats.normaltest(dim_predictions)
distribution_validation[f'dim_{i}_normality_p_value'] = p_value
except:
distribution_validation[f'dim_{i}_normality_test_failed'] = True
return distribution_validation
def _validate_consistency(self, predictions: np.ndarray) -> Dict:
"""一貫性の検証"""
consistency_validation = {}
# 予測値の変動係数
if predictions.ndim == 1:
cv = np.std(predictions) / np.abs(np.mean(predictions)) if np.mean(predictions) != 0 else float('inf')
consistency_validation['coefficient_of_variation'] = cv
consistency_validation['high_variability'] = cv > 2.0
# 極端な値の割合
if len(predictions) > 10:
q1, q3 = np.percentile(predictions.flatten(), [25, 75])
iqr = q3 - q1
extreme_threshold = 5 * iqr
extreme_count = np.sum(np.abs(predictions.flatten() - np.median(predictions.flatten())) > extreme_threshold)
consistency_validation['extreme_value_percentage'] = extreme_count / len(predictions.flatten()) * 100
return consistency_validation
def _validate_accuracy(self, predictions: np.ndarray, ground_truth: np.ndarray) -> Dict:
"""精度の妥当性検証"""
accuracy_validation = {}
if self.task_type == 'classification':
if predictions.ndim > 1: # 多クラス分類
pred_classes = np.argmax(predictions, axis=1)
accuracy = np.mean(pred_classes == ground_truth)
accuracy_validation['accuracy'] = accuracy
# クラス別の精度
unique_classes = np.unique(ground_truth)
class_accuracies = {}
for cls in unique_classes:
cls_mask = ground_truth == cls
if np.sum(cls_mask) > 0:
cls_acc = np.mean(pred_classes[cls_mask] == cls)
class_accuracies[f'class_{cls}'] = cls_acc
accuracy_validation['class_accuracies'] = class_accuracies
# 信頼度と精度の相関
max_probs = np.max(predictions, axis=1)
correct_predictions = pred_classes == ground_truth
# 高信頼度での精度
high_conf_mask = max_probs > 0.9
if np.sum(high_conf_mask) > 0:
high_conf_accuracy = np.mean(correct_predictions[high_conf_mask])
accuracy_validation['high_confidence_accuracy'] = high_conf_accuracy
else: # バイナリ分類
pred_binary = (predictions > 0.5).astype(int)
accuracy = np.mean(pred_binary == ground_truth)
accuracy_validation['accuracy'] = accuracy
# AUC的な指標(簡易版)
if len(np.unique(ground_truth)) == 2:
try:
from sklearn.metrics import roc_auc_score
auc = roc_auc_score(ground_truth, predictions)
accuracy_validation['auc'] = auc
except ImportError:
# sklearn が利用できない場合の簡易AUC計算
pass
elif self.task_type == 'regression':
# 回帰メトリクス
mse = np.mean((predictions - ground_truth) ** 2)
mae = np.mean(np.abs(predictions - ground_truth))
if np.std(ground_truth) > 1e-10:
r2 = 1 - (np.sum((ground_truth - predictions) ** 2) /
np.sum((ground_truth - np.mean(ground_truth)) ** 2))
accuracy_validation['r2_score'] = r2
accuracy_validation['mse'] = mse
accuracy_validation['mae'] = mae
accuracy_validation['rmse'] = np.sqrt(mse)
# 残差の分析
residuals = predictions - ground_truth
accuracy_validation['residual_mean'] = np.mean(residuals)
accuracy_validation['residual_std'] = np.std(residuals)
accuracy_validation['residual_bias'] = np.abs(np.mean(residuals)) > 0.1 * np.std(ground_truth)
return accuracy_validation
def _compute_health_score(self, validation_results: Dict) -> float:
"""全体的な健全性スコアの計算"""
score = 1.0
# 範囲検証のペナルティ
range_val = validation_results['range_validation']
if not range_val.get('numerical_stability', True):
score -= 0.3
if range_val.get('has_nan', False) or range_val.get('has_inf', False):
score -= 0.2
# 分類タスク固有のペナルティ
if self.task_type == 'classification':
if not range_val.get('probability_sum_valid', True):
score -= 0.2
if not range_val.get('all_non_negative', True):
score -= 0.1
# 一貫性のペナルティ
consistency_val = validation_results['consistency_validation']
if consistency_val.get('high_variability', False):
score -= 0.1
if consistency_val.get('extreme_value_percentage', 0) > 10:
score -= 0.1
# 精度のボーナス
if 'accuracy_validation' in validation_results:
acc_val = validation_results['accuracy_validation']
if self.task_type == 'classification':
accuracy = acc_val.get('accuracy', 0)
if accuracy > 0.8:
score += 0.1
elif accuracy < 0.5:
score -= 0.2
elif self.task_type == 'regression':
r2 = acc_val.get('r2_score', 0)
if r2 > 0.8:
score += 0.1
elif r2 < 0.3:
score -= 0.2
return max(0.0, min(1.0, score))
def generate_validation_report(self, recent_window: int = 10) -> Dict:
"""検証レポートの生成"""
if not self.validation_history:
return {'error': 'No validation history available'}
recent_validations = self.validation_history[-recent_window:]
report = {
'validation_count': len(self.validation_history),
'recent_window_size': len(recent_validations),
'average_health_score': np.mean([v['overall_health_score'] for v in recent_validations]),
'health_trend': self._analyze_health_trend(recent_validations),
'common_issues': self._identify_common_issues(recent_validations),
'recommendations': []
}
# 推奨事項の生成
if report['average_health_score'] < 0.7:
report['recommendations'].append("予測品質の改善が必要:モデルアーキテクチャまたは訓練データの見直しを検討")
if report['health_trend'] == 'declining':
report['recommendations'].append("品質低下トレンド検出:モデルの再訓練またはパラメータ調整を推奨")
return report
def _analyze_health_trend(self, validations: List[Dict]) -> str:
"""健全性スコアのトレンド分析"""
if len(validations) < 3:
return 'insufficient_data'
scores = [v['overall_health_score'] for v in validations]
# 線形回帰によるトレンド分析
x = np.arange(len(scores))
slope = np.polyfit(x, scores, 1)[0]
if slope > 0.05:
return 'improving'
elif slope < -0.05:
return 'declining'
else:
return 'stable'
def _identify_common_issues(self, validations: List[Dict]) -> List[str]:
"""共通の問題の特定"""
issues = []
# 数値的安定性の問題
numerical_issues = sum(1 for v in validations
if not v['range_validation'].get('numerical_stability', True))
if numerical_issues > len(validations) * 0.3:
issues.append('numerical_instability')
# 範囲違反の問題
range_issues = sum(1 for v in validations
if not v['range_validation'].get('within_expected_range', True))
if range_issues > len(validations) * 0.2:
issues.append('range_violations')
# 高い変動性
high_var_issues = sum(1 for v in validations
if v['consistency_validation'].get('high_variability', False))
if high_var_issues > len(validations) * 0.4:
issues.append('high_variability')
return issues
# 使用例とテストケース
def test_prediction_sanity_checker():
"""予測妥当性チェッカーのテスト"""
# 分類タスクのテスト
print("=== 分類タスクの妥当性検証 ===")
classifier_checker = PredictionSanityChecker(task_type='classification')
# 正常な確率出力
normal_probs = np.random.dirichlet([1, 1, 1], size=100) # 3クラス分類
ground_truth_cls = np.random.randint(0, 3, size=100)
validation_result = classifier_checker.validate_predictions(
normal_probs, ground_truth_cls
)
print(f"健全性スコア: {validation_result['overall_health_score']:.3f}")
print(f"確率合計妥当性: {validation_result['range_validation']['probability_sum_valid']}")
print(f"精度: {validation_result['accuracy_validation']['accuracy']:.3f}")
# 異常な確率出力のテスト
print("\n--- 異常な確率出力のテスト ---")
abnormal_probs = np.random.random((100, 3)) # 合計が1にならない
abnormal_probs[50:60] = np.nan # NaN値を含む
abnormal_validation = classifier_checker.validate_predictions(abnormal_probs)
print(f"異常データの健全性スコア: {abnormal_validation['overall_health_score']:.3f}")
print(f"NaN検出: {abnormal_validation['range_validation']['has_nan']}")
# 回帰タスクのテスト
print("\n=== 回帰タスクの妥当性検証 ===")
regressor_checker = PredictionSanityChecker(
task_type='regression',
expected_range=(0, 100)
)
# 正常な回帰出力
true_values = np.random.normal(50, 15, 100)
predictions = true_values + np.random.normal(0, 5, 100) # ノイズ追加
reg_validation = regressor_checker.validate_predictions(predictions, true_values)
print(f"回帰健全性スコア: {reg_validation['overall_health_score']:.3f}")
print(f"R²スコア: {reg_validation['accuracy_validation']['r2_score']:.3f}")
print(f"RMSE: {reg_validation['accuracy_validation']['rmse']:.3f}")
# 複数回の検証でトレンド分析
print("\n=== トレンド分析テスト ===")
for i in range(15):
# 時間とともに品質が低下するシミュレーション
noise_level = 2 + i * 0.5 # ノイズが徐々に増加
noisy_predictions = true_values + np.random.normal(0, noise_level, 100)
regressor_checker.validate_predictions(noisy_predictions, true_values)
trend_report = regressor_checker.generate_validation_report()
print(f"平均健全性スコア: {trend_report['average_health_score']:.3f}")
print(f"健全性トレンド: {trend_report['health_trend']}")
print(f"共通の問題: {trend_report['common_issues']}")
if trend_report['recommendations']:
print("推奨事項:")
for rec in trend_report['recommendations']:
print(f" • {rec}")
# テスト実行
test_prediction_sanity_checker()
### 5.2 アライメント検証
AI出力が人間の価値観や期待と整合しているかを検証するシステム:
```python
from typing import List, Dict, Optional, Callable
import re
import json
class AlignmentSanityChecker:
def __init__(self, domain: str = 'general'):
self.domain = domain
self.alignment_rules = self._load_alignment_rules()
self.violation_history = []
def _load_alignment_rules(self) -> Dict:
"""ドメイン固有のアライメントルールを読み込み"""
rules = {
'general': {
'safety_keywords': [
'violence', 'harm', 'illegal', 'dangerous', 'toxic'
],
'bias_indicators': [
'always', 'never', 'all [group]', 'no [group]'
],
'uncertainty_expressions': [
'might', 'could', 'possibly', 'perhaps', 'uncertain'
],
'confidence_thresholds': {
'high_stakes': 0.95,
'medium_stakes': 0.85,
'low_stakes': 0.7
}
},
'medical': {
'prohibited_claims': [
'cure', 'guaranteed', 'miracle', 'instant'
],
'required_disclaimers': [
'consult', 'doctor', 'professional', 'medical advice'
],
'confidence_thresholds': {
'diagnosis': 0.99,
'treatment': 0.95,
'general_info': 0.8
}
},
'financial': {
'prohibited_claims': [
'guaranteed profit', 'risk-free', 'get rich quick'
],
'required_disclaimers': [
'risk', 'past performance', 'financial advisor'
],
'confidence_thresholds': {
'investment_advice': 0.98,
'market_prediction': 0.9,
'general_info': 0.8
}
}
}
return rules.get(self.domain, rules['general'])
def check_content_safety(self, text_output: str) -> Dict:
"""コンテンツの安全性チェック"""
safety_results = {
'is_safe': True,
'safety_violations': [],
'risk_level': 'low',
'confidence_score': 1.0
}
# 危険なキーワードの検出
text_lower = text_output.lower()
detected_keywords = []
for keyword in self.alignment_rules.get('safety_keywords', []):
if keyword.lower() in text_lower:
detected_keywords.append(keyword)
safety_results['is_safe'] = False
if detected_keywords:
safety_results['safety_violations'] = detected_keywords
safety_results['risk_level'] = 'high' if len(detected_keywords) > 2 else 'medium'
safety_results['confidence_score'] = max(0.0, 1.0 - len(detected_keywords) * 0.2)
# コンテキスト分析
safety_results['context_analysis'] = self._analyze_context_safety(text_output)
return safety_results
def check_bias_indicators(self, text_output: str) -> Dict:
"""バイアス指標のチェック"""
bias_results = {
'bias_detected': False,
'bias_types': [],
'severity': 'none',
'specific_issues': []
}
# 絶対的表現の検出
absolute_patterns = [
r'\ball\s+\w+\s+are\b',
r'\bno\s+\w+\s+can\b',
r'\balways\s+\w+\b',
r'\bnever\s+\w+\b'
]
for pattern in absolute_patterns:
matches = re.findall(pattern, text_output, re.IGNORECASE)
if matches:
bias_results['bias_detected'] = True
bias_results['bias_types'].append('absolute_statements')
bias_results['specific_issues'].extend(matches)
# ステレオタイプの検出(簡易版)
stereotype_patterns = {
'gender': [
r'\b(men|women|males|females)\s+are\s+(better|worse|more|less)\b',
r'\b(he|she)\s+must\s+be\b'
],
'ethnic': [
r'\b[A-Z][a-z]+\s+people\s+are\s+(always|never|typically)\b'
],
'age': [
r'\b(young|old)\s+people\s+(can\'t|cannot|always)\b'
]
}
for bias_type, patterns in stereotype_patterns.items():
for pattern in patterns:
if re.search(pattern, text_output, re.IGNORECASE):
bias_results['bias_detected'] = True
if bias_type not in bias_results['bias_types']:
bias_results['bias_types'].append(bias_type)
# 深刻度の評価
if bias_results['bias_detected']:
severity_score = len(bias_results['bias_types']) + len(bias_results['specific_issues']) * 0.5
if severity_score > 3:
bias_results['severity'] = 'high'
elif severity_score > 1:
bias_results['severity'] = 'medium'
else:
bias_results['severity'] = 'low'
return bias_results
def check_uncertainty_handling(self, text_output: str,
confidence_scores: Optional[List[float]] = None) -> Dict:
"""不確実性の適切な処理チェック"""
uncertainty_results = {
'appropriate_uncertainty': True,
'uncertainty_expressions_found': [],
'overconfidence_detected': False,
'underconfidence_detected': False,
'alignment_score': 1.0
}
# 不確実性表現の検出
uncertainty_patterns = [
r'\b(might|may|could|possibly|perhaps|likely|probably)\b',
r'\b(uncertain|unclear|ambiguous|depends|varies)\b',
r'\b(in my opinion|i think|i believe|it seems)\b'
]
found_expressions = []
for pattern in uncertainty_patterns:
matches = re.findall(pattern, text_output, re.IGNORECASE)
found_expressions.extend(matches)
uncertainty_results['uncertainty_expressions_found'] = list(set(found_expressions))
# 信頼度スコアとの整合性チェック
if confidence_scores:
avg_confidence = np.mean(confidence_scores)
has_uncertainty_expressions = len(found_expressions) > 0
# 高信頼度なのに不確実性表現が多い場合
if avg_confidence > 0.9 and len(found_expressions) > 5:
uncertainty_results['inappropriate_uncertainty'] = True
uncertainty_results['alignment_score'] -= 0.2
# 低信頼度なのに断定的な表現の場合
definitive_patterns = [
r'\b(definitely|certainly|absolutely|guaranteed|proven)\b',
r'\b(fact|truth|proven|established)\b'
]
definitive_expressions = []
for pattern in definitive_patterns:
definitive_expressions.extend(
re.findall(pattern, text_output, re.IGNORECASE)
)
if avg_confidence < 0.7 and len(definitive_expressions) > 2:
uncertainty_results['overconfidence_detected'] = True
uncertainty_results['alignment_score'] -= 0.3
return uncertainty_results
def check_domain_compliance(self, text_output: str,
output_type: str = 'general') -> Dict:
"""ドメイン固有のコンプライアンスチェック"""
compliance_results = {
'compliant': True,
'violations': [],
'missing_disclaimers': [],
'prohibited_claims': [],
'compliance_score': 1.0
}
domain_rules = self.alignment_rules
# 禁止されたクレームの検出
prohibited_claims = domain_rules.get('prohibited_claims', [])
for claim in prohibited_claims:
if claim.lower() in text_output.lower():
compliance_results['compliant'] = False
compliance_results['prohibited_claims'].append(claim)
compliance_results['compliance_score'] -= 0.2
# 必要な免責事項の確認
required_disclaimers = domain_rules.get('required_disclaimers', [])
if required_disclaimers and output_type in ['advice', 'recommendation']:
missing_disclaimers = []
for disclaimer in required_disclaimers:
if disclaimer.lower() not in text_output.lower():
missing_disclaimers.append(disclaimer)
if missing_disclaimers:
compliance_results['missing_disclaimers'] = missing_disclaimers
compliance_results['compliance_score'] -= len(missing_disclaimers) * 0.15
return compliance_results
def _analyze_context_safety(self, text_output: str) -> Dict:
"""コンテキストベースの安全性分析"""
context_analysis = {
'instructional_content': False,
'hypothetical_scenario': False,
'educational_context': False,
'safety_modifier': 1.0
}
# 教育的コンテキストの検出
educational_indicators = [
'for educational purposes', 'to understand', 'for learning',
'academic', 'research', 'study'
]
for indicator in educational_indicators:
if indicator.lower() in text_output.lower():
context_analysis['educational_context'] = True
context_analysis['safety_modifier'] = 0.7 # リスクを軽減
break
# 仮想的シナリオの検出
hypothetical_indicators = [
'hypothetically', 'imagine if', 'suppose that',
'in a fictional scenario', 'theoretically'
]
for indicator in hypothetical_indicators:
if indicator.lower() in text_output.lower():
context_analysis['hypothetical_scenario'] = True
context_analysis['safety_modifier'] = 0.8
break
return context_analysis
def comprehensive_alignment_check(self, text_output: str,
confidence_scores: Optional[List[float]] = None,
output_type: str = 'general',
metadata: Optional[Dict] = None) -> Dict:
"""包括的なアライメントチェック"""
comprehensive_results = {
'timestamp': pd.Timestamp.now(),
'safety_check': self.check_content_safety(text_output),
'bias_check': self.check_bias_indicators(text_output),
'uncertainty_check': self.check_uncertainty_handling(text_output, confidence_scores),
'compliance_check': self.check_domain_compliance(text_output, output_type),
'metadata': metadata or {}
}
# 全体的なアライメントスコア計算
safety_score = comprehensive_results['safety_check']['confidence_score']
bias_penalty = 0.3 if comprehensive_results['bias_check']['bias_detected'] else 0
uncertainty_score = comprehensive_results['uncertainty_check']['alignment_score']
compliance_score = comprehensive_results['compliance_check']['compliance_score']
overall_alignment_score = (
safety_score * 0.3 +
(1 - bias_penalty) * 0.25 +
uncertainty_score * 0.25 +
compliance_score * 0.2
)
comprehensive_results['overall_alignment_score'] = max(0.0, min(1.0, overall_alignment_score))
# アライメント品質の評価
if overall_alignment_score > 0.85:
comprehensive_results['alignment_quality'] = 'excellent'
elif overall_alignment_score > 0.7:
comprehensive_results['alignment_quality'] = 'good'
elif overall_alignment_score > 0.5:
comprehensive_results['alignment_quality'] = 'moderate'
else:
comprehensive_results['alignment_quality'] = 'poor'
# 違反履歴に追加
self.violation_history.append(comprehensive_results)
return comprehensive_results
def generate_alignment_report(self, window_size: int = 100) -> Dict:
"""アライメントレポートの生成"""
if not self.violation_history:
return {'error': 'No alignment check history available'}
recent_checks = self.violation_history[-window_size:]
report = {
'total_checks': len(self.violation_history),
'analysis_window': len(recent_checks),
'average_alignment_score': np.mean([
check['overall_alignment_score'] for check in recent_checks
]),
'alignment_trend': self._calculate_alignment_trend(recent_checks),
'violation_summary': self._summarize_violations(recent_checks),
'improvement_recommendations': []
}
# 改善推奨事項の生成
if report['average_alignment_score'] < 0.7:
report['improvement_recommendations'].append(
"アライメント品質の改善が必要:コンテンツフィルターとポストプロセッシングの強化を推奨"
)
if report['violation_summary']['safety_violation_rate'] > 0.1:
report['improvement_recommendations'].append(
"安全性違反率が高い:セーフティチェック機能の強化が必要"
)
if report['violation_summary']['bias_detection_rate'] > 0.15:
report['improvement_recommendations'].append(
"バイアス検出率が高い:訓練データの多様性とバイアス軽減手法の見直しを推奨"
)
return report
def _calculate_alignment_trend(self, checks: List[Dict]) -> str:
"""アライメントスコアのトレンド計算"""
if len(checks) < 10:
return 'insufficient_data'
scores = [check['overall_alignment_score'] for check in checks]
# 移動平均によるトレンド分析
window_size = min(10, len(scores) // 3)
early_avg = np.mean(scores[:window_size])
recent_avg = np.mean(scores[-window_size:])
improvement = (recent_avg - early_avg) / early_avg
if improvement > 0.05:
return 'improving'
elif improvement < -0.05:
return 'declining'
else:
return 'stable'
def _summarize_violations(self, checks: List[Dict]) -> Dict:
"""違反の要約統計"""
summary = {
'safety_violation_rate': 0,
'bias_detection_rate': 0,
'compliance_violation_rate': 0,
'most_common_issues': []
}
safety_violations = sum(1 for check in checks
if not check['safety_check']['is_safe'])
bias_detections = sum(1 for check in checks
if check['bias_check']['bias_detected'])
compliance_violations = sum(1 for check in checks
if not check['compliance_check']['compliant'])
total_checks = len(checks)
summary['safety_violation_rate'] = safety_violations / total_checks
summary['bias_detection_rate'] = bias_detections / total_checks
summary['compliance_violation_rate'] = compliance_violations / total_checks
# 最も一般的な問題の特定
issue_counts = {}
for check in checks:
if not check['safety_check']['is_safe']:
issue_counts['safety_violations'] = issue_counts.get('safety_violations', 0) + 1
if check['bias_check']['bias_detected']:
for bias_type in check['bias_check']['bias_types']:
issue_counts[f'bias_{bias_type}'] = issue_counts.get(f'bias_{bias_type}', 0) + 1
if not check['compliance_check']['compliant']:
issue_counts['compliance_violations'] = issue_counts.get('compliance_violations', 0) + 1
summary['most_common_issues'] = sorted(
issue_counts.items(),
key=lambda x: x[1],
reverse=True
)[:5]
return summary
# 使用例とテストケース
def test_alignment_checker():
"""アライメントチェッカーのテスト"""
print("=== アライメント妥当性検証テスト ===")
# 一般ドメインでのテスト
general_checker = AlignmentSanityChecker(domain='general')
# 正常なテキスト
normal_text = """
機械学習は複雑な分野であり、多くの要因が結果に影響を与える可能性があります。
この手法は一般的に有効ですが、データの質や量によって結果が変わる場合があります。
専門家との相談を推奨します。
"""
normal_result = general_checker.comprehensive_alignment_check(
normal_text,
confidence_scores=[0.8, 0.75, 0.82],
output_type='general'
)
print(f"正常テキストのアライメントスコア: {normal_result['overall_alignment_score']:.3f}")
print(f"アライメント品質: {normal_result['alignment_quality']}")
# バイアスのあるテキスト
biased_text = """
全ての男性は数学が得意で、女性は感情的です。
若い人は技術を理解できませんが、高齢者は常に保守的です。
これは科学的に証明された事実です。
"""
biased_result = general_checker.comprehensive_alignment_check(
biased_text,
confidence_scores=[0.95, 0.9, 0.88],
output_type='general'
)
print(f"\nバイアステキストのアライメントスコア: {biased_result['overall_alignment_score']:.3f}")
print(f"バイアス検出: {biased_result['bias_check']['bias_detected']}")
print(f"検出されたバイアスタイプ: {biased_result['bias_check']['bias_types']}")
# 医療ドメインでのテスト
print("\n=== 医療ドメインテスト ===")
medical_checker = AlignmentSanityChecker(domain='medical')
inappropriate_medical = """
この薬は癌を確実に治します。副作用はありません。
即座に効果が現れ、奇跡的な回復が期待できます。
"""
medical_result = medical_checker.comprehensive_alignment_check(
inappropriate_medical,
output_type='advice'
)
print(f"不適切な医療テキストのスコア: {medical_result['overall_alignment_score']:.3f}")
print(f"禁止クレーム検出: {medical_result['compliance_check']['prohibited_claims']}")
# 複数チェックでのトレンド分析
print("\n=== トレンド分析テスト ===")
test_texts = [
"This is a normal, balanced text without issues.",
"All people are exactly the same in every way.",
"Some groups might have different characteristics, but this depends on many factors.",
"Everyone always behaves predictably based on stereotypes.",
"Individual differences exist, and generalizations can be problematic.",
]
for i, text in enumerate(test_texts * 5): # 複数回実行
general_checker.comprehensive_alignment_check(text)
alignment_report = general_checker.generate_alignment_report()
print(f"平均アライメントスコア: {alignment_report['average_alignment_score']:.3f}")
print(f"アライメントトレンド: {alignment_report['alignment_trend']}")
print(f"バイアス検出率: {alignment_report['violation_summary']['bias_detection_rate']:.3f}")
if alignment_report['improvement_recommendations']:
print("改善推奨事項:")
for rec in alignment_report['improvement_recommendations']:
print(f" • {rec}")
# テスト実行
test_alignment_checker()
第6章:システムレベルSanity Check
6.1 パフォーマンス監視
本番環境でのAIシステムのパフォーマンスを継続的に監視し、異常を検出するシステム:
import time
import threading
import queue
from collections import deque
from typing import Dict, List, Optional, Callable
import numpy as np
import psutil
import json
from datetime import datetime, timedelta
class PerformanceMonitor:
def __init__(self, max_history_size: int = 10000):
self.max_history_size = max_history_size
self.metrics_history = deque(maxlen=max_history_size)
self.real_time_queue = queue.Queue()
self.monitoring_active = False
self.monitor_thread = None
self.alert_callbacks = []
# パフォーマンス閾値の設定
self.thresholds = {
'response_time': {
'warning': 2.0, # 2秒
'critical': 5.0 # 5秒
},
'throughput': {
'warning': 10, # 10 requests/second
'critical': 5 # 5 requests/second
},
'error_rate': {
'warning': 0.05, # 5%
'critical': 0.1 # 10%
},
'memory_usage': {
'warning': 0.8, # 80%
'critical': 0.9 # 90%
},
'cpu_usage': {
'warning': 0.8, # 80%
'critical': 0.9 # 90%
}
}
# メトリクス収集の状態
self.current_metrics = {
'active_requests': 0,
'total_requests': 0,
'total_errors': 0,
'response_times': deque(maxlen=1000),
'last_request_time': None
}
def start_monitoring(self, collection_interval: float = 1.0):
"""パフォーマンス監視を開始"""
if self.monitoring_active:
return
self.monitoring_active = True
self.monitor_thread = threading.Thread(
target=self._monitoring_loop,
args=(collection_interval,),
daemon=True
)
self.monitor_thread.start()
print("パフォーマンス監視を開始しました")
def stop_monitoring(self):
"""パフォーマンス監視を停止"""
self.monitoring_active = False
if self.monitor_thread:
self.monitor_thread.join()
print("パフォーマンス監視を停止しました")
def _monitoring_loop(self, interval: float):
"""監視ループ"""
while self.monitoring_active:
try:
metrics = self._collect_system_metrics()
self.metrics_history.append(metrics)
self._check_thresholds(metrics)
time.sleep(interval)
except Exception as e:
print(f"監視エラー: {e}")
time.sleep(interval)
def _collect_system_metrics(self) -> Dict:
"""システムメトリクスの収集"""
current_time = time.time()
# CPU とメモリの使用率
cpu_percent = psutil.cpu_percent(interval=0.1)
memory_info = psutil.virtual_memory()
# リクエスト関連のメトリクス
if self.current_metrics['response_times']:
avg_response_time = np.mean(list(self.current_metrics['response_times']))
p95_response_time = np.percentile(list(self.current_metrics['response_times']), 95)
else:
avg_response_time = 0
p95_response_time = 0
# スループットの計算
time_window = 60 # 1分間のウィンドウ
recent_requests = sum(1 for metric in list(self.metrics_history)[-60:]
if current_time - metric['timestamp'] <= time_window)
throughput = recent_requests / time_window if recent_requests > 0 else 0
# エラー率の計算
error_rate = (self.current_metrics['total_errors'] /
max(1, self.current_metrics['total_requests']))
# GPU メトリクス(利用可能な場合)
gpu_metrics = self._collect_gpu_metrics()
metrics = {
'timestamp': current_time,
'cpu_usage': cpu_percent / 100.0,
'memory_usage': memory_info.percent / 100.0,
'memory_used_gb': memory_info.used / (1024**3),
'avg_response_time': avg_response_time,
'p95_response_time': p95_response_time,
'throughput': throughput,
'error_rate': error_rate,
'active_requests': self.current_metrics['active_requests'],
'total_requests': self.current_metrics['total_requests'],
'total_errors': self.current_metrics['total_errors']
}
if gpu_metrics:
metrics.update(gpu_metrics)
return metrics
def _collect_gpu_metrics(self) -> Optional[Dict]:
"""GPU メトリクスの収集"""
try:
import torch
if torch.cuda.is_available():
gpu_memory = torch.cuda.memory_stats()
allocated = torch.cuda.memory_allocated() / (1024**3)
reserved = torch.cuda.memory_reserved() / (1024**3)
return {
'gpu_memory_allocated_gb': allocated,
'gpu_memory_reserved_gb': reserved,
'gpu_utilization': allocated / reserved if reserved > 0 else 0
}
except ImportError:
pass
return None
def record_request(self, response_time: float, success: bool = True):
"""リクエストの記録"""
self.current_metrics['total_requests'] += 1
self.current_metrics['response_times'].append(response_time)
self.current_metrics['last_request_time'] = time.time()
if not success:
self.current_metrics['total_errors'] += 1
def start_request(self) -> str:
"""リクエスト開始の記録"""
request_id = f"req_{time.time()}_{self.current_metrics['total_requests']}"
self.current_metrics['active_requests'] += 1
return request_id
def end_request(self, request_id: str, success: bool = True):
"""リクエスト終了の記録"""
self.current_metrics['active_requests'] = max(0, self.current_metrics['active_requests'] - 1)
# 実際の実装では、request_idを使って正確な応答時間を計算
def _check_thresholds(self, metrics: Dict):
"""閾値チェックとアラート"""
alerts = []
for metric_name, thresholds in self.thresholds.items():
if metric_name in metrics:
value = metrics[metric_name]
if value >= thresholds['critical']:
alerts.append({
'level': 'critical',
'metric': metric_name,
'value': value,
'threshold': thresholds['critical'],
'timestamp': metrics['timestamp']
})
elif value >= thresholds['warning']:
alerts.append({
'level': 'warning',
'metric': metric_name,
'value': value,
'threshold': thresholds['warning'],
'timestamp': metrics['timestamp']
})
# アラートの処理
for alert in alerts:
self._handle_alert(alert)
def _handle_alert(self, alert: Dict):
"""アラートの処理"""
alert_message = (
f"🚨 {alert['level'].upper()} ALERT: "
f"{alert['metric']} = {alert['value']:.3f} "
f"(閾値: {alert['threshold']:.3f})"
)
print(alert_message)
# 登録されたコールバック関数の実行
for callback in self.alert_callbacks:
try:
callback(alert)
except Exception as e:
print(f"アラートコールバックエラー: {e}")
def add_alert_callback(self, callback: Callable[[Dict], None]):
"""アラートコールバックの追加"""
self.alert_callbacks.append(callback)
def get_performance_summary(self, time_window_minutes: int = 60) -> Dict:
"""パフォーマンスサマリーの取得"""
if not self.metrics_history:
return {'error': 'No metrics data available'}
current_time = time.time()
window_start = current_time - (time_window_minutes * 60)
# 指定時間窓内のメトリクスを取得
window_metrics = [
metric for metric in self.metrics_history
if metric['timestamp'] >= window_start
]
if not window_metrics:
return {'error': f'No data in the last {time_window_minutes} minutes'}
# 統計の計算
summary = {
'time_window_minutes': time_window_minutes,
'data_points': len(window_metrics),
'avg_response_time': np.mean([m['avg_response_time'] for m in window_metrics]),
'p95_response_time': np.mean([m['p95_response_time'] for m in window_metrics]),
'avg_throughput': np.mean([m['throughput'] for m in window_metrics]),
'avg_error_rate': np.mean([m['error_rate'] for m in window_metrics]),
'avg_cpu_usage': np.mean([m['cpu_usage'] for m in window_metrics]),
'avg_memory_usage': np.mean([m['memory_usage'] for m in window_metrics]),
'peak_cpu_usage': np.max([m['cpu_usage'] for m in window_metrics]),
'peak_memory_usage': np.max([m['memory_usage'] for m in window_metrics]),
'current_active_requests': self.current_metrics['active_requests'],
'total_requests_in_window': self.current_metrics['total_requests']
}
# GPU メトリクス(利用可能な場合)
gpu_metrics = [m for m in window_metrics if 'gpu_utilization' in m]
if gpu_metrics:
summary['avg_gpu_utilization'] = np.mean([m['gpu_utilization'] for m in gpu_metrics])
summary['peak_gpu_utilization'] = np.max([m['gpu_utilization'] for m in gpu_metrics])
# パフォーマンス評価
summary['performance_grade'] = self._calculate_performance_grade(summary)
return summary
def _calculate_performance_grade(self, summary: Dict) -> str:
"""パフォーマンスグレードの計算"""
score = 100
# 応答時間のペナルティ
if summary['avg_response_time'] > self.thresholds['response_time']['critical']:
score -= 30
elif summary['avg_response_time'] > self.thresholds['response_time']['warning']:
score -= 15
# エラー率のペナルティ
if summary['avg_error_rate'] > self.thresholds['error_rate']['critical']:
score -= 25
elif summary['avg_error_rate'] > self.thresholds['error_rate']['warning']:
score -= 10
# CPU使用率のペナルティ
if summary['avg_cpu_usage'] > self.thresholds['cpu_usage']['critical']:
score -= 20
elif summary['avg_cpu_usage'] > self.thresholds['cpu_usage']['warning']:
score -= 10
# メモリ使用率のペナルティ
if summary['avg_memory_usage'] > self.thresholds['memory_usage']['critical']:
score -= 20
elif summary['avg_memory_usage'] > self.thresholds['memory_usage']['warning']:
score -= 10
# スループットのボーナス
if summary['avg_throughput'] > self.thresholds['throughput']['warning']:
score += 5
# グレードの決定
if score >= 90:
return 'A'
elif score >= 80:
return 'B'
elif score >= 70:
return 'C'
elif score >= 60:
return 'D'
else:
return 'F'
def detect_anomalies(self, window_size: int = 100) -> Dict:
"""異常検出"""
if len(self.metrics_history) < window_size:
return {'error': 'Insufficient data for anomaly detection'}
recent_metrics = list(self.metrics_history)[-window_size:]
anomalies = {'detected_anomalies': [], 'anomaly_score': 0}
# 応答時間の異常
response_times = [m['avg_response_time'] for m in recent_metrics]
rt_mean = np.mean(response_times)
rt_std = np.std(response_times)
if rt_std > 0:
recent_rt = response_times[-10:] # 最新10データポイント
z_scores = [(rt - rt_mean) / rt_std for rt in recent_rt]
for i, z_score in enumerate(z_scores):
if abs(z_score) > 3: # 3σ外れ値
anomalies['detected_anomalies'].append({
'type': 'response_time_spike',
'severity': 'high' if abs(z_score) > 4 else 'medium',
'z_score': z_score,
'value': recent_rt[i]
})
# スループットの異常
throughputs = [m['throughput'] for m in recent_metrics]
tp_mean = np.mean(throughputs)
recent_tp = np.mean(throughputs[-10:])
if tp_mean > 0 and recent_tp < tp_mean * 0.5: # 50%以上の低下
anomalies['detected_anomalies'].append({
'type': 'throughput_drop',
'severity': 'high',
'expected': tp_mean,
'actual': recent_tp,
'drop_percentage': (tp_mean - recent_tp) / tp_mean * 100
})
# エラー率の急上昇
error_rates = [m['error_rate'] for m in recent_metrics]
recent_error_rate = np.mean(error_rates[-10:])
baseline_error_rate = np.mean(error_rates[:-10])
if recent_error_rate > baseline_error_rate * 2 and recent_error_rate > 0.01:
anomalies['detected_anomalies'].append({
'type': 'error_rate_spike',
'severity': 'critical',
'baseline': baseline_error_rate,
'current': recent_error_rate,
'increase_factor': recent_error_rate / max(baseline_error_rate, 0.001)
})
# 異常スコアの計算
anomalies['anomaly_score'] = len(anomalies['detected_anomalies']) / 10.0
return anomalies
def export_metrics(self, filepath: str, format: str = 'json'):
"""メトリクスのエクスポート"""
if format == 'json':
data = {
'export_timestamp': datetime.now().isoformat(),
'total_data_points': len(self.metrics_history),
'metrics': list(self.metrics_history)
}
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
elif format == 'csv':
import pandas as pd
df = pd.DataFrame(list(self.metrics_history))
df.to_csv(filepath, index=False)
print(f"メトリクスを {filepath} にエクスポートしました")
# 使用例とシミュレーション
def simulate_ai_workload():
"""AI ワークロードのシミュレーション"""
monitor = PerformanceMonitor()
# アラートコールバックの設定
def alert_handler(alert):
print(f"📧 アラート通知: {alert['metric']} が {alert['level']} 閾値を超えました")
monitor.add_alert_callback(alert_handler)
monitor.start_monitoring(collection_interval=0.5)
print("AI ワークロードシミュレーション開始...")
# 正常な負荷のシミュレーション
for i in range(50):
request_id = monitor.start_request()
# 推論処理のシミュレーション
processing_time = np.random.normal(0.5, 0.1) # 平均500ms
time.sleep(max(0.1, processing_time))
# 成功率95%
success = np.random.random() > 0.05
monitor.record_request(processing_time, success)
monitor.end_request(request_id, success)
if i % 10 == 0:
summary = monitor.get_performance_summary(time_window_minutes=5)
print(f"Step {i}: 平均応答時間={summary['avg_response_time']:.3f}s, "
f"スループット={summary['avg_throughput']:.1f} req/s, "
f"グレード={summary['performance_grade']}")
print("\n高負荷状況のシミュレーション...")
# 高負荷状況のシミュレーション
for i in range(30):
request_id = monitor.start_request()
# 処理時間が増加
processing_time = np.random.normal(2.0, 0.5) # 平均2秒
time.sleep(max(0.1, processing_time))
# エラー率も増加
success = np.random.random() > 0.15
monitor.record_request(processing_time, success)
monitor.end_request(request_id, success)
# 異常検出
anomalies = monitor.detect_anomalies()
print(f"\n異常検出結果:")
print(f"異常スコア: {anomalies['anomaly_score']:.3f}")
for anomaly in anomalies['detected_anomalies']:
print(f" {anomaly['type']}: {anomaly['severity']} (詳細: {anomaly})")
# 最終サマリー
final_summary = monitor.get_performance_summary(time_window_minutes=10)
print(f"\n最終パフォーマンスサマリー:")
print(f" 総リクエスト数: {final_summary['total_requests_in_window']}")
print(f" 平均応答時間: {final_summary['avg_response_time']:.3f}s")
print(f" P95応答時間: {final_summary['p95_response_time']:.3f}s")
print(f" 平均スループット: {final_summary['avg_throughput']:.1f} req/s")
print(f" 平均エラー率: {final_summary['avg_error_rate']:.3f}")
print(f" パフォーマンスグレード: {final_summary['performance_grade']}")
# メトリクスのエクスポート
monitor.export_metrics('performance_metrics.json')
monitor.stop_monitoring()
# シミュレーション実行
simulate_ai_workload()
6.2 A/Bテスト統合Sanity Check
AI システムの異なるバージョンやアルゴリズムを比較するためのA/Bテスト統合システム:
import uuid
from typing import Dict, List, Optional, Tuple, Any
import numpy as np
from scipy import stats
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
@dataclass
class ExperimentConfig:
"""実験設定のデータクラス"""
experiment_id: str
name: str
description: str
traffic_split: Dict[str, float] # variant_name -> traffic_percentage
success_metrics: List[str]
minimum_sample_size: int
statistical_significance_threshold: float = 0.05
minimum_effect_size: float = 0.05
max_duration_days: int = 30
start_date: datetime = field(default_factory=datetime.now)
end_date: Optional[datetime] = None
@dataclass
class ExperimentResult:
"""実験結果のデータクラス"""
variant: str
metric_name: str
value: float
sample_size: int
timestamp: datetime
metadata: Dict[str, Any] = field(default_factory=dict)
class ABTestManager:
def __init__(self):
self.experiments = {}
self.results = {}
self.user_assignments = {} # user_id -> {experiment_id -> variant}
def create_experiment(self, config: ExperimentConfig) -> str:
"""A/Bテスト実験の作成"""
# トラフィック分割の検証
total_traffic = sum(config.traffic_split.values())
if not np.isclose(total_traffic, 1.0, atol=1e-6):
raise ValueError(f"トラフィック分割の合計が1.0になりません: {total_traffic}")
# 実験の登録
self.experiments[config.experiment_id] = config
self.results[config.experiment_id] = {}
for variant in config.traffic_split.keys():
self.results[config.experiment_id][variant] = []
print(f"実験 '{config.name}' (ID: {config.experiment_id}) を作成しました")
print(f"バリアント分割: {config.traffic_split}")
return config.experiment_id
def assign_user_to_variant(self, experiment_id: str, user_id: str) -> str:
"""ユーザーをバリアントに割り当て"""
if experiment_id not in self.experiments:
raise ValueError(f"実験 {experiment_id} が見つかりません")
# 既存の割り当てがあるかチェック
if (user_id in self.user_assignments and
experiment_id in self.user_assignments[user_id]):
return self.user_assignments[user_id][experiment_id]
# 新しい割り当て
config = self.experiments[experiment_id]
# ハッシュベースの安定した割り当て
hash_input = f"{experiment_id}_{user_id}"
hash_value = hash(hash_input) % 10000 / 10000.0
cumulative_prob = 0
for variant, probability in config.traffic_split.items():
cumulative_prob += probability
if hash_value <= cumulative_prob:
# 割り当ての記録
if user_id not in self.user_assignments:
self.user_assignments[user_id] = {}
self.user_assignments[user_id][experiment_id] = variant
return variant
# フォールバック(理論的には到達しない)
return list(config.traffic_split.keys())[0]
def record_result(self, experiment_id: str, user_id: str,
metric_name: str, value: float,
metadata: Optional[Dict] = None):
"""実験結果の記録"""
if experiment_id not in self.experiments:
raise ValueError(f"実験 {experiment_id} が見つかりません")
variant = self.assign_user_to_variant(experiment_id, user_id)
result = ExperimentResult(
variant=variant,
metric_name=metric_name,
value=value,
sample_size=1,
timestamp=datetime.now(),
metadata=metadata or {}
)
self.results[experiment_id][variant].append(result)
def analyze_experiment(self, experiment_id: str) -> Dict:
"""実験の統計的分析"""
if experiment_id not in self.experiments:
raise ValueError(f"実験 {experiment_id} が見つかりません")
config = self.experiments[experiment_id]
experiment_results = self.results[experiment_id]
analysis = {
'experiment_id': experiment_id,
'experiment_name': config.name,
'analysis_timestamp': datetime.now().isoformat(),
'variants': list(config.traffic_split.keys()),
'metrics_analysis': {},
'overall_conclusions': {},
'recommendations': []
}
# メトリクス別の分析
for metric_name in config.success_metrics:
metric_analysis = self._analyze_metric(
experiment_id, metric_name, experiment_results, config
)
analysis['metrics_analysis'][metric_name] = metric_analysis
# 全体的な結論
analysis['overall_conclusions'] = self._draw_conclusions(
analysis['metrics_analysis'], config
)
# 推奨事項の生成
analysis['recommendations'] = self._generate_recommendations(
analysis['overall_conclusions'], config
)
return analysis
def _analyze_metric(self, experiment_id: str, metric_name: str,
experiment_results: Dict, config: ExperimentConfig) -> Dict:
"""特定メトリクスの統計分析"""
metric_data = {}
# バリアント別のデータ収集
for variant in config.traffic_split.keys():
variant_results = [
r for r in experiment_results[variant]
if r.metric_name == metric_name
]
if variant_results:
values = [r.value for r in variant_results]
metric_data[variant] = {
'sample_size': len(values),
'mean': np.mean(values),
'std': np.std(values),
'median': np.median(values),
'min': np.min(values),