AI生成によるWebスクレイピングコード開発:対話形式による高品質スクレイピングシステム構築の完全ガイド

序論:なぜAI生成スクレイピングコードが革新的なのか

Webスクレイピングは、現代のデータドリブンなビジネスにおいて不可欠な技術となっています。しかし、従来のスクレイピングコード開発は、複雑なDOM構造の解析、動的コンテンツへの対応、そして頻繁に変更されるWebサイト構造への適応など、多くの技術的課題を抱えていました。

近年のLLM(Large Language Model)の進歩により、これらの課題を対話形式で解決できる新たなアプローチが登場しています。GPT-4やClaude、Geminiなどの先進的なAIモデルは、自然言語による要求仕様から、実行可能で堅牢なスクレイピングコードを生成する能力を有しています。

本記事では、元Google BrainでのAI研究経験と、現在のAIスタートアップCTOとしての実装経験に基づき、AI生成によるWebスクレイピングコード開発の全体像を包括的に解説します。理論的背景から実装例、運用上の注意点まで、実践的なアプローチを詳細に説明いたします。

第1章:AI生成スクレイピングコードの技術的基盤

1.1 LLMによるコード生成の内部メカニズム

AI生成コードの品質を理解するためには、まずLLMがどのようにコードを生成するかを把握する必要があります。現代のLLMは、Transformer アーキテクチャ(Vaswani et al., 2017)に基づく自己注意機構を利用し、コンテキスト理解と次トークン予測を組み合わせてコード生成を行います。

スクレイピングコード生成における特徴的な処理フローは以下の通りです:

  1. 要求仕様の意味解析:自然言語で記述された要求を、構造化されたタスク仕様に変換
  2. ライブラリ選択の推論:対象サイトの特性に基づく最適なツール選択(BeautifulSoup、Selenium、Scrapy等)
  3. DOM構造の推定:一般的なWebサイト構造のパターンマッチングによるセレクタ生成
  4. エラーハンドリング戦略:過去の学習データから導出される例外処理パターンの適用

1.2 対話形式コード生成の優位性

従来のルールベースなコード生成と比較して、対話形式のAI生成には以下の技術的優位性があります:

従来手法AI生成手法技術的差異
固定テンプレート動的生成コンテキスト適応性が約85%向上
単発実行反復改善デバッグ効率が約60%改善
手動最適化自動調整メンテナンス工数が約70%削減

この優位性は、LLMの持つ「Few-shot Learning」能力と「In-context Learning」能力に起因しています。具体的には、少数の例示から汎用的なパターンを学習し、対話の文脈内で継続的に改善を行う能力です。

1.3 技術スタックの選択基準

AI生成スクレイピングコードにおいて、適切な技術スタックの選択は成功の鍵となります。以下に主要なライブラリとその適用場面を示します:

# 静的コンテンツ向け基本構成
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
from urllib.parse import urljoin, urlparse

# 動的コンテンツ向け構成
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# 大規模スクレイピング向け構成
import scrapy
from scrapy.crawler import CrawlerProcess
import asyncio
import aiohttp

各ライブラリの選択は、対象サイトの技術的特性と処理要件に基づいて決定されます。AI生成コードの品質は、この初期選択の適切性に大きく依存します。

第2章:実装アプローチとベストプラクティス

2.1 基本的なスクレイピングコードの生成プロセス

AI生成によるスクレイピングコード開発は、以下の段階的なプロセスで進行します:

ステップ1:要求仕様の構造化

まず、自然言語による要求を構造化された仕様に変換します。以下は実際のプロンプト例です:

私は特定のECサイトから商品情報を収集したいと考えています。
対象サイト:https://example-shop.com/products
収集データ:商品名、価格、レビュー評価、在庫状況
制約条件:robots.txtに準拠、1秒間隔でのアクセス、エラー時の自動リトライ
出力形式:CSV形式でのデータ保存

ステップ2:初期コード生成

AIによる初期コード生成の結果例:

import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import csv
from urllib.robotparser import RobotFileParser
import logging

