# **Part 3: Enterprise Web Scraping – Building Scalable, Compliant, and Future-Proof Data Extraction Systems** **Duration:** ~60 minutes **Hashtags:** #EnterpriseScraping #DataEngineering #ScrapyCluster #MachineLearning #RealTimeData #Compliance #WebScraping #BigData #CloudScraping #DataMonetization --- ## **Table of Contents** 1. [Distributed Scraping Architectures: Beyond Single-Node Processing](#distributed-scraping) 2. [Building Custom Proxy Networks: Infrastructure and Management](#custom-proxy-networks) 3. [Machine Learning for Data Extraction: Pattern Recognition and Adaptive Scraping](#ml-data-extraction) 4. [Real-Time Data Pipelines: Streaming, Processing, and Visualization](#real-time-pipelines) 5. [Enterprise Legal Compliance Frameworks: GDPR, CCPA, and Global Regulations](#legal-compliance) 6. [Monetizing Scraped Data: Business Models and Value Extraction](#data-monetization) 7. [Future-Proofing Against Anti-Scraping Technology: Strategic Defense](#future-proofing) 8. [Case Study: Building an Enterprise Scraping Platform from Scratch](#enterprise-case-study) 9. [Team Organization and Workflow for Professional Scraping Operations](#team-organization) 10. [Cost Analysis and ROI Calculation for Scraping Projects](#cost-analysis) 11. [Quiz: Enterprise Scraping Mastery](#enterprise-quiz) 12. [Conclusion and What's Next](#part3-conclusion) --- ## **1. Distributed Scraping Architectures: Beyond Single-Node Processing** 🌐 Enterprise-scale scraping requires moving beyond single-machine solutions. Let's explore architectures that handle millions of pages daily while maintaining reliability and compliance. ### **The Scaling Challenge** When scraping at enterprise scale, you'll face: - **IP blocking** at unprecedented rates - **Data volume** that exceeds local storage - **Processing bottlenecks** from complex extraction - **Monitoring complexity** across multiple data sources - **Compliance requirements** that vary by jurisdiction ### **Architecture Patterns for Distributed Scraping** #### **Pattern 1: Master-Worker Architecture** ``` [Master Node] → Coordinates tasks ↓ [Worker Nodes] → Execute scraping jobs ↓ [Data Store] → Centralized storage ``` **Implementation with Scrapy Cluster:** ```python # Master node (coordinator) import redis from kafka import KafkaProducer r = redis.Redis(host='redis-master', port=6379) producer = KafkaProducer(bootstrap_servers='kafka:9092') def distribute_urls(urls): for url in urls: # Determine appropriate worker group domain = extract_domain(url) group = get_worker_group(domain) # Send to Redis queue r.rpush(f'queue:{group}', url) # Also send to Kafka for monitoring producer.send('scraping-jobs', key=domain.encode(), value=url.encode()) # Worker node (scrapy cluster worker) import scrapy from scrapy_redis.spiders import RedisSpider class EnterpriseSpider(RedisSpider): name = 'enterprise' redis_key = 'queue:general' def parse(self, response): # Complex parsing logic data = self.extract_data(response) # Send to processing pipeline yield { 'url': response.url, 'data': data, 'timestamp': datetime.utcnow().isoformat() } ``` **Advantages:** - Simple to implement - Good for homogeneous scraping tasks - Easy to scale workers horizontally **Limitations:** - Single point of failure at master node - Network overhead for task distribution - Limited domain-specific optimization #### **Pattern 2: Domain-Specific Clusters** ``` [API Gateway] ↓ [Domain Router] → Routes to appropriate cluster ↓ [Amazon Cluster] → Specialized for Amazon [Ebay Cluster] → Specialized for Ebay [News Cluster] → Specialized for news sites ↓ [Unified Data Store] ``` **Implementation with Kubernetes:** ```yaml # amazon-scraper-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: amazon-scraper spec: replicas: 10 selector: matchLabels: app: amazon-scraper template: metadata: labels: app: amazon-scraper spec: containers: - name: scraper image: enterprise-scraper:latest env: - name: TARGET_DOMAIN value: "amazon.com" - name: PROXY_POOL value: "amazon-proxies" - name: HEADLESS value: "true" resources: limits: memory: "1Gi" cpu: "500m" ``` **Specialization Strategies:** - **Amazon Cluster:** Optimized for product pages, handles CAPTCHAs, respects Amazon's robots.txt - **News Cluster:** Handles RSS feeds, article extraction, handles paywalls - **Social Media Cluster:** Specialized for API reverse engineering, handles OAuth flows #### **Pattern 3: Hybrid Cloud-Edge Architecture** ``` [Edge Nodes] → Global locations, handle initial requests ↓ [Cloud Processing] → Centralized data transformation ↓ [AI Analysis Layer] → Business intelligence ``` **Implementation with AWS:** ```python # Edge node (Lambda@Edge) def lambda_handler(event, context): request = event['Records'][0]['cf']['request'] domain = request['headers']['host'][0]['value'] # Route to appropriate processing if domain in NEWS_SITES: return process_news(request) elif domain in ECOMMERCE_SITES: return process_ecommerce(request) # Default processing return request # Cloud processing (AWS Glue) class DataTransformer(GlueJob): def transform(self, df): # Apply domain-specific transformations df = df.filter(col("domain").isin(VALID_DOMAINS)) # Clean data based on domain rules df = df.withColumn("cleaned_data", when(col("domain") == "amazon.com", clean_amazon(col("raw"))) .when(col("domain") == "ebay.com", clean_ebay(col("raw"))) .otherwise(col("raw"))) return df ``` **Benefits:** - Reduced latency (processing near data source) - Better IP diversity (global edge locations) - Compliance with regional data laws ### **Load Balancing Strategies for Scraping Farms** #### **1. Domain-Based Load Balancing** ```python def get_worker_for_domain(domain): # Hash domain to consistent worker group hash_val = hash(domain) % 100 if hash_val < 30: return "group-a" elif hash_val < 70: return "group-b" else: return "group-c" ``` #### **2. Capacity-Based Load Balancing** ```python def get_available_worker(): # Check worker capacity metrics workers = get_worker_status() # Sort by available capacity workers.sort(key=lambda w: w['available_capacity'], reverse=True) # Return most available worker return workers[0]['id'] ``` #### **3. Compliance-Aware Routing** ```python def route_request(url, user_data): domain = extract_domain(url) # Check GDPR compliance requirements if user_data['region'] == 'EU' and domain in GDPR_RESTRICTED: # Route to EU-compliant cluster return "eu-cluster" # Check CCPA requirements if user_data['region'] == 'CA' and domain in CCPA_RESTRICTED: return "ca-cluster" return "default-cluster" ``` ### **Failure Handling at Scale** #### **Dead Letter Queues for Failed Requests** ```python def process_request(request): try: # Attempt to process result = scrape_page(request['url']) return result except CAPTCHAException: # Route to CAPTCHA-solving service send_to_captcha_queue(request) except ProxyBlockedException: # Mark proxy as bad mark_proxy_unavailable(request['proxy']) # Retry with new proxy request['proxy'] = get_new_proxy() retry_request(request) except Exception as e: # Log detailed error log_error(request, e) # Send to dead letter queue send_to_dlq(request, str(e)) ``` #### **Automated Failure Analysis** ```python def analyze_failures(): # Get recent failures from DLQ failures = get_dlq_messages(last_hour=True) # Group by failure type failure_types = {} for f in failures: f_type = determine_failure_type(f) failure_types[f_type] = failure_types.get(f_type, 0) + 1 # Identify patterns if failure_types.get('CAPTCHA', 0) > 100: trigger_captcha_strategy_update() if failure_types.get('403', 0) > 50: rotate_ip_blocks() if failure_types.get('timeout', 0) > 200: increase_timeout_thresholds() ``` --- ## **2. Building Custom Proxy Networks: Infrastructure and Management** 🌍 Commercial proxy services work for small-scale operations, but enterprises need custom solutions for cost efficiency and control. ### **Why Build Your Own Proxy Network?** | Benefit | Commercial Service | Custom Network | |---------|--------------------|----------------| | **Cost** | $10-15/GB | $0.50-2/GB | | **Control** | Limited | Complete | | **IP Diversity** | Good | Excellent | | **Customization** | Minimal | Full | | **Compliance** | Varies | Guaranteed | | **Scalability** | Bounded | Unlimited | ### **Proxy Network Architecture** ``` [Residential Nodes] → Home computers with opt-in software ↓ [Datacenter Transit] → Aggregation and filtering ↓ [API Gateway] → Authentication and rate limiting ↓ [Scraping Clients] → Your scraping infrastructure ``` ### **Residential Node Implementation** #### **Client Software (Node.js Example)** ```javascript // proxy-node.js const http = require('http'); const socks = require('socks'); const { checkCompliance } = require('./compliance'); // Configuration from central server const CONFIG = { server: 'proxy-manager.example.com', node_id: generateNodeId(), allowed_domains: ['example.com', 'scraping-targets.com'] }; // Register with management server async function register() { const response = await fetch(`${CONFIG.server}/register`, { method: 'POST', body: JSON.stringify({ node_id: CONFIG.node_id, ip: getPublicIp(), location: getGeolocation(), bandwidth: getAvailableBandwidth() }) }); const data = await response.json(); CONFIG.allowed_domains = data.allowed_domains; } // Handle proxy requests const server = http.createServer((req, res) => { // Check compliance if (!checkCompliance(req, CONFIG)) { res.writeHead(403); res.end('Compliance violation'); return; } // Forward request const target = new URL(req.url); socks.createConnection({ proxy: { ipaddress: target.hostname, port: target.port || 80, type: 5 }, destination: { host: target.hostname, port: target.port || 80 } }, (err, socket, info) => { if (err) { res.writeHead(500); res.end('Proxy error'); return; } // Pipe data req.pipe(socket); socket.pipe(res); }); }); server.listen(1080, () => { console.log('Proxy node running on port 1080'); register(); }); ``` #### **Compliance Checking Module** ```javascript // compliance.js const { GDPR_REGIONS, CCPA_REGIONS } = require('./regions'); function checkCompliance(request, config) { // Get client location const clientIp = request.connection.remoteAddress; const clientLocation = geolocateIp(clientIp); // Check robots.txt compliance const targetDomain = new URL(request.url).hostname; if (!config.allowed_domains.includes(targetDomain)) { logViolation('DOMAIN_NOT_ALLOWED', clientIp, targetDomain); return false; } // Check GDPR compliance if (GDPR_REGIONS.includes(clientLocation.country) && isPersonalDataRequest(request)) { if (!hasUserConsent(request)) { logViolation('GDPR_CONSENT_MISSING', clientIp, targetDomain); return false; } } // Check request rate if (exceedsRateLimit(clientIp)) { logViolation('RATE_LIMIT_EXCEEDED', clientIp, targetDomain); return false; } return true; } ``` ### **Datacenter Transit Layer** #### **Aggregation Server (Python Example)** ```python from flask import Flask, request import redis import requests app = Flask(__name__) r = redis.Redis(host='redis', port=6379) @app.route('/proxy', methods=['GET', 'POST']) def proxy(): # Authentication api_key = request.headers.get('X-API-Key') if not validate_api_key(api_key): return {'error': 'Unauthorized'}, 401 # Rate limiting if exceeds_rate_limit(api_key): return {'error': 'Rate limit exceeded'}, 429 # Get target URL target_url = request.args.get('url') if not target_url: return {'error': 'URL required'}, 400 # Select residential node node = select_residential_node(target_url, api_key) if not node: return {'error': 'No available nodes'}, 503 # Forward request try: response = requests.request( method=request.method, url=f"http://{node['ip']}:1080/{target_url}", headers={k: v for k, v in request.headers if k != 'Host'}, data=request.get_data(), timeout=30 ) # Record metrics record_metrics(api_key, node, response) # Return response return (response.content, response.status_code, dict(response.headers)) except Exception as e: mark_node_unavailable(node) return {'error': str(e)}, 500 def select_residential_node(url, api_key): """Smart node selection based on multiple factors""" domain = extract_domain(url) client_region = get_client_region(api_key) # Get available nodes for this domain nodes = r.smembers(f"nodes:{domain}") # Filter by region if required if client_region == 'EU': nodes = [n for n in nodes if n['region'] in EU_REGIONS] # Sort by performance metrics nodes.sort(key=lambda n: ( n['success_rate'], -n['response_time'], n['availability'] ), reverse=True) return nodes[0] if nodes else None ``` ### **Network Management System** #### **Real-Time Monitoring Dashboard** ```python # monitoring.py import dash from dash import dcc, html import plotly.express as px import pandas as pd from flask_sqlalchemy import SQLAlchemy app = dash.Dash(__name__) server = app.server db = SQLAlchemy(server) class ProxyMetrics(db.Model): id = db.Column(db.Integer, primary_key=True) timestamp = db.Column(db.DateTime, default=datetime.utcnow) node_id = db.Column(db.String(50)) domain = db.Column(db.String(100)) success_rate = db.Column(db.Float) response_time = db.Column(db.Float) request_count = db.Column(db.Integer) app.layout = html.Div([ html.H1("Proxy Network Dashboard"), dcc.DatePickerRange( id='date-range', start_date=datetime.today() - timedelta(days=7), end_date=datetime.today() ), dcc.Tabs([ dcc.Tab(label='Network Health', children=[ dcc.Graph(id='health-metrics'), dcc.Interval( id='health-interval', interval=60*1000, # Refresh every minute n_intervals=0 ) ]), dcc.Tab(label='Node Performance', children=[ dcc.Dropdown( id='node-selector', options=[{'label': n, 'value': n} for n in get_all_nodes()], multi=True ), dcc.Graph(id='node-performance') ]), dcc.Tab(label='Compliance Monitoring', children=[ dcc.Graph(id='compliance-chart') ]) ]) ]) @app.callback( Output('health-metrics', 'figure'), Input('health-interval', 'n_intervals'), Input('date-range', 'start_date'), Input('date-range', 'end_date') ) def update_health_metrics(n, start, end): # Query metrics from database metrics = ProxyMetrics.query.filter( ProxyMetrics.timestamp.between(start, end) ).all() df = pd.DataFrame([{ 'timestamp': m.timestamp, 'success_rate': m.success_rate, 'response_time': m.response_time, 'request_count': m.request_count } for m in metrics]) # Create visualization fig = px.line(df, x='timestamp', y=['success_rate', 'response_time']) fig.update_layout(title='Network Health Metrics') return fig ``` ### **Compliance and Ethical Considerations** #### **Residential Node Ethics Policy** When building a residential proxy network, implement: 1. **Explicit Consent Framework** - Clear opt-in with understandable terms - No hidden resource usage - Easy opt-out mechanism 2. **Bandwidth Management** - Limit to 20% of available bandwidth - Throttle during peak usage hours - Respect user's quality of service 3. **Data Privacy Protections** - Never store request content - Encrypt all traffic - Anonymize node identifiers 4. **Geographic Restrictions** - Block sensitive regions by default - Comply with local laws - Implement regional filtering #### **Compliance Dashboard Metrics** Track and report: - % of requests compliant with robots.txt - GDPR/CCPA compliance rate - Personal data handling incidents - User consent rate - Data retention compliance --- ## **3. Machine Learning for Data Extraction: Pattern Recognition and Adaptive Scraping** 🤖 Traditional scraping breaks when sites change. Machine learning creates resilient, adaptive scrapers. ### **The Problem with Static Selectors** ```python # Fragile traditional approach price = soup.select_one('.product-price').text ``` When the site changes from `.product-price` to `.item-cost`, your scraper breaks. ### **ML-Powered Data Extraction Approaches** #### **1. Template Learning (Supervised Approach)** **Concept:** Train a model to recognize patterns in HTML for specific data points. **Implementation with Scrapy + ML:** ```python import scrapy import joblib from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.ensemble import RandomForestClassifier # Load pre-trained model model = joblib.load('price_extractor.pkl') vectorizer = joblib.load('price_vectorizer.pkl') class MLSpider(scrapy.Spider): name = 'ml_spider' def parse(self, response): soup = BeautifulSoup(response.text, 'html.parser') # Extract all text elements elements = [] for tag in soup.find_all(text=True): if tag.parent.name not in ['script', 'style']: elements.append({ 'text': tag.strip(), 'tag': tag.parent.name, 'class': ' '.join(tag.parent.get('class', [])), 'id': tag.parent.get('id', ''), 'path': get_css_path(tag.parent) }) # Predict which elements contain price X = vectorizer.transform([format_element(e) for e in elements]) predictions = model.predict(X) # Extract predicted prices prices = [] for i, is_price in enumerate(predictions): if is_price: prices.append(clean_price(elements[i]['text'])) yield { 'url': response.url, 'prices': prices } # Feature engineering function def format_element(element): """Convert HTML element to feature string""" features = [ f"TEXT:{element['text'][:100]}", f"TAG:{element['tag']}", f"CLASS:{element['class']}", f"ID:{element['id']}", f"PATH:{element['path'].split('>')[-3:]}" ] return " ".join(features) ``` #### **2. Visual Layout Analysis (Computer Vision Approach)** **Concept:** Use computer vision to understand page structure regardless of HTML changes. **Implementation with Playwright + CV:** ```python from playwright.sync_api import sync_playwright import cv2 import numpy as np from PIL import Image def extract_prices_visual(url): with sync_playwright() as p: browser = p.chromium.launch() page = browser.new_page() page.goto(url) # Take screenshot screenshot = page.screenshot() img = Image.open(io.BytesIO(screenshot)) img_cv = np.array(img) img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR) # Detect text regions gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) blur = cv2.GaussianBlur(gray, (3,3), 0) thresh = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1] # Find contours contours = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) contours = contours[0] if len(contours) == 2 else contours[1] # Filter potential price regions prices = [] for c in contours: area = cv2.contourArea(c) if 100 < area < 5000: # Reasonable text size x, y, w, h = cv2.boundingRect(c) roi = img_cv[y:y+h, x:x+w] # Extract text with OCR text = pytesseract.image_to_string(roi).strip() # Check if price-like if re.search(r'\$\d+(\.\d{1,2})?', text): prices.append(clean_price(text)) browser.close() return prices ``` #### **3. DOM Tree Analysis (Graph-Based Approach)** **Concept:** Treat HTML as a graph and find data based on structural patterns. **Implementation with NetworkX:** ```python import networkx as nx from bs4 import BeautifulSoup def build_dom_tree(html): """Convert HTML to graph structure""" soup = BeautifulSoup(html, 'html.parser') G = nx.DiGraph() def add_node(element, parent=None): node_id = id(element) attributes = { 'tag': element.name, 'text': element.get_text().strip()[:50], 'class': ' '.join(element.get('class', [])), 'id': element.get('id', '') } G.add_node(node_id, **attributes) if parent: G.add_edge(parent, node_id) for child in element.children: if child.name: add_node(child, node_id) add_node(soup.html) return G def find_prices(graph): """Find price nodes using graph patterns""" price_nodes = [] for node in graph.nodes: attrs = graph.nodes[node] # Pattern 1: Contains price-like text if re.search(r'\$\d+', attrs.get('text', '')): price_nodes.append(node) continue # Pattern 2: Has class indicating price if 'price' in attrs.get('class', '').lower(): price_nodes.append(node) continue # Pattern 3: Child of product container if any('product' in p.lower() for p in get_path_to_root(graph, node)): children = list(graph.successors(node)) for child in children: child_attrs = graph.nodes[child] if re.search(r'\$\d+', child_attrs.get('text', '')): price_nodes.append(child) # Deduplicate and clean unique_prices = set() for node in price_nodes: text = graph.nodes[node]['text'] cleaned = clean_price(text) if cleaned: unique_prices.add(cleaned) return list(unique_prices) ``` ### **Training Data Collection Strategies** #### **1. Active Learning Pipeline** ```python def active_learning_pipeline(): # Start with small labeled dataset labeled = load_initial_dataset() model = train_model(labeled) # Process unlabeled pages unlabeled = get_unlabeled_pages() predictions = model.predict(unlabeled) # Identify uncertain predictions uncertainties = model.predict_proba(unlabeled).max(axis=1) uncertain_indices = np.where(uncertainties < 0.7)[0] # Request human labeling for uncertain cases if len(uncertain_indices) > 0: pages_to_label = [unlabeled[i] for i in uncertain_indices[:10]] labeled_new = request_human_labels(pages_to_label) # Update dataset and retrain labeled = labeled + labeled_new model = train_model(labeled) return model ``` #### **2. Synthetic Data Generation** ```python def generate_synthetic_data(real_examples, count=1000): """Create variations of real examples to expand training set""" synthetic = [] for _ in range(count): # Select random real example example = random.choice(real_examples) # Apply realistic variations variations = { 'price': apply_price_variations(example['price']), 'html': apply_html_variations(example['html']) } # Create new training example synthetic.append({ 'html': variations['html'], 'target': { 'price': variations['price'] } }) return synthetic def apply_html_variations(html): """Apply realistic HTML changes while preserving meaning""" soup = BeautifulSoup(html, 'html.parser') # Randomly change class names (preserve semantic meaning) for element in soup.find_all(class_=True): classes = element['class'] new_classes = [] for cls in classes: if 'price' in cls.lower(): # Keep price-related semantics prefix = random.choice(['item', 'product', '']) suffix = random.choice(['cost', 'value', 'amount', '']) new_classes.append(f"{prefix}price{suffix}") else: # Randomize non-critical classes if random.random() > 0.7: new_classes.append(f"gen-{random_string(8)}") else: new_classes.append(cls) element['class'] = new_classes # Randomly add wrapper divs if random.random() > 0.5: target = random.choice(soup.find_all()) new_wrapper = soup.new_tag("div", **{ 'class': [f"container-{random_string(5)}"] }) target.wrap(new_wrapper) return str(soup) ``` ### **Continuous Learning System** #### **Automated Model Retraining Pipeline** ```python def retraining_pipeline(): # Check for site changes if detect_site_changes(): # Gather new samples new_samples = collect_new_samples() # Evaluate current model current_metrics = evaluate_model(new_samples) # Determine if retraining needed if current_metrics['accuracy'] < RETRAINING_THRESHOLD: print("Model accuracy dropped below threshold. Retraining...") # Combine with existing data all_data = load_training_data() + new_samples # Train new model new_model = train_model(all_data) # Validate new model validation_metrics = evaluate_model(new_samples, new_model) # Deploy if better if validation_metrics['accuracy'] > current_metrics['accuracy']: deploy_model(new_model) update_training_data(all_data) print("New model deployed successfully") else: print("New model performed worse. Keeping current model") # Regular scheduled retraining elif should_perform_scheduled_retrain(): print("Performing scheduled model retraining") all_data = load_training_data() new_model = train_model(all_data) deploy_model(new_model) def detect_site_changes(): """Detect structural changes in target sites""" # Compare current HTML structure with historical patterns current_patterns = extract_html_patterns(CURRENT_URLS) historical_patterns = load_historical_patterns() # Calculate structural similarity similarity = calculate_similarity(current_patterns, historical_patterns) # Flag significant changes return similarity < CHANGE_DETECTION_THRESHOLD ``` #### **Human-in-the-Loop Verification** ```python def verify_extraction(url, extracted_data): """Send uncertain extractions for human verification""" # Calculate confidence scores confidence = calculate_confidence(extracted_data) # Identify low-confidence items low_confidence = [ (field, value) for field, value in extracted_data.items() if confidence[field] < HUMAN_VERIFICATION_THRESHOLD ] if low_confidence: # Create verification task task = { 'url': url, 'fields': low_confidence, 'current_extraction': extracted_data, 'html_snapshot': get_html_snapshot(url) } # Send to verification queue send_to_verification_queue(task) # Return partially verified data return { field: value for field, value in extracted_data.items() if confidence[field] >= HUMAN_VERIFICATION_THRESHOLD } return extracted_data def process_verification_results(): """Incorporate human verification results into training data""" results = get_completed_verifications() for result in results: # Create training example example = { 'html': result['html_snapshot'], 'target': result['corrected_data'] } # Add to training dataset add_to_training_data(example) # Flag major discrepancies for immediate attention if has_significant_discrepancy(result): trigger_site_analysis(result['url']) ``` --- ## **4. Real-Time Data Pipelines: Streaming, Processing, and Visualization** ⚡ Enterprise scraping requires moving beyond batch processing to real-time data streams. ### **The Real-Time Data Imperative** Why real-time matters: - **Competitive intelligence:** React to price changes within minutes - **Market monitoring:** Detect trends as they emerge - **Fraud prevention:** Identify suspicious patterns immediately - **Customer experience:** Update product information instantly ### **Real-Time Architecture Overview** ``` [Scraping Nodes] → Raw data extraction ↓ [Message Queue] → Kafka/Pulsar/RabbitMQ ↓ [Stream Processor] → Flink/Spark Streaming ↓ [Real-Time Database] → Cassandra/Redis ↓ [API Layer] → GraphQL/REST ↓ [Applications] → Dashboards, Alerts, ML Models ``` ### **Building the Data Pipeline** #### **1. Message Queue Configuration (Apache Kafka)** ```python # kafka_producer.py from confluent_kafka import Producer import json class ScrapingProducer: def __init__(self): self.producer = Producer({ 'bootstrap.servers': 'kafka-broker1:9092,kafka-broker2:9092', 'acks': 'all', 'compression.type': 'snappy', 'batch.size': 65536, 'linger.ms': 5, 'enable.idempotence': True }) def delivery_report(self, err, msg): if err: logger.error(f'Message delivery failed: {err}') else: logger.debug(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}') def send_scrape_event(self, domain, data): """Send scraped data to appropriate topic""" topic = f"scraped-data.{domain.replace('.', '-')}" # Serialize data value = json.dumps({ 'url': data['url'], 'content': data['content'], 'timestamp': datetime.utcnow().isoformat(), 'metadata': data.get('metadata', {}) }).encode('utf-8') # Send to Kafka self.producer.produce( topic, value=value, callback=self.delivery_report ) # Ensure delivery self.producer.poll(0) def flush(self): """Wait for all messages to be delivered""" self.producer.flush() ``` #### **2. Stream Processing (Apache Flink)** ```java // DataProcessingJob.java public class DataProcessingJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Configure checkpointing for fault tolerance env.enableCheckpointing(60000); // Every minute env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // Source: scraped data from Kafka Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "kafka:9092"); properties.setProperty("group.id", "data-processor"); FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>( "scraped-data.*", new SimpleStringSchema(), properties ); DataStream<ScrapedEvent> stream = env .addSource(kafkaSource) .map(new ParseScrapedData()) .keyBy(event -> event.getDomain()) .process(new DomainSpecificProcessing()); // Sink: write to database stream .addSink(new CassandraSink()) .name("Cassandra Output"); // Sink: real-time alerts stream .filter(event -> event.getPriority() > 0.7) .addSink(new AlertSink()) .name("Alert System"); env.execute("Real-time Data Processing"); } // Parse raw JSON to domain objects public static class ParseScrapedData implements MapFunction<String, ScrapedEvent> { @Override public ScrapedEvent map(String value) throws Exception { JsonNode json = new ObjectMapper().readTree(value); return new ScrapedEvent( json.get("domain").asText(), json.get("url").asText(), json.get("content"), json.get("timestamp").asText() ); } } // Domain-specific business logic public static class DomainSpecificProcessing extends KeyedProcessFunction<String, ScrapedEvent, ProcessedEvent> { // State for tracking changes private ValueState<DomainState> domainState; @Override public void open(Configuration parameters) { domainState = getRuntimeContext().getState( new ValueStateDescriptor<>("domainState", DomainState.class) ); } @Override public void processElement( ScrapedEvent event, Context ctx, Collector<ProcessedEvent> out ) { // Get current state DomainState state = domainState.value(); if (state == null) { state = new DomainState(event.getDomain()); } // Process based on domain ProcessedEvent processed = null; switch (event.getDomain()) { case "amazon.com": processed = processAmazon(event, state); break; case "ebay.com": processed = processEbay(event, state); break; // Other domains... } // Update state state.updateWith(event); domainState.update(state); // Emit processed event if (processed != null) { out.collect(processed); } } private ProcessedEvent processAmazon(ScrapedEvent event, DomainState state) { // Extract Amazon-specific data JsonNode content = event.getContent(); String asin = content.path("asin").asText(); double price = content.path("price").asDouble(); // Detect price changes Double lastPrice = state.getLastPrice(asin); if (lastPrice != null && Math.abs(lastPrice - price) > lastPrice * 0.1) { // Significant price change (10%+) return new PriceChangeEvent( asin, lastPrice, price, event.getTimestamp() ); } return new ProductUpdateEvent( asin, price, content.path("title").asText(), event.getTimestamp() ); } } } ``` #### **3. Real-Time Database (Apache Cassandra)** ```sql -- Schema for real-time product data CREATE KEYSPACE real_time WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 3}; USE real_time; -- Product current state (fast lookup) CREATE TABLE product_current ( domain text, asin text, title text, price decimal, last_updated timestamp, PRIMARY KEY ((domain), asin) ) WITH CLUSTERING ORDER BY (asin ASC) AND compaction = {'class': 'TimeWindowCompactionStrategy'}; -- Product price history (for analytics) CREATE TABLE price_history ( domain text, asin text, timestamp timestamp, price decimal, PRIMARY KEY ((domain, asin), timestamp) ) WITH CLUSTERING ORDER BY (timestamp DESC) AND compaction = {'class': 'TimeWindowCompactionStrategy'} AND default_time_to_live = 2592000; -- 30 days -- Real-time alerts CREATE TABLE alerts ( alert_id timeuuid PRIMARY KEY, domain text, asin text, alert_type text, old_value text, new_value text, timestamp timestamp, severity int ) WITH compaction = {'class': 'TimeWindowCompactionStrategy'}; ``` #### **4. Real-Time API Layer (GraphQL)** ```javascript // server.js const { ApolloServer } = require('apollo-server'); const { Kafka } = require('kafkajs'); const { makeExecutableSchema } = require('@graphql-tools/schema'); // GraphQL schema const typeDefs = ` type Product { asin: String! title: String! price: Float! lastUpdated: String! } type PriceChange { asin: String! oldPrice: Float! newPrice: Float! timestamp: String! percentChange: Float! } type Query { product(domain: String!, asin: String!): Product priceChanges(domain: String, limit: Int = 100): [PriceChange!]! } type Subscription { priceChange(domain: String): PriceChange! productUpdate(domain: String, asin: String): Product! } `; // Resolvers const resolvers = { Query: { product: (_, { domain, asin }, { dataSources }) => dataSources.cassandra.getProduct(domain, asin), priceChanges: (_, { domain, limit }, { dataSources }) => dataSources.cassandra.getPriceChanges(domain, limit) }, Subscription: { priceChange: { subscribe: (_, { domain }, { dataSources }) => dataSources.kafka.subscribeToPriceChanges(domain) }, productUpdate: { subscribe: (_, { domain, asin }, { dataSources }) => dataSources.kafka.subscribeToProductUpdates(domain, asin) } } }; // Data sources class CassandraDataSource { async getProduct(domain, asin) { const result = await this.client.execute( `SELECT * FROM product_current WHERE domain = ? AND asin = ?`, [domain, asin], { prepare: true } ); return result.rows[0]; } async getPriceChanges(domain, limit) { const query = domain ? `SELECT * FROM alerts WHERE domain = ? AND alert_type = 'PRICE_CHANGE' LIMIT ?` : `SELECT * FROM alerts WHERE alert_type = 'PRICE_CHANGE' LIMIT ?`; const params = domain ? [domain, limit] : [limit]; const result = await this.client.execute(query, params, { prepare: true }); return result.rows; } } class KafkaDataSource { constructor() { this.kafka = new Kafka({ brokers: ['kafka:9092'] }); this.consumer = this.kafka.consumer({ groupId: 'graphql-api' }); this.consumer.connect(); } async subscribeToPriceChanges(domain) { const topic = domain ? `alerts.price.${domain}` : 'alerts.price.*'; await this.consumer.subscribe({ topic, fromBeginning: false }); return { [Symbol.asyncIterator]: () => ({ next: async () => { const message = await this.consumer.receive(); return { value: parseAlertMessage(message), done: false }; } }) }; } } // Start server const server = new ApolloServer({ typeDefs, resolvers, dataSources: () => ({ cassandra: new CassandraDataSource(), kafka: new KafkaDataSource() }) }); server.listen().then(({ url }) => { console.log(`🚀 Server ready at ${url}`); }); ``` ### **Real-Time Visualization Dashboard** #### **Streamlit Dashboard Example** ```python # dashboard.py import streamlit as st import pandas as pd import plotly.express as px from kafka import KafkaConsumer import json from datetime import datetime, timedelta st.set_page_config(layout="wide") st.title("Real-Time Scraping Dashboard") # Initialize Kafka consumer @st.cache_resource def get_consumer(): return KafkaConsumer( 'alerts.price.*', bootstrap_servers='kafka:9092', group_id='dashboard', auto_offset_reset='latest', value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) consumer = get_consumer() # Sidebar filters st.sidebar.header("Filters") selected_domain = st.sidebar.selectbox( "Domain", ["All", "amazon.com", "ebay.com", "walmart.com"] ) time_window = st.sidebar.slider( "Time Window (minutes)", 1, 60, 10 ) # Main dashboard tab1, tab2, tab3 = st.tabs(["Live Feed", "Price Trends", "Alerts"]) with tab1: st.header("Live Price Changes") live_container = st.empty() # Collect data for time window price_changes = [] start_time = datetime.utcnow() while datetime.utcnow() < start_time + timedelta(minutes=time_window): msg = consumer.poll(timeout_ms=1000) if msg: for topic_partition, messages in msg.items(): for message in messages: if selected_domain == "All" or message.value['domain'] == selected_domain: price_changes.append(message.value) # Display latest changes if price_changes: df = pd.DataFrame(price_changes) df = df.tail(20) # Show last 20 changes for _, change in df.iterrows(): with live_container.container(): col1, col2, col3, col4 = st.columns([2,1,1,1]) col1.metric("Product", change['asin'], change['title']) col2.metric("Old Price", f"${change['oldPrice']:.2f}") col3.metric("New Price", f"${change['newPrice']:.2f}", f"{change['percentChange']:.1f}%") col4.write(f"Updated: {change['timestamp']}") time.sleep(1) # Refresh rate with tab2: st.header("Price Trends") # Get historical data from Cassandra # (Implementation would connect to Cassandra) historical_data = get_historical_data(selected_domain) if not historical_data.empty: fig = px.line(historical_data, x='timestamp', y='price', color='asin', title='Price Trends') st.plotly_chart(fig, use_container_width=True) else: st.info("No historical data available") with tab3: st.header("Alerts Summary") # Get alert statistics alert_stats = get_alert_statistics(selected_domain, time_window) col1, col2, col3 = st.columns(3) col1.metric("Total Alerts", alert_stats['total']) col2.metric("Major Changes (>10%)", alert_stats['major']) col3.metric("New Products", alert_stats['new']) # Alert table alerts_df = get_recent_alerts(selected_domain) st.dataframe(alerts_df) ``` ### **Performance Optimization Techniques** #### **1. Data Compression Strategies** ```python # Before sending to Kafka import zlib import msgpack def compress_data(data): """Compress data using optimal method based on content type""" if isinstance(data, dict) and 'html' in data: # For HTML content, use specialized compression return zlib.compress( msgpack.packb({ 'url': data['url'], 'content': html_compress(data['html']), 'ts': data['timestamp'] }) ) else: # General purpose compression return zlib.compress(json.dumps(data).encode('utf-8'), level=9) def html_compress(html): """Specialized HTML compression removing unnecessary whitespace""" # Remove comments html = re.sub(r'<!--.*?-->', '', html, flags=re.DOTALL) # Collapse whitespace html = re.sub(r'\s+', ' ', html) # Remove optional tags html = re.sub(r'</?html>|</?body>|</?head>', '', html) return html ``` #### **2. Schema Evolution Management** ```java // Avro schema management public class SchemaRegistry { private final SchemaRegistryClient client; public SchemaRegistry() { this.client = new CachedSchemaRegistryClient( "http://schema-registry:8081", 100 ); } public <T> byte[] serialize(T data, String subject) { try { // Get latest schema Schema schema = client.getLatestSchemaMetadata(subject).getSchemaObject(); // Create serializer DatumWriter<T> writer = new GenericDatumWriter<>(schema); Encoder encoder = EncoderFactory.get().binaryEncoder( new ByteArrayOutputStream(), null); // Serialize writer.write(data, encoder); encoder.flush(); return ((ByteArrayOutputStream)encoder.out).toByteArray(); } catch (Exception e) { throw new RuntimeException("Serialization failed", e); } } public <T> T deserialize(byte[] data, String subject, Class<T> clazz) { try { // Get writer schema SchemaIdVersion schemaIdVersion = client.getSchemaIdVersion(subject, data); Schema writerSchema = client.getSchemaById(schemaIdVersion.getId()); // Get reader schema (compatible version) Schema readerSchema = getCompatibleSchema(subject, writerSchema); // Deserialize DatumReader<T> reader = new GenericDatumReader<>(writerSchema, readerSchema); Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); return reader.read(null, decoder); } catch (Exception e) { throw new RuntimeException("Deserialization failed", e); } } private Schema getCompatibleSchema(String subject, Schema writerSchema) { // Logic to find compatible reader schema // Handles schema evolution (backward/forward compatibility) // ... } } ``` #### **3. Backpressure Handling** ```python # Adaptive processing rate control class BackpressureController: def __init__(self, target_latency=1000, max_rate=1000): self.target_latency = target_latency # ms self.max_rate = max_rate # messages/sec self.current_rate = max_rate self.latency_history = deque(maxlen=60) self.last_adjustment = time.time() def update(self, current_latency): """Adjust processing rate based on current latency""" self.latency_history.append(current_latency) # Only adjust every 5 seconds if time.time() - self.last_adjustment < 5: return self.current_rate avg_latency = sum(self.latency_history) / len(self.latency_history) # Calculate adjustment factor if avg_latency > self.target_latency * 1.5: # Severe overload - reduce rate significantly factor = 0.5 elif avg_latency > self.target_latency: # Moderate overload factor = 0.8 elif avg_latency < self.target_latency * 0.7: # Underutilized - increase rate factor = 1.2 else: # Stable - minor adjustment factor = 0.95 + (self.target_latency / avg_latency) * 0.05 # Apply rate change new_rate = max(10, min(self.max_rate, self.current_rate * factor)) self.current_rate = new_rate self.last_adjustment = time.time() return new_rate # In stream processor backpressure = BackpressureController() def process_message(message): start_time = time.time() # Process message... latency = (time.time() - start_time) * 1000 # Update backpressure controller current_rate = backpressure.update(latency) # Adjust consumer rate if needed if current_rate < backpressure.max_rate * 0.7: consumer.pause() elif current_rate > backpressure.max_rate * 0.9: consumer.resume() ``` --- ## **5. Enterprise Legal Compliance Frameworks: GDPR, CCPA, and Global Regulations** ⚖️ Enterprise scraping requires sophisticated compliance systems, not just basic robots.txt checks. ### **Global Data Regulation Landscape** | Regulation | Regions | Key Requirements | Penalties | |------------|---------|------------------|-----------| | **GDPR** | EU, EEA | Consent, Right to Erasure, Data Minimization | Up to 4% global revenue | | **CCPA/CPRA** | California | Opt-out, Access Requests, Sensitive Data Limits | $2,500/violation | | **PIPEDA** | Canada | Purpose Limitation, Accountability | CAD $100k/failure | | **LGPD** | Brazil | Data Protection Officer, Impact Assessments | 2% revenue | | **PDPA** | Singapore | Consent, Do Not Track | SGD $1M | | **POPIA** | South Africa | Data Processing Records | 10M ZAR | ### **Compliance Architecture Framework** ``` [Scraping Nodes] → Raw data ↓ [Compliance Filter] → Block non-compliant data ↓ [Data Classification] → Identify sensitive data ↓ [Storage Policy Engine] → Apply retention rules ↓ [Access Control] → Enforce data permissions ↓ [Audit Trail] → Track all data operations ``` ### **Compliance Filter Implementation** #### **Domain Compliance Checker** ```python # compliance_filter.py import re from urllib.parse import urlparse from regulations import GDPR, CCPA COMPLIANCE_RULES = { 'gdpr': { 'regions': GDPR.EU_COUNTRIES, 'prohibited_patterns': [ r'email\s*[:=]\s*[\w\.-]+@[\w\.-]+', r'phone\s*[:=]\s*[\d\-\+\(\)\s]{7,}', r'social\s*security\s*number', # Add more patterns ], 'required_consent': True }, 'ccpa': { 'regions': ['US-CA'], 'prohibited_patterns': [ r'social\s*security\s*number', r'driver\s*license\s*number', r'financial\s*account\s*number', ], 'opt_out_required': True } # Additional regulations... } class ComplianceFilter: def __init__(self, user_region=None): self.user_region = user_region or self.detect_region() self.active_regulations = self.determine_applicable_regulations() def detect_region(self): """Detect user region based on IP or other signals""" # Implementation would use geolocation service return 'US-CA' # Example def determine_applicable_regulations(self): """Determine which regulations apply to this request""" applicable = [] for regulation, config in COMPLIANCE_RULES.items(): if any(region in self.user_region for region in config['regions']): applicable.append(regulation) return applicable def filter_content(self, url, content): """Filter content to remove non-compliant data""" domain = urlparse(url).netloc filtered_content = content for regulation in self.active_regulations: # Check robots.txt compliance if not self.check_robots_txt(domain, regulation): raise ComplianceException( f"Scraping disallowed by robots.txt for {regulation}" ) # Remove prohibited patterns for pattern in COMPLIANCE_RULES[regulation]['prohibited_patterns']: filtered_content = re.sub( pattern, '[REDACTED]', filtered_content, flags=re.IGNORECASE ) # Check consent requirements if (COMPLIANCE_RULES[regulation].get('required_consent') and not self.has_valid_consent(url)): raise ComplianceException( f"Missing consent for {regulation} compliance" ) return filtered_content def check_robots_txt(self, domain, regulation): """Check if scraping is allowed under robots.txt""" robots_url = f"https://{domain}/robots.txt" try: response = requests.get(robots_url, timeout=5) if response.status_code == 200: # Parse robots.txt rules = parse_robots_txt(response.text) # Check if our user-agent is allowed return rules.is_allowed( user_agent='EnterpriseScraper/1.0', path='/' ) return True # No robots.txt = allowed except: return True # Assume allowed if can't check def has_valid_consent(self, url): """Check if valid consent exists for this domain""" # Implementation would check consent database return True ``` #### **Data Classification Engine** ```python # data_classifier.py import re from collections import defaultdict class DataClassifier: def __init__(self): self.patterns = { 'PERSONAL_NAME': [ r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', # Basic name pattern r'\b(Mr\.|Mrs\.|Ms\.) [A-Z][a-z]+ [A-Z][a-z]+\b' ], 'EMAIL': [ r'\b[\w\.-]+@[\w\.-]+\.\w+\b' ], 'PHONE': [ r'\b(\+\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b' ], 'SSN': [ r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b' ], 'CREDIT_CARD': [ r'\b(?:\d[ -]*?){13,16}\b' ], 'LOCATION': [ r'\b\d{1,5} .+, .+, [A-Z]{2} \d{5}(?:-\d{4})?\b' ] } def classify(self, text): """Classify all data elements in text""" findings = defaultdict(list) for category, patterns in self.patterns.items(): for pattern in patterns: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: findings[category].append({ 'text': match.group(), 'start': match.start(), 'end': match.end(), 'context': self.get_context(text, match) }) return findings def get_context(self, text, match, window=20): """Get surrounding context for a match""" start = max(0, match.start() - window) end = min(len(text), match.end() + window) return text[start:end].replace('\n', ' ') def redact(self, text, findings=None): """Redact classified data from text""" if findings is None: findings = self.classify(text) redacted = text # Sort findings by position to avoid index shifting all_findings = sorted( [(f['start'], f['end'], cat) for cat, items in findings.items() for f in items], key=lambda x: x[0], reverse=True ) for start, end, category in all_findings: replacement = f"[{category}]" redacted = redacted[:start] + replacement + redacted[end:] return redacted ``` ### **Data Retention and Deletion Systems** #### **Retention Policy Engine** ```python # retention_policy.py from datetime import datetime, timedelta RETENTION_POLICIES = { 'default': { 'retention_days': 365, 'min_retention': 30, 'max_retention': 365 }, 'gdpr': { 'retention_days': 30, 'min_retention': 7, 'max_retention': 365 }, 'ccpa': { 'retention_days': 180, 'min_retention': 30, 'max_retention': 730 }, 'financial': { 'retention_days': 1825, # 5 years 'min_retention': 1825, 'max_retention': 1825 } } class RetentionPolicyEngine: def __init__(self, regulation='default', data_type='default'): self.regulation = regulation self.data_type = data_type self.policy = self._get_policy() def _get_policy(self): """Get effective policy based on regulation and data type""" # Start with regulation-specific policy policy = RETENTION_POLICIES.get(self.regulation, RETENTION_POLICIES['default']) # Override with data-type specific policy if exists data_policy = RETENTION_POLICIES.get(self.data_type) if data_policy: return { 'retention_days': data_policy['retention_days'], 'min_retention': max(policy['min_retention'], data_policy['min_retention']), 'max_retention': min(policy['max_retention'], data_policy['max_retention']) } return policy def calculate_expiration(self, created_at=None): """Calculate when data should expire""" created_at = created_at or datetime.utcnow() retention_days = self.policy['retention_days'] return created_at + timedelta(days=retention_days) def validate_retention(self, requested_days): """Validate requested retention period""" if requested_days < self.policy['min_retention']: return self.policy['min_retention'] if requested_days > self.policy['max_retention']: return self.policy['max_retention'] return requested_days def should_purge(self, created_at): """Check if data should be purged""" expiration = self.calculate_expiration(created_at) return datetime.utcnow() > expiration # Usage in data storage def store_data(data, regulation='default', data_type='default', retention_days=None): # Determine effective retention engine = RetentionPolicyEngine(regulation, data_type) effective_days = engine.validate_retention(retention_days or engine.policy['retention_days']) # Store with expiration db.store( data, expiration=datetime.utcnow() + timedelta(days=effective_days) ) ``` #### **Right to Erasure Implementation** ```python # erasure_system.py import uuid from concurrent.futures import ThreadPoolExecutor class ErasureRequest: def __init__(self, request_id, user_id, reason, sources=None): self.request_id = request_id self.user_id = user_id self.reason = reason self.sources = sources or ['all'] self.status = 'pending' self.created_at = datetime.utcnow() self.completed_at = None def to_dict(self): return { 'request_id': self.request_id, 'user_id': self.user_id, 'reason': self.reason, 'sources': self.sources, 'status': self.status, 'created_at': self.created_at.isoformat(), 'completed_at': self.completed_at.isoformat() if self.completed_at else None } class RightToErasureSystem: def __init__(self, data_sources): self.data_sources = data_sources self.request_store = RequestStore() self.executor = ThreadPoolExecutor(max_workers=20) def submit_erasure_request(self, user_id, reason, sources=None): """Submit a new erasure request""" request = ErasureRequest( request_id=str(uuid.uuid4()), user_id=user_id, reason=reason, sources=sources ) self.request_store.save(request) self.executor.submit(self.process_request, request) return request.request_id def process_request(self, request): """Process an erasure request across all data sources""" try: # Update status request.status = 'processing' self.request_store.update(request) # Process each data source futures = [] for source in self.data_sources: if request.sources == ['all'] or source.name in request.sources: futures.append( self.executor.submit( source.erase_data, request.user_id, request.reason ) ) # Wait for completion for future in futures: future.result(timeout=300) # 5 minute timeout per source # Mark as complete request.status = 'completed' request.completed_at = datetime.utcnow() self.request_store.update(request) except Exception as e: request.status = f'failed: {str(e)}' self.request_store.update(request) raise def get_request_status(self, request_id): """Get status of an erasure request""" return self.request_store.get(request_id) def list_requests(self, user_id=None, status=None, limit=100): """List erasure requests with filters""" return self.request_store.list(user_id, status, limit) # Data source interface class DataSource: def __init__(self, name): self.name = name def erase_data(self, user_id, reason): """Erase all data for user_id from this source""" raise NotImplementedError # Example implementation for database source class DatabaseSource(DataSource): def __init__(self, db_connection, tables): super().__init__("database") self.db = db_connection self.tables = tables def erase_data(self, user_id, reason): for table in self.tables: # Delete or anonymize based on table if table == 'user_profiles': self.db.execute( f"UPDATE {table} SET email = NULL, phone = NULL " f"WHERE user_id = %s", (user_id,) ) else: self.db.execute( f"DELETE FROM {table} WHERE user_id = %s", (user_id,) ) # Log the erasure self.db.execute( "INSERT INTO erasure_log (user_id, source, reason, timestamp) " "VALUES (%s, %s, %s, %s)", (user_id, self.name, reason, datetime.utcnow()) ) ``` ### **Compliance Reporting and Audit Systems** #### **Automated Compliance Reporting** ```python # compliance_reporting.py from jinja2 import Environment, FileSystemLoader import pdfkit class ComplianceReportGenerator: def __init__(self, template_dir='templates'): self.env = Environment(loader=FileSystemLoader(template_dir)) def generate_monthly_report(self, month, year): """Generate GDPR/CCPA compliance report for month/year""" # Gather data stats = self._collect_compliance_stats(month, year) incidents = self._get_compliance_incidents(month, year) requests = self._get_data_requests(month, year) # Render HTML template = self.env.get_template('compliance_report.html') html = template.render( month=month, year=year, stats=stats, incidents=incidents, requests=requests, company_info=COMPANY_INFO ) # Convert to PDF pdf = pdfkit.from_string(html, False) # Save to storage report_id = f"compliance_{year}_{month:02d}" storage.save( f"reports/{report_id}.pdf", pdf, metadata={ 'type': 'compliance', 'month': month, 'year': year, 'generated_at': datetime.utcnow().isoformat() } ) return report_id def _collect_compliance_stats(self, month, year): """Collect key compliance metrics""" return { 'total_requests': db.query( "SELECT COUNT(*) FROM data_requests " "WHERE EXTRACT(MONTH FROM created_at) = %s " "AND EXTRACT(YEAR FROM created_at) = %s", (month, year) ), 'erasure_requests': db.query( "SELECT COUNT(*) FROM erasure_requests " "WHERE EXTRACT(MONTH FROM created_at) = %s " "AND EXTRACT(YEAR FROM created_at) = %s", (month, year) ), 'compliance_rate': self._calculate_compliance_rate(month, year), 'data_sources': self._get_data_sources_compliance() } def _calculate_compliance_rate(self, month, year): """Calculate overall compliance rate""" # Implementation would check: # - robots.txt compliance rate # - data retention compliance # - consent management # - erasure request fulfillment # ... return 98.7 # Example value def generate_dsr_report(self, request_id): """Generate Data Subject Request report""" # Get request details request = db.get_request(request_id) # Get affected data data = db.get_user_data(request['user_id']) # Render report template = self.env.get_template('dsr_report.html') html = template.render( request=request, data=data, company_info=COMPANY_INFO ) return pdfkit.from_string(html, False) ``` #### **Audit Trail System** ```python # audit_trail.py import uuid from datetime import datetime class AuditEvent: def __init__(self, event_type, user_id, target, details=None, ip_address=None): self.event_id = str(uuid.uuid4()) self.event_type = event_type self.user_id = user_id self.target = target self.details = details or {} self.ip_address = ip_address self.timestamp = datetime.utcnow() def to_dict(self): return { 'event_id': self.event_id, 'event_type': self.event_type, 'user_id': self.user_id, 'target': self.target, 'details': self.details, 'ip_address': self.ip_address, 'timestamp': self.timestamp.isoformat() } class AuditTrail: def __init__(self, storage): self.storage = storage def log_event(self, event_type, user_id, target, **kwargs): """Log an audit event""" event = AuditEvent( event_type=event_type, user_id=user_id, target=target, details=kwargs.get('details', {}), ip_address=kwargs.get('ip_address') ) self.storage.save(event.to_dict()) return event.event_id def search_events(self, start_date=None, end_date=None, event_types=None, user_ids=None, limit=1000): """Search audit events with filters""" query = { 'timestamp': {} } if start_date: query['timestamp']['$gte'] = start_date if end_date: query['timestamp']['$lte'] = end_date if event_types: query['event_type'] = {'$in': event_types} if user_ids: query['user_id'] = {'$in': user_ids} return self.storage.search(query, limit=limit) def generate_compliance_report(self, start_date, end_date): """Generate compliance-focused audit report""" # Get relevant events events = self.search_events( start_date=start_date, end_date=end_date, event_types=[ 'data_access', 'data_erasure', 'consent_change', 'policy_update' ] ) # Organize by compliance area report = { 'gdpr': self._analyze_gdpr_events(events), 'ccpa': self._analyze_ccpa_events(events), 'overall': self._analyze_overall_compliance(events) } return report def _analyze_gdpr_events(self, events): """Analyze events for GDPR compliance""" return { 'data_access_requests': len([ e for e in events if e['event_type'] == 'data_access' ]), 'erasure_requests': len([ e for e in events if e['event_type'] == 'data_erasure' ]), 'consent_changes': len([ e for e in events if e['event_type'] == 'consent_change' ]), 'compliance_rate': self._calculate_gdpr_compliance(events) } ``` --- ## **6. Monetizing Scraped Data: Business Models and Value Extraction** 💰 Scraping is an investment. Let's explore how to generate ROI through strategic data monetization. ### **Data Monetization Framework** ``` [Raw Scraped Data] → Cost center ↓ [Processed Data] → Value creation ↓ [Data Products] → Revenue generation ↓ [Business Impact] → ROI realization ``` ### **Monetization Strategies** #### **1. Data-as-a-Service (DaaS)** **Concept:** Sell access to your processed data through APIs or data feeds. **Implementation Example:** ```python # data_api.py from flask import Flask, request, jsonify from functools import wraps import jwt from data_access import DataAccessLayer app = Flask(__name__) data_layer = DataAccessLayer() SECRET_KEY = "your-secret-key" def token_required(f): @wraps(f) def decorated(*args, **kwargs): token = request.headers.get('Authorization') if not token: return jsonify({'message': 'Token is missing!'}), 401 try: data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) current_user = data_layer.get_user(data['user_id']) except: return jsonify({'message': 'Token is invalid!'}), 401 return f(current_user, *args, **kwargs) return decorated @app.route('/api/v1/products', methods=['GET']) @token_required def get_products(current_user): # Check subscription tier if not current_user.has_access('products'): return jsonify({'error': 'Insufficient permissions'}), 403 # Apply rate limiting based on subscription if not current_user.check_rate_limit(): return jsonify({'error': 'Rate limit exceeded'}), 429 # Get query parameters domain = request.args.get('domain') category = request.args.get('category') min_price = request.args.get('min_price', type=float) max_price = request.args.get('max_price', type=float) # Query data with user-specific permissions products = data_layer.get_products( user_id=current_user.id, domain=domain, category=category, min_price=min_price, max_price=max_price ) # Apply data masking based on subscription products = current_user.mask_data(products) # Record usage for billing data_layer.record_usage( user_id=current_user.id, endpoint='/api/v1/products', count=len(products) ) return jsonify({ 'data': products, 'metadata': { 'total': len(products), 'rate_limit_remaining': current_user.rate_limit_remaining } }) @app.route('/api/v1/subscribe', methods=['POST']) def subscribe(): """Handle new subscriptions""" plan = request.json.get('plan') payment_token = request.json.get('payment_token') # Process payment if not process_payment(payment_token, plan): return jsonify({'error': 'Payment failed'}), 400 # Create subscription user_id = request.json.get('user_id') subscription_id = data_layer.create_subscription( user_id=user_id, plan=plan, start_date=datetime.utcnow() ) # Generate API token token = jwt.encode({ 'user_id': user_id, 'exp': datetime.utcnow() + timedelta(days=30) }, SECRET_KEY, algorithm="HS256") return jsonify({ 'subscription_id': subscription_id, 'api_token': token, 'plan_details': PLAN_DETAILS[plan] }) if __name__ == '__main__': app.run() ``` **Pricing Models:** - **Tiered API Calls:** Free (100/day), Basic ($99/mo for 10k), Pro ($499/mo for 100k) - **Data Freshness:** Real-time (+50% premium), Hourly, Daily - **Data Depth:** Summary data, Full dataset, Historical archive #### **2. Value-Added Analytics** **Concept:** Process raw data into actionable insights. **Example: Competitive Intelligence Dashboard** ```python # analytics_engine.py import pandas as pd import numpy as np from sklearn.ensemble import IsolationForest from statsmodels.tsa.arima.model import ARIMA class AnalyticsEngine: def __init__(self, data_source): self.data_source = data_source def price_intelligence(self, domain, product_ids=None, competitors=None): """Generate price intelligence report""" # Get historical price data price_data = self.data_source.get_price_history( domain=domain, product_ids=product_ids, competitors=competitors ) # Convert to DataFrame df = pd.DataFrame(price_data) df['timestamp'] = pd.to_datetime(df['timestamp']) df = df.set_index('timestamp') # Calculate key metrics report = { 'price_distribution': self._calculate_price_distribution(df), 'price_trends': self._calculate_price_trends(df), 'competitor_analysis': self._analyze_competitor_pricing(df), 'anomalies': self._detect_price_anomalies(df), 'recommendations': self._generate_pricing_recommendations(df) } return report def _calculate_price_distribution(self, df): """Calculate price distribution statistics""" return { 'current_average': df.groupby('product_id')['price'].last().mean(), 'min_price': df['price'].min(), 'max_price': df['price'].max(), 'price_range': df.groupby('product_id').agg( min_price=('price', 'min'), max_price=('price', 'max') ).reset_index() } def _calculate_price_trends(self, df): """Analyze price trends over time""" # Calculate daily price changes daily_changes = df.groupby(['product_id', pd.Grouper(freq='D')])['price'].last().pct_change() return { 'average_daily_change': daily_changes.mean(), 'volatility': daily_changes.std(), 'trend_direction': 'increasing' if daily_changes.mean() > 0 else 'decreasing', 'price_cycles': self._detect_price_cycles(df) } def _detect_price_anomalies(self, df): """Detect unusual price changes""" # Calculate price velocity (change per hour) df = df.sort_index() df['price_velocity'] = df.groupby('product_id')['price'].diff() / df.index.to_series().diff().dt.total_seconds() # Use isolation forest to detect anomalies model = IsolationForest(contamination=0.05) anomalies = model.fit_predict(df[['price', 'price_velocity']].dropna()) # Get anomaly details anomaly_data = df[anomalies == -1].copy() anomaly_data['anomaly_score'] = model.decision_function( anomaly_data[['price', 'price_velocity']] ) return anomaly_data.reset_index().to_dict('records') def _generate_pricing_recommendations(self, df): """Generate actionable pricing recommendations""" # Get current prices current_prices = df.groupby('product_id')['price'].last() # Compare with competitors competitor_prices = self._get_competitor_comparison(df) recommendations = [] for product_id, current_price in current_prices.items(): comp_data = competitor_prices.get(product_id, {}) if not comp_data: continue # Calculate competitive position position = self._calculate_competitive_position( current_price, comp_data['competitor_prices'] ) # Generate recommendation if position == 'high': recommendations.append({ 'product_id': product_id, 'current_price': current_price, 'recommended_action': 'reduce', 'suggested_price': comp_data['median_price'] * 0.95, 'reason': 'Above competitive median' }) elif position == 'low': recommendations.append({ 'product_id': product_id, 'current_price': current_price, 'recommended_action': 'increase', 'suggested_price': comp_data['median_price'] * 1.05, 'reason': 'Below competitive median' }) return recommendations ``` **Monetization Approach:** - **Subscription Model:** $299-$999/month per dashboard - **Custom Reports:** One-time fees for specialized analyses - **Alert Services:** Premium pricing for real-time alerts #### **3. Data Licensing** **Concept:** License your data to third parties for integration into their products. **Implementation with Data Packaging:** ```python # data_packaging.py import zipfile import io import pandas as pd from datetime import datetime, timedelta class DataPackager: def __init__(self, data_source): self.data_source = data_source def create_license_package(self, license_id): """Create a data package for a specific license""" # Get license details license = self.data_source.get_license(license_id) # Determine data scope data_scope = self._determine_data_scope(license) # Generate the actual data data = self.data_source.get_data_for_license( license_id=license_id, scope=data_scope ) # Package the data return self._package_data(license, data) def _determine_data_scope(self, license): """Determine what data to include based on license""" scope = { 'domains': license.get('domains', ['all']), 'date_range': { 'start': license['start_date'], 'end': license.get('end_date', datetime.utcnow()) }, 'fields': license.get('fields', 'all'), 'format': license.get('format', 'csv') } # Apply any restrictions if license.get('max_records'): scope['max_records'] = license['max_records'] return scope def _package_data(self, license, data): """Package data according to license specifications""" # Convert to appropriate format if license['format'] == 'csv': buffer = io.StringIO() pd.DataFrame(data).to_csv(buffer, index=False) content = buffer.getvalue() mime_type = 'text/csv' elif license['format'] == 'json': content = json.dumps(data) mime_type = 'application/json' elif license['format'] == 'parquet': buffer = io.BytesIO() pd.DataFrame(data).to_parquet(buffer, index=False) content = buffer.getvalue() mime_type = 'application/octet-stream' # Create package with metadata package = io.BytesIO() with zipfile.ZipFile(package, 'w', zipfile.ZIP_DEFLATED) as zf: # Add data file zf.writestr(f"data.{license['format']}", content) # Add license file license_text = self._generate_license_text(license) zf.writestr("LICENSE.txt", license_text) # Add README readme = self._generate_readme(license) zf.writestr("README.md", readme) package.seek(0) return { 'content': package, 'mime_type': 'application/zip', 'filename': f"data_license_{license['id']}_{datetime.utcnow().strftime('%Y%m%d')}.zip" } def _generate_license_text(self, license): """Generate license agreement text""" return f"""DATA LICENSE AGREEMENT License ID: {license['id']} Effective Date: {license['start_date'].strftime('%Y-%m-%d')} Parties: Licensor: {COMPANY_NAME} Licensee: {license['client_name']} 1. GRANT OF LICENSE The Licensor grants to Licensee a non-exclusive, non-transferable license to use the data described in this agreement for the purposes specified herein. 2. DATA SCOPE - Domains: {', '.join(license.get('domains', ['all']))} - Date Range: {license['start_date'].strftime('%Y-%m-%d')} to {license.get('end_date', datetime.utcnow()).strftime('%Y-%m-%d')} - Fields: {license.get('fields', 'all')} - Format: {license.get('format', 'csv')} 3. RESTRICTIONS - No redistribution of raw data - No use for competitive intelligence against Licensor - Data may only be used by authorized personnel 4. TERM AND TERMINATION This license terminates on {license.get('end_date', datetime.utcnow()).strftime('%Y-%m-%d')} or upon breach of terms. 5. WARRANTY DISCLAIMER Data is provided "as is" without warranty of any kind. 6. LIMITATION OF LIABILITY Licensor's liability limited to license fee paid. Signed, _________________________ {license['client_name']} Date: {datetime.utcnow().strftime('%Y-%m-%d')} """ ``` **Licensing Models:** - **Exclusive Licensing:** High fees for exclusive data rights in a domain - **Non-Exclusive Bulk:** Lower per-unit cost for large volumes - **Field Licensing:** Charge based on data fields included --- ## **7. Future-Proofing Against Anti-Scraping Technology: Strategic Defense** 🛡️ The scraping arms race continues. Here's how to stay ahead of detection. ### **The Anti-Scraping Evolution Timeline** | Year | Detection Methods | Evasion Techniques | |------|-------------------|--------------------| | 2010 | Basic User-Agent checks | User-Agent rotation | | 2015 | Rate limiting, IP blocking | Proxy networks | | 2018 | Header analysis, JS challenges | Headless browsers | | 2020 | Canvas fingerprinting, WebRTC | Fingerprint spoofing | | 2022 | Behavioral analysis, ML detection | Human-like automation | | 2023 | Advanced browser fingerprinting | AI-generated behavior | | 2024 | Predictive blocking | Adaptive scraping | ### **Advanced Detection Methods to Counter** #### **1. Behavioral Analysis** Websites track: - Mouse movement patterns - Scroll velocity and patterns - Click timing and location - Tab switching behavior - Keystroke dynamics **Countermeasure: Human Behavior Simulation** ```python # human_behavior.py import random import time from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys class HumanBehavior: def __init__(self, driver): self.driver = driver self.start_time = time.time() def random_mouse_move(self, element=None, duration=1.0): """Simulate realistic mouse movement to an element""" actions = ActionChains(self.driver) # Get screen dimensions width = self.driver.execute_script("return window.innerWidth") height = self.driver.execute_script("return window.innerHeight") # Current position current_x = random.randint(0, width) current_y = random.randint(0, height) # Target position if element: loc = element.location size = element.size target_x = loc['x'] + random.randint(0, size['width']) target_y = loc['y'] + random.randint(0, size['height']) else: target_x = random.randint(0, width) target_y = random.randint(0, height) # Number of intermediate points steps = random.randint(5, 15) # Generate Bezier curve points points = self._generate_bezier_points( (current_x, current_y), (target_x, target_y), steps ) # Move through points with variable speed start = time.time() for i, (x, y) in enumerate(points): # Variable speed along the path elapsed = time.time() - start if elapsed < duration: # Accelerate then decelerate progress = elapsed / duration eased = self._ease_out_quad(progress) # Move to current position actions.move_by_offset( int(x - current_x), int(y - current_y) ) current_x, current_y = x, y # Random pause between movements time.sleep(random.uniform(0.01, 0.05)) actions.perform() def _generate_bezier_points(self, start, end, steps): """Generate points along a quadratic Bezier curve""" points = [] p0 = start p1 = ( (start[0] + end[0]) / 2 + random.uniform(-100, 100), (start[1] + end[1]) / 2 + random.uniform(-100, 100) ) p2 = end for t in [i/steps for i in range(steps+1)]: x = (1-t)**2 * p0[0] + 2*(1-t)*t * p1[0] + t**2 * p2[0] y = (1-t)**2 * p0[1] + 2*(1-t)*t * p1[1] + t**2 * p2[1] points.append((x, y)) return points def _ease_out_quad(self, x): """Easing function for natural movement""" return 1 - (1 - x) * (1 - x) def random_scrolling(self, min_scroll=1, max_scroll=5): """Simulate realistic scrolling behavior""" scroll_count = random.randint(min_scroll, max_scroll) for _ in range(scroll_count): # Random scroll direction direction = random.choice([-1, 1]) # Random scroll amount (pixels) scroll_amount = random.randint(100, 800) * direction # Execute scroll with variable speed self.driver.execute_script( f"window.scrollBy({{top: 0, left: {scroll_amount}, behavior: 'smooth'}});" ) # Variable pause after scroll time.sleep(random.uniform(0.5, 2.0)) def human_like_typing(self, element, text): """Simulate human-like typing with errors and corrections""" actions = ActionChains(self.driver) actions.click(element) # Clear existing text if needed if element.get_attribute('value'): for _ in range(len(element.get_attribute('value'))): actions.send_keys(Keys.BACKSPACE) # Type with realistic timing and errors i = 0 while i < len(text): # Random typing speed time.sleep(random.uniform(0.05, 0.2)) # Random chance of error if i > 0 and random.random() < 0.05: # Type wrong character wrong_char = chr(random.randint(97, 122)) actions.send_keys(wrong_char) time.sleep(random.uniform(0.1, 0.3)) # Correct the error actions.send_keys(Keys.BACKSPACE) time.sleep(random.uniform(0.05, 0.15)) # Type correct character actions.send_keys(text[i]) i += 1 actions.perform() ``` #### **2. Advanced Fingerprinting** Modern sites collect: - AudioContext fingerprint - WebRTC IP leakage - Battery status API - Device memory and hardware concurency - WebGL rendering differences - Font enumeration **Countermeasure: Comprehensive Fingerprint Spoofing** ```python # fingerprint_spoofing.py from selenium import webdriver from selenium.webdriver.chrome.options import Options def configure_stealth_options(): """Configure Chrome options for maximum stealth""" options = Options() # Basic stealth options options.add_argument("--disable-blink-features=AutomationControlled") options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option('useAutomationExtension', False) # Realistic viewport options.add_argument("--window-size=1920,1080") # Geolocation and language options.add_argument("--lang=en-US") options.add_argument("--timezone=America/New_York") # Disable features that reveal automation options.add_argument("--disable-infobars") options.add_argument("--disable-extensions") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") # Emulate real user behavior options.add_argument("--disable-gpu") options.add_argument("--start-maximized") # Add realistic capabilities prefs = { "profile.default_content_setting_values.geolocation": 1, "profile.default_content_setting_values.notifications": 2, "credentials_enable_service": False, "profile.password_manager_enabled": False } options.add_experimental_option("prefs", prefs) return options def setup_stealth_driver(): """Create a fully stealthed driver instance""" options = configure_stealth_options() driver = webdriver.Chrome(options=options) # Execute stealth scripts driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { 'source': ''' // Override webdriver property Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); // Spoof languages Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); // Spoof platform Object.defineProperty(navigator, 'platform', { get: () => 'Win32' }); // Spoof plugins Object.defineProperty(navigator, 'plugins', { get: () => ({ length: 3, 0: {name: 'Chrome PDF Viewer'}, 1: {name: 'Native Client'}, 2: {name: 'Widevine Content Decryption Module'}, item: n => n < 3 ? this[n] : null, namedItem: name => name === 'Chrome PDF Viewer' ? this[0] : name === 'Native Client' ? this[1] : name === 'Widevine Content Decryption Module' ? this[2] : null }) }); // Spoof WebRTC const originalGetUserMedia = navigator.mediaDevices.getUserMedia; navigator.mediaDevices.getUserMedia = function(constraints) { return originalGetUserMedia.call(this, constraints); }; // Spoof AudioContext const originalAudioContext = window.AudioContext; window.AudioContext = function() { const context = new originalAudioContext(); context.createAnalyser = function() { const analyser = originalAudioContext.prototype.createAnalyser.apply(this, arguments); Object.defineProperty(analyser, 'fftSize', { get: () => 2048 }); return analyser; }; return context; }; ''' }) return driver ``` #### **3. Machine Learning Detection** Sites now use ML models to: - Analyze request patterns - Detect non-human interaction sequences - Identify scraping infrastructure - Predict scraper behavior **Countermeasure: Adaptive Scraping with Reinforcement Learning** ```python # adaptive_scraping.py import numpy as np import random from collections import defaultdict class AdaptiveScraper: def __init__(self, domains, initial_params=None): self.domains = domains self.params = initial_params or self._default_params() self.q_table = defaultdict(lambda: defaultdict(float)) self.alpha = 0.1 # Learning rate self.gamma = 0.9 # Discount factor self.epsilon = 0.1 # Exploration rate def _default_params(self): """Default scraping parameters""" return { 'delay_min': 1.0, 'delay_max': 3.0, 'proxy_rotation': 10, 'headless': True, 'user_agent_rotation': 5 } def get_action(self, state): """Choose action using epsilon-greedy policy""" if random.uniform(0, 1) < self.epsilon: # Explore: choose random action return self._random_action() else: # Exploit: choose best known action if not self.q_table[state]: return self._random_action() return max(self.q_table[state], key=self.q_table[state].get) def _random_action(self): """Generate random scraping parameter adjustment""" action_type = random.choice([ 'increase_delay', 'decrease_delay', 'rotate_proxy', 'change_user_agent', 'toggle_headless' ]) if action_type in ['increase_delay', 'decrease_delay']: amount = random.uniform(0.1, 0.5) return (action_type, amount) return (action_type, None) def apply_action(self, action): """Apply action to scraping parameters""" action_type, value = action if action_type == 'increase_delay': self.params['delay_min'] += value self.params['delay_max'] += value * 1.5 elif action_type == 'decrease_delay': self.params['delay_min'] = max(0.5, self.params['delay_min'] - value) self.params['delay_max'] = max(1.0, self.params['delay_max'] - value * 1.5) elif action_type == 'rotate_proxy': # Implementation would rotate proxy pass elif action_type == 'change_user_agent': # Implementation would change user agent pass elif action_type == 'toggle_headless': self.params['headless'] = not self.params['headless'] def update_q_value(self, state, action, reward, next_state): """Update Q-value using Q-learning update rule""" current_q = self.q_table[state][action] max_next_q = max(self.q_table[next_state].values()) if self.q_table[next_state] else 0 new_q = current_q + self.alpha * (reward + self.gamma * max_next_q - current_q) self.q_table[state][action] = new_q def scrape_with_adaptation(self, url): """Scrape URL with adaptive parameter adjustment""" domain = self._extract_domain(url) state = self._get_state(domain) # Choose action action = self.get_action(state) self.apply_action(action) # Execute scrape with current parameters start_time = time.time() try: result = self._execute_scrape(url) success = True latency = time.time() - start_time except Exception as e: success = False latency = None # Calculate reward reward = self._calculate_reward(success, latency, action) # Update Q-table next_state = self._get_state(domain) self.update_q_value(state, action, reward, next_state) return result def _calculate_reward(self, success, latency, action): """Calculate reinforcement learning reward""" if not success: return -10.0 # Major penalty for failure # Base reward for success reward = 1.0 # Bonus for low latency if latency < 2.0: reward += 2.0 elif latency < 5.0: reward += 1.0 # Penalty for using resource-intensive actions if action[0] == 'rotate_proxy': reward -= 0.5 elif action[0] == 'change_user_agent': reward -= 0.2 return reward def _get_state(self, domain): """Get current state representation""" # State includes domain and recent success rate recent_success = self._get_recent_success_rate(domain) return (domain, round(recent_success, 1)) def _get_recent_success_rate(self, domain): """Calculate recent success rate for domain""" # Implementation would track success history return 0.85 # Example value ``` ## Continued in next section ..