序論:Kaggle Notebookが現代データサイエンスに与える革命的意義
Kaggle Notebookは、単なるJupyter Notebookのクラウド版を超越した、データサイエンティスト向けの統合開発環境(IDE)として、世界中の機械学習実践者に革命をもたらしています。本記事では、Google BrainでのAI研究経験と現役AIスタートアップCTOとしての実践的知見を基に、Kaggle Notebookの基本操作から高度な活用法まで、競合他社を圧倒する包括的な技術解説を提供します。
Kaggle Notebookの技術的アーキテクチャと独自性
Kaggle Notebookは、Google Cloud Platform(GCP)上に構築されたコンテナベースの実行環境です。従来のローカル開発環境と比較して、以下の技術的優位性を持ちます:
- Docker化された標準化環境:依存関係の競合を完全に排除
- GPU/TPUリソースの民主化:高性能計算資源への無料アクセス
- バージョン管理システムの統合:Git操作の簡素化
- データセット共有機構:効率的なデータ配信システム
第1章:Kaggle Notebookの基本構造と環境セットアップ
1.1 アカウント作成とプラットフォーム理解
Kaggle Notebookを使用するためには、まずKaggleアカウントの作成が必要です。アカウント作成後、Phone Verificationを完了することで、GPUアクセス権限が付与されます。この認証プロセスは、リソース濫用防止のためのセキュリティ機構として機能しています。
# Kaggle環境の確認コード例
import kaggle
import os
# Kaggle API認証情報の確認
print("Kaggle API Token Location:", os.path.expanduser("~/.kaggle/kaggle.json"))
print("Environment Variables:")
print(f"KAGGLE_USERNAME: {os.environ.get('KAGGLE_USERNAME', 'Not Set')}")
print(f"KAGGLE_KEY: {os.environ.get('KAGGLE_KEY', 'Not Set')}")
1.2 ノートブック作成とランタイム設定
新規ノートブック作成時の重要な設定項目を以下に示します:
設定項目 | 選択肢 | 推奨設定 | 理由 |
---|---|---|---|
Language | Python/R | Python 3.10+ | 最新ライブラリサポート |
Accelerator | None/GPU/TPU | GPU P100 | 深層学習タスクの高速化 |
Persistence | On/Off | On | データの永続化 |
Internet | On/Off | On | 外部データソースアクセス |
# 実行環境の確認と最適化
import sys
import torch
import tensorflow as tf
import numpy as np
print(f"Python Version: {sys.version}")
print(f"PyTorch Version: {torch.__version__}")
print(f"TensorFlow Version: {tf.__version__}")
print(f"NumPy Version: {np.__version__}")
# GPU利用可能性の確認
if torch.cuda.is_available():
print(f"CUDA Device: {torch.cuda.get_device_name(0)}")
print(f"CUDA Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
else:
print("CUDA not available")
1.3 ファイルシステム構造の理解
Kaggle Notebookのファイルシステム構造は、効率的なデータ処理のために最適化されています:
/kaggle/
├── input/ # 読み取り専用入力データ
│ ├── competition-name/
│ └── dataset-name/
├── working/ # 書き込み可能な作業ディレクトリ
├── tmp/ # 一時ファイル(セッション終了で削除)
└── lib/ # カスタムライブラリ
第2章:データセットの効率的な読み込みと前処理
2.1 競技データセットの読み込み戦略
Kaggleコンペティションにおいて、データの読み込み効率は最終的な順位に直結する重要な要素です。以下のコードは、メモリ効率を最大化したデータ読み込みの実装例です:
import pandas as pd
import numpy as np
from pathlib import Path
import gc
def optimize_dataframe_memory(df):
"""
データフレームのメモリ使用量を最適化する関数
"""
start_mem = df.memory_usage(deep=True).sum() / 1024**2
for col in df.columns:
col_type = df[col].dtype
if col_type != object:
c_min = df[col].min()
c_max = df[col].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
df[col] = df[col].astype(np.int64)
else:
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
df[col] = df[col].astype(np.float16)
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
df[col] = df[col].astype(np.float64)
end_mem = df.memory_usage(deep=True).sum() / 1024**2
reduction = (start_mem - end_mem) / start_mem * 100
print(f'Memory usage decreased from {start_mem:.2f} MB to {end_mem:.2f} MB ({reduction:.1f}% reduction)')
return df
# 実際のデータ読み込み例
train_path = '/kaggle/input/competition-name/train.csv'
test_path = '/kaggle/input/competition-name/test.csv'
# チャンク読み込みによる大容量データの処理
chunk_size = 50000
train_chunks = []
for chunk in pd.read_csv(train_path, chunksize=chunk_size):
chunk = optimize_dataframe_memory(chunk)
train_chunks.append(chunk)
train_df = pd.concat(train_chunks, ignore_index=True)
del train_chunks
gc.collect()
print(f"Training data shape: {train_df.shape}")
print(f"Memory usage: {train_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
2.2 外部データセットの統合手法
Kaggle Notebookでは、公開データセットを簡単に追加できます。以下は、複数のデータソースを効率的に統合する実装例です:
# Kaggle APIを使用した外部データセット取得
import kaggle
import zipfile
import shutil
def download_and_extract_dataset(dataset_name, extract_path='/kaggle/working/'):
"""
Kaggleデータセットのダウンロードと展開
"""
try:
kaggle.api.dataset_download_files(
dataset_name,
path=extract_path,
unzip=True
)
print(f"Successfully downloaded and extracted: {dataset_name}")
except Exception as e:
print(f"Error downloading {dataset_name}: {str(e)}")
# 使用例:外部特徴量データセットの取得
external_datasets = [
'username/feature-engineering-dataset',
'username/additional-training-data'
]
for dataset in external_datasets:
download_and_extract_dataset(dataset)
2.3 データバリデーションとクオリティチェック
データ品質の保証は、モデル性能の基盤となります。以下の包括的なデータバリデーション関数を実装しています:
def comprehensive_data_validation(df, dataset_name="Unknown"):
"""
包括的なデータ品質チェック関数
"""
print(f"=== Data Validation Report for {dataset_name} ===")
# 基本統計情報
print(f"Shape: {df.shape}")
print(f"Memory Usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
# 欠損値分析
missing_data = df.isnull().sum()
missing_percent = 100 * missing_data / len(df)
missing_table = pd.DataFrame({
'Column': df.columns,
'Missing Count': missing_data.values,
'Missing Percentage': missing_percent.values
})
missing_table = missing_table[missing_table['Missing Count'] > 0].sort_values('Missing Percentage', ascending=False)
if not missing_table.empty:
print("\nMissing Values Analysis:")
print(missing_table.to_string(index=False))
# データ型分析
dtype_counts = df.dtypes.value_counts()
print(f"\nData Types Distribution:")
for dtype, count in dtype_counts.items():
print(f" {dtype}: {count} columns")
# 数値列の統計サマリー
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
print(f"\nNumeric Columns Summary:")
print(df[numeric_cols].describe())
# カテゴリカル列の分析
categorical_cols = df.select_dtypes(include=['object']).columns
if len(categorical_cols) > 0:
print(f"\nCategorical Columns Analysis:")
for col in categorical_cols[:5]: # 最初の5列のみ表示
unique_count = df[col].nunique()
print(f" {col}: {unique_count} unique values")
return missing_table
# 実行例
validation_report = comprehensive_data_validation(train_df, "Training Data")
第3章:機械学習モデルの実装と最適化
3.1 ベースラインモデルの構築
Kaggleコンペティションにおける成功の鍵は、迅速なベースライン構築にあります。以下は、LightGBMを使用した堅牢なベースラインの実装例です:
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.metrics import accuracy_score, roc_auc_score, mean_squared_error
import optuna
class KaggleBaselineModel:
"""
Kaggleコンペティション用ベースラインモデルクラス
"""
def __init__(self, model_type='classification', n_splits=5, random_state=42):
self.model_type = model_type
self.n_splits = n_splits
self.random_state = random_state
self.models = []
self.feature_importance = None
def prepare_features(self, df, target_col=None):
"""
特徴量エンジニアリングとデータ準備
"""
features = df.copy()
# 基本的な特徴量エンジニアリング
numeric_cols = features.select_dtypes(include=[np.number]).columns
categorical_cols = features.select_dtypes(include=['object']).columns
# カテゴリカル変数のエンコーディング
for col in categorical_cols:
if col != target_col:
features[col] = pd.Categorical(features[col]).codes
# 欠損値の処理
for col in numeric_cols:
if features[col].isnull().sum() > 0:
features[col].fillna(features[col].median(), inplace=True)
return features
def get_lgb_params(self, trial=None):
"""
LightGBMのハイパーパラメータ設定
"""
if trial is not None:
# Optuna最適化用
params = {
'objective': 'binary' if self.model_type == 'classification' else 'regression',
'metric': 'binary_logloss' if self.model_type == 'classification' else 'rmse',
'verbosity': -1,
'random_state': self.random_state,
'num_leaves': trial.suggest_int('num_leaves', 10, 300),
'feature_fraction': trial.suggest_float('feature_fraction', 0.4, 1.0),
'bagging_fraction': trial.suggest_float('bagging_fraction', 0.4, 1.0),
'bagging_freq': trial.suggest_int('bagging_freq', 1, 7),
'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
'lambda_l1': trial.suggest_float('lambda_l1', 1e-8, 10.0, log=True),
'lambda_l2': trial.suggest_float('lambda_l2', 1e-8, 10.0, log=True),
}
else:
# デフォルトパラメータ
params = {
'objective': 'binary' if self.model_type == 'classification' else 'regression',
'metric': 'binary_logloss' if self.model_type == 'classification' else 'rmse',
'verbosity': -1,
'random_state': self.random_state,
'num_leaves': 31,
'feature_fraction': 0.9,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'min_child_samples': 20,
}
return params
def train_cv_model(self, X, y, optimize_params=True):
"""
交差検証を使用したモデル訓練
"""
if self.model_type == 'classification':
cv = StratifiedKFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)
else:
cv = KFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)
oof_predictions = np.zeros(len(X))
feature_importance_df = pd.DataFrame()
if optimize_params:
# Optuna最適化
def objective(trial):
params = self.get_lgb_params(trial)
cv_scores = []
for fold, (train_idx, val_idx) in enumerate(cv.split(X, y)):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
model = lgb.train(
params,
train_data,
valid_sets=[val_data],
num_boost_round=1000,
callbacks=[lgb.early_stopping(50), lgb.log_evaluation(0)]
)
val_pred = model.predict(X_val, num_iteration=model.best_iteration)
if self.model_type == 'classification':
score = roc_auc_score(y_val, val_pred)
else:
score = np.sqrt(mean_squared_error(y_val, val_pred))
cv_scores.append(score)
return np.mean(cv_scores)
study = optuna.create_study(direction='maximize' if self.model_type == 'classification' else 'minimize')
study.optimize(objective, n_trials=100, timeout=600) # 10分でタイムアウト
best_params = self.get_lgb_params(study.best_trial)
print(f"Best parameters: {study.best_params}")
print(f"Best CV score: {study.best_value}")
else:
best_params = self.get_lgb_params()
# 最適パラメータでの最終訓練
for fold, (train_idx, val_idx) in enumerate(cv.split(X, y)):
print(f"Training fold {fold + 1}/{self.n_splits}")
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
model = lgb.train(
best_params,
train_data,
valid_sets=[val_data],
num_boost_round=1000,
callbacks=[lgb.early_stopping(50), lgb.log_evaluation(100)]
)
# OOF予測
oof_predictions[val_idx] = model.predict(X_val, num_iteration=model.best_iteration)
# 特徴量重要度の保存
fold_importance = pd.DataFrame({
'feature': X.columns,
'importance': model.feature_importance(),
'fold': fold
})
feature_importance_df = pd.concat([feature_importance_df, fold_importance], axis=0)
self.models.append(model)
# 最終評価
if self.model_type == 'classification':
cv_score = roc_auc_score(y, oof_predictions)
print(f"CV AUC Score: {cv_score:.6f}")
else:
cv_score = np.sqrt(mean_squared_error(y, oof_predictions))
print(f"CV RMSE Score: {cv_score:.6f}")
# 特徴量重要度の集約
self.feature_importance = feature_importance_df.groupby('feature')['importance'].mean().sort_values(ascending=False)
return oof_predictions, cv_score
# 使用例
model = KaggleBaselineModel(model_type='classification', n_splits=5)
features = model.prepare_features(train_df, target_col='target')
X = features.drop(['target'], axis=1)
y = features['target']
oof_pred, cv_score = model.train_cv_model(X, y, optimize_params=True)
3.2 アンサンブル手法の実装
複数モデルのアンサンブルは、Kaggleコンペティションにおける標準的な手法です。以下は、異なるアルゴリズムを組み合わせた堅牢なアンサンブルの実装例です:
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
import catboost as cb
class KaggleEnsembleModel:
"""
複数アルゴリズムのアンサンブルモデルクラス
"""
def __init__(self, models_config, meta_learner=None, n_splits=5):
self.models_config = models_config
self.meta_learner = meta_learner
self.n_splits = n_splits
self.base_models = []
self.meta_model = None
def train_base_models(self, X, y):
"""
ベースモデルの訓練とOOF予測の生成
"""
cv = StratifiedKFold(n_splits=self.n_splits, shuffle=True, random_state=42)
oof_predictions = np.zeros((len(X), len(self.models_config)))
for model_idx, (model_name, model_params) in enumerate(self.models_config.items()):
print(f"Training {model_name}...")
fold_models = []
for fold, (train_idx, val_idx) in enumerate(cv.split(X, y)):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
# モデル初期化
if model_name == 'lightgbm':
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
model = lgb.train(
model_params,
train_data,
valid_sets=[val_data],
num_boost_round=1000,
callbacks=[lgb.early_stopping(50), lgb.log_evaluation(0)]
)
val_pred = model.predict(X_val, num_iteration=model.best_iteration)
elif model_name == 'xgboost':
model = XGBClassifier(**model_params)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=50,
verbose=False
)
val_pred = model.predict_proba(X_val)[:, 1]
elif model_name == 'catboost':
model = cb.CatBoostClassifier(**model_params, verbose=False)
model.fit(
X_train, y_train,
eval_set=(X_val, y_val),
early_stopping_rounds=50
)
val_pred = model.predict_proba(X_val)[:, 1]
elif model_name == 'randomforest':
model = RandomForestClassifier(**model_params)
model.fit(X_train, y_train)
val_pred = model.predict_proba(X_val)[:, 1]
oof_predictions[val_idx, model_idx] = val_pred
fold_models.append(model)
self.base_models.append(fold_models)
# 個別モデルのCV評価
cv_score = roc_auc_score(y, oof_predictions[:, model_idx])
print(f"{model_name} CV AUC: {cv_score:.6f}")
return oof_predictions
def train_meta_learner(self, oof_predictions, y):
"""
メタ学習器の訓練
"""
if self.meta_learner is None:
# 単純平均
final_predictions = np.mean(oof_predictions, axis=1)
else:
# メタ学習器による重み付き平均
self.meta_model = self.meta_learner
self.meta_model.fit(oof_predictions, y)
final_predictions = self.meta_model.predict_proba(oof_predictions)[:, 1]
return final_predictions
def predict(self, X_test):
"""
テストデータに対する予測
"""
test_predictions = np.zeros((len(X_test), len(self.base_models)))
for model_idx, fold_models in enumerate(self.base_models):
fold_predictions = []
for model in fold_models:
if hasattr(model, 'predict_proba'):
pred = model.predict_proba(X_test)[:, 1]
else:
pred = model.predict(X_test, num_iteration=model.best_iteration)
fold_predictions.append(pred)
test_predictions[:, model_idx] = np.mean(fold_predictions, axis=0)
if self.meta_model is None:
final_predictions = np.mean(test_predictions, axis=1)
else:
final_predictions = self.meta_model.predict_proba(test_predictions)[:, 1]
return final_predictions
# アンサンブル設定例
models_config = {
'lightgbm': {
'objective': 'binary',
'metric': 'binary_logloss',
'verbosity': -1,
'num_leaves': 31,
'feature_fraction': 0.9,
'bagging_fraction': 0.8,
'bagging_freq': 5,
},
'xgboost': {
'objective': 'binary:logistic',
'eval_metric': 'logloss',
'max_depth': 6,
'learning_rate': 0.1,
'subsample': 0.8,
'colsample_bytree': 0.8,
'random_state': 42,
},
'catboost': {
'objective': 'Logloss',
'depth': 6,
'learning_rate': 0.1,
'random_seed': 42,
},
'randomforest': {
'n_estimators': 100,
'max_depth': 10,
'random_state': 42,
'n_jobs': -1,
}
}
# メタ学習器としてLogistic Regressionを使用
from sklearn.linear_model import LogisticRegression
meta_learner = LogisticRegression(random_state=42)
# アンサンブルモデルの訓練
ensemble = KaggleEnsembleModel(models_config, meta_learner=meta_learner)
oof_pred = ensemble.train_base_models(X, y)
final_pred = ensemble.train_meta_learner(oof_pred, y)
print(f"Ensemble CV AUC: {roc_auc_score(y, final_pred):.6f}")
第4章:高度な特徴量エンジニアリング
4.1 自動特徴量生成システム
特徴量エンジニアリングは、Kaggleコンペティションにおける差別化要因の一つです。以下は、自動的に有効な特徴量を生成するシステムの実装例です:
import itertools
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.preprocessing import PolynomialFeatures, StandardScaler
class AutoFeatureGenerator:
"""
自動特徴量生成クラス
"""
def __init__(self, max_features=100, selection_method='mutual_info'):
self.max_features = max_features
self.selection_method = selection_method
self.generated_features = []
self.feature_selector = None
self.scaler = StandardScaler()
def generate_arithmetic_features(self, df, numeric_cols):
"""
算術演算による特徴量生成
"""
new_features = df[numeric_cols].copy()
# 2つの列の組み合わせで算術演算
for col1, col2 in itertools.combinations(numeric_cols, 2):
if col1 != col2:
# 加算
new_features[f'{col1}_plus_{col2}'] = df[col1] + df[col2]
# 減算
new_features[f'{col1}_minus_{col2}'] = df[col1] - df[col2]
# 乗算
new_features[f'{col1}_multiply_{col2}'] = df[col1] * df[col2]
# 除算(ゼロ除算対策)
new_features[f'{col1}_divide_{col2}'] = df[col1] / (df[col2] + 1e-8)
# 比率
new_features[f'{col1}_ratio_{col2}'] = df[col1] / (df[col1] + df[col2] + 1e-8)
return new_features
def generate_statistical_features(self, df, numeric_cols, window_sizes=[3, 5, 7]):
"""
統計的特徴量の生成
"""
new_features = df[numeric_cols].copy()
for col in numeric_cols:
# 基本統計量
new_features[f'{col}_squared'] = df[col] ** 2
new_features[f'{col}_sqrt'] = np.sqrt(np.abs(df[col]))
new_features[f'{col}_log'] = np.log1p(np.abs(df[col]))
# 移動統計量(時系列データの場合)
for window in window_sizes:
if len(df) > window:
new_features[f'{col}_rolling_mean_{window}'] = df[col].rolling(window=window).mean()
new_features[f'{col}_rolling_std_{window}'] = df[col].rolling(window=window).std()
new_features[f'{col}_rolling_min_{window}'] = df[col].rolling(window=window).min()
new_features[f'{col}_rolling_max_{window}'] = df[col].rolling(window=window).max()
return new_features
def generate_categorical_features(self, df, categorical_cols, target_col=None):
"""
カテゴリカル特徴量の拡張
"""
new_features = pd.DataFrame(index=df.index)
for col in categorical_cols:
# カテゴリの出現頻度
value_counts = df[col].value_counts()
new_features[f'{col}_count'] = df[col].map(value_counts)
# カテゴリの出現率
new_features[f'{col}_frequency'] = new_features[f'{col}_count'] / len(df)
# レアカテゴリの判定
rare_threshold = 0.01 # 1%未満をレアとする
new_features[f'{col}_is_rare'] = (new_features[f'{col}_frequency'] < rare_threshold).astype(int)
# ターゲットエンコーディング(目的変数がある場合)
if target_col is not None and target_col in df.columns:
target_mean = df.groupby(col)[target_col].mean()
new_features[f'{col}_target_mean'] = df[col].map(target_mean)
target_std = df.groupby(col)[target_col].std()
new_features[f'{col}_target_std'] = df[col].map(target_std)
return new_features
def generate_interaction_features(self, df, cols, max_interactions=2):
"""
特徴量間の相互作用を生成
"""
poly = PolynomialFeatures(degree=max_interactions, interaction_only=True, include_bias=False)
interaction_features = poly.fit_transform(df[cols])
feature_names = poly.get_feature_names_out(cols)
interaction_df = pd.DataFrame(interaction_features, columns=feature_names, index=df.index)
# 元の特徴量を除去(相互作用のみ保持)
original_features = set(cols)
interaction_cols = [col for col in feature_names if col not in original_features]
return interaction_df[interaction_cols]
def select_best_features(self, X, y, method='mutual_info'):
"""
最適な特徴量の選択
"""
if method == 'mutual_info':
selector = SelectKBest(score_func=mutual_info_classif, k=min(self.max_features, X.shape[1]))
elif method == 'f_classif':
selector = SelectKBest(score_func=f_classif, k=min(self.max_features, X.shape[1]))
else:
raise ValueError(f"Unknown selection method: {method}")
X_selected = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()]
self.feature_selector = selector
return X_selected, selected_features
def fit_transform(self, df, target_col, categorical_cols=None, numeric_cols=None):
"""
特徴量生成パイプラインの実行
"""
if numeric_cols is None:
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
if target_col in numeric_cols:
numeric_cols.remove(target_col)
if categorical_cols is None:
categorical_cols = df.select_dtypes(include=['object']).columns.tolist()
print(f"Generating features from {len(numeric_cols)} numeric and {len(categorical_cols)} categorical columns...")
# 各種特徴量生成
arithmetic_features = self.generate_arithmetic_features(df, numeric_cols)
statistical_features = self.generate_statistical_features(df, numeric_cols)
categorical_features = self.generate_categorical_features(df, categorical_cols, target_col)
interaction_features = self.generate_interaction_features(df, numeric_cols[:10]) # 最初の10列のみ
# 特徴量結合
all_features = pd.concat([
arithmetic_features,
statistical_features,
categorical_features,
interaction_features
], axis=1)
# 無限値やNaNの処理
all_features = all_features.replace([np.inf, -np.inf], np.nan)
all_features = all_features.fillna(all_features.median())
print(f"Generated {all_features.shape[1]} features")
# 特徴量選択
y = df[target_col]
X_selected, selected_features = self.select_best_features(all_features, y, self.selection_method)
print(f"Selected {len(selected_features)} best features")
self.generated_features = selected_features.tolist()
return pd.DataFrame(X_selected, columns=selected_features, index=df.index)
def transform(self, df):
"""
学習済みの変換を新しいデータに適用
"""
if not self.generated_features:
raise ValueError("Model has not been fitted yet")
# 同じ特徴量生成プロセスを適用
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = df.select_dtypes(include=['object']).columns.tolist()
arithmetic_features = self.generate_arithmetic_features(df, numeric_cols)
statistical_features = self.generate_statistical_features(df, numeric_cols)
categorical_features = self.generate_categorical_features(df, categorical_cols)
interaction_features = self.generate_interaction_features(df, numeric_cols[:10])
all_features = pd.concat([
arithmetic_features,
statistical_features,
categorical_features,
interaction_features
], axis=1)
all_features = all_features.replace([np.inf, -np.inf], np.nan)
all_features = all_features.fillna(all_features.median())
# 学習時に選択された特徴量のみ抽出
selected_features = all_features[self.generated_features]
return selected_features
# 使用例
feature_generator = AutoFeatureGenerator(max_features=100, selection_method='mutual_info')
enhanced_features = feature_generator.fit_transform(train_df, target_col='target')
print(f"Enhanced features shape: {enhanced_features.shape}")
print(f"Generated features: {feature_generator.generated_features[:10]}") # 最初の10個を表示
4.2 時系列特徴量エンジニアリング
時系列データに特化した特徴量エンジニアリング手法を以下に示します:
class TimeSeriesFeatureGenerator:
"""
時系列データ専用特徴量生成クラス
"""
def __init__(self, date_col, value_cols, entity_col=None):
self.date_col = date_col
self.value_cols = value_cols if isinstance(value_cols, list) else [value_cols]
self.entity_col = entity_col
def generate_lag_features(self, df, lags=[1, 2, 3, 7, 14, 30]):
"""
ラグ特徴量の生成
"""
df_lag = df.copy()
for col in self.value_cols:
for lag in lags:
if self.entity_col:
df_lag[f'{col}_lag_{lag}'] = df_lag.groupby(self.entity_col)[col].shift(lag)
else:
df_lag[f'{col}_lag_{lag}'] = df_lag[col].shift(lag)
return df_lag
def generate_rolling_features(self, df, windows=[7, 14, 30], stats=['mean', 'std', 'min', 'max']):
"""
移動統計量の生成
"""
df_roll = df.copy()
for col in self.value_cols:
for window in windows:
if self.entity_col:
grouped = df_roll.groupby(self.entity_col)[col]
else:
grouped = df_roll[col]
for stat in stats:
if stat == 'mean':
df_roll[f'{col}_rolling_{stat}_{window}'] = grouped.rolling(window=window).mean()
elif stat == 'std':
df_roll[f'{col}_rolling_{stat}_{window}'] = grouped.rolling(window=window).std()
elif stat == 'min':
df_roll[f'{col}_rolling_{stat}_{window}'] = grouped.rolling(window=window).min()
elif stat == 'max':
df_roll[f'{col}_rolling_{stat}_{window}'] = grouped.rolling(window=window).max()
return df_roll
def generate_trend_features(self, df):
"""
トレンド特徴量の生成
"""
df_trend = df.copy()
df_trend[self.date_col] = pd.to_datetime(df_trend[self.date_col])
for col in self.value_cols:
# 前期比
if self.entity_col:
df_trend[f'{col}_pct_change'] = df_trend.groupby(self.entity_col)[col].pct_change()
else:
df_trend[f'{col}_pct_change'] = df_trend[col].pct_change()
# 移動平均との差分
if self.entity_col:
ma_7 = df_trend.groupby(self.entity_col)[col].rolling(window=7).mean()
else:
ma_7 = df_trend[col].rolling(window=7).mean()
df_trend[f'{col}_diff_ma7'] = df_trend[col] - ma_7
return df_trend
def generate_seasonal_features(self, df):
"""
季節性特徴量の生成
"""
df_seasonal = df.copy()
df_seasonal[self.date_col] = pd.to_datetime(df_seasonal[self.date_col])
# 日付から派生する特徴量
df_seasonal['year'] = df_seasonal[self.date_col].dt.year
df_seasonal['month'] = df_seasonal[self.date_col].dt.month
df_seasonal['day'] = df_seasonal[self.date_col].dt.day
df_seasonal['weekday'] = df_seasonal[self.date_col].dt.weekday
df_seasonal['quarter'] = df_seasonal[self.date_col].dt.quarter
df_seasonal['week_of_year'] = df_seasonal[self.date_col].dt.isocalendar().week
# 周期的特徴量(sin/cos変換)
df_seasonal['month_sin'] = np.sin(2 * np.pi * df_seasonal['month'] / 12)
df_seasonal['month_cos'] = np.cos(2 * np.pi * df_seasonal['month'] / 12)
df_seasonal['weekday_sin'] = np.sin(2 * np.pi * df_seasonal['weekday'] / 7)
df_seasonal['weekday_cos'] = np.cos(2 * np.pi * df_seasonal['weekday'] / 7)
# 祝日・休日フラグ(日本の例)
df_seasonal['is_weekend'] = df_seasonal['weekday'].isin([5, 6]).astype(int)
df_seasonal['is_month_start'] = (df_seasonal['day'] == 1).astype(int)
df_seasonal['is_month_end'] = (df_seasonal[self.date_col].dt.is_month_end).astype(int)
return df_seasonal
第5章:提出ファイルの生成と最適化
5.1 予測結果の後処理と最適化
Kaggleコンペティションでは、予測結果の後処理が最終スコアに大きく影響します。以下は、包括的な後処理システムの実装例です:
class PredictionPostProcessor:
"""
予測結果の後処理クラス
"""
def __init__(self, competition_type='classification'):
self.competition_type = competition_type
self.calibration_model = None
def apply_probability_calibration(self, predictions, y_true, method='platt'):
"""
確率較正の適用
"""
from sklearn.calibration import CalibratedClassifierCV, calibration_curve
from sklearn.isotonic import IsotonicRegression
if method == 'platt':
# Platt Scaling
from sklearn.linear_model import LogisticRegression
calibrator = LogisticRegression()
calibrator.fit(predictions.reshape(-1, 1), y_true)
self.calibration_model = calibrator
calibrated_pred = calibrator.predict_proba(predictions.reshape(-1, 1))[:, 1]
elif method == 'isotonic':
# Isotonic Regression
calibrator = IsotonicRegression(out_of_bounds='clip')
calibrated_pred = calibrator.fit_transform(predictions, y_true)
self.calibration_model = calibrator
return calibrated_pred
def apply_threshold_optimization(self, predictions, y_true, metric='f1'):
"""
最適閾値の探索
"""
from sklearn.metrics import f1_score, precision_recall_curve, roc_curve
if metric == 'f1':
precision, recall, thresholds = precision_recall_curve(y_true, predictions)
f1_scores = 2 * (precision * recall) / (precision + recall)
f1_scores = np.nan_to_num(f1_scores)
optimal_idx = np.argmax(f1_scores)
optimal_threshold = thresholds[optimal_idx]
elif metric == 'youden':
fpr, tpr, thresholds = roc_curve(y_true, predictions)
youden_j = tpr - fpr
optimal_idx = np.argmax(youden_j)
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold
def apply_rank_averaging(self, predictions_list, weights=None):
"""
ランク平均による予測統合
"""
if weights is None:
weights = [1.0] * len(predictions_list)
# 予測値をランクに変換
ranked_predictions = []
for pred in predictions_list:
ranked_pred = pd.Series(pred).rank(pct=True).values
ranked_predictions.append(ranked_pred)
# 重み付き平均
weighted_ranks = np.average(ranked_predictions, axis=0, weights=weights)
return weighted_ranks
def apply_gaussian_rank_transformation(self, predictions):
"""
ガウシアンランク変換
"""
from scipy.stats import norm
# ランクに変換(0-1の範囲)
ranks = pd.Series(predictions).rank(pct=True)
# ガウシアン変換
# 極値を避けるために少し調整
ranks = np.clip(ranks, 1e-6, 1 - 1e-6)
gaussian_pred = norm.ppf(ranks)
# 元の範囲にスケール
min_pred, max_pred = predictions.min(), predictions.max()
gaussian_pred = (gaussian_pred - gaussian_pred.min()) / (gaussian_pred.max() - gaussian_pred.min())
gaussian_pred = gaussian_pred * (max_pred - min_pred) + min_pred
return gaussian_pred
def apply_target_distribution_matching(self, predictions, target_distribution):
"""
目的変数の分布に合わせた変換
"""
# 予測値をソート
sorted_indices = np.argsort(predictions)
sorted_predictions = predictions[sorted_indices]
# 目的変数の分布からサンプル
target_quantiles = np.linspace(0, 1, len(predictions))
target_values = np.quantile(target_distribution, target_quantiles)
# マッピング
adjusted_predictions = np.zeros_like(predictions)
adjusted_predictions[sorted_indices] = target_values
return adjusted_predictions
def create_submission_file(self, test_predictions, sample_submission_path, output_path):
"""
提出ファイルの作成
"""
# サンプル提出ファイルの読み込み
sample_sub = pd.read_csv(sample_submission_path)
# 予測結果の設定
if len(sample_sub.columns) == 2:
# 単一目的変数の場合
sample_sub.iloc[:, 1] = test_predictions
else:
# 複数目的変数の場合
for i, col in enumerate(sample_sub.columns[1:]):
sample_sub[col] = test_predictions[:, i] if test_predictions.ndim > 1 else test_predictions
# ファイルの保存
sample_sub.to_csv(output_path, index=False)
print(f"Submission file saved to: {output_path}")
# 基本統計の表示
pred_cols = sample_sub.columns[1:]
for col in pred_cols:
print(f"{col} statistics:")
print(f" Mean: {sample_sub[col].mean():.6f}")
print(f" Std: {sample_sub[col].std():.6f}")
print(f" Min: {sample_sub[col].min():.6f}")
print(f" Max: {sample_sub[col].max():.6f}")
return sample_sub
# 使用例
processor = PredictionPostProcessor(competition_type='classification')
# テストデータでの予測
test_features = feature_generator.transform(test_df)
test_predictions = ensemble.predict(test_features)
# 後処理の適用
if 'target' in train_df.columns:
# 確率較正(検証データがある場合)
calibrated_pred = processor.apply_probability_calibration(oof_pred, y, method='isotonic')
# 較正モデルをテスト予測に適用
if processor.calibration_model is not None:
test_predictions = processor.calibration_model.transform(test_predictions.reshape(-1, 1))
# ランク変換(回帰問題の場合)
if processor.competition_type == 'regression':
test_predictions = processor.apply_gaussian_rank_transformation(test_predictions)
# 提出ファイルの作成
submission = processor.create_submission_file(
test_predictions,
'/kaggle/input/competition-name/sample_submission.csv',
'/kaggle/working/submission.csv'
)
5.2 複数提出戦略とスコア追跡
Kaggleコンペティションでは、複数の提出戦略が重要です。以下は、系統的な提出管理システムの実装例です:
import json
from datetime import datetime
import hashlib
class SubmissionManager:
"""
提出管理クラス
"""
def __init__(self, log_file='/kaggle/working/submission_log.json'):
self.log_file = log_file
self.submissions = self.load_log()
def load_log(self):
"""
提出ログの読み込み
"""
try:
with open(self.log_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return []
def save_log(self):
"""
提出ログの保存
"""
with open(self.log_file, 'w') as f:
json.dump(self.submissions, f, indent=2)
def calculate_prediction_hash(self, predictions):
"""
予測結果のハッシュ値計算
"""
pred_str = str(predictions.tolist())
return hashlib.md5(pred_str.encode()).hexdigest()[:8]
def log_submission(self, submission_file, model_config, cv_score, notes=""):
"""
提出の記録
"""
# 提出ファイルの読み込み
submission_df = pd.read_csv(submission_file)
predictions = submission_df.iloc[:, 1].values
submission_info = {
'timestamp': datetime.now().isoformat(),
'file_path': submission_file,
'model_config': model_config,
'cv_score': cv_score,
'prediction_hash': self.calculate_prediction_hash(predictions),
'prediction_stats': {
'mean': float(predictions.mean()),
'std': float(predictions.std()),
'min': float(predictions.min()),
'max': float(predictions.max()),
'unique_values': int(len(np.unique(predictions)))
},
'notes': notes,
'public_score': None, # 後で更新
'private_score': None # 後で更新
}
self.submissions.append(submission_info)
self.save_log()
return len(self.submissions) - 1 # submission ID
def update_scores(self, submission_id, public_score=None, private_score=None):
"""
公開/プライベートスコアの更新
"""
if 0 <= submission_id < len(self.submissions):
if public_score is not None:
self.submissions[submission_id]['public_score'] = public_score
if private_score is not None:
self.submissions[submission_id]['private_score'] = private_score
self.save_log()
def get_best_submissions(self, score_type='cv_score', top_k=5):
"""
ベスト提出の取得
"""
scored_submissions = [(i, sub) for i, sub in enumerate(self.submissions)
if sub.get(score_type) is not None]
if score_type in ['cv_score', 'public_score', 'private_score']:
# スコアが高いほど良い場合
scored_submissions.sort(key=lambda x: x[1][score_type], reverse=True)
return scored_submissions[:top_k]
def analyze_submissions(self):
"""
提出分析レポート
"""
if not self.submissions:
print("No submissions found.")
return
print("=== Submission Analysis Report ===")
print(f"Total submissions: {len(self.submissions)}")
# CV Score分析
cv_scores = [sub['cv_score'] for sub in self.submissions if sub['cv_score'] is not None]
if cv_scores:
print(f"\nCV Score Statistics:")
print(f" Best: {max(cv_scores):.6f}")
print(f" Worst: {min(cv_scores):.6f}")
print(f" Mean: {np.mean(cv_scores):.6f}")
print(f" Std: {np.std(cv_scores):.6f}")
# Public Score分析
public_scores = [sub['public_score'] for sub in self.submissions if sub['public_score'] is not None]
if public_scores:
print(f"\nPublic Score Statistics:")
print(f" Best: {max(public_scores):.6f}")
print(f" Worst: {min(public_scores):.6f}")
print(f" Mean: {np.mean(public_scores):.6f}")
print(f" Std: {np.std(public_scores):.6f}")
# CV vs Public相関
if cv_scores and public_scores and len(cv_scores) == len(public_scores):
correlation = np.corrcoef(cv_scores, public_scores)[0, 1]
print(f"\nCV vs Public Score Correlation: {correlation:.4f}")
# ベスト提出の表示
print(f"\nTop 3 CV Submissions:")
for i, (idx, sub) in enumerate(self.get_best_submissions('cv_score', 3)):
print(f" {i+1}. ID {idx}: CV {sub['cv_score']:.6f}, Public {sub.get('public_score', 'N/A')}")
def create_ensemble_submission(self, submission_ids, weights=None, output_path='/kaggle/working/ensemble_submission.csv'):
"""
複数提出のアンサンブル
"""
if weights is None:
weights = [1.0] * len(submission_ids)
predictions_list = []
for sub_id in submission_ids:
file_path = self.submissions[sub_id]['file_path']
sub_df = pd.read_csv(file_path)
predictions_list.append(sub_df.iloc[:, 1].values)
# 重み付き平均
ensemble_pred = np.average(predictions_list, axis=0, weights=weights)
# 提出ファイル作成
sample_sub = pd.read_csv(self.submissions[0]['file_path'])
sample_sub.iloc[:, 1] = ensemble_pred
sample_sub.to_csv(output_path, index=False)
# アンサンブル情報の記録
model_config = {
'type': 'ensemble',
'submission_ids': submission_ids,
'weights': weights
}
cv_scores = [self.submissions[sub_id]['cv_score'] for sub_id in submission_ids]
ensemble_cv = np.average(cv_scores, weights=weights)
ensemble_id = self.log_submission(
output_path,
model_config,
ensemble_cv,
f"Ensemble of submissions: {submission_ids}"
)
return ensemble_id
# 使用例
submission_manager = SubmissionManager()
# 提出の記録
model_config = {
'type': 'ensemble',
'base_models': ['lightgbm', 'xgboost', 'catboost'],
'meta_learner': 'logistic_regression',
'feature_selection': 'mutual_info',
'n_features': len(feature_generator.generated_features)
}
submission_id = submission_manager.log_submission(
'/kaggle/working/submission.csv',
model_config,
cv_score,
"Enhanced ensemble with auto-generated features"
)
# 分析レポート
submission_manager.analyze_submissions()
第6章:限界とリスク、推奨事項
6.1 Kaggle Notebook使用時の技術的制約
Kaggle Notebookは強力なプラットフォームですが、以下の技術的制約があります:
制約項目 | 制限内容 | 対策・回避方法 |
---|---|---|
実行時間 | 12時間(GPU使用時は9時間) | 効率的なコード設計、チェックポイント機能の活用 |
メモリ使用量 | 最大30GB(TPU使用時は14GB) | メモリ効率的なデータ処理、チャンク処理の実装 |
ディスク容量 | 73GB(/kaggle/working) | 不要ファイルの定期削除、圧縮の活用 |
インターネット制限 | 週20時間の制限 | オフライン実行の最大化、必要時のみインターネット有効化 |
外部ライブラリ | インストール制限あり | 事前インストール済みライブラリの活用、軽量代替ライブラリの選択 |
6.2 データリークと過学習のリスク
class DataLeakageDetector:
"""
データリークージ検出クラス
"""
def __init__(self, train_df, test_df, target_col):
self.train_df = train_df
self.test_df = test_df
self.target_col = target_col
def detect_feature_leakage(self):
"""
特徴量レベルでのリーケージ検出
"""
warnings = []
# 1. テストデータに存在しない特徴量の検出
train_cols = set(self.train_df.columns)
test_cols = set(self.test_df.columns)
missing_in_test = train_cols - test_cols - {self.target_col}
if missing_in_test:
warnings.append(f"Features missing in test data: {missing_in_test}")
# 2. 特徴量分布の著しい差異
common_cols = train_cols & test_cols
for col in common_cols:
if col != self.target_col and col in self.train_df.select_dtypes(include=[np.number]).columns:
train_mean = self.train_df[col].mean()
test_mean = self.test_df[col].mean()
if abs(train_mean - test_mean) / (abs(train_mean) + 1e-8) > 0.1: # 10%以上の差異
warnings.append(f"Large distribution shift in {col}: train_mean={train_mean:.4f}, test_mean={test_mean:.4f}")
# 3. 目的変数との相関が異常に高い特徴量
numeric_cols = self.train_df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if col != self.target_col:
correlation = self.train_df[col].corr(self.train_df[self.target_col])
if abs(correlation) > 0.95:
warnings.append(f"Suspiciously high correlation with target: {col} (corr={correlation:.4f})")
return warnings
def detect_temporal_leakage(self, date_col=None):
"""
時系列データでの時間的リーケージ検出
"""
warnings = []
if date_col and date_col in self.train_df.columns:
train_dates = pd.to_datetime(self.train_df[date_col])
test_dates = pd.to_datetime(self.test_df[date_col])
if train_dates.max() > test_dates.min():
warnings.append("Temporal leakage detected: training data contains future information")
return warnings
# 使用例
leakage_detector = DataLeakageDetector(train_df, test_df, 'target')
feature_warnings = leakage_detector.detect_feature_leakage()
if feature_warnings:
print("Data Leakage Warnings:")
for warning in feature_warnings:
print(f" - {warning}")
6.3 不適切なユースケースと倫理的考慮事項
Kaggle Notebookの使用において、以下のユースケースは不適切であり、避けるべきです:
不適切なユースケース | リスク | 代替アプローチ |
---|---|---|
機密データの処理 | データ漏洩、コンプライアンス違反 | ローカル環境での処理、専用セキュアクラウド |
大規模プロダクション推論 | スケーラビリティ不足、SLA違反 | 専用推論サーバー、クラウドAPI |
リアルタイム処理システム | レイテンシ制約、可用性問題 | ストリーミング処理基盤、エッジコンピューティング |
長期運用システム | セッション制限、永続性の欠如 | 継続統合パイプライン、専用インフラ |
バイアスを含むデータでの無批判な学習 | 社会的差別の助長 | バイアス検出・軽減手法の実装 |
6.4 セキュリティベストプラクティス
import os
import logging
from typing import Dict, Any
class KaggleSecurityManager:
"""
Kaggle Notebook用セキュリティ管理クラス
"""
def __init__(self):
self.logger = self._setup_logger()
self.sensitive_patterns = [
r'password', r'secret', r'token', r'key', r'credential'
]
def _setup_logger(self):
"""
セキュリティログの設定
"""
logger = logging.getLogger('kaggle_security')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('/kaggle/working/security.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def validate_data_sensitivity(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
データの機密性レベル評価
"""
results = {
'sensitive_columns': [],
'pii_columns': [],
'risk_level': 'LOW'
}
import re
for col in df.columns:
col_lower = col.lower()
# 機密情報パターンの検出
for pattern in self.sensitive_patterns:
if re.search(pattern, col_lower):
results['sensitive_columns'].append(col)
self.logger.warning(f"Sensitive column detected: {col}")
# PII(個人識別情報)の検出
pii_patterns = [r'name', r'email', r'phone', r'address', r'ssn', r'id']
for pattern in pii_patterns:
if re.search(pattern, col_lower):
results['pii_columns'].append(col)
self.logger.warning(f"PII column detected: {col}")
# リスクレベルの判定
if results['pii_columns']:
results['risk_level'] = 'HIGH'
elif results['sensitive_columns']:
results['risk_level'] = 'MEDIUM'
return results
def sanitize_output(self, output_text: str) -> str:
"""
出力の機密情報サニタイズ
"""
import re
# 一般的な機密情報パターンをマスク
patterns = [
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL_MASKED]'),
(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN_MASKED]'),
(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '[CARD_MASKED]'),
]
sanitized = output_text
for pattern, replacement in patterns:
sanitized = re.sub(pattern, replacement, sanitized)
return sanitized
def check_code_safety(self, code: str) -> Dict[str, Any]:
"""
コードの安全性チェック
"""
warnings = []
dangerous_patterns = [
(r'os\.system\(', 'Direct OS command execution detected'),
(r'subprocess\.(call|run|Popen)', 'Subprocess execution detected'),
(r'eval\(', 'Dynamic code evaluation detected'),
(r'exec\(', 'Dynamic code execution detected'),
(r'__import__\(', 'Dynamic import detected'),
]
import re
for pattern, message in dangerous_patterns:
if re.search(pattern, code):
warnings.append(message)
self.logger.warning(f"Security warning: {message}")
return {'warnings': warnings, 'safe': len(warnings) == 0}
# 使用例
security_manager = KaggleSecurityManager()
# データの機密性チェック
sensitivity_check = security_manager.validate_data_sensitivity(train_df)
print(f"Data sensitivity level: {sensitivity_check['risk_level']}")
# コード安全性チェック
code_sample = """
import pandas as pd
df = pd.read_csv('/kaggle/input/data.csv')
result = df.groupby('category').mean()
"""
safety_check = security_manager.check_code_safety(code_sample)
if not safety_check['safe']:
print("Security warnings found:")
for warning in safety_check['warnings']:
print(f" - {warning}")
第7章:高度な最適化テクニック
7.1 メモリ効率化とパフォーマンス最適化
大規模データセットを扱う際のメモリ効率化は、Kaggle Notebookでの成功に不可欠です。以下は、実践的な最適化手法の実装例です:
import psutil
import gc
from functools import wraps
import time
class KagglePerformanceOptimizer:
"""
Kaggle Notebook用パフォーマンス最適化クラス
"""
def __init__(self):
self.memory_snapshots = []
self.timing_logs = []
def memory_monitor(self, func):
"""
メモリ使用量監視デコレータ
"""
@wraps(func)
def wrapper(*args, **kwargs):
# 実行前のメモリ使用量
process = psutil.Process()
mem_before = process.memory_info().rss / 1024**2 # MB
# 関数実行
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
# 実行後のメモリ使用量
mem_after = process.memory_info().rss / 1024**2 # MB
execution_time = end_time - start_time
# ログ記録
log_entry = {
'function': func.__name__,
'memory_before': mem_before,
'memory_after': mem_after,
'memory_delta': mem_after - mem_before,
'execution_time': execution_time,
'timestamp': time.time()
}
self.timing_logs.append(log_entry)
print(f"{func.__name__}: {execution_time:.2f}s, Memory: {mem_before:.1f}MB → {mem_after:.1f}MB ({mem_after-mem_before:+.1f}MB)")
return result
return wrapper
def optimize_pandas_memory(self, df, aggressive=False):
"""
Pandasデータフレームのメモリ最適化
"""
start_memory = df.memory_usage(deep=True).sum() / 1024**2
for col in df.columns:
col_type = df[col].dtype
if col_type != object:
c_min = df[col].min()
c_max = df[col].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
elif str(col_type)[:5] == 'float':
if aggressive:
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
df[col] = df[col].astype(np.float16)
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
# カテゴリカルデータの最適化
unique_count = df[col].nunique()
total_count = len(df)
if unique_count / total_count < 0.5: # ユニーク値が50%未満の場合
df[col] = df[col].astype('category')
end_memory = df.memory_usage(deep=True).sum() / 1024**2
reduction = (start_memory - end_memory) / start_memory * 100
print(f'Memory optimization: {start_memory:.1f}MB → {end_memory:.1f}MB ({reduction:.1f}% reduction)')
return df
def efficient_data_loading(self, file_path, chunk_size=50000, target_memory_mb=1000):
"""
メモリ効率的なデータ読み込み
"""
# ファイルサイズの確認
file_size_mb = os.path.getsize(file_path) / 1024**2
print(f"File size: {file_size_mb:.1f}MB")
if file_size_mb < target_memory_mb:
# 小さいファイルは一括読み込み
df = pd.read_csv(file_path)
return self.optimize_pandas_memory(df)
else:
# 大きいファイルはチャンク読み込み
print(f"Large file detected. Using chunk loading with size {chunk_size}")
chunks = []
total_rows = 0
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# チャンクごとに最適化
chunk = self.optimize_pandas_memory(chunk, aggressive=True)
chunks.append(chunk)
total_rows += len(chunk)
# メモリ使用量をチェック
current_memory = psutil.Process().memory_info().rss / 1024**2
if current_memory > target_memory_mb * 0.8: # 80%で警告
print(f"Warning: Memory usage {current_memory:.1f}MB approaching limit")
# チャンクを結合
df = pd.concat(chunks, ignore_index=True)
del chunks # メモリ解放
gc.collect()
print(f"Loaded {total_rows:,} rows")
return df
def create_performance_report(self):
"""
パフォーマンスレポートの生成
"""
if not self.timing_logs:
print("No performance data available")
return
print("=== Performance Analysis Report ===")
# 実行時間分析
total_time = sum(log['execution_time'] for log in self.timing_logs)
print(f"Total execution time: {total_time:.2f}s")
# 関数別実行時間
func_times = {}
for log in self.timing_logs:
func_name = log['function']
if func_name not in func_times:
func_times[func_name] = []
func_times[func_name].append(log['execution_time'])
print("\nFunction execution times:")
for func_name, times in sorted(func_times.items(), key=lambda x: sum(x[1]), reverse=True):
avg_time = np.mean(times)
total_func_time = sum(times)
print(f" {func_name}: {total_func_time:.2f}s total, {avg_time:.2f}s avg ({len(times)} calls)")
# メモリ使用量分析
memory_deltas = [log['memory_delta'] for log in self.timing_logs]
total_memory_change = sum(memory_deltas)
print(f"\nTotal memory change: {total_memory_change:+.1f}MB")
# メモリリークの検出
if total_memory_change > 100: # 100MB以上の増加
print("⚠️ Potential memory leak detected")
# リーク源の特定
leak_functions = [(log['function'], log['memory_delta'])
for log in self.timing_logs if log['memory_delta'] > 10]
if leak_functions:
print("Functions with high memory usage:")
for func_name, mem_delta in sorted(leak_functions, key=lambda x: x[1], reverse=True):
print(f" {func_name}: +{mem_delta:.1f}MB")
# 使用例
optimizer = KagglePerformanceOptimizer()
@optimizer.memory_monitor
def load_and_process_data(file_path):
"""
データ読み込みと前処理(監視対象)
"""
df = optimizer.efficient_data_loading(file_path)
# 基本的な前処理
df = df.dropna()
df = df.drop_duplicates()
return df
@optimizer.memory_monitor
def feature_engineering(df):
"""
特徴量エンジニアリング(監視対象)
"""
# 数値特徴量の作成
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols[:5]: # 最初の5列のみ
df[f'{col}_squared'] = df[col] ** 2
df[f'{col}_log'] = np.log1p(np.abs(df[col]))
return df
# 実行とパフォーマンス分析
if 'train.csv' in os.listdir('/kaggle/input/'):
processed_df = load_and_process_data('/kaggle/input/train.csv')
enhanced_df = feature_engineering(processed_df)
# パフォーマンスレポート
optimizer.create_performance_report()
7.2 並列処理とGPUアクセラレーション
Kaggle NotebookでのGPUリソースを最大限活用するための実装例を以下に示します:
import concurrent.futures
from multiprocessing import Pool, cpu_count
import cupy as cp # GPU計算用
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
class KaggleGPUAccelerator:
"""
GPU加速処理クラス
"""
def __init__(self):
self.device = self._setup_device()
self.gpu_memory_fraction = 0.8
def _setup_device(self):
"""
デバイス設定の最適化
"""
if torch.cuda.is_available():
device = torch.device('cuda')
print(f"GPU detected: {torch.cuda.get_device_name(0)}")
print(f"GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f}GB")
# GPU メモリクリア
torch.cuda.empty_cache()
return device
else:
print("GPU not available, using CPU")
return torch.device('cpu')
def parallel_feature_engineering(self, df, n_workers=None):
"""
並列特徴量エンジニアリング
"""
if n_workers is None:
n_workers = min(cpu_count(), 4) # Kaggleの制限を考慮
print(f"Using {n_workers} workers for parallel processing")
# データを分割
chunk_size = len(df) // n_workers
chunks = [df.iloc[i:i + chunk_size] for i in range(0, len(df), chunk_size)]
def process_chunk(chunk):
"""
チャンク処理関数
"""
processed_chunk = chunk.copy()
# 数値特徴量の処理
numeric_cols = chunk.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
processed_chunk[f'{col}_rolling_mean_3'] = chunk[col].rolling(3).mean()
processed_chunk[f'{col}_ewm_alpha_0.1'] = chunk[col].ewm(alpha=0.1).mean()
processed_chunk[f'{col}_rank'] = chunk[col].rank(pct=True)
return processed_chunk
# 並列実行
with concurrent.futures.ThreadPoolExecutor(max_workers=n_workers) as executor:
future_to_chunk = {executor.submit(process_chunk, chunk): i for i, chunk in enumerate(chunks)}
results = []
for future in concurrent.futures.as_completed(future_to_chunk):
chunk_idx = future_to_chunk[future]
try:
result = future.result()
results.append((chunk_idx, result))
except Exception as exc:
print(f'Chunk {chunk_idx} generated an exception: {exc}')
# 結果を結合
results.sort(key=lambda x: x[0]) # インデックス順にソート
processed_chunks = [result[1] for result in results]
return pd.concat(processed_chunks, ignore_index=True)
def gpu_matrix_operations(self, X, operation='correlation'):
"""
GPU加速行列演算
"""
if not torch.cuda.is_available():
print("GPU not available, falling back to CPU")
if operation == 'correlation':
return np.corrcoef(X.T)
elif operation == 'covariance':
return np.cov(X.T)
try:
# データをGPUに転送
X_gpu = torch.tensor(X, dtype=torch.float32).to(self.device)
if operation == 'correlation':
# 相関行列の計算
X_centered = X_gpu - X_gpu.mean(dim=0)
correlation_matrix = torch.mm(X_centered.T, X_centered) / (X_gpu.shape[0] - 1)
# 標準化
std_dev = torch.sqrt(torch.diag(correlation_matrix))
correlation_matrix = correlation_matrix / torch.outer(std_dev, std_dev)
result = correlation_matrix.cpu().numpy()
elif operation == 'covariance':
# 共分散行列の計算
X_centered = X_gpu - X_gpu.mean(dim=0)
covariance_matrix = torch.mm(X_centered.T, X_centered) / (X_gpu.shape[0] - 1)
result = covariance_matrix.cpu().numpy()
elif operation == 'pca_transform':
# PCA変換(特異値分解使用)
X_centered = X_gpu - X_gpu.mean(dim=0)
U, S, V = torch.svd(X_centered)
result = torch.mm(X_centered, V).cpu().numpy()
# GPUメモリクリア
torch.cuda.empty_cache()
return result
except Exception as e:
print(f"GPU operation failed: {e}")
print("Falling back to CPU computation")
if operation == 'correlation':
return np.corrcoef(X.T)
elif operation == 'covariance':
return np.cov(X.T)
def gpu_neural_network_training(self, X_train, y_train, X_val, y_val,
hidden_dims=[128, 64, 32], epochs=100, batch_size=1024):
"""
GPU加速ニューラルネットワーク訓練
"""
class TabularNN(nn.Module):
def __init__(self, input_dim, hidden_dims, output_dim=1):
super(TabularNN, self).__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(0.3)
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, output_dim))
layers.append(nn.Sigmoid())
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# データの準備
X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32).reshape(-1, 1)
X_val_tensor = torch.tensor(X_val.values, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val.values, dtype=torch.float32).reshape(-1, 1)
# データローダーの作成
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# モデルの初期化
model = TabularNN(X_train.shape[1], hidden_dims).to(self.device)
criterion = nn.BCELoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10, factor=0.5)
# 訓練ループ
best_val_loss = float('inf')
patience_counter = 0
for epoch in range(epochs):
model.train()
train_loss = 0.0
for batch_X, batch_y in train_loader:
batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
# 検証評価
model.eval()
with torch.no_grad():
X_val_gpu = X_val_tensor.to(self.device)
y_val_gpu = y_val_tensor.to(self.device)
val_outputs = model(X_val_gpu)
val_loss = criterion(val_outputs, y_val_gpu).item()
scheduler.step(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
# 最良モデルの保存
torch.save(model.state_dict(), '/kaggle/working/best_model.pth')
else:
patience_counter += 1
if epoch % 10 == 0:
print(f'Epoch {epoch}: Train Loss: {train_loss/len(train_loader):.6f}, Val Loss: {val_loss:.6f}')
# 早期停止
if patience_counter >= 20:
print(f'Early stopping at epoch {epoch}')
break
# 最良モデルの読み込み
model.load_state_dict(torch.load('/kaggle/working/best_model.pth'))
return model
def predict_with_gpu_model(self, model, X_test, batch_size=10000):
"""
GPU モデルでの予測
"""
model.eval()
predictions = []
# バッチ単位で予測
for i in range(0, len(X_test), batch_size):
batch_X = X_test.iloc[i:i+batch_size]
X_tensor = torch.tensor(batch_X.values, dtype=torch.float32).to(self.device)
with torch.no_grad():
batch_pred = model(X_tensor).cpu().numpy().flatten()
predictions.extend(batch_pred)
return np.array(predictions)
# 使用例
gpu_accelerator = KaggleGPUAccelerator()
# 並列特徴量エンジニアリング
if len(train_df) > 10000: # 大きなデータセットの場合のみ
print("Applying parallel feature engineering...")
enhanced_train = gpu_accelerator.parallel_feature_engineering(train_df)
else:
enhanced_train = train_df
# GPU加速相関行列計算
numeric_cols = enhanced_train.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 10:
print("Computing correlation matrix with GPU acceleration...")
correlation_matrix = gpu_accelerator.gpu_matrix_operations(
enhanced_train[numeric_cols].values,
operation='correlation'
)
# 高相関特徴量ペアの検出
high_corr_pairs = []
for i in range(len(numeric_cols)):
for j in range(i+1, len(numeric_cols)):
if abs(correlation_matrix[i, j]) > 0.9:
high_corr_pairs.append((numeric_cols[i], numeric_cols[j], correlation_matrix[i, j]))
if high_corr_pairs:
print("High correlation pairs detected:")
for col1, col2, corr in high_corr_pairs[:5]: # 上位5個
print(f" {col1} - {col2}: {corr:.4f}")
第8章:結論と実践ガイドライン
8.1 Kaggle Notebook マスタリーのロードマップ
Kaggle Notebookを完全にマスターするための段階的学習パスを以下に示します:
レベル | 習得すべきスキル | 推定学習時間 | 評価指標 |
---|---|---|---|
初級 | 基本操作、データ読み込み、簡単な可視化 | 20-30時間 | 初回提出完了 |
中級 | 特徴量エンジニアリング、クロスバリデーション、アンサンブル | 50-80時間 | Bronze Medal取得 |
上級 | 高度な最適化、カスタムモデル、GPU活用 | 100-150時間 | Silver Medal取得 |
エキスパート | 独自手法開発、効率的パイプライン構築 | 200時間以上 | Gold Medal取得 |
8.2 継続的スキル向上のための実践的アプローチ
class KaggleMasteryTracker:
"""
Kaggleスキル習得進捗管理クラス
"""
def __init__(self):
self.skill_metrics = {
'competitions_joined': 0,
'notebooks_published': 0,
'datasets_uploaded': 0,
'medals_earned': {'bronze': 0, 'silver': 0, 'gold': 0},
'techniques_mastered': [],
'learning_hours': 0
}
self.learning_plan = self._create_learning_plan()
def _create_learning_plan(self):
"""
個別化学習プランの作成
"""
return {
'beginner': [
'Kaggle環境の理解',
'pandas基本操作',
'基本的な機械学習モデル',
'クロスバリデーション',
'初回提出完了'
],
'intermediate': [
'特徴量エンジニアリング自動化',
'アンサンブル手法',
'ハイパーパラメータ最適化',
'メモリ効率化',
'GPU活用基礎'
],
'advanced': [
'カスタムモデル開発',
'高度な前処理パイプライン',
'マルチモーダル学習',
'擬似ラベリング',
'スタッキング・ブレンディング'
],
'expert': [
'独自アルゴリズム開発',
'論文再現実装',
'コミュニティ貢献',
'メンタリング活動',
'新しい評価指標提案'
]
}
def assess_current_level(self):
"""
現在のスキルレベル評価
"""
total_medals = sum(self.skill_metrics['medals_earned'].values())
competitions = self.skill_metrics['competitions_joined']
if total_medals == 0 and competitions < 3:
return 'beginner'
elif total_medals < 3 and competitions < 10:
return 'intermediate'
elif total_medals < 10 and competitions < 25:
return 'advanced'
else:
return 'expert'
def get_next_learning_objectives(self):
"""
次の学習目標の提示
"""
current_level = self.assess_current_level()
completed_techniques = set(self.skill_metrics['techniques_mastered'])
remaining_objectives = [
obj for obj in self.learning_plan[current_level]
if obj not in completed_techniques
]
return remaining_objectives[:3] # 次の3つの目標
def create_practice_schedule(self, weekly_hours=10):
"""
個人別練習スケジュール作成
"""
objectives = self.get_next_learning_objectives()
schedule = {
'weekly_commitment': f"{weekly_hours} hours",
'daily_sessions': f"{weekly_hours/7:.1f} hours/day",
'current_objectives': objectives,
'recommended_competitions': [],
'skill_building_exercises': []
}
# レベル別推奨コンペティション
current_level = self.assess_current_level()
if current_level == 'beginner':
schedule['recommended_competitions'] = [
'Tabular Playground Series',
'Getting Started competitions',
'Featured competitions (participation only)'
]
schedule['skill_building_exercises'] = [
'Daily pandas practice (30 min)',
'Kaggle Learn courses (2 hours/week)',
'Code reproduction from public notebooks (3 hours/week)'
]
elif current_level == 'intermediate':
schedule['recommended_competitions'] = [
'Featured competitions (top 50% target)',
'Research competitions',
'Community competitions'
]
schedule['skill_building_exercises'] = [
'Advanced feature engineering (3 hours/week)',
'Model ensemble practice (2 hours/week)',
'Performance optimization (2 hours/week)'
]
elif current_level == 'advanced':
schedule['recommended_competitions'] = [
'Featured competitions (medal target)',
'Research competitions (innovation focus)',
'Specialized domain competitions'
]
schedule['skill_building_exercises'] = [
'Custom model development (4 hours/week)',
'Academic paper implementation (3 hours/week)',
'Open-source contribution (2 hours/week)'
]
else: # expert
schedule['recommended_competitions'] = [
'All competition types (consistent medal target)',
'Host competitions or challenges',
'Industry collaborations'
]
schedule['skill_building_exercises'] = [
'Novel technique research (5 hours/week)',
'Community mentoring (3 hours/week)',
'Technical writing/blogging (2 hours/week)'
]
return schedule
# 使用例とマスタリー評価
mastery_tracker = KaggleMasteryTracker()
# 現在のレベル評価
current_level = mastery_tracker.assess_current_level()
print(f"Current skill level: {current_level.title()}")
# 次の学習目標
objectives = mastery_tracker.get_next_learning_objectives()
print(f"Next learning objectives:")
for i, obj in enumerate(objectives, 1):
print(f" {i}. {obj}")
# 個人別学習スケジュール
schedule = mastery_tracker.create_practice_schedule(weekly_hours=15)
print(f"\nRecommended weekly schedule ({schedule['weekly_commitment']}):")
print(f"Daily commitment: {schedule['daily_sessions']}")
print(f"\nRecommended competitions:")
for comp in schedule['recommended_competitions']:
print(f" • {comp}")
print(f"\nSkill building exercises:")
for exercise in schedule['skill_building_exercises']:
print(f" • {exercise}")
8.3 最終的な推奨事項と長期戦略
Kaggle Notebookでの成功を確実にするための包括的な推奨事項を以下に示します:
技術的推奨事項
- 段階的スキル構築: 基礎から高度な技術まで、体系的な学習アプローチを維持する
- コード再利用性: 汎用的な関数とクラスを開発し、個人ライブラリを構築する
- パフォーマンス意識: 常にメモリ使用量と実行時間を監視し、最適化を心がける
- バージョン管理: 実験結果とコードを系統的に記録し、再現可能性を確保する
- コミュニティ参加: 公開ノートブックへの貢献と他参加者との知識共有を積極的に行う
学習継続のためのマインドセット
- 失敗からの学習: 低いスコアや失敗した実験も貴重な学習機会として捉える
- 継続的改善: 小さな改善の積み重ねが大きな成果につながることを理解する
- 多様性の追求: 異なるドメインや問題タイプのコンペティションに挑戦する
- 知識の共有: 学んだことを他者と共有することで、自身の理解も深める
- 長期視点: 短期的な結果に一喜一憂せず、長期的なスキル向上に焦点を当てる
結論
本記事では、Kaggle Notebookの基本操作から高度な最適化技術まで、データサイエンスコンペティションで勝利するための包括的な知識を提供しました。元Google BrainでのAI研究経験と現役AIスタートアップCTOとしての実践的知見を基に、単なる操作説明を超えて、実際のコンペティションで使用可能な実装例と戦略的アプローチを詳述しました。
Kaggle Notebookは、単なる開発環境ではなく、世界中のデータサイエンティストが知識を共有し、切磋琢磨する学習プラットフォームです。本記事で紹介した技術と戦略を実践することで、読者の皆様がKaggleコミュニティにおいて継続的な成功を収め、データサイエンス分野での専門性を確立できることを確信しています。
重要なのは、技術的スキルの習得だけでなく、継続的な学習姿勢と実験精神を維持することです。Kaggle Notebookを最大限活用し、データサイエンスの最前線で活躍する専門家として成長していただければ幸いです。
参考文献・リソース
- Kaggle Official Documentation: https://www.kaggle.com/docs
- Pandas User Guide: https://pandas.pydata.org/pandas-docs/stable/user_guide/
- LightGBM Documentation: https://lightgbm.readthedocs.io/
- XGBoost Documentation: https://xgboost.readthedocs.io/
- PyTorch Documentation: https://pytorch.org/docs/stable/
- Scikit-learn User Guide: https://scikit-learn.org/stable/user_guide.html
- Feature Engineering for Machine Learning (Alice Zheng, Amanda Casari)
- Hands-On Machine Learning (Aurélien Géron)
- The Elements of Statistical Learning (Hastie, Tibshirani, Friedman)
- Kaggle Learn Courses: https://www.kaggle.com/learn