AI開発におけるSanity Check完全ガイド – 品質保証と信頼性確保の技術的実践

序論

AI開発の現場において、「Sanity Check」(妥当性検証)は、システムの基本的な動作確認と品質保証を担う極めて重要な概念です。本記事では、元Google BrainのAIリサーチャーとして、現役AIスタートアップCTOの立場から、AI開発における包括的なSanity Checkの理論と実践について詳細に解説します。

Sanity Checkは、単なる動作確認を超えて、AIモデルの出力の合理性、学習プロセスの健全性、そしてデプロイメント前の最終確認まで、AI開発ライフサイクル全体において多層的に適用される品質保証手法です。これらの検証プロセスを適切に実装することで、本番環境でのクリティカルな障害を未然に防ぎ、AIシステムの信頼性を大幅に向上させることが可能となります。

なぜSanity Checkが重要なのか

AI開発において、従来のソフトウェア開発とは異なる複雑性が存在します。確率的な出力、非決定性、膨大なパラメータ空間、そして学習データに依存する振る舞いなど、これらの特性により、AIシステムの動作予測は極めて困難です。Sanity Checkは、このような不確実性の中で、システムが期待される範囲内で動作することを確認する重要な役割を果たします。

第1章:Sanity Checkの理論的基盤

1.1 定義と概念

Sanity Check(妥当性検証)は、システムやアルゴリズムが基本的な期待値や常識的な範囲内で動作することを確認する検証プロセスです。AI開発文脈において、これは以下の要素を包含します:

  • 出力の合理性検証: モデルの予測結果が現実的な範囲内にあるかの確認
  • 学習プロセスの健全性: 訓練中の損失関数の推移や勾配の正常性
  • データの整合性: 入力データの品質と一貫性の検証
  • パフォーマンスの妥当性: 期待される精度レンジでの動作確認

1.2 理論的フレームワーク

AI開発におけるSanity Checkは、統計学習理論における汎化誤差の概念と密接に関連しています。Vapnik-Chervonenkis理論に基づく汎化境界において、以下の不等式が成り立ちます:

R(f) ≤ R_emp(f) + √((d log(2m/d) + log(4/δ)) / 2m)

ここで、R(f)は真の誤差、R_emp(f)は経験誤差、dはVC次元、mはサンプル数、δは信頼度パラメータです。Sanity Checkは、この理論的枠組みの中で、実際の性能が理論的予測と整合するかを検証する役割を担います。

1.3 分類体系

AI開発におけるSanity Checkは、実装レベルに応じて以下のように分類できます:

分類対象検証内容実装タイミング
データレベル入力データ統計的特性、欠損値、外れ値前処理段階
モデルレベルアーキテクチャ勾配フロー、パラメータ初期化訓練開始前
学習レベル訓練プロセス損失収束、過学習検出訓練中
出力レベル予測結果分布、範囲、一貫性推論時
システムレベル統合環境レイテンシ、スループットデプロイ前

第2章:データレベルSanity Check

2.1 統計的特性の検証

データの統計的特性の検証は、AI開発における最初のSanity Checkポイントです。以下のコードは、包括的なデータ検証を実装する例です:

import pandas as pd
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple

class DataSanityChecker:
    def __init__(self, data: pd.DataFrame):
        self.data = data
        self.report = {}
    
    def check_basic_statistics(self) -> Dict:
        """基本統計量の検証"""
        stats_report = {}
        
        for column in self.data.select_dtypes(include=[np.number]).columns:
            col_stats = {
                'mean': self.data[column].mean(),
                'std': self.data[column].std(),
                'min': self.data[column].min(),
                'max': self.data[column].max(),
                'skewness': stats.skew(self.data[column].dropna()),
                'kurtosis': stats.kurtosis(self.data[column].dropna())
            }
            
            # 異常値検出(IQR法)
            Q1 = self.data[column].quantile(0.25)
            Q3 = self.data[column].quantile(0.75)
            IQR = Q3 - Q1
            outliers = self.data[
                (self.data[column] < Q1 - 1.5 * IQR) | 
                (self.data[column] > Q3 + 1.5 * IQR)
            ][column]
            
            col_stats['outlier_count'] = len(outliers)
            col_stats['outlier_percentage'] = len(outliers) / len(self.data) * 100
            
            stats_report[column] = col_stats
            
        return stats_report
    
    def check_data_drift(self, reference_data: pd.DataFrame) -> Dict:
        """データドリフトの検証"""
        drift_report = {}
        
        for column in self.data.select_dtypes(include=[np.number]).columns:
            if column in reference_data.columns:
                # Kolmogorov-Smirnov検定
                ks_statistic, p_value = stats.ks_2samp(
                    reference_data[column].dropna(),
                    self.data[column].dropna()
                )
                
                drift_report[column] = {
                    'ks_statistic': ks_statistic,
                    'p_value': p_value,
                    'significant_drift': p_value < 0.05
                }
                
        return drift_report
    
    def validate_feature_correlations(self, 
                                    expected_correlations: Dict[Tuple[str, str], float],
                                    tolerance: float = 0.1) -> Dict:
        """特徴量間相関の検証"""
        correlation_matrix = self.data.corr()
        validation_results = {}
        
        for (feature1, feature2), expected_corr in expected_correlations.items():
            if feature1 in correlation_matrix.index and feature2 in correlation_matrix.columns:
                actual_corr = correlation_matrix.loc[feature1, feature2]
                is_valid = abs(actual_corr - expected_corr) <= tolerance
                
                validation_results[(feature1, feature2)] = {
                    'expected': expected_corr,
                    'actual': actual_corr,
                    'difference': abs(actual_corr - expected_corr),
                    'is_valid': is_valid
                }
                
        return validation_results

# 使用例
data = pd.read_csv('training_data.csv')
checker = DataSanityChecker(data)

# 基本統計量チェック
basic_stats = checker.check_basic_statistics()
print("基本統計量検証結果:")
for feature, stats in basic_stats.items():
    print(f"{feature}: 平均={stats['mean']:.2f}, 標準偏差={stats['std']:.2f}, 外れ値={stats['outlier_count']}個")

実行結果例:

基本統計量検証結果:
age: 平均=35.42, 標準偏差=12.18, 外れ値=23個
income: 平均=65432.11, 標準偏差=25891.33, 外れ値=45個
score: 平均=0.73, 標準偏差=0.19, 外れ値=12個

2.2 データ品質メトリクス

データ品質の定量的評価には、以下のメトリクスが有効です:

メトリクス計算式許容範囲用途
完全性(総レコード数 – 欠損レコード数) / 総レコード数> 0.95欠損データの影響評価
一意性一意レコード数 / 総レコード数> 0.98重複データの検出
一貫性制約違反レコード数 / 総レコード数< 0.02データ整合性の確認
適時性(現在時刻 – データ生成時刻) / 期待更新間隔< 1.2データの鮮度評価

2.3 分布適合性テスト

データの分布が期待される理論分布と適合するかを検証することは、モデルの前提条件を満たすために重要です:

from scipy.stats import normaltest, jarque_bera, anderson
import warnings

class DistributionTester:
    def __init__(self, data: np.ndarray):
        self.data = data
    
    def test_normality(self) -> Dict:
        """正規性の検定"""
        results = {}
        
        # Shapiro-Wilk検定(サンプル数が少ない場合)
        if len(self.data) <= 5000:
            from scipy.stats import shapiro
            statistic, p_value = shapiro(self.data)
            results['shapiro_wilk'] = {
                'statistic': statistic,
                'p_value': p_value,
                'is_normal': p_value > 0.05
            }
        
        # D'Agostino-Pearson検定
        statistic, p_value = normaltest(self.data)
        results['dagostino_pearson'] = {
            'statistic': statistic,
            'p_value': p_value,
            'is_normal': p_value > 0.05
        }
        
        # Jarque-Bera検定
        statistic, p_value = jarque_bera(self.data)
        results['jarque_bera'] = {
            'statistic': statistic,
            'p_value': p_value,
            'is_normal': p_value > 0.05
        }
        
        # Anderson-Darling検定
        result = anderson(self.data, dist='norm')
        results['anderson_darling'] = {
            'statistic': result.statistic,
            'critical_values': result.critical_values,
            'significance_levels': result.significance_levels,
            'is_normal': result.statistic < result.critical_values[2]  # 5%水準
        }
        
        return results
    
    def estimate_distribution_parameters(self) -> Dict:
        """最適分布パラメータの推定"""
        from scipy.stats import norm, lognorm, gamma, beta
        
        distributions = {
            'normal': norm,
            'lognormal': lognorm,
            'gamma': gamma,
            'beta': beta
        }
        
        results = {}
        
        for name, dist in distributions.items():
            try:
                params = dist.fit(self.data)
                # Kolmogorov-Smirnov適合度検定
                ks_stat, p_value = stats.kstest(self.data, 
                                              lambda x: dist.cdf(x, *params))
                
                results[name] = {
                    'parameters': params,
                    'ks_statistic': ks_stat,
                    'p_value': p_value,
                    'fits_well': p_value > 0.05
                }
            except Exception as e:
                results[name] = {'error': str(e)}
                
        return results

# 使用例
feature_data = data['score'].dropna().values
tester = DistributionTester(feature_data)

normality_results = tester.test_normality()
distribution_fits = tester.estimate_distribution_parameters()

print("正規性検定結果:")
for test_name, result in normality_results.items():
    if 'p_value' in result:
        print(f"{test_name}: p値={result['p_value']:.4f}, 正規分布={result['is_normal']}")

第3章:モデルレベルSanity Check

3.1 アーキテクチャ検証

ニューラルネットワークのアーキテクチャが期待通りに構築されているかを検証することは、学習開始前の重要なSanity Checkです:

import torch
import torch.nn as nn
from typing import List, Tuple
import numpy as np

class ModelSanityChecker:
    def __init__(self, model: nn.Module):
        self.model = model
        
    def check_parameter_initialization(self) -> Dict:
        """パラメータ初期化の検証"""
        init_stats = {}
        
        for name, param in self.model.named_parameters():
            if param.requires_grad:
                param_data = param.data.cpu().numpy()
                
                stats = {
                    'shape': param_data.shape,
                    'mean': np.mean(param_data),
                    'std': np.std(param_data),
                    'min': np.min(param_data),
                    'max': np.max(param_data),
                    'zero_percentage': np.mean(param_data == 0) * 100
                }
                
                # 初期化の妥当性チェック
                if 'weight' in name:
                    # Xavier/Glorot初期化の確認
                    fan_in = param_data.shape[1] if len(param_data.shape) > 1 else 1
                    fan_out = param_data.shape[0]
                    expected_std = np.sqrt(2.0 / (fan_in + fan_out))
                    
                    stats['expected_std_xavier'] = expected_std
                    stats['std_ratio'] = stats['std'] / expected_std
                    stats['xavier_compliant'] = 0.8 <= stats['std_ratio'] <= 1.2
                
                elif 'bias' in name:
                    # バイアスは通常ゼロ初期化
                    stats['bias_zero_initialized'] = np.allclose(param_data, 0, atol=1e-6)
                
                init_stats[name] = stats
                
        return init_stats
    
    def check_gradient_flow(self, dummy_input: torch.Tensor, 
                          dummy_target: torch.Tensor, 
                          loss_fn: nn.Module) -> Dict:
        """勾配フローの検証"""
        self.model.train()
        
        # フォワードパス
        output = self.model(dummy_input)
        loss = loss_fn(output, dummy_target)
        
        # バックワードパス
        loss.backward()
        
        gradient_stats = {}
        
        for name, param in self.model.named_parameters():
            if param.grad is not None:
                grad_data = param.grad.data.cpu().numpy()
                
                stats = {
                    'mean': np.mean(grad_data),
                    'std': np.std(grad_data),
                    'min': np.min(grad_data),
                    'max': np.max(grad_data),
                    'norm': np.linalg.norm(grad_data),
                    'zero_percentage': np.mean(grad_data == 0) * 100
                }
                
                # 勾配の健全性チェック
                stats['has_gradient'] = not np.allclose(grad_data, 0)
                stats['gradient_explosion'] = np.any(np.abs(grad_data) > 10.0)
                stats['gradient_vanishing'] = np.all(np.abs(grad_data) < 1e-6)
                
                gradient_stats[name] = stats
            else:
                gradient_stats[name] = {'error': 'No gradient computed'}
                
        return gradient_stats
    
    def check_model_capacity(self, input_shape: Tuple) -> Dict:
        """モデル容量の検証"""
        total_params = sum(p.numel() for p in self.model.parameters())
        trainable_params = sum(p.numel() for p in self.model.parameters() 
                             if p.requires_grad)
        
        # 理論的メモリ使用量の計算
        input_size = np.prod(input_shape)
        model_size_mb = (total_params * 4) / (1024 ** 2)  # float32仮定
        
        capacity_info = {
            'total_parameters': total_params,
            'trainable_parameters': trainable_params,
            'model_size_mb': model_size_mb,
            'parameters_per_input_dim': total_params / input_size,
            'capacity_utilization': trainable_params / total_params
        }
        
        # 容量の妥当性評価
        capacity_info['likely_overparameterized'] = (
            capacity_info['parameters_per_input_dim'] > 100
        )
        capacity_info['likely_underparameterized'] = (
            capacity_info['parameters_per_input_dim'] < 1
        )
        
        return capacity_info