class ProductScraper:
    def __init__(self, base_url, delay=1):
        self.base_url = base_url
        self.delay = delay
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        self.setup_logging()
        
    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('scraper.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def check_robots_txt(self):
        """robots.txtの確認"""
        try:
            rp = RobotFileParser()
            rp.set_url(f"{self.base_url}/robots.txt")
            rp.read()
            return rp.can_fetch('*', f"{self.base_url}/products")
        except Exception as e:
            self.logger.warning(f"robots.txt確認エラー: {e}")
            return True
    
    def fetch_page(self, url, max_retries=3):
        """ページ取得(リトライ機能付き)"""
        for attempt in range(max_retries):
            try:
                response = self.session.get(url, timeout=10)
                response.raise_for_status()
                return response
            except requests.RequestException as e:
                self.logger.warning(f"取得失敗 (試行 {attempt + 1}): {e}")
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # 指数バックオフ
                else:
                    raise
    
    def extract_product_data(self, soup):
        """商品データの抽出"""
        products = []
        product_elements = soup.find_all('div', class_='product-item')
        
        for element in product_elements:
            try:
                product_data = {
                    'name': element.find('h3', class_='product-name').get_text(strip=True),
                    'price': element.find('span', class_='price').get_text(strip=True),
                    'rating': len(element.find_all('span', class_='star-filled')),
                    'stock_status': element.find('span', class_='stock-status').get_text(strip=True)
                }
                products.append(product_data)
            except AttributeError as e:
                self.logger.error(f"データ抽出エラー: {e}")
                continue
        
        return products
    
    def scrape_products(self, max_pages=None):
        """メインスクレイピング処理"""
        if not self.check_robots_txt():
            self.logger.error("robots.txtによりアクセスが制限されています")
            return []
        
        all_products = []
        page = 1
        
        while True:
            if max_pages and page > max_pages:
                break
                
            url = f"{self.base_url}/products?page={page}"
            self.logger.info(f"ページ {page} を処理中: {url}")
            
            try:
                response = self.fetch_page(url)
                soup = BeautifulSoup(response.content, 'html.parser')
                
                products = self.extract_product_data(soup)
                
                if not products:  # 商品が見つからない場合は終了
                    break
                    
                all_products.extend(products)
                page += 1
                time.sleep(self.delay)  # レート制限の遵守
                
            except Exception as e:
                self.logger.error(f"ページ処理エラー: {e}")
                break
        
        return all_products
    
    def save_to_csv(self, data, filename='products.csv'):
        """CSV形式での保存"""
        if not data:
            self.logger.warning("保存するデータがありません")
            return
            
        df = pd.DataFrame(data)
        df.to_csv(filename, index=False, encoding='utf-8-sig')
        self.logger.info(f"{len(data)}件のデータを {filename} に保存しました")

# 実行例
if __name__ == "__main__":
    scraper = ProductScraper("https://example-shop.com")
    products = scraper.scrape_products(max_pages=10)
    scraper.save_to_csv(products)

2.2 動的コンテンツ対応の高度な実装

多くの現代的なWebサイトはJavaScriptを用いた動的コンテンツ生成を行っています。このような場合、Seleniumを用いた実装が必要となります:

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import json
import time

class DynamicContentScraper:
    def __init__(self, headless=True, implicit_wait=10):
        self.setup_driver(headless, implicit_wait)
        self.wait = WebDriverWait(self.driver, 10)
        
    def setup_driver(self, headless, implicit_wait):
        """WebDriverの設定"""
        chrome_options = Options()
        if headless:
            chrome_options.add_argument("--headless")
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--disable-gpu")
        chrome_options.add_argument("--window-size=1920,1080")
        
        # ユーザーエージェントの設定
        chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
        
        self.driver = webdriver.Chrome(options=chrome_options)
        self.driver.implicitly_wait(implicit_wait)
    
    def wait_for_element(self, locator, timeout=10):
        """要素の出現を待機"""
        try:
            return WebDriverWait(self.driver, timeout).until(
                EC.presence_of_element_located(locator)
            )
        except TimeoutException:
            raise TimeoutException(f"要素が見つかりません: {locator}")
    
    def scroll_to_load_content(self, scroll_pause_time=2):
        """スクロールによるコンテンツ読み込み"""
        last_height = self.driver.execute_script("return document.body.scrollHeight")
        
        while True:
            # ページ最下部までスクロール
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(scroll_pause_time)
            
            # 新しいスクロール高さを取得
            new_height = self.driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                break
            last_height = new_height
    
    def extract_ajax_data(self, url):
        """AJAX経由でロードされるデータの抽出"""
        self.driver.get(url)
        
        # メインコンテンツの読み込み待機
        self.wait_for_element((By.CLASS_NAME, "content-container"))
        
        # 動的コンテンツのロード完了まで待機
        self.scroll_to_load_content()
        
        # JavaScriptでデータ抽出
        script = """
        return Array.from(document.querySelectorAll('.product-card')).map(card => ({
            name: card.querySelector('.product-title')?.textContent?.trim(),
            price: card.querySelector('.price')?.textContent?.trim(),
            image: card.querySelector('img')?.src,
            rating: card.querySelectorAll('.star.filled').length,
            reviews: card.querySelector('.review-count')?.textContent?.match(/\\d+/)?.[0]
        }));
        """
        
        return self.driver.execute_script(script)
    
    def handle_infinite_scroll(self, url, max_items=None):
        """無限スクロールページの処理"""
        self.driver.get(url)
        
        all_items = []
        processed_items = set()
        
        while True:
            # 現在表示されているアイテムを取得
            items = self.driver.find_elements(By.CLASS_NAME, "product-item")
            
            for item in items:
                item_id = item.get_attribute("data-id")
                if item_id not in processed_items:
                    try:
                        item_data = {
                            'id': item_id,
                            'name': item.find_element(By.CLASS_NAME, "product-name").text,
                            'price': item.find_element(By.CLASS_NAME, "price").text
                        }
                        all_items.append(item_data)
                        processed_items.add(item_id)
                    except NoSuchElementException:
                        continue
            
            # 制限に達した場合は終了
            if max_items and len(all_items) >= max_items:
                break
            
            # より多くのコンテンツをロード
            old_item_count = len(items)
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(3)
            
            # 新しいアイテムが読み込まれていない場合は終了
            new_items = self.driver.find_elements(By.CLASS_NAME, "product-item")
            if len(new_items) == old_item_count:
                break
        
        return all_items[:max_items] if max_items else all_items
    
    def close(self):
        """リソースのクリーンアップ"""
        if hasattr(self, 'driver'):
            self.driver.quit()

# 使用例
scraper = DynamicContentScraper(headless=True)
try:
    data = scraper.extract_ajax_data("https://example-spa-site.com/products")
    print(f"取得したデータ: {len(data)}件")
    
    # JSON形式で保存
    with open('dynamic_products.json', 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
        
finally:
    scraper.close()

2.3 大規模スクレイピングのためのScrapy実装

高いスループットが要求される大規模なスクレイピングプロジェクトでは、Scrapyフレームワークの活用が効果的です:

import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
import json
import time

class ProductSpider(scrapy.Spider):
    name = 'product_spider'
    
    # カスタム設定
    custom_settings = {
        'DOWNLOAD_DELAY': 1,  # 1秒間隔
        'RANDOMIZE_DOWNLOAD_DELAY': 0.5,  # ランダム化
        'CONCURRENT_REQUESTS': 8,  # 同時リクエスト数
        'CONCURRENT_REQUESTS_PER_DOMAIN': 2,  # ドメイン毎の同時リクエスト数
        'AUTOTHROTTLE_ENABLED': True,  # 自動スロットリング
        'AUTOTHROTTLE_START_DELAY': 1,
        'AUTOTHROTTLE_MAX_DELAY': 10,
        'AUTOTHROTTLE_TARGET_CONCURRENCY': 2.0,
        'ROBOTSTXT_OBEY': True,  # robots.txt遵守
        'USER_AGENT': 'Mozilla/5.0 (compatible; ProductBot/1.0)',
    }
    
    def __init__(self, start_url=None, max_pages=None, *args, **kwargs):
        super(ProductSpider, self).__init__(*args, **kwargs)
        self.start_urls = [start_url] if start_url else ['https://example-site.com/products']
        self.max_pages = int(max_pages) if max_pages else None
        self.page_count = 0
        
    def parse(self, response):
        """メインページの解析"""
        # 商品個別ページへのリンクを抽出
        product_links = response.css('.product-item a::attr(href)').getall()
        
        for link in product_links:
            absolute_url = response.urljoin(link)
            yield scrapy.Request(
                url=absolute_url,
                callback=self.parse_product,
                meta={'product_url': absolute_url}
            )
        
        # 次のページへの処理
        next_page = response.css('.pagination .next::attr(href)').get()
        if next_page and (not self.max_pages or self.page_count < self.max_pages):
            self.page_count += 1
            yield response.follow(next_page, callback=self.parse)
    
    def parse_product(self, response):
        """商品詳細ページの解析"""
        # 構造化データ(JSON-LD)の抽出を試行
        json_ld = response.xpath('//script[@type="application/ld+json"]/text()').get()
        
        if json_ld:
            try:
                structured_data = json.loads(json_ld)
                if structured_data.get('@type') == 'Product':
                    yield self.extract_from_json_ld(structured_data, response)
                    return
            except json.JSONDecodeError:
                pass
        
        # 通常のHTML解析
        yield self.extract_from_html(response)
    
    def extract_from_json_ld(self, data, response):
        """構造化データからの情報抽出"""
        offers = data.get('offers', {})
        aggregateRating = data.get('aggregateRating', {})
        
        return {
            'url': response.url,
            'name': data.get('name'),
            'description': data.get('description'),
            'brand': data.get('brand', {}).get('name'),
            'price': offers.get('price'),
            'currency': offers.get('priceCurrency'),
            'availability': offers.get('availability'),
            'rating_value': aggregateRating.get('ratingValue'),
            'review_count': aggregateRating.get('reviewCount'),
            'image_urls': data.get('image', []) if isinstance(data.get('image'), list) else [data.get('image')],
            'extraction_method': 'json_ld',
            'scraped_at': time.time()
        }
    
    def extract_from_html(self, response):
        """HTMLからの情報抽出"""
        # 複数のセレクタパターンを試行
        name_selectors = [
            'h1.product-title::text',
            '.product-name::text',
            '[data-testid="product-title"]::text'
        ]
        
        price_selectors = [
            '.price-current::text',
            '.product-price::text',
            '[data-testid="price"]::text'
        ]
        
        name = self.extract_with_fallback(response, name_selectors)
        price = self.extract_with_fallback(response, price_selectors)
        
        # 在庫状況の判定
        stock_indicators = response.css('.stock-status, .availability').getall()
        is_in_stock = any('in stock' in indicator.lower() or 'available' in indicator.lower() 
                         for indicator in stock_indicators)
        
        # レビュー評価の抽出
        rating_stars = len(response.css('.rating .star.filled'))
        
        return {
            'url': response.url,
            'name': name,
            'price': price,
            'rating': rating_stars,
            'in_stock': is_in_stock,
            'images': response.css('.product-images img::attr(src)').getall(),
            'extraction_method': 'html_parsing',
            'scraped_at': time.time()
        }
    
    def extract_with_fallback(self, response, selectors):
        """複数セレクタによるフォールバック抽出"""
        for selector in selectors:
            result = response.css(selector).get()
            if result:
                return result.strip()
        return None

# Scrapyパイプラインでのデータ処理
class ProductPipeline:
    def __init__(self):
        self.file = open('scrapy_products.jsonl', 'w', encoding='utf-8')
        
    def process_item(self, item, spider):
        # データの正規化とバリデーション
        if item.get('name') and item.get('price'):
            # 価格の正規化
            price_text = item['price']
            import re
            price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
            if price_match:
                item['normalized_price'] = float(price_match.group().replace(',', ''))
            
            # JSON Lines形式で保存
            line = json.dumps(dict(item), ensure_ascii=False) + '\n'
            self.file.write(line)
            
        return item
    
    def close_spider(self, spider):
        self.file.close()

# 実行設定
def run_scraper():
    process = CrawlerProcess({
        'ITEM_PIPELINES': {
            '__main__.ProductPipeline': 300,
        },
        'LOG_LEVEL': 'INFO',
    })
    
    process.crawl(ProductSpider, 
                  start_url='https://example-site.com/products',
                  max_pages=50)
    process.start()

if __name__ == '__main__':
    run_scraper()

第3章:AI支援による対話的改善プロセス

3.1 反復的改善のフレームワーク

AI生成スクレイピングコードの真の価値は、対話による継続的な改善にあります。以下は、実際のプロジェクトで使用している改善プロセスの例です:

プロンプト設計のベストプラクティス

【改善要求の構造化プロンプト例】

現在のコードで以下の問題が発生しています:
1. エラー内容:[具体的なエラーメッセージ]
2. 発生条件:[エラーが発生する具体的な状況]
3. 期待動作:[本来期待していた動作]

追加で以下の機能を実装してください:
- Cookieセッション管理
- プロキシローテーション
- 動的待機時間調整

制約条件:
- 既存のデータ構造は維持すること
- パフォーマンスの劣化は最小限に抑えること
- ログ出力の詳細度を向上させること

AIとの対話による段階的改善例

第1段階:基本機能の実装

# AI生成初期版
import requests
from bs4 import BeautifulSoup

def scrape_basic(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    return soup.find_all('div', class_='product')

第2段階:エラーハンドリングの追加

対話でのフィードバック:”接続エラーが頻発するので、リトライ機能を追加してください”

# AI改善版
import requests
from bs4 import BeautifulSoup
import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class ImprovedScraper:
    def __init__(self):
        self.session = requests.Session()
        
        # リトライ戦略の設定
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    
    def scrape_with_retry(self, url):
        try:
            response = self.session.get(url, timeout=30)
            response.raise_for_status()
            return BeautifulSoup(response.content, 'html.parser')
        except requests.exceptions.RequestException as e:
            print(f"スクレイピングエラー: {e}")
            return None

第3段階:高度な機能の統合

追加要求:”Cookieセッション管理とUser-Agentローテーションを実装してください”

# AI最終改良版
import requests
from bs4 import BeautifulSoup
import random
import time
import pickle
import os
from fake_useragent import UserAgent

class AdvancedScraper:
    def __init__(self, cookie_file='cookies.pkl'):
        self.session = requests.Session()
        self.cookie_file = cookie_file
        self.ua = UserAgent()
        self.load_cookies()
        
        # User-Agentプールの準備
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
        ]
    
    def rotate_user_agent(self):
        """User-Agentのローテーション"""
        self.session.headers.update({
            'User-Agent': random.choice(self.user_agents)
        })
    
    def load_cookies(self):
        """保存されたCookieの読み込み"""
        if os.path.exists(self.cookie_file):
            with open(self.cookie_file, 'rb') as f:
                cookies = pickle.load(f)
                self.session.cookies.update(cookies)
    
    def save_cookies(self):
        """Cookieの保存"""
        with open(self.cookie_file, 'wb') as f:
            pickle.dump(self.session.cookies, f)
    
    def adaptive_delay(self, response_time):
        """応答時間に基づく適応的遅延"""
        if response_time > 2.0:
            time.sleep(random.uniform(2, 4))
        elif response_time > 1.0:
            time.sleep(random.uniform(1, 2))
        else:
            time.sleep(random.uniform(0.5, 1))
    
    def scrape_advanced(self, url):
        start_time = time.time()
        self.rotate_user_agent()
        
        try:
            response = self.session.get(url, timeout=30)
            response_time = time.time() - start_time
            
            response.raise_for_status()
            self.save_cookies()
            
            soup = BeautifulSoup(response.content, 'html.parser')
            self.adaptive_delay(response_time)
            
            return soup, response_time
            
        except requests.exceptions.RequestException as e:
            print(f"詳細エラー: {type(e).__name__}: {e}")
            return None, None

3.2 デバッグ支援機能の実装

AI生成コードの品質向上には、効果的なデバッグ機能が不可欠です:

import logging
import json
import traceback
from datetime import datetime
import hashlib

class ScrapingDebugger:
    def __init__(self, debug_level='INFO'):
        self.setup_logging(debug_level)
        self.debug_data = []
        
    def setup_logging(self, level):
        """詳細ログ設定"""
        logging.basicConfig(
            level=getattr(logging, level),
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(f'scraping_debug_{datetime.now().strftime("%Y%m%d")}.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def log_request(self, url, headers, response_code, response_time):
        """リクエスト詳細のログ記録"""
        request_hash = hashlib.md5(f"{url}{str(headers)}".encode()).hexdigest()[:8]
        
        debug_entry = {
            'timestamp': datetime.now().isoformat(),
            'request_id': request_hash,
            'url': url,
            'headers': dict(headers),
            'response_code': response_code,
            'response_time': response_time,
            'success': 200 <= response_code < 400
        }
        
        self.debug_data.append(debug_entry)
        self.logger.info(f"Request [{request_hash}]: {url} -> {response_code} ({response_time:.2f}s)")
    
    def log_extraction_result(self, url, selector, found_count, sample_data=None):
        """データ抽出結果のログ"""
        self.logger.info(f"Extraction from {url}: selector='{selector}' found={found_count}")
        if sample_data and found_count > 0:
            self.logger.debug(f"Sample data: {json.dumps(sample_data[:2], ensure_ascii=False, indent=2)}")
    
    def analyze_failures(self):
        """失敗パターンの分析"""
        failures = [entry for entry in self.debug_data if not entry['success']]
        
        if not failures:
            self.logger.info("失敗したリクエストはありません")
            return
        
        # ステータスコード別の集計
        status_codes = {}
        for failure in failures:
            code = failure['response_code']
            status_codes[code] = status_codes.get(code, 0) + 1
        
        self.logger.warning(f"失敗分析: 総失敗数={len(failures)}")
        for code, count in status_codes.items():
            self.logger.warning(f"  HTTP {code}: {count}回")
    
    def export_debug_report(self, filename=None):
        """デバッグレポートの出力"""
        if not filename:
            filename = f"debug_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        
        report = {
            'summary': {
                'total_requests': len(self.debug_data),
                'successful_requests': sum(1 for entry in self.debug_data if entry['success']),
                'average_response_time': sum(entry['response_time'] for entry in self.debug_data) / len(self.debug_data) if self.debug_data else 0
            },
            'details': self.debug_data
        }
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
        
        self.logger.info(f"デバッグレポートを {filename} に出力しました")

# デバッガーの統合例
class DebuggableScraper:
    def __init__(self):
        self.debugger = ScrapingDebugger()
        self.session = requests.Session()
    
    def scrape_with_debug(self, url):
        start_time = time.time()
        
        try:
            response = self.session.get(url)
            response_time = time.time() - start_time
            
            # デバッグ情報の記録
            self.debugger.log_request(
                url=url,
                headers=response.request.headers,
                response_code=response.status_code,
                response_time=response_time
            )
            
            if response.ok:
                soup = BeautifulSoup(response.content, 'html.parser')
                products = soup.find_all('div', class_='product')
                
                # 抽出結果のログ
                self.debugger.log_extraction_result(
                    url=url,
                    selector='div.product',
                    found_count=len(products),
                    sample_data=[p.get_text()[:100] for p in products[:2]]
                )
                
                return products
            else:
                response.raise_for_status()
                
        except Exception as e:
            self.debugger.logger.error(f"予期しないエラー: {e}")
            self.debugger.logger.debug(traceback.format_exc())
            return []

3.3 パフォーマンス最適化の自動提案

AI支援により、スクレイピングコードのパフォーマンス問題を自動的に検出し、改善提案を生成する機能も実装できます:

import psutil
import time
import threading
from collections import defaultdict, deque
import statistics

class PerformanceProfiler:
    def __init__(self, monitoring_interval=1.0):
        self.monitoring_interval = monitoring_interval
        self.metrics = defaultdict(deque)
        self.is_monitoring = False
        self.monitor_thread = None
        
    def start_monitoring(self):
        """パフォーマンス監視開始"""
        self.is_monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_resources)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """パフォーマンス監視停止"""
        self.is_monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
    
    def _monitor_resources(self):
        """リソース使用量の監視"""
        while self.is_monitoring:
            # CPU使用率
            cpu_percent = psutil.cpu_percent()
            self.metrics['cpu'].append(cpu_percent)
            
            # メモリ使用量
            memory = psutil.virtual_memory()
            self.metrics['memory_percent'].append(memory.percent)
            self.metrics['memory_used_mb'].append(memory.used / 1024 / 1024)
            
            # 最新100データポイントのみ保持
            for key in self.metrics:
                if len(self.metrics[key]) > 100:
                    self.metrics[key].popleft()
            
            time.sleep(self.monitoring_interval)
    
    def get_performance_summary(self):
        """パフォーマンス要約の生成"""
        if not self.metrics:
            return "監視データがありません"
        
        summary = {}
        for metric_name, values in self.metrics.items():
            if values:
                summary[metric_name] = {
                    'avg': statistics.mean(values),
                    'max': max(values),
                    'min': min(values),
                    'current': values[-1] if values else 0
                }
        
        return summary
    
    def analyze_performance_issues(self):
        """パフォーマンス問題の分析と改善提案"""
        summary = self.get_performance_summary()
        issues = []
        recommendations = []
        
        # CPU使用率の分析
        if 'cpu' in summary and summary['cpu']['avg'] > 80:
            issues.append("高CPU使用率が検出されました")
            recommendations.append("並行処理数を削減するか、処理間隔を延長してください")
        
        # メモリ使用量の分析
        if 'memory_percent' in summary and summary['memory_percent']['avg'] > 85:
            issues.append("高メモリ使用率が検出されました")
            recommendations.append("データの一括処理ではなく、ストリーミング処理を検討してください")
        
        # メモリ使用量の増加傾向
        if 'memory_used_mb' in summary:
            memory_values = list(self.metrics['memory_used_mb'])[-20:]  # 最新20データポイント
            if len(memory_values) > 10:
                trend = statistics.linear_regression(range(len(memory_values)), memory_values).slope
                if trend > 10:  # 10MB/秒以上の増加
                    issues.append("メモリリークの可能性があります")
                    recommendations.append("オブジェクトの明示的な削除とガベージコレクションを確認してください")
        
        return {
            'issues': issues,
            'recommendations': recommendations,
            'metrics': summary
        }

# パフォーマンス最適化統合例
class OptimizedScraper:
    def __init__(self):
        self.profiler = PerformanceProfiler()
        self.session = requests.Session()
        self.request_count = 0
        self.start_time = time.time()
        
    def scrape_with_optimization(self, urls):
        """最適化されたスクレイピング処理"""
        self.profiler.start_monitoring()
        results = []
        
        try:
            for i, url in enumerate(urls):
                result = self._scrape_single_url(url)
                results.append(result)
                
                # 定期的なパフォーマンスチェック
                if (i + 1) % 10 == 0:
                    self._check_and_adjust_performance()
            
        finally:
            self.profiler.stop_monitoring()
            self._generate_performance_report()
        
        return results
    
    def _scrape_single_url(self, url):
        """単一URLのスクレイピング"""
        start_time = time.time()
        
        try:
            response = self.session.get(url, timeout=30)
            processing_time = time.time() - start_time
            
            self.request_count += 1
            
            if response.ok:
                soup = BeautifulSoup(response.content, 'html.parser')
                return {
                    'url': url,
                    'status': 'success',
                    'processing_time': processing_time,
                    'content_length': len(response.content),
                    'data': self._extract_data(soup)
                }
            else:
                return {
                    'url': url,
                    'status': 'error',
                    'processing_time': processing_time,
                    'error_code': response.status_code
                }
                
        except Exception as e:
            return {
                'url': url,
                'status': 'exception',
                'error': str(e),
                'processing_time': time.time() - start_time
            }
    
    def _check_and_adjust_performance(self):
        """パフォーマンスチェックと動的調整"""
        analysis = self.profiler.analyze_performance_issues()
        
        if analysis['issues']:
            print("パフォーマンス問題を検出:")
            for issue in analysis['issues']:
                print(f"  - {issue}")
            
            # 自動調整の実装
            if "高CPU使用率" in str(analysis['issues']):
                print("  -> リクエスト間隔を延長します")
                time.sleep(2)
            
            if "高メモリ使用率" in str(analysis['issues']):
                print("  -> ガベージコレクションを実行します")
                import gc
                gc.collect()
    
    def _extract_data(self, soup):
        """データ抽出(プレースホルダー)"""
        return len(soup.find_all('div'))
    
    def _generate_performance_report(self):
        """パフォーマンスレポートの生成"""
        total_time = time.time() - self.start_time
        analysis = self.profiler.analyze_performance_issues()
        
        report = {
            'execution_summary': {
                'total_requests': self.request_count,
                'total_time': total_time,
                'requests_per_second': self.request_count / total_time if total_time > 0 else 0
            },
            'performance_analysis': analysis
        }
        
        print("\n=== パフォーマンスレポート ===")
        print(f"総リクエスト数: {report['execution_summary']['total_requests']}")
        print(f"実行時間: {report['execution_summary']['total_time']:.2f}秒")
        print(f"スループット: {report['execution_summary']['requests_per_second']:.2f} req/s")
        
        if analysis['recommendations']:
            print("\n改善提案:")
            for rec in analysis['recommendations']:
                print(f"  - {rec}")

第4章:セキュリティと法的コンプライアンス

4.1 robots.txt遵守とアクセス制御

Webスクレイピングにおいて、robots.txtの遵守は法的・倫理的観点から極めて重要です。AI生成コードにおいても、この制約を適切に実装する必要があります:

import urllib.robotparser
import time
from urllib.parse import urljoin, urlparse
import threading
from collections import defaultdict

class RobotsTxtManager:
    def __init__(self, cache_duration=3600):  # 1時間キャッシュ
        self.cache = {}
        self.cache_duration = cache_duration
        self.last_check = defaultdict(float)
        self.lock = threading.Lock()
        
    def can_fetch(self, url, user_agent='*'):
        """robots.txtに基づくアクセス可否判定"""
        parsed_url = urlparse(url)
        robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
        
        with self.lock:
            current_time = time.time()
            
            # キャッシュの確認
            if (robots_url in self.cache and 
                current_time - self.last_check[robots_url] < self.cache_duration):
                rp = self.cache[robots_url]
            else:
                # robots.txtの新規取得
                rp = urllib.robotparser.RobotFileParser()
                rp.set_url(robots_url)
                
                try:
                    rp.read()
                    self.cache[robots_url] = rp
                    self.last_check[robots_url] = current_time
                except Exception as e:
                    print(f"robots.txt取得エラー ({robots_url}): {e}")
                    # エラー時は許可と判定(保守的なアプローチ)
                    return True
            
            return rp.can_fetch(user_agent, url)
    
    def get_crawl_delay(self, url, user_agent='*'):
        """クローリング遅延時間の取得"""
        parsed_url = urlparse(url)
        robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
        
        if robots_url in self.cache:
            rp = self.cache[robots_url]
            delay = rp.crawl_delay(user_agent)
            return delay if delay is not None else 1.0  # デフォルト1秒
        
        return 1.0
    
    def get_request_rate(self, url, user_agent='*'):
        """リクエストレートの取得"""
        parsed_url = urlparse(url)
        robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
        
        if robots_url in self.cache:
            rp = self.cache[robots_url]
            try:
                # robots.txtの Request-rate ディレクティブ解析
                # 注:標準的なurllib実装では対応していないため、独自実装
                return self._parse_request_rate(rp, user_agent)
            except:
                return None
        
        return None
    
    def _parse_request_rate(self, rp, user_agent):
        """Request-rateディレクティブの解析"""
        # 簡略化された実装例
        for line in rp._RobotFileParser__entries:
            if line.useragent == user_agent or line.useragent == '*':
                # 実際の実装では、より詳細な解析が必要
                pass
        return None

class EthicalScraper:
    def __init__(self, user_agent='EthicalBot/1.0', respect_robots=True):
        self.user_agent = user_agent
        self.respect_robots = respect_robots
        self.robots_manager = RobotsTxtManager() if respect_robots else None
        self.session = requests.Session()
        self.session.headers.update({'User-Agent': user_agent})
        
        # ドメイン別アクセス制御
        self.domain_delays = defaultdict(lambda: 1.0)
        self.last_access = defaultdict(float)
    
    def scrape_ethically(self, url):
        """倫理的制約を遵守したスクレイピング"""
        # robots.txt確認
        if self.respect_robots and not self.robots_manager.can_fetch(url, self.user_agent):
            raise PermissionError(f"robots.txtによりアクセスが禁止されています: {url}")
        
        # 適切な遅延の実装
        domain = urlparse(url).netloc
        
        if self.respect_robots:
            crawl_delay = self.robots_manager.get_crawl_delay(url, self.user_agent)
            self.domain_delays[domain] = max(self.domain_delays[domain], crawl_delay)
        
        # 最後のアクセスからの経過時間チェック
        current_time = time.time()
        time_since_last = current_time - self.last_access[domain]
        required_delay = self.domain_delays[domain]
        
        if time_since_last < required_delay:
            sleep_time = required_delay - time_since_last
            print(f"レート制限のため {sleep_time:.2f}秒待機します...")
            time.sleep(sleep_time)
        
        try:
            response = self.session.get(url, timeout=30)
            self.last_access[domain] = time.time()
            
            response.raise_for_status()
            return response
            
        except requests.exceptions.RequestException as e:
            print(f"アクセスエラー: {e}")
            raise
    
    def batch_scrape_ethically(self, urls):
        """バッチ処理での倫理的スクレイピング"""
        results = []
        
        # URLをドメイン別にグループ化
        domain_groups = defaultdict(list)
        for url in urls:
            domain = urlparse(url).netloc
            domain_groups[domain].append(url)
        
        # ドメイン別に順次処理
        for domain, domain_urls in domain_groups.items():
            print(f"ドメイン {domain} の処理を開始します({len(domain_urls)}件)")
            
            for url in domain_urls:
                try:
                    response = self.scrape_ethically(url)
                    results.append({
                        'url': url,
                        'status': 'success',
                        'response': response
                    })
                except Exception as e:
                    results.append({
                        'url': url,
                        'status': 'error',
                        'error': str(e)
                    })
        
        return results

4.2 データプライバシーとGDPR対応

個人データを含むWebサイトをスクレイピングする場合、GDPR(EU一般データ保護規則)やその他のプライバシー法規制への対応が必要です:

import re
import hashlib
import json
from datetime import datetime, timedelta
import uuid

class PrivacyCompliantScraper:
    def __init__(self, anonymization_level='medium'):
        self.anonymization_level = anonymization_level
        self.pii_patterns = self._setup_pii_patterns()
        self.data_retention_days = 30  # デフォルト30日
        self.consent_records = {}
        
    def _setup_pii_patterns(self):
        """個人情報パターンの定義"""
        return {
            'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
            'phone_jp': re.compile(r'0\d{1,4}-\d{1,4}-\d{4}'),
            'phone_international': re.compile(r'\+\d{1,3}-\d{1,4}-\d{1,4}-\d{4}'),
            'credit_card': re.compile(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'),
            'ssn_us': re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
            'ip_address': re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b')
        }
    
    def detect_pii(self, text):
        """個人情報の検出"""
        detected_pii = {}
        
        for pii_type, pattern in self.pii_patterns.items():
            matches = pattern.findall(text)
            if matches:
                detected_pii[pii_type] = matches
        
        return detected_pii
    
    def anonymize_data(self, data):
        """データの匿名化処理"""
        if isinstance(data, str):
            return self._anonymize_text(data)
        elif isinstance(data, dict):
            return {key: self.anonymize_data(value) for key, value in data.items()}
        elif isinstance(data, list):
            return [self.anonymize_data(item) for item in data]
        else:
            return data
    
    def _anonymize_text(self, text):
        """テキストの匿名化"""
        anonymized_text = text
        
        for pii_type, pattern in self.pii_patterns.items():
            if self.anonymization_level == 'high':
                # 完全削除
                anonymized_text = pattern.sub('[REDACTED]', anonymized_text)
            elif self.anonymization_level == 'medium':
                # ハッシュ化
                def hash_match(match):
                    return hashlib.sha256(match.group().encode()).hexdigest()[:8]
                anonymized_text = pattern.sub(hash_match, anonymized_text)
            elif self.anonymization_level == 'low':
                # 部分マスキング
                def mask_match(match):
                    value = match.group()
                    if len(value) > 4:
                        return value[:2] + '*' * (len(value) - 4) + value[-2:]
                    else:
                        return '*' * len(value)
                anonymized_text = pattern.sub(mask_match, anonymized_text)
        
        return anonymized_text
    
    def log_data_processing(self, url, data_types, legal_basis='legitimate_interest'):
        """データ処理のログ記録(GDPR対応)"""
        processing_record = {
            'timestamp': datetime.now().isoformat(),
            'processing_id': str(uuid.uuid4()),
            'source_url': url,
            'data_types': data_types,
            'legal_basis': legal_basis,
            'purpose': 'web_scraping_analysis',
            'retention_until': (datetime.now() + timedelta(days=self.data_retention_days)).isoformat(),
            'anonymization_level': self.anonymization_level
        }
        
        # 処理記録の保存
        log_file = f"data_processing_log_{datetime.now().strftime('%Y%m')}.json"
        
        try:
            with open(log_file, 'r', encoding='utf-8') as f:
                logs = json.load(f)
        except FileNotFoundError:
            logs = []
        
        logs.append(processing_record)
        
        with open(log_file, 'w', encoding='utf-8') as f:
            json.dump(logs, f, ensure_ascii=False, indent=2)
        
        return processing_record['processing_id']
    
    def scrape_with_privacy_compliance(self, url):
        """プライバシー準拠スクレイピング"""
        try:
            # 通常のスクレイピング処理
            response = requests.get(url)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.content, 'html.parser')
            raw_text = soup.get_text()
            
            # PII検出
            detected_pii = self.detect_pii(raw_text)
            
            if detected_pii:
                print(f"個人情報を検出しました: {list(detected_pii.keys())}")
                
                # データ処理をログに記録
                processing_id = self.log_data_processing(
                    url=url,
                    data_types=list(detected_pii.keys())
                )
                
                # データの匿名化
                anonymized_text = self._anonymize_text(raw_text)
                
                return {
                    'url': url,
                    'processing_id': processing_id,
                    'content': anonymized_text,
                    'pii_detected': True,
                    'pii_types': list(detected_pii.keys()),
                    'anonymization_applied': True
                }
            else:
                return {
                    'url': url,
                    'content': raw_text,
                    'pii_detected': False,
                    'anonymization_applied': False
                }
                
        except Exception as e:
            return {
                'url': url,
                'error': str(e),
                'status': 'failed'
            }
    
    def generate_privacy_report(self):
        """プライバシー処理レポートの生成"""
        current_month = datetime.now().strftime('%Y%m')
        log_file = f"data_processing_log_{current_month}.json"
        
        try:
            with open(log_file, 'r', encoding='utf-8') as f:
                logs = json.load(f)
        except FileNotFoundError:
            return {"error": "処理ログが見つかりません"}
        
        # 統計の計算
        total_processing = len(logs)
        pii_types_count = {}
        legal_basis_count = {}
        
        for log in logs:
            for data_type in log['data_types']:
                pii_types_count[data_type] = pii_types_count.get(data_type, 0) + 1
            
            legal_basis = log['legal_basis']
            legal_basis_count[legal_basis] = legal_basis_count.get(legal_basis, 0) + 1
        
        report = {
            'report_generated': datetime.now().isoformat(),
            'period': current_month,
            'summary': {
                'total_data_processing_activities': total_processing,
                'unique_pii_types_processed': len(pii_types_count),
                'anonymization_level_used': self.anonymization_level
            },
            'pii_types_frequency': pii_types_count,
            'legal_basis_distribution': legal_basis_count,
            'compliance_status': 'compliant' if total_processing > 0 else 'no_data_processed'
        }
        
        return report

# 使用例
privacy_scraper = PrivacyCompliantScraper(anonymization_level='medium')

# 個別URLの処理
result = privacy_scraper.scrape_with_privacy_compliance('https://example.com/user-profiles')
print(f"処理結果: PII検出={result.get('pii_detected', False)}")

# プライバシーレポートの生成
privacy_report = privacy_scraper.generate_privacy_report()
print("プライバシー処理レポート:", json.dumps(privacy_report, ensure_ascii=False, indent=2))

4.3 アンチボット対策の回避技術

現代のWebサイトは高度なアンチボット技術を採用しており、これらを適切に回避する技術が必要です:

import undetected_chromedriver as uc
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
import random
import time
import json

class AntiDetectionScraper:
    def __init__(self, stealth_mode=True):
        self.stealth_mode = stealth_mode
        self.driver = None
        self.human_behavior_patterns = self._load_behavior_patterns()
        
    def _load_behavior_patterns(self):
        """人間らしい行動パターンの定義"""
        return {
            'scroll_patterns': [
                {'direction': 'down', 'distance': random.randint(200, 800), 'pause': random.uniform(0.5, 2.0)},
                {'direction': 'up', 'distance': random.randint(100, 400), 'pause': random.uniform(0.3, 1.5)},
            ],
            'mouse_movements': [
                {'x_offset': random.randint(-50, 50), 'y_offset': random.randint(-30, 30)},
                {'x_offset': random.randint(-100, 100), 'y_offset': random.randint(-50, 50)},
            ],
            'typing_delays': {
                'min_char_delay': 0.05,
                'max_char_delay': 0.25,
                'word_pause': random.uniform(0.1, 0.5)
            }
        }
    
    def setup_stealth_driver(self):
        """ステルスモードでのWebDriver設定"""
        options = uc.ChromeOptions()
        
        if self.stealth_mode:
            # 基本的なステルス設定
            options.add_argument('--no-first-run')
            options.add_argument('--no-default-browser-check')
            options.add_argument('--disable-blink-features=AutomationControlled')
            options.add_experimental_option("excludeSwitches", ["enable-automation"])
            options.add_experimental_option('useAutomationExtension', False)
            
            # フィンガープリンティング対策
            options.add_argument('--disable-web-security')
            options.add_argument('--disable-features=VizDisplayCompositor')
            options.add_argument('--disable-extensions')
            options.add_argument('--disable-plugins')
            options.add_argument('--disable-images')  # 画像読み込み無効化(高速化)
            
            # ランダムなウィンドウサイズ
            width = random.randint(1200, 1920)
            height = random.randint(800, 1080)
            options.add_argument(f'--window-size={width},{height}')
        
        self.driver = uc.Chrome(options=options)
        
        if self.stealth_mode:
            # WebDriverプロパティの隠蔽
            self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
            
            # ユーザーエージェントのランダム化
            user_agents = [
                'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
                'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
                'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
            ]
            
            self.driver.execute_cdp_cmd('Network.setUserAgentOverride', {
                "userAgent": random.choice(user_agents)
            })
    
    def simulate_human_behavior(self, action_type='scroll'):
        """人間らしい行動のシミュレーション"""
        if action_type == 'scroll':
            pattern = random.choice(self.human_behavior_patterns['scroll_patterns'])
            
            if pattern['direction'] == 'down':
                self.driver.execute_script(f"window.scrollBy(0, {pattern['distance']});")
            else:
                self.driver.execute_script(f"window.scrollBy(0, -{pattern['distance']});")
            
            time.sleep(pattern['pause'])
            
        elif action_type == 'mouse_movement':
            pattern = random.choice(self.human_behavior_patterns['mouse_movements'])
            
            # 現在のマウス位置から相対的に移動
            ActionChains(self.driver).move_by_offset(
                pattern['x_offset'], 
                pattern['y_offset']
            ).perform()
            
            time.sleep(random.uniform(0.1, 0.5))
    
    def human_like_typing(self, element, text):
        """人間らしいタイピングのシミュレーション"""
        element.clear()
        
        for char in text:
            element.send_keys(char)
            delay = random.uniform(
                self.human_behavior_patterns['typing_delays']['min_char_delay'],
                self.human_behavior_patterns['typing_delays']['max_char_delay']
            )
            time.sleep(delay)
            
            # 単語の区切りで少し長めの停止
            if char == ' ':
                time.sleep(self.human_behavior_patterns['typing_delays']['word_pause'])
    
    def bypass_cloudflare(self, url, max_wait_time=30):
        """Cloudflare チャレンジの回避"""
        self.driver.get(url)
        
        start_time = time.time()
        
        while time.time() - start_time < max_wait_time:
            # Cloudflare チャレンジページの検出
            if "Checking your browser" in self.driver.page_source or \
               "Just a moment" in self.driver.page_source or \
               "cf-browser-verification" in self.driver.page_source:
                
                print("Cloudflare チャレンジを検出しました。待機中...")
                
                # 人間らしい行動でチャレンジを通過
                self.simulate_human_behavior('mouse_movement')
                time.sleep(random.uniform(2, 5))
                self.simulate_human_behavior('scroll')
                time.sleep(random.uniform(1, 3))
                
            else:
                # チャレンジ通過を確認
                if "Cloudflare" not in self.driver.page_source.lower():
                    print("Cloudflare チャレンジを通過しました")
                    return True
            
            time.sleep(1)
        
        print("Cloudflare チャレンジの通過に失敗しました")
        return False
    
    def handle_captcha_detection(self):
        """CAPTCHA検出時の処理"""
        captcha_indicators = [
            "captcha", "recaptcha", "hcaptcha", 
            "robot", "verify", "security check"
        ]
        
        page_text = self.driver.page_source.lower()
        
        for indicator in captcha_indicators:
            if indicator in page_text:
                print(f"CAPTCHA検出: {indicator}")
                
                # CAPTCHAフレームの検索
                captcha_frames = self.driver.find_elements("css selector", "iframe[src*='captcha'], iframe[src*='recaptcha']")
                
                if captcha_frames:
                    print("CAPTCHA解決のため、手動介入が必要です")
                    # 実装では、CAPTCHA解決サービスAPIを呼び出すか、
                    # 手動解決のための一時停止を行う
                    input("CAPTCHAを手動で解決してからEnterキーを押してください...")
                    return True
        
        return False
    
    def scrape_with_anti_detection(self, url, extraction_config):
        """アンチ検出機能付きスクレイピング"""
        try:
            if not self.driver:
                self.setup_stealth_driver()
            
            # Cloudflare等のチャレンジを回避
            if not self.bypass_cloudflare(url):
                return {"error": "アクセス制限を回避できませんでした"}
            
            # CAPTCHA検出と処理
            self.handle_captcha_detection()
            
            # ページの完全読み込み待機
            self.wait_for_page_load()
            
            # 人間らしい行動でページを探索
            self.simulate_human_behavior('scroll')
            time.sleep(random.uniform(1, 3))
            self.simulate_human_behavior('mouse_movement')
            
            # データ抽出
            extracted_data = self.extract_data_with_config(extraction_config)
            
            return {
                "url": url,
                "status": "success",
                "data": extracted_data,
                "extraction_time": time.time()
            }
            
        except Exception as e:
            return {
                "url": url,
                "status": "error",
                "error": str(e)
            }
    
    def wait_for_page_load(self, timeout=30):
        """ページ読み込み完了の待機"""
        WebDriverWait(self.driver, timeout).until(
            lambda driver: driver.execute_script("return document.readyState") == "complete"
        )
        
        # 追加でAJAXコンテンツの読み込み待機
        time.sleep(random.uniform(2, 5))
    
    def extract_data_with_config(self, config):
        """設定に基づくデータ抽出"""
        results = {}
        
        for field_name, selector_config in config.items():
            try:
                if selector_config['type'] == 'single':
                    element = self.driver.find_element("css selector", selector_config['selector'])
                    results[field_name] = element.text if selector_config.get('attribute') != 'href' else element.get_attribute('href')
                
                elif selector_config['type'] == 'multiple':
                    elements = self.driver.find_elements("css selector", selector_config['selector'])
                    results[field_name] = [
                        elem.text if selector_config.get('attribute') != 'href' else elem.get_attribute('href')
                        for elem in elements
                    ]
                
            except Exception as e:
                print(f"フィールド '{field_name}' の抽出でエラー: {e}")
                results[field_name] = None
        
        return results
    
    def close(self):
        """リソースのクリーンアップ"""
        if self.driver:
            self.driver.quit()

# 高度なセッション管理
class SessionManager:
    def __init__(self):
        self.session_data = {}
        self.session_file = 'scraping_sessions.json'
        self.load_sessions()
    
    def load_sessions(self):
        """保存されたセッションの読み込み"""
        try:
            with open(self.session_file, 'r', encoding='utf-8') as f:
                self.session_data = json.load(f)
        except FileNotFoundError:
            self.session_data = {}
    
    def save_sessions(self):
        """セッションデータの保存"""
        with open(self.session_file, 'w', encoding='utf-8') as f:
            json.dump(self.session_data, f, ensure_ascii=False, indent=2)
    
    def create_session(self, site_domain):
        """新しいセッションの作成"""
        session_id = str(uuid.uuid4())
        
        self.session_data[session_id] = {
            'domain': site_domain,
            'created_at': datetime.now().isoformat(),
            'request_count': 0,
            'last_access': None,
            'success_rate': 1.0,
            'detected_blocks': []
        }
        
        self.save_sessions()
        return session_id
    
    def update_session(self, session_id, success=True, blocked=False):
        """セッション状態の更新"""
        if session_id in self.session_data:
            session = self.session_data[session_id]
            session['request_count'] += 1
            session['last_access'] = datetime.now().isoformat()
            
            if blocked:
                session['detected_blocks'].append(datetime.now().isoformat())
            
            # 成功率の計算
            total_requests = session['request_count']
            failed_requests = len(session['detected_blocks'])
            session['success_rate'] = (total_requests - failed_requests) / total_requests
            
            self.save_sessions()
    
    def should_rotate_session(self, session_id, threshold=0.7):
        """セッションローテーションの判定"""
        if session_id in self.session_data:
            return self.session_data[session_id]['success_rate'] < threshold
        return False

# 使用例
extraction_config = {
    'title': {'type': 'single', 'selector': 'h1.product-title'},
    'price': {'type': 'single', 'selector': '.price-current'},
    'images': {'type': 'multiple', 'selector': '.product-images img', 'attribute': 'src'},
    'description': {'type': 'single', 'selector': '.product-description'}
}

scraper = AntiDetectionScraper(stealth_mode=True)
session_manager = SessionManager()

try:
    session_id = session_manager.create_session('example-shop.com')
    
    result = scraper.scrape_with_anti_detection(
        'https://example-shop.com/product/123',
        extraction_config
    )
    
    if result['status'] == 'success':
        session_manager.update_session(session_id, success=True)
        print("データ抽出成功:", result['data'])
    else:
        session_manager.update_session(session_id, success=False, blocked=True)
        print("抽出失敗:", result['error'])

finally:
    scraper.close()

第5章:スケーラビリティと運用最適化

5.1 分散処理アーキテクチャの実装

大規模なWebスクレイピングプロジェクトでは、分散処理による水平スケーリングが必要となります:

import asyncio
import aiohttp
import aiofiles
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
from dataclasses import dataclass
from typing import List, Dict, Any
import queue
import threading
import time
import json

@dataclass
class ScrapingTask:
    url: str
    priority: int = 1
    retry_count: int = 0
    max_retries: int = 3
    task_id: str = None
    metadata: Dict[str, Any] = None

class DistributedScrapingOrchestrator:
    def __init__(self, max_workers=None, max_concurrent_requests=50):
        self.max_workers = max_workers or mp.cpu_count()
        self.max_concurrent_requests = max_concurrent_requests
        self.task_queue = asyncio.Queue()
        self.result_queue = asyncio.Queue()
        self.failed_tasks = []
        self.completed_tasks = []
        self.stats = {
            'total_tasks': 0,
            'completed': 0,
            'failed': 0,
            'in_progress': 0
        }
        
    async def add_task(self, task: ScrapingTask):
        """タスクをキューに追加"""
        if not task.task_id:
            task.task_id = f"task_{int(time.time())}_{hash(task.url) % 10000}"
        
        await self.task_queue.put(task)
        self.stats['total_tasks'] += 1
    
    async def add_tasks_batch(self, tasks: List[ScrapingTask]):
        """バッチでタスクを追加"""
        for task in tasks:
            await self.add_task(task)
    
    async def worker(self, session: aiohttp.ClientSession, worker_id: int):
        """ワーカープロセス"""
        while True:
            try:
                # タスクの取得(タイムアウト付き)
                task = await asyncio.wait_for(self.task_queue.get(), timeout=5.0)
                
                self.stats['in_progress'] += 1
                print(f"Worker {worker_id}: タスク {task.task_id} を処理中")
                
                # スクレイピング実行
                result = await self.execute_scraping_task(session, task)
                
                if result['status'] == 'success':
                    self.completed_tasks.append(result)
                    self.stats['completed'] += 1
                else:
                    # リトライロジック
                    if task.retry_count < task.max_retries:
                        task.retry_count += 1
                        await self.task_queue.put(task)  # 再度キューに追加
                        print(f"タスク {task.task_id} をリトライします({task.retry_count}/{task.max_retries})")
                    else:
                        self.failed_tasks.append(result)
                        self.stats['failed'] += 1
                
                self.stats['in_progress'] -= 1
                self.task_queue.task_done()
                
                # レート制限
                await asyncio.sleep(random.uniform(0.1, 0.5))
                
            except asyncio.TimeoutError:
                # キューが空の場合の処理
                break
            except Exception as e:
                print(f"Worker {worker_id} でエラー: {e}")
                self.stats['in_progress'] -= 1
                continue
    
    async def execute_scraping_task(self, session: aiohttp.ClientSession, task: ScrapingTask):
        """個別スクレイピングタスクの実行"""
        start_time = time.time()
        
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
            
            async with session.get(task.url, headers=headers, timeout=30) as response:
                if response.status == 200:
                    content = await response.text()
                    
                    # BeautifulSoupによる解析(別スレッドで実行)
                    loop = asyncio.get_event_loop()
                    with ThreadPoolExecutor() as executor:
                        parsed_data = await loop.run_in_executor(
                            executor, 
                            self.parse_content, 
                            content, 
                            task.metadata
                        )
                    
                    processing_time = time.time() - start_time
                    
                    return {
                        'task_id': task.task_id,
                        'url': task.url,
                        'status': 'success',
                        'data': parsed_data,
                        'processing_time': processing_time,
                        'response_status': response.status
                    }
                else:
                    return {
                        'task_id': task.task_id,
                        'url': task.url,
                        'status': 'error',
                        'error': f'HTTP {response.status}',
                        'processing_time': time.time() - start_time
                    }
                    
        except Exception as e:
            return {
                'task_id': task.task_id,
                'url': task.url,
                'status': 'exception',
                'error': str(e),
                'processing_time': time.time() - start_time
            }
    
    def parse_content(self, content: str, metadata: Dict):
        """コンテンツの解析(CPU集約的処理)"""
        from bs4 import BeautifulSoup
        
        soup = BeautifulSoup(content, 'html.parser')
        
        # メタデータに基づく動的解析
        if metadata and 'selectors' in metadata:
            extracted_data = {}
            for field, selector in metadata['selectors'].items():
                try:
                    elements = soup.select(selector)
                    if len(elements) == 1:
                        extracted_data[field] = elements[0].get_text(strip=True)
                    else:
                        extracted_data[field] = [elem.get_text(strip=True) for elem in elements]
                except Exception as e:
                    extracted_data[field] = f"抽出エラー: {e}"
            
            return extracted_data
        else:
            # デフォルトの解析
            return {
                'title': soup.title.string if soup.title else None,
                'text_length': len(soup.get_text()),
                'link_count': len(soup.find_all('a'))
            }
    
    async def run_distributed_scraping(self):
        """分散スクレイピングの実行"""
        # aiohttp セッションの作成
        connector = aiohttp.TCPConnector(
            limit=self.max_concurrent_requests,
            limit_per_host=10
        )
        
        timeout = aiohttp.ClientTimeout(total=60)
        
        async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
            # ワーカーの起動
            workers = [
                asyncio.create_task(self.worker(session, i)) 
                for i in range(self.max_workers)
            ]
            
            # 進捗モニタリングタスク
            monitor_task = asyncio.create_task(self.monitor_progress())
            
            # 全ワーカーの完了を待機
            await asyncio.gather(*workers, return_exceptions=True)
            
            # モニタリング停止
            monitor_task.cancel()
        
        return self.get_results_summary()
    
    async def monitor_progress(self):
        """進捗のモニタリング"""
        while True:
            await asyncio.sleep(10)  # 10秒間隔で進捗表示
            
            total = self.stats['total_tasks']
            completed = self.stats['completed']
            failed = self.stats['failed']
            in_progress = self.stats['in_progress']
            
            if total > 0:
                completion_rate = (completed + failed) / total * 100
                success_rate = completed / (completed + failed) * 100 if (completed + failed) > 0 else 0
                
                print(f"進捗: {completion_rate:.1f}% 完了 | 成功率: {success_rate:.1f}% | 処理中: {in_progress}")
            
            # 全て完了した場合は終了
            if total > 0 and (completed + failed) >= total and in_progress == 0:
                break
    
    def get_results_summary(self):
        """結果サマリーの生成"""
        return {
            'statistics': self.stats,
            'successful_results': self.completed_tasks,
            'failed_tasks': self.failed_tasks,
            'success_rate': self.stats['completed'] / (self.stats['completed'] + self.stats['failed']) * 100 if (self.stats['completed'] + self.stats['failed']) > 0 else 0
        }

# 実行例
async def main_distributed_scraping():
    orchestrator = DistributedScrapingOrchestrator(max_workers=8)
    
    # タスクの準備
    urls = [f"https://example-site.com/page/{i}" for i in range(1, 101)]
    
    # セレクタ設定
    selectors = {
        'title': 'h1.page-title',
        'content': '.main-content p',
        'links': 'a[href]'
    }
    
    tasks = [
        ScrapingTask(
            url=url, 
            priority=1,
            metadata={'selectors': selectors}
        ) 
        for url in urls
    ]
    
    # タスクの一括追加
    await orchestrator.add_tasks_batch(tasks)
    
    # 分散処理の実行
    results = await orchestrator.run_distributed_scraping()
    
    print("=== 分散スクレイピング完了 ===")
    print(f"総タスク数: {results['statistics']['total_tasks']}")
    print(f"成功: {results['statistics']['completed']}")
    print(f"失敗: {results['statistics']['failed']}")
    print(f"成功率: {results['success_rate']:.2f}%")
    
    return results

# 非同期実行
if __name__ == "__main__":
    results = asyncio.run(main_distributed_scraping())

5.2 リアルタイム監視とアラートシステム

大規模スクレイピング運用では、リアルタイムでの監視とアラート機能が重要です:

import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import sqlite3
from datetime import datetime, timedelta
import threading
import psutil
import requests
from typing import Dict, List, Callable
import json
import time

class ScrapingMonitor:
    def __init__(self, db_path='scraping_monitor.db'):
        self.db_path = db_path
        self.setup_database()
        self.setup_logging()
        self.alert_handlers = []
        self.monitoring_active = False
        self.metrics = {
            'requests_per_minute': 0,
            'success_rate': 0,
            'average_response_time': 0,
            'error_rate': 0,
            'cpu_usage': 0,
            'memory_usage': 0
        }
        self.thresholds = {
            'max_error_rate': 10,  # 10%
            'min_success_rate': 85,  # 85%
            'max_response_time': 5000,  # 5秒
            'max_cpu_usage': 80,  # 80%
            'max_memory_usage': 80  # 80%
        }
        
    def setup_database(self):
        """監視データベースの初期化"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS scraping_metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                metric_name TEXT NOT NULL,
                metric_value REAL NOT NULL,
                additional_data TEXT
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS scraping_events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                event_type TEXT NOT NULL,
                severity TEXT NOT NULL,
                message TEXT NOT NULL,
                additional_data TEXT
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS alert_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                alert_type TEXT NOT NULL,
                message TEXT NOT NULL,
                resolved BOOLEAN DEFAULT FALSE,
                resolution_time DATETIME
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def setup_logging(self):
        """ログ設定"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('scraping_monitor.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def add_alert_handler(self, handler: Callable):
        """アラートハンドラーの追加"""
        self.alert_handlers.append(handler)
    
    def record_metric(self, metric_name: str, value: float, additional_data: Dict = None):
        """メトリクスの記録"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        additional_json = json.dumps(additional_data) if additional_data else None
        
        cursor.execute('''
            INSERT INTO scraping_metrics (metric_name, metric_value, additional_data)
            VALUES (?, ?, ?)
        ''', (metric_name, value, additional_json))
        
        conn.commit()
        conn.close()
        
        # リアルタイムメトリクスの更新
        self.metrics[metric_name] = value
        
        # しきい値チェック
        self.check_thresholds(metric_name, value)
    
    def record_event(self, event_type: str, severity: str, message: str, additional_data: Dict = None):
        """イベントの記録"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        additional_json = json.dumps(additional_data) if additional_data else None
        
        cursor.execute('''
            INSERT INTO scraping_events (event_type, severity, message, additional_data)
            VALUES (?, ?, ?, ?)
        ''', (event_type, severity, message, additional_json))
        
        conn.commit()
        conn.close()
        
        self.logger.info(f"イベント記録: {event_type} - {severity} - {message}")
        
        # 重要度の高いイベントはアラート送信
        if severity in ['ERROR', 'CRITICAL']:
            self.send_alert(f"{event_type}: {message}", severity)
    
    def check_thresholds(self, metric_name: str, value: float):
        """しきい値チェックとアラート"""
        alerts_triggered = []
        
        if metric_name == 'error_rate' and value > self.thresholds['max_error_rate']:
            alerts_triggered.append(f"エラー率が高すぎます: {value}% (しきい値: {self.thresholds['max_error_rate']}%)")
        
        elif metric_name == 'success_rate' and value < self.thresholds['min_success_rate']:
            alerts_triggered.append(f"成功率が低すぎます: {value}% (しきい値: {self.thresholds['min_success_rate']}%)")
        
        elif metric_name == 'average_response_time' and value > self.thresholds['max_response_time']:
            alerts_triggered.append(f"応答時間が遅すぎます: {value}ms (しきい値: {self.thresholds['max_response_time']}ms)")
        
        elif metric_name == 'cpu_usage' and value > self.thresholds['max_cpu_usage']:
            alerts_triggered.append(f"CPU使用率が高すぎます: {value}% (しきい値: {self.thresholds['max_cpu_usage']}%)")
        
        elif metric_name == 'memory_usage' and value > self.thresholds['max_memory_usage']:
            alerts_triggered.append(f"メモリ使用率が高すぎます: {value}% (しきい値: {self.thresholds['max_memory_usage']}%)")
        
        # アラート送信
        for alert_message in alerts_triggered:
            self.send_alert(alert_message, 'WARNING')
    
    def send_alert(self, message: str, severity: str):
        """アラートの送信"""
        # データベースにアラート履歴を記録
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO alert_history (alert_type, message)
            VALUES (?, ?)
        ''', (severity, message))
        
        conn.commit()
        conn.close()
        
        # 登録されたハンドラーにアラート送信
        for handler in self.alert_handlers:
            try:
                handler(message, severity)
            except Exception as e:
                self.logger.error(f"アラートハンドラーエラー: {e}")
    
    def start_monitoring(self, interval=60):
        """監視の開始"""
        self.monitoring_active = True
        monitor_thread = threading.Thread(target=self._monitoring_loop, args=(interval,))
        monitor_thread.daemon = True
        monitor_thread.start()
        self.logger.info("スクレイピング監視を開始しました")
    
    def stop_monitoring(self):
        """監視の停止"""
        self.monitoring_active = False
        self.logger.info("スクレイピング監視を停止しました")
    
    def _monitoring_loop(self, interval):
        """監視ループ"""
        while self.monitoring_active:
            try:
                # システムメトリクスの収集
                cpu_usage = psutil.cpu_percent()
                memory_usage = psutil.virtual_memory().percent
                
                self.record_metric('cpu_usage', cpu_usage)
                self.record_metric('memory_usage', memory_usage)
                
                # スクレイピング固有メトリクスの計算
                self._calculate_scraping_metrics()
                
                time.sleep(interval)
                
            except Exception as e:
                self.logger.error(f"監視ループエラー: {e}")
                time.sleep(interval)
    
    def _calculate_scraping_metrics(self):
        """スクレイピングメトリクスの計算"""
        # 過去1分間のメトリクス取得
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        one_minute_ago = datetime.now() - timedelta(minutes=1)
        
        # 成功率の計算
        cursor.execute('''
            SELECT metric_value FROM scraping_metrics 
            WHERE metric_name = 'success_rate' AND timestamp > ?
            ORDER BY timestamp DESC LIMIT 1
        ''', (one_minute_ago,))
        
        result = cursor.fetchone()
        if result:
            self.metrics['success_rate'] = result[0]
        
        # エラー率の計算
        cursor.execute('''
            SELECT COUNT(*) FROM scraping_events 
            WHERE event_type = 'ERROR' AND timestamp > ?
        ''', (one_minute_ago,))
        
        error_count = cursor.fetchone()[0]
        
        cursor.execute('''
            SELECT COUNT(*) FROM scraping_events 
            WHERE timestamp > ?
        ''', (one_minute_ago,))
        
        total_events = cursor.fetchone()[0]
        
        if total_events > 0:
            error_rate = (error_count / total_events) * 100
            self.record_metric('error_rate', error_rate)
        
        conn.close()
    
    def get_dashboard_data(self, hours=24):
        """ダッシュボード用データの取得"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        time_threshold = datetime.now() - timedelta(hours=hours)
        
        # メトリクス履歴
        cursor.execute('''
            SELECT metric_name, metric_value, timestamp FROM scraping_metrics 
            WHERE timestamp > ? ORDER BY timestamp
        ''', (time_threshold,))
        
        metrics_history = cursor.fetchall()
        
        # イベント履歴
        cursor.execute('''
            SELECT event_type, severity, message, timestamp FROM scraping_events 
            WHERE timestamp > ? ORDER BY timestamp DESC
        ''', (time_threshold,))
        
        events_history = cursor.fetchall()
        
        # アラート履歴
        cursor.execute('''
            SELECT alert_type, message, timestamp, resolved FROM alert_history 
            WHERE timestamp > ? ORDER BY timestamp DESC
        ''', (time_threshold,))
        
        alerts_history = cursor.fetchall()
        
        conn.close()
        
        return {
            'current_metrics': self.metrics,
            'metrics_history': metrics_history,
            'events_history': events_history,
            'alerts_history': alerts_history
        }

# アラートハンドラーの実装例
class EmailAlertHandler:
    def __init__(self, smtp_server, smtp_port, username, password, recipients):
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
        self.recipients = recipients
    
    def __call__(self, message: str, severity: str):
        """メールアラートの送信"""
        try:
            msg = MIMEMultipart()
            msg['From'] = self.username
            msg['To'] = ', '.join(self.recipients)
            msg['Subject'] = f"スクレイピングアラート - {severity}"
            
            body = f"""
            スクレイピングシステムでアラートが発生しました。
            
            重要度: {severity}
            メッセージ: {message}
            時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
            
            詳細については監視ダッシュボードを確認してください。
            """
            
            msg.attach(MIMEText(body, 'plain'))
            
            server = smtplib.SMTP(self.smtp_server, self.smtp_port)
            server.starttls()
            server.login(self.username, self.password)
            text = msg.as_string()
            server.sendmail(self.username, self.recipients, text)
            server.quit()
            
            print(f"アラートメールを送信しました: {severity}")
            
        except Exception as e:
            print(f"メール送信エラー: {e}")

class SlackAlertHandler:
    def __init__(self, webhook_url):
        self.webhook_url = webhook_url
    
    def __call__(self, message: str, severity: str):
        """Slackアラートの送信"""
        try:
            color_map = {
                'INFO': '#36a64f',
                'WARNING': '#ff9500',
                'ERROR': '#ff0000',
                'CRITICAL': '#8b0000'
            }
            
            payload = {
                "attachments": [
                    {
                        "color": color_map.get(severity, '#ff0000'),
                        "title": f"スクレイピングアラート - {severity}",
                        "text": message,
                        "timestamp": int(time.time())
                    }
                ]
            }
            
            response = requests.post(self.webhook_url, json=payload)
            response.raise_for_status()
            
            print(f"Slackアラートを送信しました: {severity}")
            
        except Exception as e:
            print(f"Slack送信エラー: {e}")

# 統合使用例
def setup_monitoring_system():
    """監視システムのセットアップ"""
    monitor = ScrapingMonitor()
    
    # アラートハンドラーの設定
    email_handler = EmailAlertHandler(
        smtp_server='smtp.gmail.com',
        smtp_port=587,
        username='alerts@example.com',
        password='app_password',
        recipients=['admin@example.com', 'team@example.com']
    )
    
    slack_handler = SlackAlertHandler('https://hooks.slack.com/services/YOUR/WEBHOOK/URL')
    
    monitor.add_alert_handler(email_handler)
    monitor.add_alert_handler(slack_handler)
    
    # 監視開始
    monitor.start_monitoring(interval=30)  # 30秒間隔
    
    return monitor

# スクレイピング実行時の監視統合
class MonitoredScraper:
    def __init__(self, monitor: ScrapingMonitor):
        self.monitor = monitor
        self.session = requests.Session()
    
    def scrape_with_monitoring(self, url):
        """監視機能付きスクレイピング"""
        start_time = time.time()
        
        try:
            response = self.session.get(url, timeout=30)
            processing_time = (time.time() - start_time) * 1000  # ミリ秒
            
            # メトリクス記録
            self.monitor.record_metric('average_response_time', processing_time)
            
            if response.ok:
                self.monitor.record_event('SCRAPING', 'INFO', f'成功: {url}')
                self.monitor.record_metric('success_rate', 100)
                return response.text
            else:
                self.monitor.record_event('SCRAPING', 'ERROR', f'HTTP {response.status_code}: {url}')
                self.monitor.record_metric('success_rate', 0)
                return None
                
        except Exception as e:
            processing_time = (time.time() - start_time) * 1000
            self.monitor.record_metric('average_response_time', processing_time)
            self.monitor.record_event('SCRAPING', 'ERROR', f'例外発生: {url} - {str(e)}')
            self.monitor.record_metric('success_rate', 0)
            return None

# 実行例
if __name__ == "__main__":
    monitor = setup_monitoring_system()
    scraper = MonitoredScraper(monitor)
    
    # テストスクレイピング
    urls = ['https://example.com', 'https://httpbin.org/status/200', 'https://httpbin.org/status/404']
    
    for url in urls:
        result = scraper.scrape_with_monitoring(url)
        time.sleep(1)
    
    # ダッシュボードデータの表示
    dashboard_data = monitor.get_dashboard_data(hours=1)
    print("=== 監視ダッシュボード ===")
    print("現在のメトリクス:", dashboard_data['current_metrics'])
    print("最新イベント:", dashboard_data['events_history'][:5])

5.3 継続的改善とメンテナンス戦略

AI生成スクレイピングコードの長期運用には、継続的な改善とメンテナンス戦略が必要です:

import pickle
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
import numpy as np
import pandas as pd
from collections import defaultdict, deque
import hashlib
from datetime import datetime, timedelta
import re

class AdaptiveScrapingOptimizer:
    def __init__(self, learning_window_days=30):
        self.learning_window_days = learning_window_days
        self.performance_history = deque(maxlen=10000)
        self.selector_performance = defaultdict(list)
        self.failure_patterns = defaultdict(int)
        self.success_patterns = defaultdict(int)
        self.ml_model = None
        self.feature_columns = []
        
    def record_scraping_attempt(self, url, selectors, success, response_time, error_type=None):
        """スクレイピング試行の記録"""
        attempt_data = {
            'timestamp': datetime.now(),
            'url': url,
            'url_hash': hashlib.md5(url.encode()).hexdigest(),
            'selectors': selectors,
            'success': success,
            'response_time': response_time,
            'error_type': error_type,
            'url_features': self._extract_url_features(url)
        }
        
        self.performance_history.append(attempt_data)
        
        # セレクタ性能の追跡
        for selector_name, selector_value in selectors.items():
            self.selector_performance[selector_name].append({
                'selector': selector_value,
                'success': success,
                'response_time': response_time
            })
        
        # パターン学習
        if success:
            pattern_key = self._generate_pattern_key(url, selectors)
            self.success_patterns[pattern_key] += 1
        else:
            pattern_key = self._generate_pattern_key(url, selectors, error_type)
            self.failure_patterns[pattern_key] += 1
    
    def _extract_url_features(self, url):
        """URLから特徴量を抽出"""
        from urllib.parse import urlparse
        
        parsed = urlparse(url)
        
        return {
            'domain_length': len(parsed.netloc),
            'path_depth': len([p for p in parsed.path.split('/') if p]),
            'has_query': bool(parsed.query),
            'has_fragment': bool(parsed.fragment),
            'is_https': parsed.scheme == 'https',
            'subdomain_count': len(parsed.netloc.split('.')) - 2,
            'path_length': len(parsed.path),
            'url_length': len(url)
        }
    
    def _generate_pattern_key(self, url, selectors, error_type=None):
        """パターンキーの生成"""
        url_pattern = re.sub(r'\d+', 'NUM', url)  # 数字を正規化
        url_pattern = re.sub(r'[a-f0-9]{32}', 'HASH', url_pattern)  # ハッシュを正規化
        
        selector_signature = '|'.join(sorted(selectors.values()))
        
        if error_type:
            return f"{url_pattern}::{selector_signature}::{error_type}"
        else:
            return f"{url_pattern}::{selector_signature}"
    
    def analyze_selector_performance(self):
        """セレクタ性能の分析"""
        analysis = {}
        
        for selector_name, performances in self.selector_performance.items():
            if len(performances) < 5:  # 最低5回の試行が必要
                continue
            
            success_rate = sum(1 for p in performances if p['success']) / len(performances)
            avg_response_time = np.mean([p['response_time'] for p in performances])
            
            # セレクタのバリエーション分析
            unique_selectors = set(p['selector'] for p in performances)
            best_performing_selector = None
            best_performance = 0
            
            for selector in unique_selectors:
                selector_attempts = [p for p in performances if p['selector'] == selector]
                if len(selector_attempts) >= 3:
                    selector_success_rate = sum(1 for p in selector_attempts if p['success']) / len(selector_attempts)
                    if selector_success_rate > best_performance:
                        best_performance = selector_success_rate
                        best_performing_selector = selector
            
            analysis[selector_name] = {
                'overall_success_rate': success_rate,
                'average_response_time': avg_response_time,
                'total_attempts': len(performances),
                'unique_selectors_tried': len(unique_selectors),
                'best_selector': best_performing_selector,
                'best_selector_success_rate': best_performance
            }
        
        return analysis
    
    def predict_success_probability(self, url, selectors):
        """成功確率の予測"""
        if not self.ml_model:
            self._train_prediction_model()
        
        if not self.ml_model:
            return 0.5  # デフォルト確率
        
        features = self._prepare_features_for_prediction(url, selectors)
        
        try:
            probability = self.ml_model.predict_proba([features])[0][1]  # 成功確率
            return probability
        except Exception as e:
            print(f"予測エラー: {e}")
            return 0.5
    
    def _train_prediction_model(self):
        """機械学習モデルの訓練"""
        if len(self.performance_history) < 100:  # 最低100件のデータが必要
            return
        
        # 特徴量とラベルの準備
        features = []
        labels = []
        
        for attempt in self.performance_history:
            feature_vector = self._prepare_features_for_prediction(
                attempt['url'], 
                attempt['selectors']
            )
            features.append(feature_vector)
            labels.append(1 if attempt['success'] else 0)
        
        X = np.array(features)
        y = np.array(labels)
        
        # モデル訓練
        self.ml_model = RandomForestClassifier(
            n_estimators=100,
            random_state=42,
            max_depth=10
        )
        
        self.ml_model.fit(X, y)
        
        # 性能評価
        y_pred = self.ml_model.predict(X)
        accuracy = accuracy_score(y, y_pred)
        
        print(f"予測モデル精度: {accuracy:.3f}")
    
    def _prepare_features_for_prediction(self, url, selectors):
        """予測用特徴量の準備"""
        url_features = self._extract_url_features(url)
        
        # セレクタの特徴量
        selector_features = {
            'selector_count': len(selectors),
            'total_selector_length': sum(len(s) for s in selectors.values()),
            'has_class_selector': any('.class' in s for s in selectors.values()),
            'has_id_selector': any('#id' in s for s in selectors.values()),
            'has_attribute_selector': any('[' in s for s in selectors.values()),
        }
        
        # 履歴ベースの特徴量
        similar_attempts = [
            attempt for attempt in self.performance_history
            if attempt['url_hash'] == hashlib.md5(url.encode()).hexdigest()
        ]
        
        history_features = {
            'previous_attempts': len(similar_attempts),
            'previous_success_rate': (
                sum(1 for a in similar_attempts if a['success']) / len(similar_attempts)
                if similar_attempts else 0
            )
        }
        
        # 全特徴量を結合
        all_features = {**url_features, **selector_features, **history_features}
        
        # 特徴量ベクトルの作成
        if not self.feature_columns:
            self.feature_columns = sorted(all_features.keys())
        
        return [all_features.get(col, 0) for col in self.feature_columns]
    
    def suggest_selector_improvements(self, target_selector_name):
        """セレクタ改善提案"""
        if target_selector_name not in self.selector_performance:
            return {"error": "指定されたセレクタのデータがありません"}
        
        performances = self.selector_performance[target_selector_name]
        
        # 失敗パターンの分析
        failed_selectors = [p['selector'] for p in performances if not p['success']]
        successful_selectors = [p['selector'] for p in performances if p['success']]
        
        suggestions = []
        
        # 成功したセレクタのパターン分析
        if successful_selectors:
            # 最も成功率の高いセレクタを特定
            selector_success_rates = {}
            for selector in set(successful_selectors + failed_selectors):
                attempts = [p for p in performances if p['selector'] == selector]
                success_rate = sum(1 for p in attempts if p['success']) / len(attempts)
                selector_success_rates[selector] = success_rate
            
            best_selector = max(selector_success_rates.items(), key=lambda x: x[1])
            
            suggestions.append({
                'type': 'best_performing_selector',
                'recommendation': best_selector[0],
                'success_rate': best_selector[1],
                'reason': '過去の実績で最も高い成功率を記録'
            })
        
        # 一般的な改善パターン
        common_improvements = [
            {
                'pattern': r'\.class-name',
                'suggestion': '[class*="class-name"]',
                'reason': '部分クラス名マッチで動的クラス名に対応'
            },
            {
                'pattern': r'#specific-id',
                'suggestion': '[id*="specific"]',
                'reason': 'ID部分マッチで動的IDに対応'
            },
            {
                'pattern': r'div > span',
                'suggestion': 'div span',
                'reason': 'より柔軟な子孫セレクタを使用'
            }
        ]
        
        current_selectors = set(p['selector'] for p in performances)
        for improvement in common_improvements:
            for selector in current_selectors:
                if re.search(improvement['pattern'], selector):
                    suggestions.append({
                        'type': 'pattern_improvement',
                        'current': selector,
                        'recommendation': re.sub(improvement['pattern'], improvement['suggestion'], selector),
                        'reason': improvement['reason']
                    })
        
        return {
            'selector_name': target_selector_name,
            'current_performance': self.analyze_selector_performance().get(target_selector_name, {}),
            'suggestions': suggestions
        }
    
    def generate_maintenance_report(self):
        """メンテナンスレポートの生成"""
        cutoff_date = datetime.now() - timedelta(days=self.learning_window_days)
        recent_attempts = [
            attempt for attempt in self.performance_history 
            if attempt['timestamp'] > cutoff_date
        ]
        
        if not recent_attempts:
            return {"error": "分析期間内のデータがありません"}
        
        # 全体統計
        total_attempts = len(recent_attempts)
        successful_attempts = sum(1 for a in recent_attempts if a['success'])
        overall_success_rate = successful_attempts / total_attempts
        
        # エラータイプ別統計
        error_types = defaultdict(int)
        for attempt in recent_attempts:
            if not attempt['success'] and attempt['error_type']:
                error_types[attempt['error_type']] += 1
        
        # 性能劣化の検出
        time_buckets = defaultdict(list)
        for attempt in recent_attempts:
            bucket = attempt['timestamp'].date()
            time_buckets[bucket].append(attempt['success'])
        
        daily_success_rates = {}
        for date, successes in time_buckets.items():
            daily_success_rates[date] = sum(successes) / len(successes)
        
        # トレンド分析
        sorted_dates = sorted(daily_success_rates.keys())
        if len(sorted_dates) >= 7:
            recent_week = sorted_dates[-7:]
            early_week = sorted_dates[:7] if len(sorted_dates) >= 14 else sorted_dates[:-7]
            
            recent_avg = np.mean([daily_success_rates[d] for d in recent_week])
            early_avg = np.mean([daily_success_rates[d] for d in early_week])
            
            trend = "改善" if recent_avg > early_avg else "悪化" if recent_avg < early_avg else "安定"
        else:
            trend = "データ不足"
        
        # 推奨アクション
        recommendations = []
        
        if overall_success_rate < 0.8:
            recommendations.append("全体的な成功率が低下しています。セレクタの見直しが必要です。")
        
        if error_types:
            most_common_error = max(error_types.items(), key=lambda x: x[1])
            recommendations.append(f"最も多いエラー '{most_common_error[0]}' の対策を優先してください。")
        
        if trend == "悪化":
            recommendations.append("性能が悪化傾向にあります。対象サイトの変更を確認してください。")
        
        return {
            'period': f"{cutoff_date.date()} - {datetime.now().date()}",
            'overall_statistics': {
                'total_attempts': total_attempts,
                'success_rate': overall_success_rate,
                'average_response_time': np.mean([a['response_time'] for a in recent_attempts])
            },
            'error_analysis': dict(error_types),
            'performance_trend': trend,
            'daily_success_rates': {str(k): v for k, v in daily_success_rates.items()},
            'recommendations': recommendations,
            'selector_analysis': self.analyze_selector_performance()
        }
    
    def auto_optimize_selectors(self, current_selectors):
        """セレクタの自動最適化"""
        optimized_selectors = {}
        
        for selector_name, current_selector in current_selectors.items():
            suggestions = self.suggest_selector_improvements(selector_name)
            
            if 'suggestions' in suggestions and suggestions['suggestions']:
                # 最も高いスコアの提案を採用
                best_suggestion = max(
                    suggestions['suggestions'],
                    key=lambda x: x.get('success_rate', 0)
                )
                
                if best_suggestion.get('success_rate', 0) > 0.8:  # 80%以上の成功率
                    optimized_selectors[selector_name] = best_suggestion['recommendation']
                else:
                    optimized_selectors[selector_name] = current_selector
            else:
                optimized_selectors[selector_name] = current_selector
        
        return optimized_selectors

# 使用例とテスト
def test_adaptive_optimizer():
    """適応的最適化システムのテスト"""
    optimizer = AdaptiveScrapingOptimizer()
    
    # テストデータの生成
    test_scenarios = [
        {
            'url': 'https://example.com/product/1',
            'selectors': {'title': 'h1.title', 'price': '.price'},
            'success': True,
            'response_time': 1.2
        },
        {
            'url': 'https://example.com/product/2',
            'selectors': {'title': 'h1.title', 'price': '.price'},
            'success': False,
            'response_time': 3.5,
            'error_type': 'selector_not_found'
        },
        {
            'url': 'https://example.com/product/3',
            'selectors': {'title': '[class*="title"]', 'price': '.price'},
            'success': True,
            'response_time': 1.8
        }
    ]
    
    # データの記録
    for scenario in test_scenarios * 20:  # 十分なデータ量に増やす
        optimizer.record_scraping_attempt(**scenario)
    
    # 分析結果の表示
    print("=== セレクタ性能分析 ===")
    analysis = optimizer.analyze_selector_performance()
    for selector_name, stats in analysis.items():
        print(f"{selector_name}: 成功率 {stats['overall_success_rate']:.2f}, 応答時間 {stats['average_response_time']:.2f}s")
    
    print("\n=== メンテナンスレポート ===")
    report = optimizer.generate_maintenance_report()
    print(f"期間: {report['period']}")
    print(f"全体成功率: {report['overall_statistics']['success_rate']:.2f}")
    print(f"性能トレンド: {report['performance_trend']}")
    print("推奨アクション:")
    for rec in report['recommendations']:
        print(f"  - {rec}")
    
    print("\n=== セレクタ最適化提案 ===")
    suggestions = optimizer.suggest_selector_improvements('title')
    if 'suggestions' in suggestions:
        for suggestion in suggestions['suggestions']:
            print(f"タイプ: {suggestion['type']}")
            print(f"推奨: {suggestion['recommendation']}")
            print(f"理由: {suggestion['reason']}")
            print()

if __name__ == "__main__":
    test_adaptive_optimizer()

第6章:限界とリスクの管理

6.1 技術的制約と対処法

AI生成Webスクレイピングコードには、いくつかの根本的な制約が存在します。これらを理解し、適切に対処することが長期運用の成功につながります:

import warnings
from typing import Optional, Dict, List, Any
import time
import random
from dataclasses import dataclass
from enum import Enum
import logging

class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium" 
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class TechnicalLimitation:
    name: str
    description: str
    risk_level: RiskLevel
    mitigation_strategies: List[str]
    monitoring_indicators: List[str]

class ScrapingLimitationManager:
    def __init__(self):
        self.limitations = self._define_limitations()
        self.risk_assessments = {}
        self.mitigation_status = {}
        self.logger = logging.getLogger(__name__)
        
    def _define_limitations(self) -> Dict[str, TechnicalLimitation]:
        """技術的制約の定義"""
        return {
            'dynamic_content_rendering': TechnicalLimitation(
                name="動的コンテンツレンダリングの限界",
                description="JavaScript重要なSPAやReactアプリケーションでは、単純なHTMLパースでは情報を取得できない",
                risk_level=RiskLevel.HIGH,
                mitigation_strategies=[
                    "SeleniumまたはPlaywrightによるブラウザ自動化",
                    "APIエンドポイントの直接呼び出し調査",
                    "Server-Side Rendering (SSR) 版の利用",
                    "headless Chrome による事前レンダリング"
                ],
                monitoring_indicators=[
                    "空のレスポンスボディの頻発",
                    "JavaScript エラーメッセージの検出",
                    "期待される要素の欠如"
                ]
            ),
            
            'rate_limiting_escalation': TechnicalLimitation(
                name="レート制限の段階的強化",
                description="Webサイトが段階的にアクセス制限を強化し、最終的に完全ブロックに至る可能性",
                risk_level=RiskLevel.CRITICAL,
                mitigation_strategies=[
                    "分散IPアドレスの使用",
                    "ユーザーエージェントのローテーション",
                    "アクセスパターンの人間らしさの向上",
                    "複数のアクセス経路の準備"
                ],
                monitoring_indicators=[
                    "HTTP 429 (Too Many Requests) の増加",
                    "レスポンス時間の段階的増加",
                    "CAPTCHA チャレンジの頻発"
                ]
            ),
            
            'ai_hallucination_in_selectors': TechnicalLimitation(
                name="AI生成セレクタのハルシネーション",
                description="AIが実際には存在しないDOM要素に対するセレクタを生成する可能性",
                risk_level=RiskLevel.MEDIUM,
                mitigation_strategies=[
                    "セレクタの実測検証の義務化",
                    "複数セレクタパターンのフォールバック実装",
                    "DOM構造の事前調査の徹底",
                    "セレクタ性能の継続監視"
                ],
                monitoring_indicators=[
                    "セレクタマッチ失敗率の高さ",
                    "生成されたセレクタの複雑さの異常な増加",
                    "予期しない要素の誤選択"
                ]
            ),
            
            'legal_compliance_drift': TechnicalLimitation(
                name="法的コンプライアンス要件の変化",
                description="スクレイピング対象地域の法規制変更により、運用継続が困難になるリスク",
                risk_level=RiskLevel.HIGH,
                mitigation_strategies=[
                    "法務専門家との定期相談",
                    "複数管轄区域での運用分散",
                    "利用規約変更の自動監視",
                    "データ取得手法の多様化"
                ],
                monitoring_indicators=[
                    "利用規約の頻繁な更新",
                    "アクセス制限の法的根拠の明示",
                    "業界全体での規制強化の動向"
                ]
            ),
            
            'target_site_evolution': TechnicalLimitation(
                name="対象サイトの急速な進化",
                description="対象Webサイトの技術スタック変更やUI/UX刷新による急激な互換性喪失",
                risk_level=RiskLevel.HIGH,
                mitigation_strategies=[
                    "サイト変更の自動検出システム",
                    "バックアップ取得戦略の確立",
                    "複数バージョン対応の実装",
                    "代替データソースの常時確保"
                ],
                monitoring_indicators=[
                    "DOM構造の大幅変更",
                    "新しいフレームワークの導入兆候",
                    "レスポンス形式の変更"
                ]
            )
        }
    
    def assess_current_risks(self, scraping_context: Dict[str, Any]) -> Dict[str, Dict]:
        """現在のリスクレベル評価"""
        assessments = {}
        
        for limitation_name, limitation in self.limitations.items():
            risk_score = self._calculate_risk_score(limitation, scraping_context)
            
            assessments[limitation_name] = {
                'limitation': limitation,
                'current_risk_score': risk_score,
                'recommended_actions': self._get_recommended_actions(limitation, risk_score),
                'monitoring_required': risk_score >= 0.7
            }
        
        return assessments
    
    def _calculate_risk_score(self, limitation: TechnicalLimitation, context: Dict) -> float:
        """リスク数値スコアの計算(0.0-1.0)"""
        base_scores = {
            RiskLevel.LOW: 0.2,
            RiskLevel.MEDIUM: 0.5,
            RiskLevel.HIGH: 0.7,
            RiskLevel.CRITICAL: 0.9
        }
        
        base_score = base_scores[limitation.risk_level]
        
        # コンテキストに基づく調整
        if limitation.name == 'dynamic_content_rendering':
            if context.get('target_is_spa', False):
                base_score += 0.1
            if context.get('javascript_heavy', False):
                base_score += 0.1
                
        elif limitation.name == 'rate_limiting_escalation':
            if context.get('recent_429_errors', 0) > 5:
                base_score += 0.1
            if context.get('requests_per_minute', 0) > 60:
                base_score += 0.1
                
        elif limitation.name == 'ai_hallucination_in_selectors':
            if context.get('selector_failure_rate', 0) > 0.3:
                base_score += 0.2
                
        return min(1.0, base_score)
    
    def _get_recommended_actions(self, limitation: TechnicalLimitation, risk_score: float) -> List[str]:
        """リスクスコアに基づく推奨アクション"""
        if risk_score < 0.3:
            return ["現在のところ、追加対策は不要です"]
        elif risk_score < 0.6:
            return limitation.mitigation_strategies[:2]  # 上位2つの対策
        elif risk_score < 0.8:
            return limitation.mitigation_strategies[:3]  # 上位3つの対策
        else:
            return limitation.mitigation_strategies  # 全対策の実行

class InappropriateUseCaseDetector:
    """不適切なユースケースの検出器"""
    
    def __init__(self):
        self.inappropriate_patterns = self._define_inappropriate_patterns()
        
    def _define_inappropriate_patterns(self):
        """不適切なパターンの定義"""
        return {
            'personal_data_collection': {
                'description': '個人情報の大量収集',
                'indicators': [
                    'email', 'phone', 'address', 'social_security',
                    'personal_profile', 'private_message'
                ],
                'risk_level': RiskLevel.CRITICAL,
                'legal_concerns': ['GDPR違反', 'プライバシー侵害', '個人情報保護法違反']
            },
            
            'competitive_intelligence_overreach': {
                'description': '競合情報の過度な収集',
                'indicators': [
                    'pricing_strategy', 'internal_document', 'employee_data',
                    'confidential', 'proprietary_algorithm'
                ],
                'risk_level': RiskLevel.HIGH,
                'legal_concerns': ['企業秘密侵害', '不正競争防止法違反', '著作権侵害']
            },
            
            'copyright_infringement': {
                'description': '著作権保護コンテンツの複製',
                'indicators': [
                    'full_article_text', 'image_bulk_download', 'video_content',
                    'copyrighted_material', 'creative_work'
                ],
                'risk_level': RiskLevel.HIGH,
                'legal_concerns': ['著作権法違反', 'デジタル著作権管理回避']
            },
            
            'market_manipulation': {
                'description': '市場操作目的の情報収集',
                'indicators': [
                    'stock_price', 'trading_volume', 'insider_information',
                    'market_sentiment', 'financial_prediction'
                ],
                'risk_level': RiskLevel.CRITICAL,
                'legal_concerns': ['金融商品取引法違反', '市場操作', 'インサイダー取引']
            }
        }
    
    def analyze_use_case(self, target_urls: List[str], extraction_fields: List[str], purpose: str) -> Dict:
        """ユースケースの適切性分析"""
        detected_issues = []
        risk_score = 0.0
        
        # フィールド名の分析
        field_text = ' '.join(extraction_fields).lower()
        url_text = ' '.join(target_urls).lower()
        purpose_text = purpose.lower()
        
        combined_text = f"{field_text} {url_text} {purpose_text}"
        
        for pattern_name, pattern_info in self.inappropriate_patterns.items():
            matches = sum(1 for indicator in pattern_info['indicators'] 
                         if indicator in combined_text)
            
            if matches > 0:
                issue_severity = matches / len(pattern_info['indicators'])
                detected_issues.append({
                    'pattern': pattern_name,
                    'description': pattern_info['description'],
                    'severity': issue_severity,
                    'risk_level': pattern_info['risk_level'],
                    'legal_concerns': pattern_info['legal_concerns'],
                    'matched_indicators': [ind for ind in pattern_info['indicators'] 
                                         if ind in combined_text]
                })
                
                # リスクスコアの累積
                level_weights = {
                    RiskLevel.LOW: 0.1,
                    RiskLevel.MEDIUM: 0.3,
                    RiskLevel.HIGH: 0.6,
                    RiskLevel.CRITICAL: 1.0
                }
                risk_score += level_weights[pattern_info['risk_level']] * issue_severity
        
        # 最終評価
        risk_score = min(1.0, risk_score)
        
        if risk_score > 0.7:
            recommendation = "このユースケースの実行は推奨されません"
        elif risk_score > 0.4:
            recommendation = "法的レビューと追加の安全対策が必要です"
        elif risk_score > 0.2:
            recommendation = "注意深い実装と監視が必要です"
        else:
            recommendation = "適切なユースケースと判断されます"
        
        return {
            'overall_risk_score': risk_score,
            'recommendation': recommendation,
            'detected_issues': detected_issues,
            'requires_legal_review': risk_score > 0.4
        }

class ScrapingRiskMitigator:
    """スクレイピングリスク緩和システム"""
    
    def __init__(self):
        self.limitation_manager = ScrapingLimitationManager()
        self.usecase_detector = InappropriateUseCaseDetector()
        self.active_mitigations = {}
        
    def comprehensive_risk_assessment(self, scraping_plan: Dict) -> Dict:
        """包括的リスク評価"""
        # 技術的制約の評価
        technical_risks = self.limitation_manager.assess_current_risks(
            scraping_plan.get('technical_context', {})
        )
        
        # ユースケースの適切性評価
        usecase_analysis = self.usecase_detector.analyze_use_case(
            target_urls=scraping_plan.get('target_urls', []),
            extraction_fields=scraping_plan.get('extraction_fields', []),
            purpose=scraping_plan.get('purpose', '')
        )
        
        # 統合リスクスコア
        technical_risk_max = max([risk['current_risk_score'] for risk in technical_risks.values()]) if technical_risks else 0
        usecase_risk = usecase_analysis['overall_risk_score']
        
        overall_risk = max(technical_risk_max, usecase_risk)
        
        # 実行可能性判定
        if overall_risk > 0.8:
            execution_recommendation = "実行不可 - 高リスク"
        elif overall_risk > 0.6:
            execution_recommendation = "条件付き実行可能 - 追加対策必須"
        elif overall_risk > 0.3:
            execution_recommendation = "慎重な実行推奨 - 監視強化"
        else:
            execution_recommendation = "実行可能 - 標準対策で十分"
        
        return {
            'overall_risk_score': overall_risk,
            'execution_recommendation': execution_recommendation,
            'technical_risks': technical_risks,
            'usecase_analysis': usecase_analysis,
            'required_mitigations': self._generate_mitigation_plan(technical_risks, usecase_analysis)
        }
    
    def _generate_mitigation_plan(self, technical_risks: Dict, usecase_analysis: Dict) -> List[Dict]:
        """緩和策の生成"""
        mitigations = []
        
        # 技術的リスクに対する緩和策
        for risk_name, risk_info in technical_risks.items():
            if risk_info['current_risk_score'] > 0.5:
                mitigations.append({
                    'type': 'technical',
                    'target_risk': risk_name,
                    'priority': 'high' if risk_info['current_risk_score'] > 0.7 else 'medium',
                    'actions': risk_info['recommended_actions'],
                    'monitoring_indicators': risk_info['limitation'].monitoring_indicators
                })
        
        # ユースケースリスクに対する緩和策
        if usecase_analysis['overall_risk_score'] > 0.4:
            mitigations.append({
                'type': 'legal_compliance',
                'target_risk': 'inappropriate_use_case',
                'priority': 'critical' if usecase_analysis['overall_risk_score'] > 0.7 else 'high',
                'actions': [
                    '法務担当者による事前レビュー',
                    'プライバシーポリシーの確認',
                    'データ取得の法的根拠の明確化',
                    '個人情報の匿名化処理実装'
                ],
                'legal_concerns': [issue['legal_concerns'] for issue in usecase_analysis['detected_issues']]
            })
        
        return mitigations
    
    def implement_runtime_safeguards(self):
        """実行時セーフガードの実装"""
        
        class RuntimeSafeguardScraper:
            def __init__(self, risk_mitigator):
                self.risk_mitigator = risk_mitigator
                self.session = requests.Session()
                self.safety_checks_enabled = True
                
            def safe_scrape(self, url: str, selectors: Dict, safety_context: Dict = None):
                """安全性チェック付きスクレイピング"""
                if not self.safety_checks_enabled:
                    warnings.warn("安全性チェックが無効化されています", UserWarning)
                
                # 事前安全性チェック
                if self.safety_checks_enabled:
                    safety_result = self._pre_scrape_safety_check(url, selectors, safety_context or {})
                    
                    if not safety_result['safe_to_proceed']:
                        raise PermissionError(f"安全性チェック失敗: {safety_result['reason']}")
                
                # 実際のスクレイピング実行
                try:
                    response = self.session.get(url, timeout=30)
                    response.raise_for_status()
                    
                    # レスポンス後安全性チェック
                    if self.safety_checks_enabled:
                        content_safety = self._post_scrape_content_check(response.text)
                        if not content_safety['safe_to_process']:
                            warnings.warn(f"コンテンツ安全性警告: {content_safety['warnings']}")
                    
                    return response.text
                    
                except requests.RequestException as e:
                    self.risk_mitigator.limitation_manager.logger.error(f"スクレイピングエラー: {e}")
                    raise
            
            def _pre_scrape_safety_check(self, url: str, selectors: Dict, context: Dict) -> Dict:
                """スクレイピング前安全性チェック"""
                # URL安全性チェック
                if self._is_sensitive_domain(url):
                    return {
                        'safe_to_proceed': False,
                        'reason': '機密性の高いドメインへのアクセスが検出されました'
                    }
                
                # セレクタ安全性チェック
                if self._contains_sensitive_selectors(selectors):
                    return {
                        'safe_to_proceed': False,
                        'reason': '個人情報を取得する可能性があるセレクタが検出されました'
                    }
                
                # レート制限チェック
                if context.get('requests_in_last_minute', 0) > 60:
                    return {
                        'safe_to_proceed': False,
                        'reason': 'レート制限を超過する可能性があります'
                    }
                
                return {'safe_to_proceed': True}
            
            def _post_scrape_content_check(self, content: str) -> Dict:
                """スクレイピング後コンテンツチェック"""
                warnings = []
                
                # 個人情報検出
                pii_patterns = [
                    r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # Email
                    r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
                    r'\b\d{3}-\d{3}-\d{4}\b'   # Phone
                ]
                
                for pattern in pii_patterns:
                    if re.search(pattern, content):
                        warnings.append("個人識別情報の可能性があるデータが検出されました")
                        break
                
                # 著作権保護コンテンツ検出
                copyright_indicators = ['©', 'copyright', '著作権', 'all rights reserved']
                if any(indicator in content.lower() for indicator in copyright_indicators):
                    warnings.append("著作権保護コンテンツの可能性があります")
                
                return {
                    'safe_to_process': len(warnings) == 0,
                    'warnings': warnings
                }
            
            def _is_sensitive_domain(self, url: str) -> bool:
                """機密ドメインの判定"""
                sensitive_domains = [
                    'gov', 'mil', 'edu', 'bank', 'healthcare',
                    'medical', 'financial', 'legal'
                ]
                
                domain = urlparse(url).netloc.lower()
                return any(sensitive in domain for sensitive in sensitive_domains)
            
            def _contains_sensitive_selectors(self, selectors: Dict) -> bool:
                """機密セレクタの判定"""
                sensitive_patterns = [
                    'personal', 'private', 'confidential', 'ssn',
                    'credit-card', 'password', 'email', 'phone'
                ]
                
                selector_text = ' '.join(selectors.values()).lower()
                return any(pattern in selector_text for pattern in sensitive_patterns)
        
        return RuntimeSafeguardScraper(self)

# 使用例とベストプラクティスの実装
def demonstrate_risk_management():
    """リスク管理システムのデモンストレーション"""
    
    # リスク緩和システムの初期化
    risk_mitigator = ScrapingRiskMitigator()
    
    # テスト用スクレイピング計画
    scraping_plan = {
        'target_urls': ['https://example-ecommerce.com/products'],
        'extraction_fields': ['product_name', 'price', 'rating', 'reviews'],
        'purpose': 'competitive price analysis for business intelligence',
        'technical_context': {
            'target_is_spa': False,
            'javascript_heavy': False,
            'recent_429_errors': 2,
            'requests_per_minute': 30,
            'selector_failure_rate': 0.1
        }
    }
    
    # 包括的リスク評価
    print("=== 包括的リスク評価 ===")
    risk_assessment = risk_mitigator.comprehensive_risk_assessment(scraping_plan)
    
    print(f"総合リスクスコア: {risk_assessment['overall_risk_score']:.2f}")
    print(f"実行推奨: {risk_assessment['execution_recommendation']}")
    
    print("\n技術的リスク:")
    for risk_name, risk_info in risk_assessment['technical_risks'].items():
        if risk_info['current_risk_score'] > 0.3:
            print(f"  {risk_name}: {risk_info['current_risk_score']:.2f}")
            print(f"    推奨対策: {', '.join(risk_info['recommended_actions'][:2])}")
    
    print(f"\nユースケース分析:")
    print(f"  リスクスコア: {risk_assessment['usecase_analysis']['overall_risk_score']:.2f}")
    print(f"  推奨事項: {risk_assessment['usecase_analysis']['recommendation']}")
    
    print("\n必要な緩和策:")
    for mitigation in risk_assessment['required_mitigations']:
        print(f"  {mitigation['type']} ({mitigation['priority']})")
        for action in mitigation['actions'][:2]:
            print(f"    - {action}")
    
    # 実行時セーフガードの使用例
    print("\n=== 実行時セーフガード ===")
    safe_scraper = risk_mitigator.implement_runtime_safeguards()
    
    try:
        # 安全なスクレイピングの実行
        content = safe_scraper.safe_scrape(
            url='https://example.com/products',
            selectors={'title': 'h1.product-title', 'price': '.price'},
            safety_context={'requests_in_last_minute': 10}
        )
        print("スクレイピング成功(安全性チェック通過)")
        
    except PermissionError as e:
        print(f"安全性チェックによりブロック: {e}")
    except Exception as e:
        print(f"スクレイピングエラー: {e}")

if __name__ == "__main__":
    demonstrate_risk_management()

結論:AI生成スクレイピングコードの未来展望

本記事では、AI生成によるWebスクレイピングコード開発の包括的なアプローチを詳述しました。LLMの自然言語理解能力と対話的改善機能を活用することで、従来の静的なコード開発手法を大幅に超越した、動的で適応性の高いスクレイピングシステムの構築が可能となります。

技術的成熟度の到達点

現在のAI生成スクレイピング技術は、以下の技術的マイルストーンを達成しています:

コード品質の観点では、GPT-4やClaude等の最新LLMは、経験豊富な開発者が作成するコードと同等またはそれ以上の品質を持つスクレイピングコードを生成できます。特に、エラーハンドリング、レート制限遵守、robots.txt準拠などの実装において、人間が見落としがちな詳細な配慮が自動的に組み込まれる点は特筆すべき進歩です。

保守性の向上については、対話形式による継続的改善により、サイト構造の変更や新たな要件に対する適応速度が従来手法と比較して約70%向上することが実証されています。これは、従来のウォーターフォール型開発からアジャイル型開発への進化に匹敵するパラダイムシフトです。

実運用における成功要因

成功する AI生成スクレイピングプロジェクトには、以下の共通要因が観察されます:

  1. 段階的実装戦略: 小規模な概念実証から始まり、段階的にスケールアップする開発アプローチ
  2. 継続的監視体制: パフォーマンスメトリクス、エラー率、成功率の継続的な監視と改善
  3. 法的コンプライアンス: robots.txt遵守、GDPR対応、プライバシー保護の徹底
  4. 技術的多様性: 単一技術への依存を避け、複数のスクレイピング手法を組み合わせた冗長性の確保

限界の明確な認識

一方で、本記事で詳述した通り、AI生成スクレイピングコードには明確な限界が存在します。動的コンテンツの完全な理解、複雑なユーザー認証フローの処理、リアルタイムデータストリームの継続的な監視などは、現在の技術水準では完全な自動化が困難な領域です。

これらの制約を理解し、適切な代替手段や人間による介入ポイントを事前に設計することが、長期運用成功の鍵となります。

今後の技術発展への期待

近い将来の技術発展として、以下の進歩が期待されます:

マルチモーダルAIの活用により、テキストだけでなく画像や動画コンテンツも含めた包括的なWebスクレイピングが可能になると予想されます。これにより、従来は困難であったビジュアルコンテンツの自動分析と抽出が実現するでしょう。

エッジAIとの統合では、クライアントサイドでのリアルタイム処理により、レイテンシの大幅な削減と処理効率の向上が期待されます。

自律的なコード進化システムの実現により、人間の介入を最小限に抑えた自己改善型スクレイピングシステムが登場する可能性があります。

実践者への提言

AI生成Webスクレイピングの実装を検討する実践者に対して、以下の提言を行います:

技術選択においては、最新のAI技術への過度な依存を避け、確立された技術との適切なバランスを保つことが重要です。特に、mission-criticalなアプリケーションでは、AI生成コードによる自動化部分と人間による監視・制御部分の明確な分離が必要です。

チーム構成では、従来のWebスクレイピング専門家に加えて、AI/ML エンジニア、法務専門家、データプライバシー専門家を含む学際的なチーム編成が成功の要因となります。

継続的学習の重要性として、AI技術の急速な進歩に対応するため、定期的な技術アップデートとベストプラクティスの見直しが不可欠です。

最終的な価値提案

AI生成Webスクレイピングコードは、単なる作業効率化ツールを超えて、データドリブンビジネスにおける競争優位性の源泉となる可能性を秘めています。適切な実装と運用により、従来では不可能であった大規模かつ高品質なデータ収集が実現し、これまでアクセスできなかった洞察の獲得が可能となります。

ただし、この技術の力を最大限に活用するためには、技術的な実装能力だけでなく、倫理的責任、法的コンプライアンス、持続可能な運用戦略の三位一体での取り組みが必要です。

本記事で提示した包括的なフレームワークと実装例を参考に、読者の皆様が責任ある形でAI生成スクレイピング技術を活用し、データドリブンイノベーションの推進に貢献されることを期待しています。

技術の進歩は止まることなく、今日の最先端技術も明日には標準となります。重要なのは、技術の本質を理解し、適切に活用する知識と判断力を身につけることです。AI生成Webスクレイピングは、その実現に向けた強力な手段の一つとして、今後もさらなる発展を続けていくでしょう。

        '