序論:Streamlitアプリケーションにおける認証の重要性と技術的課題
AIアプリケーションの開発において、Streamlitは迅速なプロトタイピングと展開を可能にする強力なフレームワークです。しかし、本格的な運用環境では、適切な認証機能の実装が不可欠となります。特に、機密性の高いAIモデルや企業内データを扱うアプリケーションでは、未認証のアクセスを防ぐセキュリティ機構が求められます。
本記事では、Streamlitにおける認証機能の実装方法を、基本的なパスワード認証から、セッション管理、OAuth連携まで、実装の複雑性と安全性のレベル別に体系的に解説します。筆者が実際のAIスタートアップ環境で経験した成功事例と失敗事例を踏まえ、実用的な実装コードとともに、各手法の技術的背景と適用場面を詳述します。
Streamlitの認証における固有の技術的課題
Streamlitは、その設計思想上、ステートレスなWebアプリケーションフレームワークです。各ユーザーのリクエストは独立して処理され、デフォルトではセッション状態の永続化機能を持ちません。この特性は、従来のWebフレームワーク(Django、Flask等)とは異なる認証戦略を要求します。
具体的な技術的課題として以下が挙げられます:
- セッション状態の管理: Streamlitの
st.session_state
は、ブラウザのタブ単位で管理されるため、複数タブ間での認証状態の共有に制約があります。 - 認証状態の永続化: ページの再読み込みやブラウザの再起動後も認証状態を維持するための機構が必要です。
- セキュリティヘッダーの制御: CSRFトークンやSecure Cookieの実装における制約事項があります。
第1章:基本認証の実装 – シンプルパスワード方式
1.1 最小構成での認証実装
最も単純な認証方式として、固定パスワードによる認証機能を実装します。この手法は、開発初期段階や内部テスト用途に適用可能です。
import streamlit as st
import hashlib
import time
def hash_password(password: str) -> str:
"""パスワードをSHA-256でハッシュ化"""
return hashlib.sha256(password.encode()).hexdigest()
def check_password(password: str, correct_hash: str) -> bool:
"""パスワードの照合"""
return hash_password(password) == correct_hash
def login_form():
"""ログインフォームの表示"""
st.title("🔐 アプリケーション認証")
with st.form("login_form"):
username = st.text_input("ユーザー名")
password = st.text_input("パスワード", type="password")
submit_button = st.form_submit_button("ログイン")
if submit_button:
# 実際の環境では環境変数から取得
CORRECT_USERNAME = "admin"
CORRECT_PASSWORD_HASH = hash_password("secure_password_123")
if username == CORRECT_USERNAME and check_password(password, CORRECT_PASSWORD_HASH):
st.session_state.authenticated = True
st.session_state.username = username
st.success("ログインに成功しました!")
time.sleep(1)
st.rerun()
else:
st.error("ユーザー名またはパスワードが正しくありません。")
def main_app():
"""メインアプリケーション"""
st.title(f"Welcome, {st.session_state.username}!")
# ログアウト機能
if st.button("ログアウト"):
for key in list(st.session_state.keys()):
del st.session_state[key]
st.rerun()
# ここにメインのアプリケーション機能を実装
st.write("認証が完了しました。アプリケーションを利用できます。")
def main():
"""メイン関数"""
# 認証状態の初期化
if 'authenticated' not in st.session_state:
st.session_state.authenticated = False
# 認証状態による分岐
if st.session_state.authenticated:
main_app()
else:
login_form()
if __name__ == "__main__":
main()
1.2 環境変数を活用したセキュアな設定管理
上記の実装では、認証情報がソースコードに直接記述されているため、セキュリティ上の問題があります。実際の運用環境では、環境変数またはSecretsファイルを使用した管理が必要です。
import os
from typing import Dict, Optional
class AuthConfig:
"""認証設定の管理クラス"""
@staticmethod
def get_credentials() -> Dict[str, str]:
"""環境変数から認証情報を取得"""
try:
# Streamlit Secrets(推奨)
credentials = st.secrets["auth"]
return {
"username": credentials["username"],
"password_hash": credentials["password_hash"]
}
except KeyError:
# 環境変数からのフォールバック
username = os.getenv("STREAMLIT_USERNAME")
password = os.getenv("STREAMLIT_PASSWORD")
if not username or not password:
raise ValueError("認証設定が見つかりません。")
return {
"username": username,
"password_hash": hash_password(password)
}
# .streamlit/secrets.toml の例
"""
[auth]
username = “admin” password_hash = “ef92b778bafe771e89245b89ecbc08a44a4e166c06659911881f383d4473e94f” “””
1.3 認証試行回数制限による総当たり攻撃対策
セキュリティを向上させるため、認証失敗時の試行回数制限機能を実装します。
import time
from collections import defaultdict
from typing import Dict, Tuple
class LoginAttemptTracker:
"""ログイン試行の追跡と制限"""
def __init__(self, max_attempts: int = 5, lockout_duration: int = 300):
self.max_attempts = max_attempts
self.lockout_duration = lockout_duration # 秒
self.attempts: Dict[str, list] = defaultdict(list)
def is_locked_out(self, identifier: str) -> Tuple[bool, int]:
"""アカウントがロックアウトされているかチェック"""
current_time = time.time()
attempts = self.attempts[identifier]
# 古い試行記録をクリーンアップ
attempts[:] = [attempt_time for attempt_time in attempts
if current_time - attempt_time < self.lockout_duration]
if len(attempts) >= self.max_attempts:
remaining_time = int(self.lockout_duration - (current_time - attempts[0]))
return True, max(0, remaining_time)
return False, 0
def record_failed_attempt(self, identifier: str):
"""失敗した試行を記録"""
self.attempts[identifier].append(time.time())
# 使用例
login_tracker = LoginAttemptTracker()
def enhanced_login_form():
"""拡張ログインフォーム(試行回数制限付き)"""
st.title("🔐 セキュア認証システム")
# クライアントIPアドレスの取得(簡易版)
client_ip = st.context.headers.get("x-forwarded-for", "unknown")
# ロックアウト状態のチェック
is_locked, remaining_time = login_tracker.is_locked_out(client_ip)
if is_locked:
st.error(f"⚠️ 複数回の認証失敗により、アカウントがロックされています。")
st.warning(f"再試行まで残り時間: {remaining_time}秒")
return
# 残り試行回数の表示
current_attempts = len(login_tracker.attempts[client_ip])
remaining_attempts = login_tracker.max_attempts - current_attempts
if current_attempts > 0:
st.info(f"残り試行回数: {remaining_attempts}回")
with st.form("secure_login_form"):
username = st.text_input("ユーザー名")
password = st.text_input("パスワード", type="password")
submit_button = st.form_submit_button("ログイン")
if submit_button:
config = AuthConfig.get_credentials()
if username == config["username"] and check_password(password, config["password_hash"]):
st.session_state.authenticated = True
st.session_state.username = username
st.success("認証に成功しました!")
time.sleep(1)
st.rerun()
else:
login_tracker.record_failed_attempt(client_ip)
st.error("認証に失敗しました。ユーザー名とパスワードを確認してください。")
第2章:セッション管理の高度な実装
2.1 暗号化Cookieによる認証状態の永続化
Streamlitの標準的なst.session_state
は、ブラウザセッション内でのみ有効です。より堅牢な認証システムを構築するため、暗号化されたCookieを使用したセッション管理を実装します。
import jwt
import datetime
from cryptography.fernet import Fernet
import base64
import json
class SecureSessionManager:
"""セキュアなセッション管理クラス"""
def __init__(self, secret_key: str):
self.secret_key = secret_key
# Fernetキーの生成(本来は環境変数から取得)
key = base64.urlsafe_b64encode(secret_key.encode()[:32].ljust(32, b'0'))
self.cipher = Fernet(key)
def create_session_token(self, username: str, expires_hours: int = 24) -> str:
"""セッショントークンの生成"""
payload = {
'username': username,
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=expires_hours),
'iat': datetime.datetime.utcnow()
}
token = jwt.encode(payload, self.secret_key, algorithm='HS256')
return token
def validate_session_token(self, token: str) -> Optional[str]:
"""セッショントークンの検証"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
return payload.get('username')
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def encrypt_session_data(self, data: dict) -> str:
"""セッションデータの暗号化"""
json_data = json.dumps(data)
encrypted_data = self.cipher.encrypt(json_data.encode())
return base64.urlsafe_b64encode(encrypted_data).decode()
def decrypt_session_data(self, encrypted_data: str) -> Optional[dict]:
"""セッションデータの復号化"""
try:
encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode())
decrypted_data = self.cipher.decrypt(encrypted_bytes)
return json.loads(decrypted_data.decode())
except Exception:
return None
# JavaScript Cookie操作のためのヘルパー関数
def inject_cookie_script(cookie_name: str, cookie_value: str, expires_days: int = 7):
"""Cookieを設定するJavaScriptを注入"""
script = f"""
<script>
document.cookie = "{cookie_name}={cookie_value}; expires=" +
new Date(Date.now() + {expires_days * 24 * 60 * 60 * 1000}).toUTCString() +
"; path=/; SameSite=Strict; Secure";
</script>
"""
st.components.v1.html(script, height=0)
def get_cookie_value(cookie_name: str) -> Optional[str]:
"""Cookieの値を取得するJavaScript"""
script = f"""
<script>
function getCookie(name) {{
const value = `; ${{document.cookie}}`;
const parts = value.split(`; ${{name}}=`);
if (parts.length === 2) return parts.pop().split(';').shift();
return null;
}}
const cookieValue = getCookie("{cookie_name}");
if (cookieValue) {{
window.parent.postMessage({{
type: "cookie_value",
name: "{cookie_name}",
value: cookieValue
}}, "*");
}}
</script>
"""
# 実際の実装では、st.components.v1.htmlを使用してメッセージを受信
# ここでは簡略化
return None
# 拡張認証システムの実装
session_manager = SecureSessionManager(st.secrets.get("session_secret", "default_secret"))
def persistent_login_check():
"""永続的なログイン状態のチェック"""
# セッション状態の初期化
if 'authenticated' not in st.session_state:
st.session_state.authenticated = False
# Cookieからセッショントークンを取得(簡易実装)
# 実際の実装では、より複雑なCookie操作が必要
if not st.session_state.authenticated:
# ここでCookieからトークンを取得し、検証する
# 実装の詳細は環境により異なる
pass
2.2 データベース連携による複数ユーザー認証
より本格的なアプリケーションでは、複数ユーザーの管理とデータベース連携が必要になります。
import sqlite3
import bcrypt
from typing import Optional, List, Dict
from dataclasses import dataclass
from datetime import datetime
@dataclass
class User:
"""ユーザー情報のデータクラス"""
id: int
username: str
email: str
password_hash: str
created_at: datetime
last_login: Optional[datetime] = None
is_active: bool = True
class UserDatabase:
"""ユーザー管理データベースクラス"""
def __init__(self, db_path: str = "users.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""データベースの初期化"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
""")
# セッション管理テーブル
conn.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
session_token TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
is_active BOOLEAN DEFAULT 1,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
def hash_password(self, password: str) -> str:
"""bcryptを使用したパスワードハッシュ化"""
salt = bcrypt.gensalt()
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
def verify_password(self, password: str, password_hash: str) -> bool:
"""パスワードの検証"""
return bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8'))
def create_user(self, username: str, email: str, password: str) -> bool:
"""新規ユーザーの作成"""
password_hash = self.hash_password(password)
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
(username, email, password_hash)
)
return True
except sqlite3.IntegrityError:
return False
def authenticate_user(self, username: str, password: str) -> Optional[User]:
"""ユーザー認証"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT * FROM users WHERE username = ? AND is_active = 1",
(username,)
)
row = cursor.fetchone()
if row and self.verify_password(password, row['password_hash']):
# 最終ログイン時刻の更新
conn.execute(
"UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ?",
(row['id'],)
)
return User(
id=row['id'],
username=row['username'],
email=row['email'],
password_hash=row['password_hash'],
created_at=datetime.fromisoformat(row['created_at']),
last_login=datetime.fromisoformat(row['last_login']) if row['last_login'] else None,
is_active=bool(row['is_active'])
)
return None
def create_session(self, user_id: int, expires_hours: int = 24) -> str:
"""セッションの作成"""
session_token = session_manager.create_session_token(str(user_id), expires_hours)
expires_at = datetime.utcnow() + datetime.timedelta(hours=expires_hours)
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT INTO sessions (user_id, session_token, expires_at) VALUES (?, ?, ?)",
(user_id, session_token, expires_at)
)
return session_token
# 使用例
user_db = UserDatabase()
def multi_user_login_form():
"""複数ユーザー対応のログインフォーム"""
st.title("🔐 ユーザー認証システム")
tab1, tab2 = st.tabs(["ログイン", "新規登録"])
with tab1:
with st.form("login_form"):
username = st.text_input("ユーザー名")
password = st.text_input("パスワード", type="password")
remember_me = st.checkbox("ログイン状態を保持")
submit_login = st.form_submit_button("ログイン")
if submit_login:
user = user_db.authenticate_user(username, password)
if user:
st.session_state.authenticated = True
st.session_state.user = user
if remember_me:
session_token = user_db.create_session(user.id, expires_hours=168) # 1週間
# Cookieに保存(実装は環境依存)
st.success(f"ようこそ、{user.username}さん!")
time.sleep(1)
st.rerun()
else:
st.error("ユーザー名またはパスワードが正しくありません。")
with tab2:
with st.form("register_form"):
new_username = st.text_input("新しいユーザー名")
new_email = st.text_input("メールアドレス")
new_password = st.text_input("パスワード", type="password")
confirm_password = st.text_input("パスワード確認", type="password")
submit_register = st.form_submit_button("登録")
if submit_register:
if new_password != confirm_password:
st.error("パスワードが一致しません。")
elif len(new_password) < 8:
st.error("パスワードは8文字以上で設定してください。")
elif user_db.create_user(new_username, new_email, new_password):
st.success("ユーザー登録が完了しました。ログインしてください。")
else:
st.error("ユーザー名またはメールアドレスが既に使用されています。")
def multi_user_main_app():
"""複数ユーザー対応のメインアプリ"""
user = st.session_state.user
st.title(f"Welcome, {user.username}!")
# ユーザー情報の表示
with st.expander("ユーザー情報"):
st.write(f"**ユーザーID:** {user.id}")
st.write(f"**メールアドレス:** {user.email}")
st.write(f"**登録日:** {user.created_at.strftime('%Y-%m-%d %H:%M:%S')}")
st.write(f"**最終ログイン:** {user.last_login.strftime('%Y-%m-%d %H:%M:%S') if user.last_login else 'N/A'}")
# ログアウト機能
if st.button("ログアウト"):
for key in list(st.session_state.keys()):
del st.session_state[key]
st.rerun()
# ここにユーザー固有のアプリケーション機能を実装
st.write("認証が完了しました。アプリケーションを利用できます。")
第3章:OAuth認証の実装
3.1 Google OAuth 2.0による認証
より高度なセキュリティを要求する場合、OAuth 2.0を使用した外部認証プロバイダーとの連携が有効です。ここでは、Google OAuth 2.0を例に実装方法を解説します。
from google_auth_oauthlib.flow import Flow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import requests
from urllib.parse import urlencode
import secrets
class GoogleOAuthManager:
"""Google OAuth 2.0認証管理クラス"""
def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.scope = ['openid', 'email', 'profile']
def get_authorization_url(self, state: str) -> str:
"""認証URLの生成"""
params = {
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'scope': ' '.join(self.scope),
'response_type': 'code',
'state': state,
'access_type': 'offline',
'prompt': 'consent'
}
base_url = 'https://accounts.google.com/o/oauth2/v2/auth'
return f"{base_url}?{urlencode(params)}"
def exchange_code_for_token(self, code: str) -> Optional[dict]:
"""認証コードをアクセストークンに交換"""
token_url = 'https://oauth2.googleapis.com/token'
data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'grant_type': 'authorization_code',
'redirect_uri': self.redirect_uri
}
response = requests.post(token_url, data=data)
if response.status_code == 200:
return response.json()
return None
def get_user_info(self, access_token: str) -> Optional[dict]:
"""アクセストークンを使用してユーザー情報を取得"""
user_info_url = 'https://www.googleapis.com/oauth2/v2/userinfo'
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(user_info_url, headers=headers)
if response.status_code == 200:
return response.json()
return None
# OAuth設定(実際は環境変数から取得)
oauth_manager = GoogleOAuthManager(
client_id=st.secrets["google_oauth"]["client_id"],
client_secret=st.secrets["google_oauth"]["client_secret"],
redirect_uri=st.secrets["google_oauth"]["redirect_uri"]
)
def oauth_login_page():
"""OAuth認証ページ"""
st.title("🔐 Google OAuth認証")
# URL パラメータから認証コードを取得
query_params = st.experimental_get_query_params()
if 'code' in query_params:
# 認証コードが返されてきた場合の処理
code = query_params['code'][0]
state = query_params.get('state', [None])[0]
# CSRF攻撃防止のためのstate検証
if state != st.session_state.get('oauth_state'):
st.error("セキュリティエラー: 不正なリクエストです。")
return
# アクセストークンの取得
token_data = oauth_manager.exchange_code_for_token(code)
if token_data:
access_token = token_data['access_token']
user_info = oauth_manager.get_user_info(access_token)
if user_info:
# ユーザー情報をセッションに保存
st.session_state.authenticated = True
st.session_state.user_info = user_info
st.session_state.oauth_token = token_data
st.success(f"ようこそ、{user_info['name']}さん!")
st.rerun()
else:
st.error("ユーザー情報の取得に失敗しました。")
else:
st.error("認証に失敗しました。")
else:
# 初回アクセス時の認証ボタン表示
st.write("Googleアカウントでログインしてください。")
if st.button("🔐 Googleでログイン"):
# CSRF攻撃防止のためのランダムなstate生成
oauth_state = secrets.token_urlsafe(32)
st.session_state.oauth_state = oauth_state
# Google認証URLにリダイレクト
auth_url = oauth_manager.get_authorization_url(oauth_state)
# JavaScript を使用してリダイレクト
redirect_script = f"""
<script>
window.location.href = "{auth_url}";
</script>
"""
st.components.v1.html(redirect_script, height=0)
def oauth_main_app():
"""OAuth認証後のメインアプリ"""
user_info = st.session_state.user_info
st.title(f"Welcome, {user_info['name']}!")
# ユーザー情報の表示
col1, col2 = st.columns([1, 3])
with col1:
if 'picture' in user_info:
st.image(user_info['picture'], width=100)
with col2:
st.write(f"**名前:** {user_info['name']}")
st.write(f"**メール:** {user_info['email']}")
st.write(f"**Google ID:** {user_info['id']}")
# ログアウト機能
if st.button("ログアウト"):
# Googleからのトークン取り消し
oauth_token = st.session_state.get('oauth_token')
if oauth_token and 'access_token' in oauth_token:
revoke_url = f"https://oauth2.googleapis.com/revoke?token={oauth_token['access_token']}"
requests.post(revoke_url)
# セッションクリア
for key in list(st.session_state.keys()):
del st.session_state[key]
st.rerun()
st.write("OAuth認証が完了しました。アプリケーションを利用できます。")
3.2 認証プロバイダーの拡張とマルチプロバイダー対応
複数の認証プロバイダーに対応するため、抽象化されたインターフェースを実装します。
from abc import ABC, abstractmethod
from enum import Enum
from typing import Union
class AuthProvider(Enum):
"""認証プロバイダーの種類"""
GOOGLE = "google"
GITHUB = "github"
MICROSOFT = "microsoft"
LOCAL = "local"
class BaseAuthProvider(ABC):
"""認証プロバイダーの基底クラス"""
@abstractmethod
def get_authorization_url(self, state: str) -> str:
"""認証URLの取得"""
pass
@abstractmethod
def exchange_code_for_token(self, code: str) -> Optional[dict]:
"""認証コードをトークンに交換"""
pass
@abstractmethod
def get_user_info(self, access_token: str) -> Optional[dict]:
"""ユーザー情報の取得"""
pass
@abstractmethod
def get_provider_name(self) -> str:
"""プロバイダー名の取得"""
pass
class GitHubOAuthProvider(BaseAuthProvider):
"""GitHub OAuth認証プロバイダー"""
def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
def get_authorization_url(self, state: str) -> str:
params = {
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'scope': 'user:email',
'state': state
}
return f"https://github.com/login/oauth/authorize?{urlencode(params)}"
def exchange_code_for_token(self, code: str) -> Optional[dict]:
token_url = 'https://github.com/login/oauth/access_token'
headers = {'Accept': 'application/json'}
data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code
}
response = requests.post(token_url, data=data, headers=headers)
return response.json() if response.status_code == 200 else None
def get_user_info(self, access_token: str) -> Optional[dict]:
headers = {'Authorization': f'token {access_token}'}
user_response = requests.get('https://api.github.com/user', headers=headers)
if user_response.status_code == 200:
user_data = user_response.json()
# メールアドレスの取得(別途APIコール必要)
email_response = requests.get('https://api.github.com/user/emails', headers=headers)
if email_response.status_code == 200:
emails = email_response.json()
primary_email = next((email['email'] for email in emails if email['primary']), None)
user_data['email'] = primary_email
return user_data
return None
def get_provider_name(self) -> str:
return "GitHub"
class MultiProviderAuthManager:
"""複数認証プロバイダー管理クラス"""
def __init__(self):
self.providers: Dict[AuthProvider, BaseAuthProvider] = {}
def register_provider(self, provider_type: AuthProvider, provider: BaseAuthProvider):
"""認証プロバイダーの登録"""
self.providers[provider_type] = provider
def get_provider(self, provider_type: AuthProvider) -> Optional[BaseAuthProvider]:
"""認証プロバイダーの取得"""
return self.providers.get(provider_type)
def get_available_providers(self) -> List[AuthProvider]:
"""利用可能なプロバイダーのリスト"""
return list(self.providers.keys())
# 使用例
auth_manager = MultiProviderAuthManager()
# プロバイダーの登録
auth_manager.register_provider(
AuthProvider.GOOGLE,
GoogleOAuthManager(
client_id=st.secrets["google_oauth"]["client_id"],
client_secret=st.secrets["google_oauth"]["client_secret"],
redirect_uri=st.secrets["google_oauth"]["redirect_uri"]
)
)
auth_manager.register_provider(
AuthProvider.GITHUB,
GitHubOAuthProvider(
client_id=st.secrets["github_oauth"]["client_id"],
client_secret=st.secrets["github_oauth"]["client_secret"],
redirect_uri=st.secrets["github_oauth"]["redirect_uri"]
)
)
def multi_provider_login_page():
"""複数プロバイダー対応のログインページ"""
st.title("🔐 認証方法を選択")
available_providers = auth_manager.get_available_providers()
# プロバイダー選択ボタン
col_count = len(available_providers)
cols = st.columns(col_count)
for i, provider_type in enumerate(available_providers):
provider = auth_manager.get_provider(provider_type)
with cols[i]:
if st.button(f"{provider.get_provider_name()}でログイン", key=f"login_{provider_type.value}"):
oauth_state = secrets.token_urlsafe(32)
st.session_state.oauth_state = oauth_state
st.session_state.selected_provider = provider_type
auth_url = provider.get_authorization_url(oauth_state)
redirect_script = f"""
<script>
window.location.href = "{auth_url}";
</script>
"""
st.components.v1.html(redirect_script, height=0)
# 認証コード処理
query_params = st.experimental_get_query_params()
if 'code' in query_params:
handle_oauth_callback(query_params)
def handle_oauth_callback(query_params: dict):
"""OAuth認証コールバックの処理"""
code = query_params['code'][0]
state = query_params.get('state', [None])[0]
if state != st.session_state.get('oauth_state'):
st.error("セキュリティエラー: 不正なリクエストです。")
return
provider_type = st.session_state.get('selected_provider')
if not provider_type:
st.error("認証プロバイダーが選択されていません。")
return
provider = auth_manager.get_provider(provider_type)
if not provider:
st.error("認証プロバイダーが見つかりません。")
return
# トークン交換
token_data = provider.exchange_code_for_token(code)
if token_data:
access_token = token_data['access_token']
user_info = provider.get_user_info(access_token)
if user_info:
st.session_state.authenticated = True
st.session_state.user_info = user_info
st.session_state.auth_provider = provider_type
st.session_state.oauth_token = token_data
st.success(f"ようこそ、{user_info.get('name', user_info.get('login', 'ユーザー'))}さん!")
st.rerun()
else:
st.error("ユーザー情報の取得に失敗しました。")
else:
st.error("認証に失敗しました。")
第4章:高度なセキュリティ機能の実装
4.1 二要素認証(2FA)の実装
セキュリティをさらに強化するため、TOTP(Time-based One-Time Password)を使用した二要素認証を実装します。
import pyotp
import qrcode
from io import BytesIO
import base64
class TwoFactorAuth:
"""二要素認証管理クラス"""
def __init__(self, issuer_name: str = "Streamlit App"):
self.issuer_name = issuer_name
def generate_secret_key(self) -> str:
"""秘密鍵の生成"""
return pyotp.random_base32()
def generate_qr_code(self, username: str, secret_key: str) -> str:
"""QRコードの生成(Base64エンコード)"""
totp_uri = pyotp.totp.TOTP(secret_key).provisioning_uri(
name=username,
issuer_name=self.issuer_name
)
# QRコード生成
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp_uri)
qr.make(fit=True)
# 画像をBase64に変換
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format='PNG')
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/png;base64,{img_base64}"
def verify_token(self, secret_key: str, token: str) -> bool:
"""TOTPトークンの検証"""
try:
totp = pyotp.TOTP(secret_key)
return totp.verify(token, valid_window=1) # 30秒の余裕を持たせる
except Exception:
return False
def get_current_token(self, secret_key: str) -> str:
"""現在のTOTPトークンを取得(テスト用)"""
totp = pyotp.TOTP(secret_key)
return totp.now()
# データベースにTOTP秘密鍵を保存するための拡張
class Enhanced2FAUserDatabase(UserDatabase):
"""2FA対応のユーザーデータベース"""
def init_database(self):
"""データベース初期化(2FAテーブル追加)"""
super().init_database()
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS user_2fa (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER UNIQUE,
secret_key TEXT NOT NULL,
is_enabled BOOLEAN DEFAULT 0,
backup_codes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
def setup_2fa(self, user_id: int, secret_key: str) -> bool:
"""2FAの設定"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT OR REPLACE INTO user_2fa (user_id, secret_key) VALUES (?, ?)",
(user_id, secret_key)
)
return True
except Exception:
return False
def enable_2fa(self, user_id: int) -> bool:
"""2FAの有効化"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"UPDATE user_2fa SET is_enabled = 1 WHERE user_id = ?",
(user_id,)
)
return True
except Exception:
return False
def get_2fa_secret(self, user_id: int) -> Optional[str]:
"""2FA秘密鍵の取得"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"SELECT secret_key FROM user_2fa WHERE user_id = ? AND is_enabled = 1",
(user_id,)
)
row = cursor.fetchone()
return row[0] if row else None
def is_2fa_enabled(self, user_id: int) -> bool:
"""2FA有効状態の確認"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"SELECT is_enabled FROM user_2fa WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
return bool(row[0]) if row else False
# 使用例
two_factor_auth = TwoFactorAuth()
user_db_2fa = Enhanced2FAUserDatabase()
def setup_2fa_page():
"""2FA設定ページ"""
st.title("🔐 二要素認証の設定")
user = st.session_state.user
if not user_db_2fa.is_2fa_enabled(user.id):
st.info("二要素認証を設定すると、ログイン時にスマートフォンアプリからの認証コードが必要になります。")
if st.button("2FAを設定する"):
# 秘密鍵の生成
secret_key = two_factor_auth.generate_secret_key()
# データベースに保存
if user_db_2fa.setup_2fa(user.id, secret_key):
st.session_state.setup_secret_key = secret_key
st.session_state.setup_step = 1
st.rerun()
# 設定プロセス
if 'setup_step' in st.session_state:
if st.session_state.setup_step == 1:
show_qr_code_step()
elif st.session_state.setup_step == 2:
verify_2fa_setup()
# 2FA無効化
if user_db_2fa.is_2fa_enabled(user.id):
st.success("✅ 二要素認証が有効になっています。")
if st.button("2FAを無効にする", type="secondary"):
with sqlite3.connect(user_db_2fa.db_path) as conn:
conn.execute(
"UPDATE user_2fa SET is_enabled = 0 WHERE user_id = ?",
(user.id,)
)
st.success("二要素認証を無効にしました。")
st.rerun()
def show_qr_code_step():
"""QRコード表示ステップ"""
st.subheader("ステップ1: 認証アプリでQRコードをスキャン")
secret_key = st.session_state.setup_secret_key
user = st.session_state.user
st.write("Google Authenticator、Authy、またはその他のTOTPアプリで以下のQRコードをスキャンしてください。")
# QRコード生成・表示
qr_code_data = two_factor_auth.generate_qr_code(user.username, secret_key)
st.markdown(
f'<img src="{qr_code_data}" alt="2FA QR Code" style="display: block; margin: auto;">',
unsafe_allow_html=True
)
# 手動入力用の秘密鍵表示
with st.expander("手動で設定する場合"):
st.code(secret_key)
st.write("上記のキーを認証アプリに手動で入力してください。")
if st.button("設定完了、次へ"):
st.session_state.setup_step = 2
st.rerun()
def verify_2fa_setup():
"""2FA設定の検証ステップ"""
st.subheader("ステップ2: 認証コードの確認")
secret_key = st.session_state.setup_secret_key
user = st.session_state.user
st.write("認証アプリに表示されている6桁のコードを入力してください。")
with st.form("verify_2fa_form"):
token = st.text_input("認証コード", max_chars=6, placeholder="123456")
submit_button = st.form_submit_button("確認")
if submit_button:
if two_factor_auth.verify_token(secret_key, token):
# 2FAを有効化
if user_db_2fa.enable_2fa(user.id):
st.success("🎉 二要素認証の設定が完了しました!")
# セッションクリア
del st.session_state.setup_secret_key
del st.session_state.setup_step
time.sleep(2)
st.rerun()
else:
st.error("データベースの更新に失敗しました。")
else:
st.error("認証コードが正しくありません。再度確認してください。")
def enhanced_login_with_2fa():
"""2FA対応のログインフォーム"""
st.title("🔐 ログイン")
# 段階的ログイン処理
if 'login_step' not in st.session_state:
st.session_state.login_step = 1
if st.session_state.login_step == 1:
# 第1段階: ユーザー名・パスワード認証
with st.form("login_form_step1"):
username = st.text_input("ユーザー名")
password = st.text_input("パスワード", type="password")
submit_button = st.form_submit_button("ログイン")
if submit_button:
user = user_db_2fa.authenticate_user(username, password)
if user:
st.session_state.temp_user = user
if user_db_2fa.is_2fa_enabled(user.id):
st.session_state.login_step = 2
st.rerun()
else:
# 2FAが無効な場合は直接ログイン完了
complete_login(user)
else:
st.error("ユーザー名またはパスワードが正しくありません。")
elif st.session_state.login_step == 2:
# 第2段階: 2FA認証
user = st.session_state.temp_user
st.success(f"ようこそ、{user.username}さん")
st.info("二要素認証が有効になっています。認証コードを入力してください。")
with st.form("login_form_step2"):
token = st.text_input("認証コード", max_chars=6, placeholder="123456")
submit_button = st.form_submit_button("認証")
if submit_button:
secret_key = user_db_2fa.get_2fa_secret(user.id)
if secret_key and two_factor_auth.verify_token(secret_key, token):
complete_login(user)
else:
st.error("認証コードが正しくありません。")
if st.button("戻る"):
st.session_state.login_step = 1
if 'temp_user' in st.session_state:
del st.session_state.temp_user
st.rerun()
def complete_login(user: User):
"""ログイン完了処理"""
st.session_state.authenticated = True
st.session_state.user = user
# ログイン関連の一時的なセッション変数をクリア
for key in ['login_step', 'temp_user']:
if key in st.session_state:
del st.session_state[key]
st.success("ログインが完了しました!")
time.sleep(1)
st.rerun()
4.2 IP制限とジオロケーション制御
特定のIPアドレスや地理的な場所からのアクセスを制限する機能を実装します。
import geoip2.database
import ipaddress
from typing import Set, List, Tuple
import requests
class IPAccessController:
"""IPアクセス制御クラス"""
def __init__(self, geoip_db_path: Optional[str] = None):
self.allowed_ip_ranges: Set[ipaddress.IPv4Network] = set()
self.blocked_ip_ranges: Set[ipaddress.IPv4Network] = set()
self.allowed_countries: Set[str] = set()
self.blocked_countries: Set[str] = set()
# GeoIP2データベースの初期化
self.geoip_reader = None
if geoip_db_path:
try:
self.geoip_reader = geoip2.database.Reader(geoip_db_path)
except Exception as e:
st.warning(f"GeoIP2データベースの読み込みに失敗しました: {e}")
def add_allowed_ip_range(self, ip_range: str):
"""許可IPレンジの追加"""
try:
network = ipaddress.IPv4Network(ip_range, strict=False)
self.allowed_ip_ranges.add(network)
except ValueError as e:
raise ValueError(f"無効なIPレンジです: {ip_range} - {e}")
def add_blocked_ip_range(self, ip_range: str):
"""ブロックIPレンジの追加"""
try:
network = ipaddress.IPv4Network(ip_range, strict=False)
self.blocked_ip_ranges.add(network)
except ValueError as e:
raise ValueError(f"無効なIPレンジです: {ip_range} - {e}")
def add_allowed_country(self, country_code: str):
"""許可国の追加(ISO 3166-1 alpha-2コード)"""
self.allowed_countries.add(country_code.upper())
def add_blocked_country(self, country_code: str):
"""ブロック国の追加(ISO 3166-1 alpha-2コード)"""
self.blocked_countries.add(country_code.upper())
def get_client_ip(self) -> str:
"""クライアントIPアドレスの取得"""
# StreamlitのヘッダーからクライアントIPを取得
headers = st.context.headers
# プロキシ経由の場合の実IPアドレス取得
forwarded_for = headers.get("x-forwarded-for")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
real_ip = headers.get("x-real-ip")
if real_ip:
return real_ip
remote_addr = headers.get("remote-addr")
if remote_addr:
return remote_addr
return "unknown"
def get_ip_location(self, ip_address: str) -> Optional[dict]:
"""IPアドレスの地理的情報を取得"""
if not self.geoip_reader:
return None
try:
response = self.geoip_reader.city(ip_address)
return {
"country_code": response.country.iso_code,
"country_name": response.country.name,
"city": response.city.name,
"latitude": float(response.location.latitude) if response.location.latitude else None,
"longitude": float(response.location.longitude) if response.location.longitude else None
}
except Exception:
return None
def is_ip_allowed(self, ip_address: str) -> Tuple[bool, str]:
"""IPアドレスがアクセス許可されているかチェック"""
try:
ip = ipaddress.IPv4Address(ip_address)
except ValueError:
return False, "無効なIPアドレス"
# ブロックリストのチェック
for blocked_range in self.blocked_ip_ranges:
if ip in blocked_range:
return False, f"IPアドレス {ip_address} はブロックされています"
# 許可リストが設定されている場合のチェック
if self.allowed_ip_ranges:
for allowed_range in self.allowed_ip_ranges:
if ip in allowed_range:
break
else:
return False, f"IPアドレス {ip_address} は許可されていません"
# 地理的制限のチェック
if self.allowed_countries or self.blocked_countries:
location = self.get_ip_location(ip_address)
if location:
country_code = location["country_code"]
# ブロック国のチェック
if country_code in self.blocked_countries:
return False, f"国 {location['country_name']} からのアクセスはブロックされています"
# 許可国のチェック
if self.allowed_countries and country_code not in self.allowed_countries:
return False, f"国 {location['country_name']} からのアクセスは許可されていません"
return True, "アクセス許可"
# 使用例
ip_controller = IPAccessController()
# 設定例
ip_controller.add_allowed_ip_range("192.168.1.0/24") # 社内ネットワーク
ip_controller.add_allowed_ip_range("10.0.0.0/8") # VPNネットワーク
ip_controller.add_blocked_country("CN") # 中国からのアクセスをブロック
ip_controller.add_allowed_country("JP") # 日本からのアクセスのみ許可
def ip_access_check():
"""IPアクセス制御チェック"""
client_ip = ip_controller.get_client_ip()
if client_ip == "unknown":
st.error("クライアントIPアドレスを取得できませんでした。")
return False
is_allowed, message = ip_controller.is_ip_allowed(client_ip)
if not is_allowed:
st.error(f"🚫 アクセス拒否: {message}")
st.info(f"あなたのIPアドレス: {client_ip}")
# 地理的情報の表示
location = ip_controller.get_ip_location(client_ip)
if location:
st.write(f"**国:** {location['country_name']} ({location['country_code']})")
if location['city']:
st.write(f"**都市:** {location['city']}")
return False
return True
def geo_restricted_main():
"""地理的制限付きメインアプリケーション"""
# IPアクセス制御
if not ip_access_check():
return
# 通常の認証フロー
if 'authenticated' not in st.session_state:
st.session_state.authenticated = False
if st.session_state.authenticated:
main_app()
else:
enhanced_login_with_2fa()
第5章:認証システムの運用とモニタリング
5.1 ログ機能とセキュリティ監査
本格的な運用環境では、認証イベントのログ記録と監査機能が必要です。
import logging
from datetime import datetime, timedelta
from typing import Dict, Any
import json
class AuthenticationLogger:
"""認証関連のログ管理クラス"""
def __init__(self, log_file: str = "auth.log"):
# ログ設定
self.logger = logging.getLogger("auth_system")
self.logger.setLevel(logging.INFO)
# ファイルハンドラー
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
# フォーマッター
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def log_authentication_attempt(self, username: str, ip_address: str,
success: bool, method: str = "password",
additional_info: Dict[str, Any] = None):
"""認証試行のログ記録"""
log_data = {
"event": "authentication_attempt",
"username": username,
"ip_address": ip_address,
"success": success,
"method": method,
"timestamp": datetime.utcnow().isoformat()
}
if additional_info:
log_data.update(additional_info)
level = logging.INFO if success else logging.WARNING
self.logger.log(level, json.dumps(log_data, ensure_ascii=False))
def log_session_event(self, username: str, event: str,
additional_info: Dict[str, Any] = None):
"""セッション関連イベントのログ記録"""
log_data = {
"event": f"session_{event}",
"username": username,
"timestamp": datetime.utcnow().isoformat()
}
if additional_info:
log_data.update(additional_info)
self.logger.info(json.dumps(log_data, ensure_ascii=False