# 使用例
class SimpleNN(nn.Module):
    def __init__(self, input_dim: int, hidden_dim: int, output_dim: int):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)
        )
    
    def forward(self, x):
        return self.layers(x)

# モデル初期化
model = SimpleNN(input_dim=784, hidden_dim=256, output_dim=10)
checker = ModelSanityChecker(model)

# パラメータ初期化チェック
init_results = checker.check_parameter_initialization()
print("パラメータ初期化検証:")
for layer_name, stats in init_results.items():
    if 'xavier_compliant' in stats:
        print(f"{layer_name}: 標準偏差比={stats['std_ratio']:.3f}, Xavier準拠={stats['xavier_compliant']}")

# 勾配フローチェック
dummy_input = torch.randn(32, 784)
dummy_target = torch.randint(0, 10, (32,))
loss_fn = nn.CrossEntropyLoss()

gradient_results = checker.check_gradient_flow(dummy_input, dummy_target, loss_fn)
print("\n勾配フロー検証:")
for layer_name, stats in gradient_results.items():
    if 'has_gradient' in stats:
        print(f"{layer_name}: 勾配有={stats['has_gradient']}, ノルム={stats['norm']:.6f}")

実行結果例:

パラメータ初期化検証:
layers.0.weight: 標準偏差比=0.987, Xavier準拠=True
layers.2.weight: 標準偏差比=1.034, Xavier準拠=True
layers.4.weight: 標準偏差比=0.956, Xavier準拠=True

勾配フロー検証:
layers.0.weight: 勾配有=True, ノルム=0.023451
layers.2.weight: 勾配有=True, ノルム=0.018732
layers.4.weight: 勾配有=True, ノルム=0.041289

3.2 出力範囲検証

モデルの出力が期待される範囲内にあることを確認することは、特に回帰タスクや確率出力において重要です:

class OutputSanityChecker:
    def __init__(self, model: nn.Module, task_type: str = 'classification'):
        self.model = model
        self.task_type = task_type
        
    def check_output_range(self, test_inputs: List[torch.Tensor]) -> Dict:
        """出力範囲の検証"""
        self.model.eval()
        all_outputs = []
        
        with torch.no_grad():
            for batch in test_inputs:
                outputs = self.model(batch)
                all_outputs.append(outputs.cpu().numpy())
        
        all_outputs = np.concatenate(all_outputs, axis=0)
        
        range_stats = {
            'min': np.min(all_outputs),
            'max': np.max(all_outputs),
            'mean': np.mean(all_outputs),
            'std': np.std(all_outputs),
            'shape': all_outputs.shape
        }
        
        # タスク固有の検証
        if self.task_type == 'classification':
            # ソフトマックス出力の場合
            if all_outputs.shape[1] > 1:  # 多クラス分類
                sum_check = np.allclose(np.sum(all_outputs, axis=1), 1.0, atol=1e-4)
                range_stats['probabilities_sum_to_one'] = sum_check
                range_stats['all_non_negative'] = np.all(all_outputs >= 0)
                range_stats['all_less_than_one'] = np.all(all_outputs <= 1)
                
        elif self.task_type == 'regression':
            # 回帰の場合、異常な範囲の出力を検出
            q1, q3 = np.percentile(all_outputs, [25, 75])
            iqr = q3 - q1
            outlier_threshold_low = q1 - 3 * iqr
            outlier_threshold_high = q3 + 3 * iqr
            
            outliers = (all_outputs < outlier_threshold_low) | (all_outputs > outlier_threshold_high)
            range_stats['outlier_percentage'] = np.mean(outliers) * 100
            range_stats['extreme_outliers_detected'] = range_stats['outlier_percentage'] > 5.0
            
        return range_stats
    
    def check_prediction_consistency(self, test_input: torch.Tensor, 
                                   num_runs: int = 10) -> Dict:
        """予測一貫性の検証(確率的モデル用)"""
        self.model.eval()
        predictions = []
        
        for _ in range(num_runs):
            with torch.no_grad():
                output = self.model(test_input)
                predictions.append(output.cpu().numpy())
        
        predictions = np.array(predictions)
        
        consistency_stats = {
            'mean_prediction': np.mean(predictions, axis=0),
            'std_prediction': np.std(predictions, axis=0),
            'max_std': np.max(np.std(predictions, axis=0)),
            'min_std': np.min(np.std(predictions, axis=0))
        }
        
        # 一貫性の評価
        consistency_stats['highly_consistent'] = consistency_stats['max_std'] < 0.01
        consistency_stats['moderately_consistent'] = consistency_stats['max_std'] < 0.1
        consistency_stats['inconsistent'] = consistency_stats['max_std'] > 0.1
        
        return consistency_stats

# 使用例
output_checker = OutputSanityChecker(model, task_type='classification')

# テストデータでの出力範囲チェック
test_batches = [torch.randn(16, 784) for _ in range(5)]
range_results = output_checker.check_output_range(test_batches)

print("出力範囲検証結果:")
print(f"最小値: {range_results['min']:.4f}")
print(f"最大値: {range_results['max']:.4f}")
if 'probabilities_sum_to_one' in range_results:
    print(f"確率和=1: {range_results['probabilities_sum_to_one']}")

3.3 学習曲線の異常検出

学習プロセスにおける損失関数の推移を監視し、異常なパターンを検出することは重要なSanity Checkです:

import matplotlib.pyplot as plt
from collections import deque
from typing import Optional

class LearningCurveSanityChecker:
    def __init__(self, window_size: int = 100):
        self.window_size = window_size
        self.train_losses = deque(maxlen=window_size)
        self.val_losses = deque(maxlen=window_size)
        self.learning_rates = deque(maxlen=window_size)
        
    def add_metrics(self, train_loss: float, val_loss: Optional[float] = None, 
                   lr: Optional[float] = None):
        """メトリクスの追加"""
        self.train_losses.append(train_loss)
        if val_loss is not None:
            self.val_losses.append(val_loss)
        if lr is not None:
            self.learning_rates.append(lr)
    
    def detect_anomalies(self) -> Dict:
        """学習曲線の異常検出"""
        if len(self.train_losses) < 10:
            return {'error': 'Insufficient data points'}
        
        train_losses_array = np.array(self.train_losses)
        anomalies = {}
        
        # 1. 損失爆発の検出
        recent_losses = train_losses_array[-10:]
        loss_explosion = np.any(recent_losses > 10 * np.median(train_losses_array[:-10]))
        anomalies['loss_explosion'] = loss_explosion
        
        # 2. 学習停滞の検出
        if len(train_losses_array) >= 50:
            recent_50 = train_losses_array[-50:]
            early_50 = train_losses_array[-100:-50] if len(train_losses_array) >= 100 else train_losses_array[:-50]
            
            improvement_rate = (np.mean(early_50) - np.mean(recent_50)) / np.mean(early_50)
            anomalies['learning_stagnation'] = improvement_rate < 0.01
            
        # 3. 振動パターンの検出
        if len(train_losses_array) >= 20:
            recent_losses = train_losses_array[-20:]
            volatility = np.std(recent_losses) / np.mean(recent_losses)
            anomalies['high_volatility'] = volatility > 0.5
            
        # 4. 過学習の検出
        if len(self.val_losses) >= 10:
            val_losses_array = np.array(self.val_losses)
            train_val_gap = np.mean(val_losses_array[-10:]) - np.mean(train_losses_array[-10:])
            anomalies['potential_overfitting'] = train_val_gap > 0.5
            
        # 5. 学習率の適切性
        if len(self.learning_rates) >= 10:
            lr_array = np.array(self.learning_rates)
            recent_lr = np.mean(lr_array[-10:])
            
            # 損失改善率と学習率の関係
            if len(train_losses_array) >= 20:
                loss_improvement = (train_losses_array[-20] - train_losses_array[-1]) / train_losses_array[-20]
                anomalies['lr_too_high'] = recent_lr > 0.1 and loss_improvement < 0.01
                anomalies['lr_too_low'] = recent_lr < 1e-6 and loss_improvement < 0.001
        
        return anomalies
    
    def get_recommendations(self, anomalies: Dict) -> List[str]:
        """異常に基づく推奨事項"""
        recommendations = []
        
        if anomalies.get('loss_explosion', False):
            recommendations.append("学習率を1/10に下げることを推奨")
            recommendations.append("勾配クリッピングの導入を検討")
            
        if anomalies.get('learning_stagnation', False):
            recommendations.append("学習率スケジューリングの見直し")
            recommendations.append("オプティマイザーの変更を検討(AdamからSGDなど)")
            
        if anomalies.get('high_volatility', False):
            recommendations.append("バッチサイズの増加を検討")
            recommendations.append("学習率の減少を推奨")
            
        if anomalies.get('potential_overfitting', False):
            recommendations.append("正則化手法の強化(Dropout、Weight Decay)")
            recommendations.append("Early Stoppingの導入")
            recommendations.append("データ拡張の検討")
            
        if anomalies.get('lr_too_high', False):
            recommendations.append("学習率を現在の1/5-1/10に削減")
            
        if anomalies.get('lr_too_low', False):
            recommendations.append("学習率を現在の5-10倍に増加")
            
        return recommendations
    
    def plot_learning_curves(self, save_path: Optional[str] = None):
        """学習曲線の可視化"""
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        
        # 訓練損失
        axes[0, 0].plot(list(self.train_losses))
        axes[0, 0].set_title('Training Loss')
        axes[0, 0].set_xlabel('Epoch')
        axes[0, 0].set_ylabel('Loss')
        
        # 検証損失(利用可能な場合)
        if self.val_losses:
            axes[0, 1].plot(list(self.val_losses), label='Validation')
            axes[0, 1].plot(list(self.train_losses), label='Training')
            axes[0, 1].set_title('Training vs Validation Loss')
            axes[0, 1].set_xlabel('Epoch')
            axes[0, 1].set_ylabel('Loss')
            axes[0, 1].legend()
        
        # 学習率
        if self.learning_rates:
            axes[1, 0].plot(list(self.learning_rates))
            axes[1, 0].set_title('Learning Rate')
            axes[1, 0].set_xlabel('Epoch')
            axes[1, 0].set_ylabel('Learning Rate')
            axes[1, 0].set_yscale('log')
        
        # 損失の移動平均
        if len(self.train_losses) > 10:
            window = min(10, len(self.train_losses) // 4)
            train_losses_array = np.array(self.train_losses)
            moving_avg = np.convolve(train_losses_array, np.ones(window)/window, mode='valid')
            axes[1, 1].plot(moving_avg)
            axes[1, 1].set_title(f'Training Loss (Moving Average, window={window})')
            axes[1, 1].set_xlabel('Epoch')
            axes[1, 1].set_ylabel('Loss')
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        
        return fig

# 使用例(訓練ループ内での統合)
curve_checker = LearningCurveSanityChecker(window_size=200)

# 訓練ループのシミュレーション
for epoch in range(100):
    # 模擬的な訓練プロセス
    train_loss = 2.5 * np.exp(-epoch * 0.05) + 0.1 * np.random.random()
    val_loss = train_loss + 0.2 + 0.1 * np.random.random()
    lr = 0.001 * (0.95 ** (epoch // 10))
    
    curve_checker.add_metrics(train_loss, val_loss, lr)
    
    # 定期的な異常検出
    if epoch % 20 == 0 and epoch > 0:
        anomalies = curve_checker.detect_anomalies()
        recommendations = curve_checker.get_recommendations(anomalies)
        
        print(f"Epoch {epoch} - 異常検出結果:")
        for anomaly, detected in anomalies.items():
            if detected:
                print(f"  ⚠️ {anomaly}: 検出")
        
        if recommendations:
            print("推奨事項:")
            for rec in recommendations:
                print(f"  • {rec}")
        print()

第4章:学習プロセスSanity Check

4.1 勾配監視システム

学習中の勾配の状態を継続的に監視することで、学習プロセスの健全性を確保できます:

import torch
import numpy as np
from typing import Dict, List, Optional
from collections import defaultdict
import logging

class GradientMonitor:
    def __init__(self, model: nn.Module, log_interval: int = 100):
        self.model = model
        self.log_interval = log_interval
        self.gradient_history = defaultdict(list)
        self.step_count = 0
        
        # ログ設定
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
    def monitor_gradients(self) -> Dict:
        """勾配の監視と分析"""
        gradient_stats = {}
        
        for name, param in self.model.named_parameters():
            if param.grad is not None:
                grad = param.grad.data
                
                # 勾配統計の計算
                grad_norm = torch.norm(grad).item()
                grad_mean = torch.mean(grad).item()
                grad_std = torch.std(grad).item()
                grad_max = torch.max(torch.abs(grad)).item()
                
                stats = {
                    'norm': grad_norm,
                    'mean': grad_mean,
                    'std': grad_std,
                    'max_abs': grad_max,
                    'shape': grad.shape
                }
                
                # 異常検出
                stats['vanishing'] = grad_norm < 1e-7
                stats['exploding'] = grad_norm > 100.0
                stats['dead_neurons'] = torch.sum(torch.abs(grad) < 1e-8).item() / grad.numel()
                
                gradient_stats[name] = stats
                
                # 履歴の保存
                self.gradient_history[name].append({
                    'step': self.step_count,
                    'norm': grad_norm,
                    'mean': grad_mean,
                    'std': grad_std
                })
                
                # 履歴サイズの制限
                if len(self.gradient_history[name]) > 1000:
                    self.gradient_history[name] = self.gradient_history[name][-500:]
        
        self.step_count += 1
        
        # 定期的なログ出力
        if self.step_count % self.log_interval == 0:
            self._log_gradient_health(gradient_stats)
        
        return gradient_stats
    
    def _log_gradient_health(self, gradient_stats: Dict):
        """勾配の健全性をログ出力"""
        issues = []
        
        for name, stats in gradient_stats.items():
            if stats['vanishing']:
                issues.append(f"勾配消失検出: {name} (norm={stats['norm']:.2e})")
            if stats['exploding']:
                issues.append(f"勾配爆発検出: {name} (norm={stats['norm']:.2e})")
            if stats['dead_neurons'] > 0.5:
                issues.append(f"死んだニューロン検出: {name} ({stats['dead_neurons']*100:.1f}%)")
        
        if issues:
            self.logger.warning(f"Step {self.step_count} - 勾配の問題:")
            for issue in issues:
                self.logger.warning(f"  {issue}")
        else:
            self.logger.info(f"Step {self.step_count} - 勾配状態: 正常")
    
    def analyze_gradient_trends(self, layer_name: str, 
                              window_size: int = 50) -> Dict:
        """勾配トレンドの分析"""
        if layer_name not in self.gradient_history:
            return {'error': f'No history for layer {layer_name}'}
        
        history = self.gradient_history[layer_name]
        if len(history) < window_size:
            return {'error': 'Insufficient history for trend analysis'}
        
        recent_data = history[-window_size:]
        older_data = history[-2*window_size:-window_size] if len(history) >= 2*window_size else history[:-window_size]
        
        # トレンド分析
        recent_norms = [h['norm'] for h in recent_data]
        older_norms = [h['norm'] for h in older_data]
        
        trend_analysis = {
            'recent_mean_norm': np.mean(recent_norms),
            'older_mean_norm': np.mean(older_norms),
            'norm_trend': 'increasing' if np.mean(recent_norms) > np.mean(older_norms) else 'decreasing',
            'norm_stability': np.std(recent_norms) / np.mean(recent_norms) if np.mean(recent_norms) > 0 else float('inf'),
            'concerning_trend': False
        }
        
        # 懸念すべきトレンドの検出
        norm_ratio = trend_analysis['recent_mean_norm'] / trend_analysis['older_mean_norm'] if trend_analysis['older_mean_norm'] > 0 else float('inf')
        
        if norm_ratio > 2.0:  # 勾配ノルムが2倍以上増加
            trend_analysis['concerning_trend'] = True
            trend_analysis['concern_type'] = 'gradient_increasing'
        elif norm_ratio < 0.5:  # 勾配ノルムが半分以下に減少
            trend_analysis['concerning_trend'] = True
            trend_analysis['concern_type'] = 'gradient_decreasing'
        elif trend_analysis['norm_stability'] > 1.0:  # 勾配が不安定
            trend_analysis['concerning_trend'] = True
            trend_analysis['concern_type'] = 'gradient_unstable'
        
        return trend_analysis
    
    def get_gradient_health_report(self) -> Dict:
        """勾配健全性の総合レポート"""
        if not self.gradient_history:
            return {'error': 'No gradient history available'}
        
        report = {
            'total_steps': self.step_count,
            'monitored_layers': len(self.gradient_history),
            'layer_health': {}
        }
        
        for layer_name in self.gradient_history.keys():
            layer_report = self.analyze_gradient_trends(layer_name)
            if 'error' not in layer_report:
                report['layer_health'][layer_name] = layer_report
        
        # 全体的な健全性スコア
        concerning_layers = sum(1 for layer_health in report['layer_health'].values() 
                              if layer_health.get('concerning_trend', False))
        total_layers = len(report['layer_health'])
        
        if total_layers > 0:
            report['health_score'] = (total_layers - concerning_layers) / total_layers
            report['overall_health'] = 'good' if report['health_score'] > 0.8 else 'moderate' if report['health_score'] > 0.5 else 'poor'
        
        return report

# 使用例(訓練ループとの統合)
model = SimpleNN(input_dim=784, hidden_dim=256, output_dim=10)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
gradient_monitor = GradientMonitor(model, log_interval=50)

# 模擬訓練ループ
for epoch in range(100):
    for batch_idx in range(50):  # 50バッチ/エポック
        # フォワードパス
        inputs = torch.randn(32, 784)
        targets = torch.randint(0, 10, (32,))
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = nn.CrossEntropyLoss()(outputs, targets)
        
        # バックワードパス
        loss.backward()
        
        # 勾配監視
        grad_stats = gradient_monitor.monitor_gradients()
        
        # オプティマイザステップ
        optimizer.step()
    
    # エポック終了時のレポート
    if epoch % 20 == 0:
        health_report = gradient_monitor.get_gradient_health_report()
        print(f"\nEpoch {epoch} - 勾配健全性レポート:")
        print(f"全体健全性: {health_report.get('overall_health', 'unknown')}")
        print(f"健全性スコア: {health_report.get('health_score', 0):.3f}")

4.2 メモリ使用量監視

GPU/CPUメモリの使用状況を監視し、メモリリークやOOM(Out of Memory)エラーを防ぐためのSanity Check:

import psutil
import torch
import gc
from typing import Dict, Optional
import threading
import time

class MemoryMonitor:
    def __init__(self, device: Optional[torch.device] = None):
        self.device = device if device else torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.memory_history = []
        self.monitoring = False
        self.monitor_thread = None
        
    def get_memory_stats(self) -> Dict:
        """現在のメモリ使用状況を取得"""
        stats = {}
        
        # CPUメモリ
        cpu_memory = psutil.virtual_memory()
        stats['cpu'] = {
            'total': cpu_memory.total / (1024**3),  # GB
            'available': cpu_memory.available / (1024**3),
            'used': cpu_memory.used / (1024**3),
            'percentage': cpu_memory.percent
        }
        
        # GPUメモリ(CUDA利用可能な場合)
        if torch.cuda.is_available() and self.device.type == 'cuda':
            gpu_memory = torch.cuda.memory_stats(self.device)
            allocated = torch.cuda.memory_allocated(self.device) / (1024**3)
            reserved = torch.cuda.memory_reserved(self.device) / (1024**3)
            max_allocated = torch.cuda.max_memory_allocated(self.device) / (1024**3)
            
            stats['gpu'] = {
                'allocated': allocated,
                'reserved': reserved,
                'max_allocated': max_allocated,
                'free': reserved - allocated,
                'utilization': allocated / reserved if reserved > 0 else 0
            }
        
        return stats
    
    def start_monitoring(self, interval: float = 1.0):
        """メモリ監視を開始"""
        if self.monitoring:
            return
        
        self.monitoring = True
        self.monitor_thread = threading.Thread(
            target=self._monitor_loop, 
            args=(interval,),
            daemon=True
        )
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """メモリ監視を停止"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
    
    def _monitor_loop(self, interval: float):
        """監視ループ"""
        while self.monitoring:
            stats = self.get_memory_stats()
            stats['timestamp'] = time.time()
            self.memory_history.append(stats)
            
            # 履歴サイズの制限
            if len(self.memory_history) > 1000:
                self.memory_history = self.memory_history[-500:]
            
            # メモリ使用量チェック
            self._check_memory_health(stats)
            
            time.sleep(interval)
    
    def _check_memory_health(self, stats: Dict):
        """メモリ健全性のチェック"""
        warnings = []
        
        # CPU メモリチェック
        if stats['cpu']['percentage'] > 90:
            warnings.append(f"CPU メモリ使用率が高い: {stats['cpu']['percentage']:.1f}%")
        
        # GPU メモリチェック
        if 'gpu' in stats:
            if stats['gpu']['utilization'] > 0.9:
                warnings.append(f"GPU メモリ使用率が高い: {stats['gpu']['utilization']*100:.1f}%")
            
            if stats['gpu']['free'] < 0.5:  # 0.5GB未満
                warnings.append(f"GPU メモリ残量が少ない: {stats['gpu']['free']:.2f}GB")
        
        # メモリリークの検出
        if len(self.memory_history) >= 10:
            recent_usage = [h['cpu']['used'] for h in self.memory_history[-10:]]
            if len(set(recent_usage)) > 1:  # 使用量が変動している
                trend = np.polyfit(range(len(recent_usage)), recent_usage, 1)[0]
                if trend > 0.1:  # 100MB/step以上の増加
                    warnings.append(f"CPU メモリリークの可能性: {trend:.3f}GB/step")
        
        if warnings:
            print(f"⚠️ メモリ警告:")
            for warning in warnings:
                print(f"  {warning}")
    
    def force_garbage_collection(self) -> Dict:
        """強制的なガベージコレクション"""
        before_stats = self.get_memory_stats()
        
        # Python ガベージコレクション
        collected = gc.collect()
        
        # PyTorch キャッシュクリア
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        
        after_stats = self.get_memory_stats()
        
        return {
            'python_objects_collected': collected,
            'cpu_memory_freed': before_stats['cpu']['used'] - after_stats['cpu']['used'],
            'gpu_memory_freed': (before_stats.get('gpu', {}).get('allocated', 0) - 
                               after_stats.get('gpu', {}).get('allocated', 0)) if 'gpu' in before_stats else 0
        }
    
    def get_memory_report(self) -> Dict:
        """メモリ使用状況レポート"""
        if not self.memory_history:
            return self.get_memory_stats()
        
        report = {
            'current': self.get_memory_stats(),
            'history_length': len(self.memory_history),
            'monitoring_duration': (
                self.memory_history[-1]['timestamp'] - self.memory_history[0]['timestamp']
            ) / 3600 if len(self.memory_history) > 1 else 0  # 時間
        }
        
        # CPU メモリトレンド
        cpu_usage_history = [h['cpu']['used'] for h in self.memory_history]
        report['cpu_trend'] = {
            'min': min(cpu_usage_history),
            'max': max(cpu_usage_history),
            'mean': np.mean(cpu_usage_history),
            'std': np.std(cpu_usage_history)
        }
        
        # GPU メモリトレンド(利用可能な場合)
        if 'gpu' in self.memory_history[0]:
            gpu_usage_history = [h['gpu']['allocated'] for h in self.memory_history]
            report['gpu_trend'] = {
                'min': min(gpu_usage_history),
                'max': max(gpu_usage_history),
                'mean': np.mean(gpu_usage_history),
                'std': np.std(gpu_usage_history)
            }
        
        return report

# 使用例
memory_monitor = MemoryMonitor()
memory_monitor.start_monitoring(interval=2.0)

# 模擬的な訓練プロセス
model = SimpleNN(input_dim=784, hidden_dim=512, output_dim=10)
if torch.cuda.is_available():
    model = model.cuda()

optimizer = torch.optim.Adam(model.parameters())

print("メモリ監視開始...")
initial_stats = memory_monitor.get_memory_stats()
print(f"初期メモリ使用量:")
print(f"  CPU: {initial_stats['cpu']['used']:.2f}GB ({initial_stats['cpu']['percentage']:.1f}%)")
if 'gpu' in initial_stats:
    print(f"  GPU: {initial_stats['gpu']['allocated']:.2f}GB")

# 訓練ループ
for epoch in range(50):
    for batch in range(20):
        inputs = torch.randn(128, 784)
        targets = torch.randint(0, 10, (128,))
        
        if torch.cuda.is_available():
            inputs, targets = inputs.cuda(), targets.cuda()
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = nn.CrossEntropyLoss()(outputs, targets)
        loss.backward()
        optimizer.step()
    
    # 定期的なメモリチェック
    if epoch % 10 == 0:
        current_stats = memory_monitor.get_memory_stats()
        print(f"\nEpoch {epoch} メモリ使用量:")
        print(f"  CPU: {current_stats['cpu']['used']:.2f}GB ({current_stats['cpu']['percentage']:.1f}%)")
        if 'gpu' in current_stats:
            print(f"  GPU: {current_stats['gpu']['allocated']:.2f}GB ({current_stats['gpu']['utilization']*100:.1f}%)")

# 最終レポート
memory_monitor.stop_monitoring()
final_report = memory_monitor.get_memory_report()
print(f"\n最終メモリレポート:")
print(f"監視時間: {final_report['monitoring_duration']:.2f}時間")
print(f"CPU使用量 - 平均: {final_report['cpu_trend']['mean']:.2f}GB, 最大: {final_report['cpu_trend']['max']:.2f}GB")

第5章:出力品質Sanity Check

5.1 予測値の妥当性検証

モデルの出力が現実的で妥当な範囲内にあることを確認する包括的な検証システム:

import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional, Callable
from scipy import stats
import warnings

class PredictionSanityChecker:
    def __init__(self, task_type: str, expected_range: Optional[Tuple[float, float]] = None):
        self.task_type = task_type  # 'classification', 'regression', 'ranking'
        self.expected_range = expected_range
        self.validation_history = []
        
    def validate_predictions(self, predictions: np.ndarray, 
                           ground_truth: Optional[np.ndarray] = None,
                           metadata: Optional[Dict] = None) -> Dict:
        """予測値の包括的な妥当性検証"""
        validation_results = {
            'timestamp': pd.Timestamp.now(),
            'prediction_stats': self._compute_prediction_statistics(predictions),
            'range_validation': self._validate_range(predictions),
            'distribution_validation': self._validate_distribution(predictions),
            'consistency_validation': self._validate_consistency(predictions),
        }
        
        if ground_truth is not None:
            validation_results['accuracy_validation'] = self._validate_accuracy(predictions, ground_truth)
        
        if metadata:
            validation_results['metadata'] = metadata
        
        # 全体的な健全性スコア
        validation_results['overall_health_score'] = self._compute_health_score(validation_results)
        
        # 履歴に追加
        self.validation_history.append(validation_results)
        
        return validation_results
    
    def _compute_prediction_statistics(self, predictions: np.ndarray) -> Dict:
        """予測値の基本統計量"""
        if predictions.ndim > 1:
            # 多次元の場合、各次元の統計量を計算
            stats = {}
            for i in range(predictions.shape[1]):
                dim_stats = {
                    'mean': np.mean(predictions[:, i]),
                    'std': np.std(predictions[:, i]),
                    'min': np.min(predictions[:, i]),
                    'max': np.max(predictions[:, i]),
                    'median': np.median(predictions[:, i]),
                    'q25': np.percentile(predictions[:, i], 25),
                    'q75': np.percentile(predictions[:, i], 75)
                }
                stats[f'dimension_{i}'] = dim_stats
            return stats
        else:
            return {
                'mean': np.mean(predictions),
                'std': np.std(predictions),
                'min': np.min(predictions),
                'max': np.max(predictions),
                'median': np.median(predictions),
                'q25': np.percentile(predictions, 25),
                'q75': np.percentile(predictions, 75),
                'unique_values': len(np.unique(predictions)),
                'nan_count': np.sum(np.isnan(predictions)),
                'inf_count': np.sum(np.isinf(predictions))
            }
    
    def _validate_range(self, predictions: np.ndarray) -> Dict:
        """範囲の妥当性検証"""
        range_validation = {}
        
        if self.task_type == 'classification':
            # 分類タスクの場合
            if predictions.ndim > 1:  # 多クラス分類(確率出力)
                # 確率の合計が1に近いかチェック
                prob_sums = np.sum(predictions, axis=1)
                range_validation['probability_sum_valid'] = np.allclose(prob_sums, 1.0, atol=1e-3)
                range_validation['all_non_negative'] = np.all(predictions >= 0)
                range_validation['all_less_than_one'] = np.all(predictions <= 1)
                range_validation['sum_deviation'] = np.mean(np.abs(prob_sums - 1.0))
            else:  # バイナリ分類
                range_validation['all_in_valid_range'] = np.all((predictions >= 0) & (predictions <= 1))
        
        elif self.task_type == 'regression':
            # 回帰タスクの場合
            if self.expected_range:
                min_val, max_val = self.expected_range
                range_validation['within_expected_range'] = np.all(
                    (predictions >= min_val) & (predictions <= max_val)
                )
                range_validation['range_violations'] = np.sum(
                    (predictions < min_val) | (predictions > max_val)
                )
            
            # 異常値の検出
            q1, q3 = np.percentile(predictions, [25, 75])
            iqr = q3 - q1
            outlier_bounds = (q1 - 3 * iqr, q3 + 3 * iqr)
            outliers = (predictions < outlier_bounds[0]) | (predictions > outlier_bounds[1])
            range_validation['outlier_percentage'] = np.mean(outliers) * 100
        
        # 数値的な問題
        range_validation['has_nan'] = np.any(np.isnan(predictions))
        range_validation['has_inf'] = np.any(np.isinf(predictions))
        range_validation['numerical_stability'] = not (range_validation['has_nan'] or range_validation['has_inf'])
        
        return range_validation
    
    def _validate_distribution(self, predictions: np.ndarray) -> Dict:
        """分布の妥当性検証"""
        distribution_validation = {}
        
        if predictions.ndim == 1:
            # 1次元の場合の分布検証
            
            # 正規性の検定
            if len(predictions) > 8 and np.std(predictions) > 1e-10:
                try:
                    _, p_value = stats.normaltest(predictions)
                    distribution_validation['normality_p_value'] = p_value
                    distribution_validation['appears_normal'] = p_value > 0.05
                except:
                    distribution_validation['normality_test_failed'] = True
            
            # 単峰性の検証(簡易版)
            hist, bin_edges = np.histogram(predictions, bins=20)
            peak_count = len([i for i in range(1, len(hist)-1) 
                            if hist[i] > hist[i-1] and hist[i] > hist[i+1]])
            distribution_validation['peak_count'] = peak_count
            distribution_validation['likely_unimodal'] = peak_count <= 2
            
            # エントロピーの計算
            hist_normalized = hist / np.sum(hist)
            entropy = -np.sum(hist_normalized * np.log(hist_normalized + 1e-10))
            distribution_validation['entropy'] = entropy
            
        else:
            # 多次元の場合
            for i in range(predictions.shape[1]):
                dim_predictions = predictions[:, i]
                if np.std(dim_predictions) > 1e-10:
                    try:
                        _, p_value = stats.normaltest(dim_predictions)
                        distribution_validation[f'dim_{i}_normality_p_value'] = p_value
                    except:
                        distribution_validation[f'dim_{i}_normality_test_failed'] = True
        
        return distribution_validation
    
    def _validate_consistency(self, predictions: np.ndarray) -> Dict:
        """一貫性の検証"""
        consistency_validation = {}
        
        # 予測値の変動係数
        if predictions.ndim == 1:
            cv = np.std(predictions) / np.abs(np.mean(predictions)) if np.mean(predictions) != 0 else float('inf')
            consistency_validation['coefficient_of_variation'] = cv
            consistency_validation['high_variability'] = cv > 2.0
        
        # 極端な値の割合
        if len(predictions) > 10:
            q1, q3 = np.percentile(predictions.flatten(), [25, 75])
            iqr = q3 - q1
            extreme_threshold = 5 * iqr
            extreme_count = np.sum(np.abs(predictions.flatten() - np.median(predictions.flatten())) > extreme_threshold)
            consistency_validation['extreme_value_percentage'] = extreme_count / len(predictions.flatten()) * 100
        
        return consistency_validation
    
    def _validate_accuracy(self, predictions: np.ndarray, ground_truth: np.ndarray) -> Dict:
        """精度の妥当性検証"""
        accuracy_validation = {}
        
        if self.task_type == 'classification':
            if predictions.ndim > 1:  # 多クラス分類
                pred_classes = np.argmax(predictions, axis=1)
                accuracy = np.mean(pred_classes == ground_truth)
                accuracy_validation['accuracy'] = accuracy
                
                # クラス別の精度
                unique_classes = np.unique(ground_truth)
                class_accuracies = {}
                for cls in unique_classes:
                    cls_mask = ground_truth == cls
                    if np.sum(cls_mask) > 0:
                        cls_acc = np.mean(pred_classes[cls_mask] == cls)
                        class_accuracies[f'class_{cls}'] = cls_acc
                accuracy_validation['class_accuracies'] = class_accuracies
                
                # 信頼度と精度の相関
                max_probs = np.max(predictions, axis=1)
                correct_predictions = pred_classes == ground_truth
                
                # 高信頼度での精度
                high_conf_mask = max_probs > 0.9
                if np.sum(high_conf_mask) > 0:
                    high_conf_accuracy = np.mean(correct_predictions[high_conf_mask])
                    accuracy_validation['high_confidence_accuracy'] = high_conf_accuracy
                
            else:  # バイナリ分類
                pred_binary = (predictions > 0.5).astype(int)
                accuracy = np.mean(pred_binary == ground_truth)
                accuracy_validation['accuracy'] = accuracy
                
                # AUC的な指標(簡易版)
                if len(np.unique(ground_truth)) == 2:
                    try:
                        from sklearn.metrics import roc_auc_score
                        auc = roc_auc_score(ground_truth, predictions)
                        accuracy_validation['auc'] = auc
                    except ImportError:
                        # sklearn が利用できない場合の簡易AUC計算
                        pass
        
        elif self.task_type == 'regression':
            # 回帰メトリクス
            mse = np.mean((predictions - ground_truth) ** 2)
            mae = np.mean(np.abs(predictions - ground_truth))
            
            if np.std(ground_truth) > 1e-10:
                r2 = 1 - (np.sum((ground_truth - predictions) ** 2) / 
                         np.sum((ground_truth - np.mean(ground_truth)) ** 2))
                accuracy_validation['r2_score'] = r2
            
            accuracy_validation['mse'] = mse
            accuracy_validation['mae'] = mae
            accuracy_validation['rmse'] = np.sqrt(mse)
            
            # 残差の分析
            residuals = predictions - ground_truth
            accuracy_validation['residual_mean'] = np.mean(residuals)
            accuracy_validation['residual_std'] = np.std(residuals)
            accuracy_validation['residual_bias'] = np.abs(np.mean(residuals)) > 0.1 * np.std(ground_truth)
        
        return accuracy_validation
    
    def _compute_health_score(self, validation_results: Dict) -> float:
        """全体的な健全性スコアの計算"""
        score = 1.0
        
        # 範囲検証のペナルティ
        range_val = validation_results['range_validation']
        if not range_val.get('numerical_stability', True):
            score -= 0.3
        if range_val.get('has_nan', False) or range_val.get('has_inf', False):
            score -= 0.2
        
        # 分類タスク固有のペナルティ
        if self.task_type == 'classification':
            if not range_val.get('probability_sum_valid', True):
                score -= 0.2
            if not range_val.get('all_non_negative', True):
                score -= 0.1
        
        # 一貫性のペナルティ
        consistency_val = validation_results['consistency_validation']
        if consistency_val.get('high_variability', False):
            score -= 0.1
        if consistency_val.get('extreme_value_percentage', 0) > 10:
            score -= 0.1
        
        # 精度のボーナス
        if 'accuracy_validation' in validation_results:
            acc_val = validation_results['accuracy_validation']
            if self.task_type == 'classification':
                accuracy = acc_val.get('accuracy', 0)
                if accuracy > 0.8:
                    score += 0.1
                elif accuracy < 0.5:
                    score -= 0.2
            elif self.task_type == 'regression':
                r2 = acc_val.get('r2_score', 0)
                if r2 > 0.8:
                    score += 0.1
                elif r2 < 0.3:
                    score -= 0.2
        
        return max(0.0, min(1.0, score))
    
    def generate_validation_report(self, recent_window: int = 10) -> Dict:
        """検証レポートの生成"""
        if not self.validation_history:
            return {'error': 'No validation history available'}
        
        recent_validations = self.validation_history[-recent_window:]
        
        report = {
            'validation_count': len(self.validation_history),
            'recent_window_size': len(recent_validations),
            'average_health_score': np.mean([v['overall_health_score'] for v in recent_validations]),
            'health_trend': self._analyze_health_trend(recent_validations),
            'common_issues': self._identify_common_issues(recent_validations),
            'recommendations': []
        }
        
        # 推奨事項の生成
        if report['average_health_score'] < 0.7:
            report['recommendations'].append("予測品質の改善が必要:モデルアーキテクチャまたは訓練データの見直しを検討")
        
        if report['health_trend'] == 'declining':
            report['recommendations'].append("品質低下トレンド検出:モデルの再訓練またはパラメータ調整を推奨")
        
        return report
    
    def _analyze_health_trend(self, validations: List[Dict]) -> str:
        """健全性スコアのトレンド分析"""
        if len(validations) < 3:
            return 'insufficient_data'
        
        scores = [v['overall_health_score'] for v in validations]
        
        # 線形回帰によるトレンド分析
        x = np.arange(len(scores))
        slope = np.polyfit(x, scores, 1)[0]
        
        if slope > 0.05:
            return 'improving'
        elif slope < -0.05:
            return 'declining'
        else:
            return 'stable'
    
    def _identify_common_issues(self, validations: List[Dict]) -> List[str]:
        """共通の問題の特定"""
        issues = []
        
        # 数値的安定性の問題
        numerical_issues = sum(1 for v in validations 
                             if not v['range_validation'].get('numerical_stability', True))
        if numerical_issues > len(validations) * 0.3:
            issues.append('numerical_instability')
        
        # 範囲違反の問題
        range_issues = sum(1 for v in validations 
                          if not v['range_validation'].get('within_expected_range', True))
        if range_issues > len(validations) * 0.2:
            issues.append('range_violations')
        
        # 高い変動性
        high_var_issues = sum(1 for v in validations 
                             if v['consistency_validation'].get('high_variability', False))
        if high_var_issues > len(validations) * 0.4:
            issues.append('high_variability')
        
        return issues

# 使用例とテストケース
def test_prediction_sanity_checker():
    """予測妥当性チェッカーのテスト"""
    
    # 分類タスクのテスト
    print("=== 分類タスクの妥当性検証 ===")
    classifier_checker = PredictionSanityChecker(task_type='classification')
    
    # 正常な確率出力
    normal_probs = np.random.dirichlet([1, 1, 1], size=100)  # 3クラス分類
    ground_truth_cls = np.random.randint(0, 3, size=100)
    
    validation_result = classifier_checker.validate_predictions(
        normal_probs, ground_truth_cls
    )
    
    print(f"健全性スコア: {validation_result['overall_health_score']:.3f}")
    print(f"確率合計妥当性: {validation_result['range_validation']['probability_sum_valid']}")
    print(f"精度: {validation_result['accuracy_validation']['accuracy']:.3f}")
    
    # 異常な確率出力のテスト
    print("\n--- 異常な確率出力のテスト ---")
    abnormal_probs = np.random.random((100, 3))  # 合計が1にならない
    abnormal_probs[50:60] = np.nan  # NaN値を含む
    
    abnormal_validation = classifier_checker.validate_predictions(abnormal_probs)
    print(f"異常データの健全性スコア: {abnormal_validation['overall_health_score']:.3f}")
    print(f"NaN検出: {abnormal_validation['range_validation']['has_nan']}")
    
    # 回帰タスクのテスト
    print("\n=== 回帰タスクの妥当性検証 ===")
    regressor_checker = PredictionSanityChecker(
        task_type='regression', 
        expected_range=(0, 100)
    )
    
    # 正常な回帰出力
    true_values = np.random.normal(50, 15, 100)
    predictions = true_values + np.random.normal(0, 5, 100)  # ノイズ追加
    
    reg_validation = regressor_checker.validate_predictions(predictions, true_values)
    print(f"回帰健全性スコア: {reg_validation['overall_health_score']:.3f}")
    print(f"R²スコア: {reg_validation['accuracy_validation']['r2_score']:.3f}")
    print(f"RMSE: {reg_validation['accuracy_validation']['rmse']:.3f}")
    
    # 複数回の検証でトレンド分析
    print("\n=== トレンド分析テスト ===")
    for i in range(15):
        # 時間とともに品質が低下するシミュレーション
        noise_level = 2 + i * 0.5  # ノイズが徐々に増加
        noisy_predictions = true_values + np.random.normal(0, noise_level, 100)
        regressor_checker.validate_predictions(noisy_predictions, true_values)
    
    trend_report = regressor_checker.generate_validation_report()
    print(f"平均健全性スコア: {trend_report['average_health_score']:.3f}")
    print(f"健全性トレンド: {trend_report['health_trend']}")
    print(f"共通の問題: {trend_report['common_issues']}")
    if trend_report['recommendations']:
        print("推奨事項:")
        for rec in trend_report['recommendations']:
            print(f"  • {rec}")

# テスト実行
test_prediction_sanity_checker()

### 5.2 アライメント検証

AI出力が人間の価値観や期待と整合しているかを検証するシステム:

```python
from typing import List, Dict, Optional, Callable
import re
import json

class AlignmentSanityChecker:
    def __init__(self, domain: str = 'general'):
        self.domain = domain
        self.alignment_rules = self._load_alignment_rules()
        self.violation_history = []
        
    def _load_alignment_rules(self) -> Dict:
        """ドメイン固有のアライメントルールを読み込み"""
        rules = {
            'general': {
                'safety_keywords': [
                    'violence', 'harm', 'illegal', 'dangerous', 'toxic'
                ],
                'bias_indicators': [
                    'always', 'never', 'all [group]', 'no [group]'
                ],
                'uncertainty_expressions': [
                    'might', 'could', 'possibly', 'perhaps', 'uncertain'
                ],
                'confidence_thresholds': {
                    'high_stakes': 0.95,
                    'medium_stakes': 0.85,
                    'low_stakes': 0.7
                }
            },
            'medical': {
                'prohibited_claims': [
                    'cure', 'guaranteed', 'miracle', 'instant'
                ],
                'required_disclaimers': [
                    'consult', 'doctor', 'professional', 'medical advice'
                ],
                'confidence_thresholds': {
                    'diagnosis': 0.99,
                    'treatment': 0.95,
                    'general_info': 0.8
                }
            },
            'financial': {
                'prohibited_claims': [
                    'guaranteed profit', 'risk-free', 'get rich quick'
                ],
                'required_disclaimers': [
                    'risk', 'past performance', 'financial advisor'
                ],
                'confidence_thresholds': {
                    'investment_advice': 0.98,
                    'market_prediction': 0.9,
                    'general_info': 0.8
                }
            }
        }
        return rules.get(self.domain, rules['general'])
    
    def check_content_safety(self, text_output: str) -> Dict:
        """コンテンツの安全性チェック"""
        safety_results = {
            'is_safe': True,
            'safety_violations': [],
            'risk_level': 'low',
            'confidence_score': 1.0
        }
        
        # 危険なキーワードの検出
        text_lower = text_output.lower()
        detected_keywords = []
        
        for keyword in self.alignment_rules.get('safety_keywords', []):
            if keyword.lower() in text_lower:
                detected_keywords.append(keyword)
                safety_results['is_safe'] = False
        
        if detected_keywords:
            safety_results['safety_violations'] = detected_keywords
            safety_results['risk_level'] = 'high' if len(detected_keywords) > 2 else 'medium'
            safety_results['confidence_score'] = max(0.0, 1.0 - len(detected_keywords) * 0.2)
        
        # コンテキスト分析
        safety_results['context_analysis'] = self._analyze_context_safety(text_output)
        
        return safety_results
    
    def check_bias_indicators(self, text_output: str) -> Dict:
        """バイアス指標のチェック"""
        bias_results = {
            'bias_detected': False,
            'bias_types': [],
            'severity': 'none',
            'specific_issues': []
        }
        
        # 絶対的表現の検出
        absolute_patterns = [
            r'\ball\s+\w+\s+are\b',
            r'\bno\s+\w+\s+can\b',
            r'\balways\s+\w+\b',
            r'\bnever\s+\w+\b'
        ]
        
        for pattern in absolute_patterns:
            matches = re.findall(pattern, text_output, re.IGNORECASE)
            if matches:
                bias_results['bias_detected'] = True
                bias_results['bias_types'].append('absolute_statements')
                bias_results['specific_issues'].extend(matches)
        
        # ステレオタイプの検出(簡易版)
        stereotype_patterns = {
            'gender': [
                r'\b(men|women|males|females)\s+are\s+(better|worse|more|less)\b',
                r'\b(he|she)\s+must\s+be\b'
            ],
            'ethnic': [
                r'\b[A-Z][a-z]+\s+people\s+are\s+(always|never|typically)\b'
            ],
            'age': [
                r'\b(young|old)\s+people\s+(can\'t|cannot|always)\b'
            ]
        }
        
        for bias_type, patterns in stereotype_patterns.items():
            for pattern in patterns:
                if re.search(pattern, text_output, re.IGNORECASE):
                    bias_results['bias_detected'] = True
                    if bias_type not in bias_results['bias_types']:
                        bias_results['bias_types'].append(bias_type)
        
        # 深刻度の評価
        if bias_results['bias_detected']:
            severity_score = len(bias_results['bias_types']) + len(bias_results['specific_issues']) * 0.5
            if severity_score > 3:
                bias_results['severity'] = 'high'
            elif severity_score > 1:
                bias_results['severity'] = 'medium'
            else:
                bias_results['severity'] = 'low'
        
        return bias_results
    
    def check_uncertainty_handling(self, text_output: str, 
                                 confidence_scores: Optional[List[float]] = None) -> Dict:
        """不確実性の適切な処理チェック"""
        uncertainty_results = {
            'appropriate_uncertainty': True,
            'uncertainty_expressions_found': [],
            'overconfidence_detected': False,
            'underconfidence_detected': False,
            'alignment_score': 1.0
        }
        
        # 不確実性表現の検出
        uncertainty_patterns = [
            r'\b(might|may|could|possibly|perhaps|likely|probably)\b',
            r'\b(uncertain|unclear|ambiguous|depends|varies)\b',
            r'\b(in my opinion|i think|i believe|it seems)\b'
        ]
        
        found_expressions = []
        for pattern in uncertainty_patterns:
            matches = re.findall(pattern, text_output, re.IGNORECASE)
            found_expressions.extend(matches)
        
        uncertainty_results['uncertainty_expressions_found'] = list(set(found_expressions))
        
        # 信頼度スコアとの整合性チェック
        if confidence_scores:
            avg_confidence = np.mean(confidence_scores)
            has_uncertainty_expressions = len(found_expressions) > 0
            
            # 高信頼度なのに不確実性表現が多い場合
            if avg_confidence > 0.9 and len(found_expressions) > 5:
                uncertainty_results['inappropriate_uncertainty'] = True
                uncertainty_results['alignment_score'] -= 0.2
            
            # 低信頼度なのに断定的な表現の場合
            definitive_patterns = [
                r'\b(definitely|certainly|absolutely|guaranteed|proven)\b',
                r'\b(fact|truth|proven|established)\b'
            ]
            
            definitive_expressions = []
            for pattern in definitive_patterns:
                definitive_expressions.extend(
                    re.findall(pattern, text_output, re.IGNORECASE)
                )
            
            if avg_confidence < 0.7 and len(definitive_expressions) > 2:
                uncertainty_results['overconfidence_detected'] = True
                uncertainty_results['alignment_score'] -= 0.3
        
        return uncertainty_results
    
    def check_domain_compliance(self, text_output: str, 
                              output_type: str = 'general') -> Dict:
        """ドメイン固有のコンプライアンスチェック"""
        compliance_results = {
            'compliant': True,
            'violations': [],
            'missing_disclaimers': [],
            'prohibited_claims': [],
            'compliance_score': 1.0
        }
        
        domain_rules = self.alignment_rules
        
        # 禁止されたクレームの検出
        prohibited_claims = domain_rules.get('prohibited_claims', [])
        for claim in prohibited_claims:
            if claim.lower() in text_output.lower():
                compliance_results['compliant'] = False
                compliance_results['prohibited_claims'].append(claim)
                compliance_results['compliance_score'] -= 0.2
        
        # 必要な免責事項の確認
        required_disclaimers = domain_rules.get('required_disclaimers', [])
        if required_disclaimers and output_type in ['advice', 'recommendation']:
            missing_disclaimers = []
            for disclaimer in required_disclaimers:
                if disclaimer.lower() not in text_output.lower():
                    missing_disclaimers.append(disclaimer)
            
            if missing_disclaimers:
                compliance_results['missing_disclaimers'] = missing_disclaimers
                compliance_results['compliance_score'] -= len(missing_disclaimers) * 0.15
        
        return compliance_results
    
    def _analyze_context_safety(self, text_output: str) -> Dict:
        """コンテキストベースの安全性分析"""
        context_analysis = {
            'instructional_content': False,
            'hypothetical_scenario': False,
            'educational_context': False,
            'safety_modifier': 1.0
        }
        
        # 教育的コンテキストの検出
        educational_indicators = [
            'for educational purposes', 'to understand', 'for learning',
            'academic', 'research', 'study'
        ]
        
        for indicator in educational_indicators:
            if indicator.lower() in text_output.lower():
                context_analysis['educational_context'] = True
                context_analysis['safety_modifier'] = 0.7  # リスクを軽減
                break
        
        # 仮想的シナリオの検出
        hypothetical_indicators = [
            'hypothetically', 'imagine if', 'suppose that',
            'in a fictional scenario', 'theoretically'
        ]
        
        for indicator in hypothetical_indicators:
            if indicator.lower() in text_output.lower():
                context_analysis['hypothetical_scenario'] = True
                context_analysis['safety_modifier'] = 0.8
                break
        
        return context_analysis
    
    def comprehensive_alignment_check(self, text_output: str,
                                    confidence_scores: Optional[List[float]] = None,
                                    output_type: str = 'general',
                                    metadata: Optional[Dict] = None) -> Dict:
        """包括的なアライメントチェック"""
        
        comprehensive_results = {
            'timestamp': pd.Timestamp.now(),
            'safety_check': self.check_content_safety(text_output),
            'bias_check': self.check_bias_indicators(text_output),
            'uncertainty_check': self.check_uncertainty_handling(text_output, confidence_scores),
            'compliance_check': self.check_domain_compliance(text_output, output_type),
            'metadata': metadata or {}
        }
        
        # 全体的なアライメントスコア計算
        safety_score = comprehensive_results['safety_check']['confidence_score']
        bias_penalty = 0.3 if comprehensive_results['bias_check']['bias_detected'] else 0
        uncertainty_score = comprehensive_results['uncertainty_check']['alignment_score']
        compliance_score = comprehensive_results['compliance_check']['compliance_score']
        
        overall_alignment_score = (
            safety_score * 0.3 +
            (1 - bias_penalty) * 0.25 +
            uncertainty_score * 0.25 +
            compliance_score * 0.2
        )
        
        comprehensive_results['overall_alignment_score'] = max(0.0, min(1.0, overall_alignment_score))
        
        # アライメント品質の評価
        if overall_alignment_score > 0.85:
            comprehensive_results['alignment_quality'] = 'excellent'
        elif overall_alignment_score > 0.7:
            comprehensive_results['alignment_quality'] = 'good'
        elif overall_alignment_score > 0.5:
            comprehensive_results['alignment_quality'] = 'moderate'
        else:
            comprehensive_results['alignment_quality'] = 'poor'
        
        # 違反履歴に追加
        self.violation_history.append(comprehensive_results)
        
        return comprehensive_results
    
    def generate_alignment_report(self, window_size: int = 100) -> Dict:
        """アライメントレポートの生成"""
        if not self.violation_history:
            return {'error': 'No alignment check history available'}
        
        recent_checks = self.violation_history[-window_size:]
        
        report = {
            'total_checks': len(self.violation_history),
            'analysis_window': len(recent_checks),
            'average_alignment_score': np.mean([
                check['overall_alignment_score'] for check in recent_checks
            ]),
            'alignment_trend': self._calculate_alignment_trend(recent_checks),
            'violation_summary': self._summarize_violations(recent_checks),
            'improvement_recommendations': []
        }
        
        # 改善推奨事項の生成
        if report['average_alignment_score'] < 0.7:
            report['improvement_recommendations'].append(
                "アライメント品質の改善が必要:コンテンツフィルターとポストプロセッシングの強化を推奨"
            )
        
        if report['violation_summary']['safety_violation_rate'] > 0.1:
            report['improvement_recommendations'].append(
                "安全性違反率が高い:セーフティチェック機能の強化が必要"
            )
        
        if report['violation_summary']['bias_detection_rate'] > 0.15:
            report['improvement_recommendations'].append(
                "バイアス検出率が高い:訓練データの多様性とバイアス軽減手法の見直しを推奨"
            )
        
        return report
    
    def _calculate_alignment_trend(self, checks: List[Dict]) -> str:
        """アライメントスコアのトレンド計算"""
        if len(checks) < 10:
            return 'insufficient_data'
        
        scores = [check['overall_alignment_score'] for check in checks]
        
        # 移動平均によるトレンド分析
        window_size = min(10, len(scores) // 3)
        early_avg = np.mean(scores[:window_size])
        recent_avg = np.mean(scores[-window_size:])
        
        improvement = (recent_avg - early_avg) / early_avg
        
        if improvement > 0.05:
            return 'improving'
        elif improvement < -0.05:
            return 'declining'
        else:
            return 'stable'
    
    def _summarize_violations(self, checks: List[Dict]) -> Dict:
        """違反の要約統計"""
        summary = {
            'safety_violation_rate': 0,
            'bias_detection_rate': 0,
            'compliance_violation_rate': 0,
            'most_common_issues': []
        }
        
        safety_violations = sum(1 for check in checks 
                              if not check['safety_check']['is_safe'])
        bias_detections = sum(1 for check in checks 
                            if check['bias_check']['bias_detected'])
        compliance_violations = sum(1 for check in checks 
                                  if not check['compliance_check']['compliant'])
        
        total_checks = len(checks)
        summary['safety_violation_rate'] = safety_violations / total_checks
        summary['bias_detection_rate'] = bias_detections / total_checks
        summary['compliance_violation_rate'] = compliance_violations / total_checks
        
        # 最も一般的な問題の特定
        issue_counts = {}
        for check in checks:
            if not check['safety_check']['is_safe']:
                issue_counts['safety_violations'] = issue_counts.get('safety_violations', 0) + 1
            if check['bias_check']['bias_detected']:
                for bias_type in check['bias_check']['bias_types']:
                    issue_counts[f'bias_{bias_type}'] = issue_counts.get(f'bias_{bias_type}', 0) + 1
            if not check['compliance_check']['compliant']:
                issue_counts['compliance_violations'] = issue_counts.get('compliance_violations', 0) + 1
        
        summary['most_common_issues'] = sorted(
            issue_counts.items(), 
            key=lambda x: x[1], 
            reverse=True
        )[:5]
        
        return summary

# 使用例とテストケース
def test_alignment_checker():
    """アライメントチェッカーのテスト"""
    
    print("=== アライメント妥当性検証テスト ===")
    
    # 一般ドメインでのテスト
    general_checker = AlignmentSanityChecker(domain='general')
    
    # 正常なテキスト
    normal_text = """
    機械学習は複雑な分野であり、多くの要因が結果に影響を与える可能性があります。
    この手法は一般的に有効ですが、データの質や量によって結果が変わる場合があります。
    専門家との相談を推奨します。
    """
    
    normal_result = general_checker.comprehensive_alignment_check(
        normal_text, 
        confidence_scores=[0.8, 0.75, 0.82], 
        output_type='general'
    )
    
    print(f"正常テキストのアライメントスコア: {normal_result['overall_alignment_score']:.3f}")
    print(f"アライメント品質: {normal_result['alignment_quality']}")
    
    # バイアスのあるテキスト
    biased_text = """
    全ての男性は数学が得意で、女性は感情的です。
    若い人は技術を理解できませんが、高齢者は常に保守的です。
    これは科学的に証明された事実です。
    """
    
    biased_result = general_checker.comprehensive_alignment_check(
        biased_text,
        confidence_scores=[0.95, 0.9, 0.88],
        output_type='general'
    )
    
    print(f"\nバイアステキストのアライメントスコア: {biased_result['overall_alignment_score']:.3f}")
    print(f"バイアス検出: {biased_result['bias_check']['bias_detected']}")
    print(f"検出されたバイアスタイプ: {biased_result['bias_check']['bias_types']}")
    
    # 医療ドメインでのテスト
    print("\n=== 医療ドメインテスト ===")
    medical_checker = AlignmentSanityChecker(domain='medical')
    
    inappropriate_medical = """
    この薬は癌を確実に治します。副作用はありません。
    即座に効果が現れ、奇跡的な回復が期待できます。
    """
    
    medical_result = medical_checker.comprehensive_alignment_check(
        inappropriate_medical,
        output_type='advice'
    )
    
    print(f"不適切な医療テキストのスコア: {medical_result['overall_alignment_score']:.3f}")
    print(f"禁止クレーム検出: {medical_result['compliance_check']['prohibited_claims']}")
    
    # 複数チェックでのトレンド分析
    print("\n=== トレンド分析テスト ===")
    
    test_texts = [
        "This is a normal, balanced text without issues.",
        "All people are exactly the same in every way.",
        "Some groups might have different characteristics, but this depends on many factors.",
        "Everyone always behaves predictably based on stereotypes.",
        "Individual differences exist, and generalizations can be problematic.",
    ]
    
    for i, text in enumerate(test_texts * 5):  # 複数回実行
        general_checker.comprehensive_alignment_check(text)
    
    alignment_report = general_checker.generate_alignment_report()
    print(f"平均アライメントスコア: {alignment_report['average_alignment_score']:.3f}")
    print(f"アライメントトレンド: {alignment_report['alignment_trend']}")
    print(f"バイアス検出率: {alignment_report['violation_summary']['bias_detection_rate']:.3f}")
    
    if alignment_report['improvement_recommendations']:
        print("改善推奨事項:")
        for rec in alignment_report['improvement_recommendations']:
            print(f"  • {rec}")

# テスト実行
test_alignment_checker()

第6章:システムレベルSanity Check

6.1 パフォーマンス監視

本番環境でのAIシステムのパフォーマンスを継続的に監視し、異常を検出するシステム:

import time
import threading
import queue
from collections import deque
from typing import Dict, List, Optional, Callable
import numpy as np
import psutil
import json
from datetime import datetime, timedelta

class PerformanceMonitor:
    def __init__(self, max_history_size: int = 10000):
        self.max_history_size = max_history_size
        self.metrics_history = deque(maxlen=max_history_size)
        self.real_time_queue = queue.Queue()
        self.monitoring_active = False
        self.monitor_thread = None
        self.alert_callbacks = []
        
        # パフォーマンス閾値の設定
        self.thresholds = {
            'response_time': {
                'warning': 2.0,    # 2秒
                'critical': 5.0    # 5秒
            },
            'throughput': {
                'warning': 10,     # 10 requests/second
                'critical': 5      # 5 requests/second
            },
            'error_rate': {
                'warning': 0.05,   # 5%
                'critical': 0.1    # 10%
            },
            'memory_usage': {
                'warning': 0.8,    # 80%
                'critical': 0.9    # 90%
            },
            'cpu_usage': {
                'warning': 0.8,    # 80%
                'critical': 0.9    # 90%
            }
        }
        
        # メトリクス収集の状態
        self.current_metrics = {
            'active_requests': 0,
            'total_requests': 0,
            'total_errors': 0,
            'response_times': deque(maxlen=1000),
            'last_request_time': None
        }
    
    def start_monitoring(self, collection_interval: float = 1.0):
        """パフォーマンス監視を開始"""
        if self.monitoring_active:
            return
        
        self.monitoring_active = True
        self.monitor_thread = threading.Thread(
            target=self._monitoring_loop,
            args=(collection_interval,),
            daemon=True
        )
        self.monitor_thread.start()
        print("パフォーマンス監視を開始しました")
    
    def stop_monitoring(self):
        """パフォーマンス監視を停止"""
        self.monitoring_active = False
        if self.monitor_thread:
            self.monitor_thread.join()
        print("パフォーマンス監視を停止しました")
    
    def _monitoring_loop(self, interval: float):
        """監視ループ"""
        while self.monitoring_active:
            try:
                metrics = self._collect_system_metrics()
                self.metrics_history.append(metrics)
                self._check_thresholds(metrics)
                time.sleep(interval)
            except Exception as e:
                print(f"監視エラー: {e}")
                time.sleep(interval)
    
    def _collect_system_metrics(self) -> Dict:
        """システムメトリクスの収集"""
        current_time = time.time()
        
        # CPU とメモリの使用率
        cpu_percent = psutil.cpu_percent(interval=0.1)
        memory_info = psutil.virtual_memory()
        
        # リクエスト関連のメトリクス
        if self.current_metrics['response_times']:
            avg_response_time = np.mean(list(self.current_metrics['response_times']))
            p95_response_time = np.percentile(list(self.current_metrics['response_times']), 95)
        else:
            avg_response_time = 0
            p95_response_time = 0
        
        # スループットの計算
        time_window = 60  # 1分間のウィンドウ
        recent_requests = sum(1 for metric in list(self.metrics_history)[-60:] 
                            if current_time - metric['timestamp'] <= time_window)
        throughput = recent_requests / time_window if recent_requests > 0 else 0
        
        # エラー率の計算
        error_rate = (self.current_metrics['total_errors'] / 
                     max(1, self.current_metrics['total_requests']))
        
        # GPU メトリクス(利用可能な場合)
        gpu_metrics = self._collect_gpu_metrics()
        
        metrics = {
            'timestamp': current_time,
            'cpu_usage': cpu_percent / 100.0,
            'memory_usage': memory_info.percent / 100.0,
            'memory_used_gb': memory_info.used / (1024**3),
            'avg_response_time': avg_response_time,
            'p95_response_time': p95_response_time,
            'throughput': throughput,
            'error_rate': error_rate,
            'active_requests': self.current_metrics['active_requests'],
            'total_requests': self.current_metrics['total_requests'],
            'total_errors': self.current_metrics['total_errors']
        }
        
        if gpu_metrics:
            metrics.update(gpu_metrics)
        
        return metrics
    
    def _collect_gpu_metrics(self) -> Optional[Dict]:
        """GPU メトリクスの収集"""
        try:
            import torch
            if torch.cuda.is_available():
                gpu_memory = torch.cuda.memory_stats()
                allocated = torch.cuda.memory_allocated() / (1024**3)
                reserved = torch.cuda.memory_reserved() / (1024**3)
                
                return {
                    'gpu_memory_allocated_gb': allocated,
                    'gpu_memory_reserved_gb': reserved,
                    'gpu_utilization': allocated / reserved if reserved > 0 else 0
                }
        except ImportError:
            pass
        return None
    
    def record_request(self, response_time: float, success: bool = True):
        """リクエストの記録"""
        self.current_metrics['total_requests'] += 1
        self.current_metrics['response_times'].append(response_time)
        self.current_metrics['last_request_time'] = time.time()
        
        if not success:
            self.current_metrics['total_errors'] += 1
    
    def start_request(self) -> str:
        """リクエスト開始の記録"""
        request_id = f"req_{time.time()}_{self.current_metrics['total_requests']}"
        self.current_metrics['active_requests'] += 1
        return request_id
    
    def end_request(self, request_id: str, success: bool = True):
        """リクエスト終了の記録"""
        self.current_metrics['active_requests'] = max(0, self.current_metrics['active_requests'] - 1)
        # 実際の実装では、request_idを使って正確な応答時間を計算
    
    def _check_thresholds(self, metrics: Dict):
        """閾値チェックとアラート"""
        alerts = []
        
        for metric_name, thresholds in self.thresholds.items():
            if metric_name in metrics:
                value = metrics[metric_name]
                
                if value >= thresholds['critical']:
                    alerts.append({
                        'level': 'critical',
                        'metric': metric_name,
                        'value': value,
                        'threshold': thresholds['critical'],
                        'timestamp': metrics['timestamp']
                    })
                elif value >= thresholds['warning']:
                    alerts.append({
                        'level': 'warning',
                        'metric': metric_name,
                        'value': value,
                        'threshold': thresholds['warning'],
                        'timestamp': metrics['timestamp']
                    })
        
        # アラートの処理
        for alert in alerts:
            self._handle_alert(alert)
    
    def _handle_alert(self, alert: Dict):
        """アラートの処理"""
        alert_message = (
            f"🚨 {alert['level'].upper()} ALERT: "
            f"{alert['metric']} = {alert['value']:.3f} "
            f"(閾値: {alert['threshold']:.3f})"
        )
        print(alert_message)
        
        # 登録されたコールバック関数の実行
        for callback in self.alert_callbacks:
            try:
                callback(alert)
            except Exception as e:
                print(f"アラートコールバックエラー: {e}")
    
    def add_alert_callback(self, callback: Callable[[Dict], None]):
        """アラートコールバックの追加"""
        self.alert_callbacks.append(callback)
    
    def get_performance_summary(self, time_window_minutes: int = 60) -> Dict:
        """パフォーマンスサマリーの取得"""
        if not self.metrics_history:
            return {'error': 'No metrics data available'}
        
        current_time = time.time()
        window_start = current_time - (time_window_minutes * 60)
        
        # 指定時間窓内のメトリクスを取得
        window_metrics = [
            metric for metric in self.metrics_history 
            if metric['timestamp'] >= window_start
        ]
        
        if not window_metrics:
            return {'error': f'No data in the last {time_window_minutes} minutes'}
        
        # 統計の計算
        summary = {
            'time_window_minutes': time_window_minutes,
            'data_points': len(window_metrics),
            'avg_response_time': np.mean([m['avg_response_time'] for m in window_metrics]),
            'p95_response_time': np.mean([m['p95_response_time'] for m in window_metrics]),
            'avg_throughput': np.mean([m['throughput'] for m in window_metrics]),
            'avg_error_rate': np.mean([m['error_rate'] for m in window_metrics]),
            'avg_cpu_usage': np.mean([m['cpu_usage'] for m in window_metrics]),
            'avg_memory_usage': np.mean([m['memory_usage'] for m in window_metrics]),
            'peak_cpu_usage': np.max([m['cpu_usage'] for m in window_metrics]),
            'peak_memory_usage': np.max([m['memory_usage'] for m in window_metrics]),
            'current_active_requests': self.current_metrics['active_requests'],
            'total_requests_in_window': self.current_metrics['total_requests']
        }
        
        # GPU メトリクス(利用可能な場合)
        gpu_metrics = [m for m in window_metrics if 'gpu_utilization' in m]
        if gpu_metrics:
            summary['avg_gpu_utilization'] = np.mean([m['gpu_utilization'] for m in gpu_metrics])
            summary['peak_gpu_utilization'] = np.max([m['gpu_utilization'] for m in gpu_metrics])
        
        # パフォーマンス評価
        summary['performance_grade'] = self._calculate_performance_grade(summary)
        
        return summary
    
    def _calculate_performance_grade(self, summary: Dict) -> str:
        """パフォーマンスグレードの計算"""
        score = 100
        
        # 応答時間のペナルティ
        if summary['avg_response_time'] > self.thresholds['response_time']['critical']:
            score -= 30
        elif summary['avg_response_time'] > self.thresholds['response_time']['warning']:
            score -= 15
        
        # エラー率のペナルティ
        if summary['avg_error_rate'] > self.thresholds['error_rate']['critical']:
            score -= 25
        elif summary['avg_error_rate'] > self.thresholds['error_rate']['warning']:
            score -= 10
        
        # CPU使用率のペナルティ
        if summary['avg_cpu_usage'] > self.thresholds['cpu_usage']['critical']:
            score -= 20
        elif summary['avg_cpu_usage'] > self.thresholds['cpu_usage']['warning']:
            score -= 10
        
        # メモリ使用率のペナルティ
        if summary['avg_memory_usage'] > self.thresholds['memory_usage']['critical']:
            score -= 20
        elif summary['avg_memory_usage'] > self.thresholds['memory_usage']['warning']:
            score -= 10
        
        # スループットのボーナス
        if summary['avg_throughput'] > self.thresholds['throughput']['warning']:
            score += 5
        
        # グレードの決定
        if score >= 90:
            return 'A'
        elif score >= 80:
            return 'B'
        elif score >= 70:
            return 'C'
        elif score >= 60:
            return 'D'
        else:
            return 'F'
    
    def detect_anomalies(self, window_size: int = 100) -> Dict:
        """異常検出"""
        if len(self.metrics_history) < window_size:
            return {'error': 'Insufficient data for anomaly detection'}
        
        recent_metrics = list(self.metrics_history)[-window_size:]
        anomalies = {'detected_anomalies': [], 'anomaly_score': 0}
        
        # 応答時間の異常
        response_times = [m['avg_response_time'] for m in recent_metrics]
        rt_mean = np.mean(response_times)
        rt_std = np.std(response_times)
        
        if rt_std > 0:
            recent_rt = response_times[-10:]  # 最新10データポイント
            z_scores = [(rt - rt_mean) / rt_std for rt in recent_rt]
            
            for i, z_score in enumerate(z_scores):
                if abs(z_score) > 3:  # 3σ外れ値
                    anomalies['detected_anomalies'].append({
                        'type': 'response_time_spike',
                        'severity': 'high' if abs(z_score) > 4 else 'medium',
                        'z_score': z_score,
                        'value': recent_rt[i]
                    })
        
        # スループットの異常
        throughputs = [m['throughput'] for m in recent_metrics]
        tp_mean = np.mean(throughputs)
        recent_tp = np.mean(throughputs[-10:])
        
        if tp_mean > 0 and recent_tp < tp_mean * 0.5:  # 50%以上の低下
            anomalies['detected_anomalies'].append({
                'type': 'throughput_drop',
                'severity': 'high',
                'expected': tp_mean,
                'actual': recent_tp,
                'drop_percentage': (tp_mean - recent_tp) / tp_mean * 100
            })
        
        # エラー率の急上昇
        error_rates = [m['error_rate'] for m in recent_metrics]
        recent_error_rate = np.mean(error_rates[-10:])
        baseline_error_rate = np.mean(error_rates[:-10])
        
        if recent_error_rate > baseline_error_rate * 2 and recent_error_rate > 0.01:
            anomalies['detected_anomalies'].append({
                'type': 'error_rate_spike',
                'severity': 'critical',
                'baseline': baseline_error_rate,
                'current': recent_error_rate,
                'increase_factor': recent_error_rate / max(baseline_error_rate, 0.001)
            })
        
        # 異常スコアの計算
        anomalies['anomaly_score'] = len(anomalies['detected_anomalies']) / 10.0
        
        return anomalies
    
    def export_metrics(self, filepath: str, format: str = 'json'):
        """メトリクスのエクスポート"""
        if format == 'json':
            data = {
                'export_timestamp': datetime.now().isoformat(),
                'total_data_points': len(self.metrics_history),
                'metrics': list(self.metrics_history)
            }
            with open(filepath, 'w') as f:
                json.dump(data, f, indent=2)
        
        elif format == 'csv':
            import pandas as pd
            df = pd.DataFrame(list(self.metrics_history))
            df.to_csv(filepath, index=False)
        
        print(f"メトリクスを {filepath} にエクスポートしました")

# 使用例とシミュレーション
def simulate_ai_workload():
    """AI ワークロードのシミュレーション"""
    monitor = PerformanceMonitor()
    
    # アラートコールバックの設定
    def alert_handler(alert):
        print(f"📧 アラート通知: {alert['metric']} が {alert['level']} 閾値を超えました")
    
    monitor.add_alert_callback(alert_handler)
    monitor.start_monitoring(collection_interval=0.5)
    
    print("AI ワークロードシミュレーション開始...")
    
    # 正常な負荷のシミュレーション
    for i in range(50):
        request_id = monitor.start_request()
        
        # 推論処理のシミュレーション
        processing_time = np.random.normal(0.5, 0.1)  # 平均500ms
        time.sleep(max(0.1, processing_time))
        
        # 成功率95%
        success = np.random.random() > 0.05
        monitor.record_request(processing_time, success)
        monitor.end_request(request_id, success)
        
        if i % 10 == 0:
            summary = monitor.get_performance_summary(time_window_minutes=5)
            print(f"Step {i}: 平均応答時間={summary['avg_response_time']:.3f}s, "
                  f"スループット={summary['avg_throughput']:.1f} req/s, "
                  f"グレード={summary['performance_grade']}")
    
    print("\n高負荷状況のシミュレーション...")
    
    # 高負荷状況のシミュレーション
    for i in range(30):
        request_id = monitor.start_request()
        
        # 処理時間が増加
        processing_time = np.random.normal(2.0, 0.5)  # 平均2秒
        time.sleep(max(0.1, processing_time))
        
        # エラー率も増加
        success = np.random.random() > 0.15
        monitor.record_request(processing_time, success)
        monitor.end_request(request_id, success)
    
    # 異常検出
    anomalies = monitor.detect_anomalies()
    print(f"\n異常検出結果:")
    print(f"異常スコア: {anomalies['anomaly_score']:.3f}")
    for anomaly in anomalies['detected_anomalies']:
        print(f"  {anomaly['type']}: {anomaly['severity']} (詳細: {anomaly})")
    
    # 最終サマリー
    final_summary = monitor.get_performance_summary(time_window_minutes=10)
    print(f"\n最終パフォーマンスサマリー:")
    print(f"  総リクエスト数: {final_summary['total_requests_in_window']}")
    print(f"  平均応答時間: {final_summary['avg_response_time']:.3f}s")
    print(f"  P95応答時間: {final_summary['p95_response_time']:.3f}s")
    print(f"  平均スループット: {final_summary['avg_throughput']:.1f} req/s")
    print(f"  平均エラー率: {final_summary['avg_error_rate']:.3f}")
    print(f"  パフォーマンスグレード: {final_summary['performance_grade']}")
    
    # メトリクスのエクスポート
    monitor.export_metrics('performance_metrics.json')
    monitor.stop_monitoring()

# シミュレーション実行
simulate_ai_workload()

6.2 A/Bテスト統合Sanity Check

AI システムの異なるバージョンやアルゴリズムを比較するためのA/Bテスト統合システム:

import uuid
from typing import Dict, List, Optional, Tuple, Any
import numpy as np
from scipy import stats
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json

@dataclass
class ExperimentConfig:
    """実験設定のデータクラス"""
    experiment_id: str
    name: str
    description: str
    traffic_split: Dict[str, float]  # variant_name -> traffic_percentage
    success_metrics: List[str]
    minimum_sample_size: int
    statistical_significance_threshold: float = 0.05
    minimum_effect_size: float = 0.05
    max_duration_days: int = 30
    start_date: datetime = field(default_factory=datetime.now)
    end_date: Optional[datetime] = None

@dataclass
class ExperimentResult:
    """実験結果のデータクラス"""
    variant: str
    metric_name: str
    value: float
    sample_size: int
    timestamp: datetime
    metadata: Dict[str, Any] = field(default_factory=dict)

class ABTestManager:
    def __init__(self):
        self.experiments = {}
        self.results = {}
        self.user_assignments = {}  # user_id -> {experiment_id -> variant}
        
    def create_experiment(self, config: ExperimentConfig) -> str:
        """A/Bテスト実験の作成"""
        # トラフィック分割の検証
        total_traffic = sum(config.traffic_split.values())
        if not np.isclose(total_traffic, 1.0, atol=1e-6):
            raise ValueError(f"トラフィック分割の合計が1.0になりません: {total_traffic}")
        
        # 実験の登録
        self.experiments[config.experiment_id] = config
        self.results[config.experiment_id] = {}
        
        for variant in config.traffic_split.keys():
            self.results[config.experiment_id][variant] = []
        
        print(f"実験 '{config.name}' (ID: {config.experiment_id}) を作成しました")
        print(f"バリアント分割: {config.traffic_split}")
        
        return config.experiment_id
    
    def assign_user_to_variant(self, experiment_id: str, user_id: str) -> str:
        """ユーザーをバリアントに割り当て"""
        if experiment_id not in self.experiments:
            raise ValueError(f"実験 {experiment_id} が見つかりません")
        
        # 既存の割り当てがあるかチェック
        if (user_id in self.user_assignments and 
            experiment_id in self.user_assignments[user_id]):
            return self.user_assignments[user_id][experiment_id]
        
        # 新しい割り当て
        config = self.experiments[experiment_id]
        
        # ハッシュベースの安定した割り当て
        hash_input = f"{experiment_id}_{user_id}"
        hash_value = hash(hash_input) % 10000 / 10000.0
        
        cumulative_prob = 0
        for variant, probability in config.traffic_split.items():
            cumulative_prob += probability
            if hash_value <= cumulative_prob:
                # 割り当ての記録
                if user_id not in self.user_assignments:
                    self.user_assignments[user_id] = {}
                self.user_assignments[user_id][experiment_id] = variant
                return variant
        
        # フォールバック(理論的には到達しない)
        return list(config.traffic_split.keys())[0]
    
    def record_result(self, experiment_id: str, user_id: str, 
                     metric_name: str, value: float, 
                     metadata: Optional[Dict] = None):
        """実験結果の記録"""
        if experiment_id not in self.experiments:
            raise ValueError(f"実験 {experiment_id} が見つかりません")
        
        variant = self.assign_user_to_variant(experiment_id, user_id)
        
        result = ExperimentResult(
            variant=variant,
            metric_name=metric_name,
            value=value,
            sample_size=1,
            timestamp=datetime.now(),
            metadata=metadata or {}
        )
        
        self.results[experiment_id][variant].append(result)
    
    def analyze_experiment(self, experiment_id: str) -> Dict:
        """実験の統計的分析"""
        if experiment_id not in self.experiments:
            raise ValueError(f"実験 {experiment_id} が見つかりません")
        
        config = self.experiments[experiment_id]
        experiment_results = self.results[experiment_id]
        
        analysis = {
            'experiment_id': experiment_id,
            'experiment_name': config.name,
            'analysis_timestamp': datetime.now().isoformat(),
            'variants': list(config.traffic_split.keys()),
            'metrics_analysis': {},
            'overall_conclusions': {},
            'recommendations': []
        }
        
        # メトリクス別の分析
        for metric_name in config.success_metrics:
            metric_analysis = self._analyze_metric(
                experiment_id, metric_name, experiment_results, config
            )
            analysis['metrics_analysis'][metric_name] = metric_analysis
        
        # 全体的な結論
        analysis['overall_conclusions'] = self._draw_conclusions(
            analysis['metrics_analysis'], config
        )
        
        # 推奨事項の生成
        analysis['recommendations'] = self._generate_recommendations(
            analysis['overall_conclusions'], config
        )
        
        return analysis
    
    def _analyze_metric(self, experiment_id: str, metric_name: str, 
                       experiment_results: Dict, config: ExperimentConfig) -> Dict:
        """特定メトリクスの統計分析"""
        metric_data = {}
        
        # バリアント別のデータ収集
        for variant in config.traffic_split.keys():
            variant_results = [
                r for r in experiment_results[variant] 
                if r.metric_name == metric_name
            ]
            
            if variant_results:
                values = [r.value for r in variant_results]
                metric_data[variant] = {
                    'sample_size': len(values),
                    'mean': np.mean(values),
                    'std': np.std(values),
                    'median': np.median(values),
                    'min': np.min(values),