# **Part 6: Advanced Web Scraping Techniques – JavaScript Rendering, Fingerprinting, and Large-Scale Data Processing**
**Duration:** ~60 minutes
**Hashtags:** #AdvancedScraping #JavaScriptRendering #BrowserFingerprinting #DataPipelines #LegalCompliance #ScrapingOptimization #EnterpriseScraping #WebScraping #DataEngineering #TechInnovation
---
## **Table of Contents**
1. [Advanced JavaScript Rendering Techniques](#javascript-rendering)
2. [Browser Fingerprinting Countermeasures](#fingerprinting-countermeasures)
3. [Large-Scale Data Processing Pipelines](#data-processing-pipelines)
4. [Legal Risk Assessment Frameworks](#legal-risk-assessment)
5. [Scraping Performance Optimization](#performance-optimization)
6. [Case Study: Enterprise-Scale Scraping Platform](#enterprise-case-study)
7. [Quiz: Advanced Scraping Mastery](#advanced-quiz)
8. [Conclusion and Future of Web Scraping](#part6-conclusion)
---
## **1. Advanced JavaScript Rendering Techniques** 🖥️
Modern websites increasingly rely on JavaScript frameworks that require sophisticated rendering approaches beyond basic headless browsers.
### **JavaScript Framework Detection**
#### **1. Framework Detection System**
```python
# framework_detection.py
import re
import requests
from bs4 import BeautifulSoup
import tldextract
class FrameworkDetector:
def __init__(self):
self.framework_signatures = self._load_framework_signatures()
self.cache = {}
def _load_framework_signatures(self):
"""Load signatures for detecting JavaScript frameworks"""
return {
'react': {
'html_patterns': [
r'data-reactroot',
r'data-reactid',
r'__reactFiber\$',
r'__reactProps\$'
],
'js_patterns': [
r'webpackJsonp.*react',
r'facebook\.com[/\\]react',
r'ReactDOM\.render'
],
'headers': [
('x-powered-by', 'react')
]
},
'angular': {
'html_patterns': [
r'ng-',
r'ng_',
r'data-ng-',
r'x-ng-',
r'ng:',
r'ng_',
r'ng\$'
],
'js_patterns': [
r'angular\.module',
r'ngRoute',
r'angular\.version'
],
'headers': [
('x-powered-by', 'angular')
]
},
'vue': {
'html_patterns': [
r'vue-',
r'data-v-',
r'__vue__',
r'vuejs'
],
'js_patterns': [
r'Vue\.prototype',
r'Vue\.extend',
r'vue\.runtime'
],
'headers': [
('x-powered-by', 'vue')
]
},
'svelte': {
'html_patterns': [
r'svelte-',
r'data-svelte'
],
'js_patterns': [
r'__sveltets_',
r'svelte\.internal'
],
'headers': []
},
'nextjs': {
'html_patterns': [
r'__NEXT_DATA__',
r'next-script',
r'__next'
],
'js_patterns': [
r'next\.js',
r'__NEXT_RELAY_ENV__'
],
'headers': [
('server', 'next')
]
},
'nuxt': {
'html_patterns': [
r'__NUXT__',
r'nuxt-'
],
'js_patterns': [
r'nuxt\.js',
r'NUXT\.config'
],
'headers': []
}
}
def detect_frameworks(self, url, timeout=10):
"""
Detect JavaScript frameworks used by a website
Returns:
List of detected frameworks with confidence scores
"""
# Check cache first
if url in self.cache:
return self.cache[url]
try:
# Get domain for subresource analysis
extracted = tldextract.extract(url)
domain = f"{extracted.domain}.{extracted.suffix}"
# Fetch page
response = requests.get(url, timeout=timeout)
response.raise_for_status()
# Analyze content
detected = self._analyze_content(
response.text,
response.headers,
domain
)
# Store in cache
self.cache[url] = detected
return detected
except Exception as e:
print(f"Error detecting frameworks for {url}: {str(e)}")
return []
def _analyze_content(self, html, headers, domain):
"""Analyze HTML and headers for framework signatures"""
detected = []
# Check HTML content
for framework, patterns in self.framework_signatures.items():
confidence = 0
# Check HTML patterns
for pattern in patterns['html_patterns']:
if re.search(pattern, html, re.IGNORECASE):
confidence += 0.2
# Check JS patterns
soup = BeautifulSoup(html, 'html.parser')
scripts = [script.string for script in soup.find_all('script') if script.string]
for script in scripts:
for pattern in patterns['js_patterns']:
if re.search(pattern, script, re.IGNORECASE):
confidence += 0.3
break
# Check headers
for header, value in patterns['headers']:
if header in headers and value in headers[header].lower():
confidence += 0.5
# Add to results if confidence is sufficient
if confidence > 0.3:
detected.append({
'framework': framework,
'confidence': min(1.0, confidence),
'evidence': self._get_evidence(framework, html, headers)
})
# Sort by confidence
detected.sort(key=lambda x: x['confidence'], reverse=True)
return detected
def _get_evidence(self, framework, html, headers):
"""Get specific evidence for detected framework"""
evidence = []
patterns = self.framework_signatures[framework]
# Check HTML patterns
for pattern in patterns['html_patterns']:
if re.search(pattern, html, re.IGNORECASE):
evidence.append(f"HTML pattern match: {pattern}")
# Check JS patterns
soup = BeautifulSoup(html, 'html.parser')
scripts = [script.string for script in soup.find_all('script') if script.string]
for script in scripts:
for pattern in patterns['js_patterns']:
if re.search(pattern, script, re.IGNORECASE):
evidence.append(f"JS pattern match: {pattern}")
break
# Check headers
for header, value in patterns['headers']:
if header in headers and value in headers[header].lower():
evidence.append(f"Header match: {header}: {headers[header]}")
return evidence
def get_rendering_strategy(self, url):
"""
Determine optimal rendering strategy based on detected frameworks
Returns:
Dictionary with rendering strategy recommendations
"""
frameworks = self.detect_frameworks(url)
if not frameworks:
return {
'strategy': 'requests',
'reason': 'No JavaScript frameworks detected',
'confidence': 1.0
}
# Determine best strategy
primary_framework = frameworks[0]['framework']
confidence = frameworks[0]['confidence']
if primary_framework in ['react', 'angular', 'vue', 'nextjs', 'nuxt']:
return {
'strategy': 'headless_browser',
'framework': primary_framework,
'reason': f'Detected {primary_framework} framework requiring JS execution',
'confidence': confidence
}
if primary_framework == 'svelte':
return {
'strategy': 'static_rendering',
'framework': primary_framework,
'reason': 'Svelte can often be statically rendered with proper configuration',
'confidence': confidence * 0.8
}
return {
'strategy': 'headless_browser',
'framework': primary_framework,
'reason': 'Unknown framework requiring JS execution',
'confidence': confidence * 0.9
}
```
### **Advanced Rendering Techniques**
#### **1. Custom Browser Profiles for JavaScript Rendering**
```python
# browser_profiles.py
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
import os
import json
import time
import random
class BrowserProfileManager:
def __init__(self, profile_dir='browser_profiles'):
self.profile_dir = profile_dir
os.makedirs(profile_dir, exist_ok=True)
self.profiles = self._load_profiles()
def _load_profiles(self):
"""Load browser profiles from disk"""
profiles = {}
for filename in os.listdir(self.profile_dir):
if filename.endswith('.json'):
with open(os.path.join(self.profile_dir, filename), 'r') as f:
profile_id = filename[:-5]
profiles[profile_id] = json.load(f)
return profiles
def save_profile(self, profile_id, profile_data):
"""Save browser profile to disk"""
with open(os.path.join(self.profile_dir, f"{profile_id}.json"), 'w') as f:
json.dump(profile_data, f, indent=2)
self.profiles[profile_id] = profile_data
def get_profile(self, profile_id=None):
"""Get browser profile by ID or randomly"""
if profile_id and profile_id in self.profiles:
return self.profiles[profile_id]
# Return random profile if ID not specified
if self.profiles:
return random.choice(list(self.profiles.values()))
# Create default profile if none exist
return self.create_default_profile()
def create_default_profile(self):
"""Create a default browser profile"""
profile = {
'user_agent': self._generate_user_agent(),
'screen_resolution': self._generate_screen_resolution(),
'timezone': self._generate_timezone(),
'language': self._generate_language(),
'webgl_vendor': self._generate_webgl_vendor(),
'webgl_renderer': self._generate_webgl_renderer(),
'audio_context': self._generate_audio_context(),
'font_list': self._generate_font_list(),
'plugins': self._generate_plugins(),
'touch_support': self._generate_touch_support(),
'hardware_concurrency': self._generate_hardware_concurrency(),
'device_memory': self._generate_device_memory(),
'webdriver': False,
'chrome': {
'app': {'version': '1.0.0'},
'runtime': {'platform': 'Win32'},
'webstore': {}
}
}
# Save as default profile
self.save_profile('default', profile)
return profile
def _generate_user_agent(self):
"""Generate realistic user agent string"""
browsers = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version} Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version} Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version} Safari/537.36'
]
versions = [
'90.0.4430.212', '91.0.4472.114', '91.0.4472.124',
'91.0.4472.164', '92.0.4515.107', '92.0.4515.131'
]
browser_template = random.choice(browsers)
version = random.choice(versions)
return browser_template.format(version=version)
def _generate_screen_resolution(self):
"""Generate realistic screen resolution"""
resolutions = [
'1920x1080', '1366x768', '1536x864',
'1440x900', '2560x1440', '3840x2160'
]
return random.choice(resolutions)
def _generate_timezone(self):
"""Generate realistic timezone"""
timezones = [
'America/New_York', 'America/Chicago', 'America/Denver',
'America/Los_Angeles', 'Europe/London', 'Europe/Paris',
'Asia/Tokyo', 'Australia/Sydney'
]
return random.choice(timezones)
def _generate_language(self):
"""Generate realistic language setting"""
languages = [
'en-US,en;q=0.9', 'en-GB,en;q=0.8', 'es-ES,es;q=0.9',
'fr-FR,fr;q=0.9', 'de-DE,de;q=0.9', 'ja-JP,ja;q=0.9'
]
return random.choice(languages)
def _generate_webgl_vendor(self):
"""Generate realistic WebGL vendor"""
vendors = [
'Google Inc.', 'Intel Inc.', 'NVIDIA Corporation',
'AMD Corporation', 'ARM Limited', 'Imagination Technologies'
]
return random.choice(vendors)
def _generate_webgl_renderer(self):
"""Generate realistic WebGL renderer"""
renderers = [
'ANGLE (Intel, Intel(R) UHD Graphics 630 Direct3D11 vs_5_0 ps_5_0)',
'ANGLE (NVIDIA, NVIDIA GeForce GTX 1080 Ti Direct3D11 vs_5_0 ps_5_0)',
'WebKit WebGL',
'Mozilla Labs - WebGL'
]
return random.choice(renderers)
def _generate_audio_context(self):
"""Generate realistic AudioContext parameters"""
return {
'sampleRate': random.choice([44100, 48000]),
'channelCount': 2,
'maxChannelCount': 2,
'contextType': random.choice(['webaudio', 'audioworklet'])
}
def _generate_font_list(self):
"""Generate realistic font list"""
fonts = [
'Arial', 'Times New Roman', 'Helvetica', 'Courier New',
'Verdana', 'Georgia', 'Comic Sans MS', 'Trebuchet MS',
'Arial Black', 'Impact', 'Lucida Console', 'Tahoma'
]
return random.sample(fonts, k=random.randint(5, 10))
def _generate_plugins(self):
"""Generate realistic plugin list"""
plugins = [
{'name': 'Chrome PDF Viewer', 'description': 'Portable Document Format'},
{'name': 'Widevine Content Decryption Module', 'description': 'Enables Widevine licenses'},
{'name': 'Native Client', 'description': 'Runs native code inside Chrome'}
]
return plugins if random.random() > 0.3 else []
def _generate_touch_support(self):
"""Generate realistic touch support parameters"""
return {
'maxTouchPoints': random.choice([0, 5, 10]),
'touchEvent': random.random() > 0.7,
'touchStart': random.random() > 0.7
}
def _generate_hardware_concurrency(self):
"""Generate realistic hardware concurrency value"""
return random.choice([2, 4, 8, 16])
def _generate_device_memory(self):
"""Generate realistic device memory value"""
return random.choice([2, 4, 8])
def create_chrome_options(self, profile=None):
"""Create Chrome options with profile settings"""
if not profile:
profile = self.get_profile()
chrome_options = Options()
# Basic options
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=' + profile['screen_resolution'])
chrome_options.add_argument(f'--user-agent={profile["user_agent"]}')
# Language and timezone
chrome_options.add_argument(f'--lang={profile["language"].split(",")[0]}')
chrome_options.add_argument(f'--timezone={profile["timezone"]}')
# Prevent detection
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_experimental_option('useAutomationExtension', False)
# Add prefs to prevent detection
prefs = {
'profile.default_content_setting_values': {
'images': 2, # Block images to speed up rendering
'javascript': 1 # Enable JavaScript
},
'intl.accept_languages': profile['language'],
'credentials_enable_service': False,
'profile.password_manager_enabled': False
}
chrome_options.add_experimental_option('prefs', prefs)
return chrome_options
def create_stealth_driver(self, profile=None):
"""Create a stealthy WebDriver with evasion techniques"""
if not profile:
profile = self.get_profile()
chrome_options = self.create_chrome_options(profile)
# Create driver
driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=chrome_options
)
# Execute CDP commands to hide WebDriver
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
'''
})
# Additional evasion techniques
self._apply_stealth_techniques(driver, profile)
return driver
def _apply_stealth_techniques(self, driver, profile):
"""Apply advanced stealth techniques to WebDriver"""
# Override navigator properties
driver.execute_script(f'''
Object.defineProperty(navigator, 'plugins', {{
get: () => [
{{ name: 'Chrome PDF Viewer', description: 'Portable Document Format' }},
{{ name: 'Widevine Content Decryption Module', description: 'Enables Widevine licenses' }}
]
}});
Object.defineProperty(navigator, 'languages', {{
get: () => ['{profile['language'].split(",")[0]}', '{profile['language'].split(",")[1].split(";")[0]}']
}});
Object.defineProperty(navigator, 'hardwareConcurrency', {{
get: () => {profile['hardware_concurrency']}
}});
Object.defineProperty(navigator, 'deviceMemory', {{
get: () => {profile['device_memory']}
}});
Object.defineProperty(navigator, 'platform', {{
get: () => 'Win32'
}});
Object.defineProperty(HTMLCanvasElement.prototype, 'toDataURL', {{
value: function() {{
return '';
}}
}});
''')
# Set WebGL parameters
driver.execute_script(f'''
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function(parameter) {{
if (parameter === 37445) return '{profile['webgl_vendor']}';
if (parameter === 37446) return '{profile['webgl_renderer']}';
return getParameter.apply(this, [parameter]);
}};
''')
# Spoof audio context
driver.execute_script(f'''
const AudioContext = window.AudioContext || window.webkitAudioContext;
const originalAudioContext = AudioContext;
class CustomAudioContext extends originalAudioContext {{
constructor() {{
super();
this.sampleRate = {profile['audio_context']['sampleRate']};
}}
}}
window.AudioContext = CustomAudioContext;
window.webkitAudioContext = CustomAudioContext;
''')
```
#### **2. Headless Browser Optimization**
```python
# browser_optimization.py
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class BrowserOptimizer:
def __init__(self, driver):
self.driver = driver
self.load_times = []
self.resource_usage = []
def optimize_page_load(self, url, max_wait=30, resource_threshold=80):
"""
Optimize page loading by blocking unnecessary resources
Returns:
Boolean indicating success
"""
start_time = time.time()
# Block unnecessary resources
self._block_resources()
# Navigate to URL
self.driver.get(url)
# Wait for critical elements
if not self._wait_for_critical_elements(max_wait):
return False
# Measure load time
load_time = time.time() - start_time
self.load_times.append(load_time)
# Check resource usage
usage = self._get_resource_usage()
self.resource_usage.append(usage)
# Apply additional optimizations if needed
if usage['cpu'] > resource_threshold or usage['memory'] > resource_threshold:
self._apply_aggressive_optimizations()
return True
def _block_resources(self):
"""Block unnecessary resources to speed up rendering"""
# Block images
self.driver.execute_cdp_cmd('Network.setBlockedURLs', {
'urls': ['*.png', '*.jpg', '*.jpeg', '*.gif', '*.svg']
})
# Block trackers
trackers = [
'*doubleclick.net*', '*googleadservices.com*',
'*analytics.js*', '*facebook.net*', '*taboola.com*'
]
self.driver.execute_cdp_cmd('Network.setBlockedURLs', {
'urls': trackers
})
# Enable network conditions throttling
self.driver.execute_cdp_cmd('Network.emulateNetworkConditions', {
'offline': False,
'latency': 50, # ms
'downloadThroughput': 150 * 1024, # 150 KB/s
'uploadThroughput': 50 * 1024 # 50 KB/s
})
def _wait_for_critical_elements(self, max_wait):
"""Wait for critical page elements to load"""
try:
# Wait for body to be present
WebDriverWait(self.driver, max_wait).until(
EC.presence_of_element_located((By.TAG_NAME, 'body'))
)
# Wait for main content container (customizable per site)
try:
WebDriverWait(self.driver, max_wait).until(
EC.presence_of_element_located((By.ID, 'main-content'))
)
except:
try:
WebDriverWait(self.driver, max_wait).until(
EC.presence_of_element_located((By.CLASS_NAME, 'content'))
)
except:
pass
# Wait for JavaScript to stabilize
self._wait_for_js_stabilization(max_wait)
return True
except Exception as e:
print(f"Error waiting for critical elements: {str(e)}")
return False
def _wait_for_js_stabilization(self, max_wait):
"""Wait for JavaScript execution to stabilize"""
start_time = time.time()
last_height = 0
stable_count = 0
while time.time() - start_time < max_wait:
# Check scroll height
current_height = self.driver.execute_script(
"return document.body.scrollHeight"
)
# Check if height has stabilized
if abs(current_height - last_height) < 10:
stable_count += 1
if stable_count >= 3: # Stable for 3 consecutive checks
return
else:
stable_count = 0
last_height = current_height
time.sleep(0.5)
def _get_resource_usage(self):
"""Get current resource usage metrics"""
# Get CPU and memory usage from performance metrics
metrics = self.driver.execute_cdp_cmd('Performance.getMetrics', {})
cpu_usage = next(
(m['value'] for m in metrics['metrics'] if m['name'] == 'CPUUtilization'),
0
) * 100
memory_usage = next(
(m['value'] for m in metrics['metrics'] if m['name'] == 'JSHeapUsedSize'),
0
)
# Convert to percentage of typical limits
memory_percent = min(100, (memory_usage / (256 * 1024 * 1024)) * 100)
return {
'cpu': cpu_usage,
'memory': memory_percent,
'network_requests': self._get_network_request_count()
}
def _get_network_request_count(self):
"""Get count of network requests made"""
# Implementation would track network requests
return 0 # Simplified
def _apply_aggressive_optimizations(self):
"""Apply aggressive optimizations when resources are constrained"""
# Disable JavaScript for non-critical pages
self.driver.execute_cdp_cmd('Emulation.setScriptExecutionDisabled', {
'value': True
})
# Set lower resource thresholds
self.driver.execute_cdp_cmd('Network.emulateNetworkConditions', {
'offline': False,
'latency': 100, # ms
'downloadThroughput': 50 * 1024, # 50 KB/s
'uploadThroughput': 20 * 1024 # 20 KB/s
})
# Block more resources
self.driver.execute_cdp_cmd('Network.setBlockedURLs', {
'urls': ['*.css', '*.js', '*.woff', '*.woff2']
})
def analyze_performance(self):
"""Analyze browser performance metrics"""
if not self.load_times:
return None
return {
'avg_load_time': sum(self.load_times) / len(self.load_times),
'max_load_time': max(self.load_times),
'resource_usage': {
'avg_cpu': sum(u['cpu'] for u in self.resource_usage) / len(self.resource_usage),
'avg_memory': sum(u['memory'] for u in self.resource_usage) / len(self.resource_usage)
},
'recommendations': self._generate_optimization_recommendations()
}
def _generate_optimization_recommendations(self):
"""Generate optimization recommendations based on performance data"""
recommendations = []
# Load time recommendations
avg_load_time = sum(self.load_times) / len(self.load_times)
if avg_load_time > 10:
recommendations.append(
"Page load time is high (>10s). Consider blocking more resources "
"or using a simpler rendering strategy."
)
# Resource usage recommendations
avg_cpu = sum(u['cpu'] for u in self.resource_usage) / len(self.resource_usage)
if avg_cpu > 70:
recommendations.append(
"High CPU usage detected. Consider reducing JavaScript execution "
"or using resource throttling."
)
avg_memory = sum(u['memory'] for u in self.resource_usage) / len(self.resource_usage)
if avg_memory > 80:
recommendations.append(
"High memory usage detected. Consider closing browser after use "
"or using lighter browser profiles."
)
return recommendations
def cleanup(self):
"""Clean up resources after browser usage"""
# Reset network conditions
self.driver.execute_cdp_cmd('Network.clearBrowserCaches', {})
self.driver.execute_cdp_cmd('Network.setBlockedURLs', {'urls': []})
self.driver.execute_cdp_cmd('Network.emulateNetworkConditions', {
'offline': False,
'latency': 0,
'downloadThroughput': -1,
'uploadThroughput': -1
})
# Close browser
self.driver.quit()
```
---
## **2. Browser Fingerprinting Countermeasures** 🖌️
Modern websites use sophisticated browser fingerprinting techniques that require advanced countermeasures.
### **Fingerprinting Detection System**
#### **1. Fingerprinting Detection Engine**
```python
# fingerprint_detection.py
import re
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
class FingerprintingDetector:
def __init__(self, driver):
self.driver = driver
self.detected_techniques = []
self.suspicious_patterns = self._load_suspicious_patterns()
def _load_suspicious_patterns(self):
"""Load patterns that indicate fingerprinting attempts"""
return [
# Canvas fingerprinting
{
'name': 'canvas_fingerprinting',
'pattern': r'CanvasRenderingContext2D|toDataURL|getImageData',
'weight': 0.8,
'description': 'Canvas-based fingerprinting technique'
},
# WebGL fingerprinting
{
'name': 'webgl_fingerprinting',
'pattern': r'WebGLRenderingContext|getParameter|getSupportedExtensions',
'weight': 0.9,
'description': 'WebGL-based fingerprinting technique'
},
# AudioContext fingerprinting
{
'name': 'audio_fingerprinting',
'pattern': r'AudioContext|createOscillator|analyserNode',
'weight': 0.7,
'description': 'AudioContext-based fingerprinting technique'
},
# Font detection
{
'name': 'font_detection',
'pattern': r'div\.style\.fontFamily|document\.fonts',
'weight': 0.6,
'description': 'Font enumeration fingerprinting'
},
# Plugin detection
{
'name': 'plugin_detection',
'pattern': r'navigator\.plugins|navigator\.mimeTypes',
'weight': 0.5,
'description': 'Browser plugin enumeration'
},
# Touch support detection
{
'name': 'touch_detection',
'pattern': r'\'ontouchstart\' in document|window\.TouchEvent',
'weight': 0.4,
'description': 'Touch capability detection'
},
# Hardware concurrency
{
'name': 'hardware_concurrency',
'pattern': r'navigator\.hardwareConcurrency',
'weight': 0.7,
'description': 'Hardware concurrency detection'
},
# Device memory
{
'name': 'device_memory',
'pattern': r'navigator\.deviceMemory',
'weight': 0.6,
'description': 'Device memory detection'
},
# Battery status
{
'name': 'battery_status',
'pattern': r'navigator\.getBattery',
'weight': 0.3,
'description': 'Battery status API detection'
},
# WebRTC IP leakage
{
'name': 'webrtc_ip_leak',
'pattern': r'RTCPeerConnection|createDataChannel',
'weight': 0.9,
'description': 'WebRTC IP address leakage'
}
]
def detect_fingerprinting(self, url, timeout=15):
"""
Detect fingerprinting techniques on a website
Returns:
List of detected fingerprinting techniques
"""
# Navigate to URL
self.driver.get(url)
# Wait for page to load
time.sleep(2)
# Get page source
page_source = self.driver.page_source
# Analyze for suspicious patterns
detected = []
for pattern in self.suspicious_patterns:
if re.search(pattern['pattern'], page_source):
detected.append({
'technique': pattern['name'],
'confidence': pattern['weight'],
'description': pattern['description'],
'evidence': self._get_evidence(pattern['pattern'], page_source)
})
# Check for dynamic fingerprinting (requires executing JS)
dynamic_techniques = self._detect_dynamic_fingerprinting()
detected.extend(dynamic_techniques)
# Store results
self.detected_techniques = detected
return detected
def _get_evidence(self, pattern, page_source):
"""Get specific evidence for detected pattern"""
matches = re.findall(pattern, page_source)
return matches[:3] # Return up to 3 examples
def _detect_dynamic_fingerprinting(self):
"""Detect dynamically executed fingerprinting techniques"""
detected = []
# Check for canvas fingerprinting
canvas_result = self.driver.execute_script('''
try {
var canvas = document.createElement('canvas');
var ctx = canvas.getContext('2d');
ctx.textBaseline = "top";
ctx.font = "14px 'Arial'";
ctx.textAlign = "none";
ctx.fillStyle = "#f60";
ctx.fillText("BrowserLeaks", 2, 15);
ctx.fillStyle = "rgba(102, 204, 0, 0.7)";
ctx.fillText("com", 4, 17);
return canvas.toDataURL();
} catch (e) {
return null;
}
''')
if canvas_result and 'data:image/png;base64' in canvas_result:
detected.append({
'technique': 'canvas_fingerprinting',
'confidence': 0.9,
'description': 'Active canvas fingerprinting detected',
'evidence': [canvas_result[:50] + '...']
})
# Check for WebGL fingerprinting
webgl_result = self.driver.execute_script('''
try {
var canvas = document.createElement('canvas');
var gl = canvas.getContext('webgl') || canvas.getContext('experimental-webgl');
if (!gl) return null;
var info = {
vendor: gl.getParameter(gl.VENDOR),
renderer: gl.getParameter(gl.RENDERER),
version: gl.getParameter(gl.VERSION),
shadingLanguageVersion: gl.getParameter(gl.SHADING_LANGUAGE_VERSION)
};
return JSON.stringify(info);
} catch (e) {
return null;
}
''')
if webgl_result:
detected.append({
'technique': 'webgl_fingerprinting',
'confidence': 0.95,
'description': 'Active WebGL fingerprinting detected',
'evidence': [webgl_result]
})
# Check for AudioContext fingerprinting
audio_result = self.driver.execute_script('''
try {
var audioCtx = new (window.AudioContext || window.webkitAudioContext)();
var oscillator = audioCtx.createOscillator();
var analyser = audioCtx.createAnalyser();
oscillator.connect(analyser);
analyser.connect(audioCtx.destination);
oscillator.start();
var bufferLength = analyser.frequencyBinCount;
var dataArray = new Uint8Array(bufferLength);
analyser.getByteFrequencyData(dataArray);
return JSON.stringify({
sampleRate: audioCtx.sampleRate,
frequencyData: Array.from(dataArray).slice(0, 10)
});
} catch (e) {
return null;
}
''')
if audio_result:
detected.append({
'technique': 'audio_fingerprinting',
'confidence': 0.85,
'description': 'Active AudioContext fingerprinting detected',
'evidence': [audio_result]
})
return detected
def get_countermeasures(self):
"""Get recommended countermeasures for detected techniques"""
countermeasures = []
for technique in self.detected_techniques:
if technique['technique'] == 'canvas_fingerprinting':
countermeasures.append({
'technique': 'canvas_fingerprinting',
'countermeasure': 'Spoof canvas rendering results',
'implementation': self._spoof_canvas
})
elif technique['technique'] == 'webgl_fingerprinting':
countermeasures.append({
'technique': 'webgl_fingerprinting',
'countermeasure': 'Spoof WebGL parameters',
'implementation': self._spoof_webgl
})
elif technique['technique'] == 'audio_fingerprinting':
countermeasures.append({
'technique': 'audio_fingerprinting',
'countermeasure': 'Spoof AudioContext results',
'implementation': self._spoof_audio
})
# Add more countermeasures as needed
return countermeasures
def _spoof_canvas(self):
"""Spoof canvas fingerprinting results"""
self.driver.execute_script('''
const toDataURL = HTMLCanvasElement.prototype.toDataURL;
HTMLCanvasElement.prototype.toDataURL = function() {
return '';
};
''')
def _spoof_webgl(self):
"""Spoof WebGL fingerprinting results"""
self.driver.execute_script('''
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function(parameter) {
if (parameter === 37445) return 'Spoofed Vendor';
if (parameter === 37446) return 'Spoofed Renderer';
return getParameter.apply(this, [parameter]);
};
''')
def _spoof_audio(self):
"""Spoof AudioContext fingerprinting results"""
self.driver.execute_script('''
const AudioContext = window.AudioContext || window.webkitAudioContext;
const originalAudioContext = AudioContext;
class CustomAudioContext extends originalAudioContext {
constructor() {
super();
this.sampleRate = 44100;
}
createAnalyser() {
const analyser = super.createAnalyser();
const originalGetByteFrequencyData = analyser.getByteFrequencyData;
analyser.getByteFrequencyData = function(array) {
// Fill with consistent values
for (let i = 0; i < array.length; i++) {
array[i] = i % 256;
}
return originalGetByteFrequencyData.call(this, array);
};
return analyser;
}
}
window.AudioContext = CustomAudioContext;
window.webkitAudioContext = CustomAudioContext;
''')
def apply_countermeasures(self):
"""Apply all recommended countermeasures"""
countermeasures = self.get_countermeasures()
for cm in countermeasures:
cm['implementation']()
return len(countermeasures)
```
### **Advanced Fingerprinting Countermeasures**
#### **1. Comprehensive Fingerprint Spoofing System**
```python
# fingerprint_spoofing.py
import random
import string
from selenium import webdriver
class FingerprintSpoofingSystem:
def __init__(self, driver):
self.driver = driver
self.spoofed_values = {}
def spoof_all(self):
"""Apply all spoofing techniques"""
self.spoof_timezone()
self.spoof_language()
self.spoof_resolution()
self.spoof_user_agent()
self.spoof_webgl()
self.spoof_canvas()
self.spoof_audio()
self.spoof_fonts()
self.spoof_plugins()
self.spoof_touch()
self.spoof_hardware()
self.spoof_webdriver()
self.spoof_webrtc()
def spoof_timezone(self, timezone=None):
"""Spoof timezone information"""
timezones = [
'America/New_York', 'America/Chicago', 'America/Denver',
'America/Los_Angeles', 'Europe/London', 'Europe/Paris',
'Asia/Tokyo', 'Australia/Sydney'
]
if not timezone:
timezone = random.choice(timezones)
self.driver.execute_cdp_cmd('Emulation.setTimezoneOverride', {
'timezoneId': timezone
})
self.spoofed_values['timezone'] = timezone
def spoof_language(self, language=None):
"""Spoof language preferences"""
languages = [
'en-US,en;q=0.9', 'en-GB,en;q=0.8', 'es-ES,es;q=0.9',
'fr-FR,fr;q=0.9', 'de-DE,de;q=0.9', 'ja-JP,ja;q=0.9'
]
if not language:
language = random.choice(languages)
self.driver.execute_cdp_cmd('Emulation.setLocaleOverride', {
'locale': language.split(',')[0]
})
self.spoofed_values['language'] = language
def spoof_resolution(self, width=None, height=None):
"""Spoof screen resolution"""
resolutions = [
(1920, 1080), (1366, 768), (1536, 864),
(1440, 900), (2560, 1440), (3840, 2160)
]
if width is None or height is None:
width, height = random.choice(resolutions)
self.driver.execute_cdp_cmd('Emulation.setDeviceMetricsOverride', {
'width': width,
'height': height,
'deviceScaleFactor': 1,
'mobile': False
})
self.spoofed_values['resolution'] = f"{width}x{height}"
def spoof_user_agent(self, user_agent=None):
"""Spoof user agent string"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1'
]
if not user_agent:
user_agent = random.choice(user_agents)
self.driver.execute_cdp_cmd('Network.setUserAgentOverride', {
'userAgent': user_agent
})
self.spoofed_values['user_agent'] = user_agent
def spoof_webgl(self, vendor=None, renderer=None):
"""Spoof WebGL fingerprinting parameters"""
vendors = [
'Google Inc.', 'Intel Inc.', 'NVIDIA Corporation',
'AMD Corporation', 'ARM Limited', 'Imagination Technologies'
]
renderers = [
'ANGLE (Intel, Intel(R) UHD Graphics 630 Direct3D11 vs_5_0 ps_5_0)',
'ANGLE (NVIDIA, NVIDIA GeForce GTX 1080 Ti Direct3D11 vs_5_0 ps_5_0)',
'WebKit WebGL',
'Mozilla Labs - WebGL'
]
if not vendor:
vendor = random.choice(vendors)
if not renderer:
renderer = random.choice(renderers)
self.driver.execute_script(f'''
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function(parameter) {{
if (parameter === 37445) return '{vendor}';
if (parameter === 37446) return '{renderer}';
return getParameter.apply(this, [parameter]);
}};
''')
self.spoofed_values['webgl'] = {'vendor': vendor, 'renderer': renderer}
def spoof_canvas(self):
"""Spoof canvas fingerprinting results"""
self.driver.execute_script('''
const toDataURL = HTMLCanvasElement.prototype.toDataURL;
HTMLCanvasElement.prototype.toDataURL = function() {
return '';
};
const getImageData = CanvasRenderingContext2D.prototype.getImageData;
CanvasRenderingContext2D.prototype.getImageData = function() {
const data = getImageData.apply(this, arguments);
// Modify pixel data to be consistent
for (let i = 0; i < data.data.length; i += 4) {
data.data[i] = 255; // R
data.data[i+1] = 0; // G
data.data[i+2] = 0; // B
data.data[i+3] = 255; // A
}
return data;
};
''')
self.spoofed_values['canvas'] = 'spoofed'
def spoof_audio(self):
"""Spoof AudioContext fingerprinting results"""
self.driver.execute_script('''
const AudioContext = window.AudioContext || window.webkitAudioContext;
const originalAudioContext = AudioContext;
class CustomAudioContext extends originalAudioContext {
constructor() {
super();
this.sampleRate = 44100;
}
createAnalyser() {
const analyser = super.createAnalyser();
const originalGetByteFrequencyData = analyser.getByteFrequencyData;
analyser.getByteFrequencyData = function(array) {
// Fill with consistent values
for (let i = 0; i < array.length; i++) {
array[i] = i % 256;
}
return originalGetByteFrequencyData.call(this, array);
};
return analyser;
}
}
window.AudioContext = CustomAudioContext;
window.webkitAudioContext = CustomAudioContext;
''')
self.spoofed_values['audio'] = 'spoofed'
def spoof_fonts(self, font_list=None):
"""Spoof available font list"""
default_fonts = [
'Arial', 'Times New Roman', 'Helvetica', 'Courier New',
'Verdana', 'Georgia', 'Comic Sans MS', 'Trebuchet MS'
]
if not font_list:
font_list = random.sample(default_fonts, k=random.randint(5, 8))
self.driver.execute_script(f'''
// Override document.fonts
const originalFonts = document.fonts;
document.fonts = new Proxy(originalFonts, {{
get: function(target, prop) {{
if (prop === 'check') {{
return function() {{ return true; }};
}}
if (prop === 'keys' || prop === 'values' || prop === 'entries') {{
return function* () {{
for (const font of {json.dumps(font_list)}) {{
yield font;
}}
}};
}}
return target[prop];
}},
has: function(target, prop) {{
return prop === 'check' || prop === 'keys' ||
prop === 'values' || prop === 'entries' ||
target.hasOwnProperty(prop);
}}
}});
// Override getComputedStyle for font detection
const originalGetComputedStyle = window.getComputedStyle;
window.getComputedStyle = function(element, pseudoElt) {{
const style = originalGetComputedStyle(element, pseudoElt);
return new Proxy(style, {{
get: function(target, prop) {{
if (prop === 'fontFamily') {{
return '{", ".join(font_list)}';
}}
return target[prop];
}}
}});
}};
''')
self.spoofed_values['fonts'] = font_list
def spoof_plugins(self, plugins=None):
"""Spoof browser plugin list"""
default_plugins = [
{'name': 'Chrome PDF Viewer', 'description': 'Portable Document Format'},
{'name': 'Widevine Content Decryption Module', 'description': 'Enables Widevine licenses'},
{'name': 'Native Client', 'description': 'Runs native code inside Chrome'}
]
if plugins is None:
plugins = random.sample(default_plugins, k=random.randint(1, 3))
plugins_script = json.dumps(plugins)
self.driver.execute_script(f'''
// Override navigator.plugins
Object.defineProperty(navigator, 'plugins', {{
get: function() {{
return {plugins_script}.map(plugin => ({{
name: plugin.name,
description: plugin.description,
item: function() {{ return this; }},
namedItem: function() {{ return this; }}
}));
}}
}});
// Override navigator.mimeTypes
Object.defineProperty(navigator, 'mimeTypes', {{
get: function() {{
return {plugins_script}.map(plugin => ({{
type: 'application/pdf',
description: plugin.description,
suffixes: 'pdf',
enabledPlugin: {{
name: plugin.name,
description: plugin.description
}}
}));
}}
}});
''')
self.spoofed_values['plugins'] = plugins
def spoof_touch(self, max_touch_points=None, has_touch=False):
"""Spoof touch capabilities"""
if max_touch_points is None:
max_touch_points = random.choice([0, 5, 10])
self.driver.execute_script(f'''
// Override touch events
['touchstart', 'touchmove', 'touchend', 'touchcancel'].forEach(function(event) {{
document.addEventListener(event, function(e) {{
e.preventDefault();
}}, {{ passive: false }});
}});
// Override navigator.maxTouchPoints
Object.defineProperty(navigator, 'maxTouchPoints', {{
get: function() {{ return {max_touch_points}; }}
}});
// Override 'ontouchstart' in document
Object.defineProperty(document, 'ontouchstart', {{
get: function() {{ return {str(has_touch).lower()}; }},
set: function() {{ }}
}});
// Override window.TouchEvent
window.TouchEvent = {str(has_touch).lower()};
''')
self.spoofed_values['touch'] = {
'max_touch_points': max_touch_points,
'has_touch': has_touch
}
def spoof_hardware(self, hardware_concurrency=None, device_memory=None):
"""Spoof hardware capabilities"""
if hardware_concurrency is None:
hardware_concurrency = random.choice([2, 4, 8, 16])
if device_memory is None:
device_memory = random.choice([2, 4, 8])
self.driver.execute_script(f'''
// Override hardwareConcurrency
Object.defineProperty(navigator, 'hardwareConcurrency', {{
get: function() {{ return {hardware_concurrency}; }}
}});
// Override deviceMemory
Object.defineProperty(navigator, 'deviceMemory', {{
get: function() {{ return {device_memory}; }}
}});
''')
self.spoofed_values['hardware'] = {
'hardware_concurrency': hardware_concurrency,
'device_memory': device_memory
}
def spoof_webdriver(self):
"""Spoof WebDriver detection"""
self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
window.navigator.chrome = {
app: {isInstalled: false},
webstore: {onInstallStageChanged: {}, onDownloadProgress: {}},
runtime: {PlatformOs: {MAC: 'mac', WIN: 'win', ANDROID: 'android', CROS: 'cros', LINUX: 'linux', OPENBSD: 'openbsd'}, PlatformArch: {ARM: 'arm', X86_32: 'x86-32', X86_64: 'x86-64'}, PlatformNaclArch: {ARM: 'arm', X86_32: 'x86-32', X86_64: 'x86-64'}, RefreshType: {NONE: 0, FORMAL: 1, OPTIONAL: 2}}
};
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
'''
})
self.spoofed_values['webdriver'] = 'spoofed'
def spoof_webrtc(self):
"""Spoof WebRTC to prevent IP leakage"""
self.driver.execute_script('''
// Override WebRTC to prevent IP leakage
const originalGetUserMedia = navigator.mediaDevices.getUserMedia;
navigator.mediaDevices.getUserMedia = function(constraints) {
return originalGetUserMedia(constraints)
.then(stream => {
// Modify the stream to prevent IP leakage
return stream;
});
};
// Override RTCPeerConnection
const originalRTCPeerConnection = window.RTCPeerConnection;
window.RTCPeerConnection = function(...args) {
const peerConnection = new originalRTCPeerConnection(...args);
const originalCreateOffer = peerConnection.createOffer;
peerConnection.createOffer = function(...args) {
return originalCreateOffer.apply(this, args)
.then(offer => {
// Remove local IP addresses
offer.sdp = offer.sdp.replace(/(c=IN IP4)(\\s+\\d+\\.\\d+\\.\\d+\\.\\d+)/g, '$1 0.0.0.0');
return offer;
});
};
const originalCreateAnswer = peerConnection.createAnswer;
peerConnection.createAnswer = function(...args) {
return originalCreateAnswer.apply(this, args)
.then(answer => {
// Remove local IP addresses
answer.sdp = answer.sdp.replace(/(c=IN IP4)(\\s+\\d+\\.\\d+\\.\\d+\\.\\d+)/g, '$1 0.0.0.0');
return answer;
});
};
return peerConnection;
};
''')
self.spoofed_values['webrtc'] = 'spoofed'
```
---
## **3. Large-Scale Data Processing Pipelines** 📊
Handling massive amounts of scraped data requires robust data processing infrastructure.
### **Distributed Data Processing Architecture**
#### **1. Data Pipeline Orchestration System**
```python
# data_pipeline.py
import time
import uuid
import json
from datetime import datetime
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
class DataPipelineOrchestrator:
def __init__(self, max_workers=10, batch_size=100):
self.stages = []
self.max_workers = max_workers
self.batch_size = batch_size
self.pipeline_id = str(uuid.uuid4())
self.metrics = {
'start_time': None,
'end_time': None,
'total_items': 0,
'processed_items': 0,
'failed_items': 0,
'stage_metrics': defaultdict(dict)
}
self.error_log = []
def add_stage(self, stage_name, processor, batched=False):
"""Add a processing stage to the pipeline"""
self.stages.append({
'name': stage_name,
'processor': processor,
'batched': batched,
'start_time': None,
'end_time': None,
'items_processed': 0,
'items_failed': 0
})
def process(self, data_stream):
"""
Process data through the pipeline
Args:
data_stream: Iterable of data items
Returns:
Processed data stream
"""
self.metrics['start_time'] = datetime.utcnow()
self.metrics['total_items'] = 0
# Initialize output queue
output = []
# Process data through each stage
current_data = data_stream
for stage in self.stages:
stage['start_time'] = datetime.utcnow()
stage['items_processed'] = 0
stage['items_failed'] = 0
print(f"Starting stage: {stage['name']}")
# Process data for this stage
if stage['batched']:
# Process in batches
batch = []
for item in current_data:
self.metrics['total_items'] += 1
batch.append(item)
if len(batch) >= self.batch_size:
self._process_batch(stage, batch, output)
batch = []
# Process remaining items
if batch:
self._process_batch(stage, batch, output)
else:
# Process items individually
for item in current_data:
self.metrics['total_items'] += 1
self._process_item(stage, item, output)
# Update metrics
stage['end_time'] = datetime.utcnow()
self.metrics['stage_metrics'][stage['name']] = {
'duration': (stage['end_time'] - stage['start_time']).total_seconds(),
'items_processed': stage['items_processed'],
'items_failed': stage['items_failed']
}
# Prepare for next stage
current_data = output
output = []
self.metrics['end_time'] = datetime.utcnow()
self.metrics['processed_items'] = self.metrics['total_items'] - self.metrics['failed_items']
print(f"Pipeline completed. Processed {self.metrics['processed_items']}/{self.metrics['total_items']} items.")
return current_data
def _process_batch(self, stage, batch, output_queue):
"""Process a batch of items through a stage"""
try:
# Process batch
results = stage['processor'](batch)
# Add successful results to output
for result in results:
output_queue.append(result)
stage['items_processed'] += 1
return True
except Exception as e:
# Log error
self._log_error(stage['name'], batch, str(e))
# Add failed items to error log
for item in batch:
self.metrics['failed_items'] += 1
stage['items_failed'] += 1
return False
def _process_item(self, stage, item, output_queue):
"""Process a single item through a stage"""
try:
# Process item
result = stage['processor'](item)
# Add to output if successful
if result is not None:
output_queue.append(result)
stage['items_processed'] += 1
return True
except Exception as e:
# Log error
self._log_error(stage['name'], item, str(e))
# Track failure
self.metrics['failed_items'] += 1
stage['items_failed'] += 1
return False
def _log_error(self, stage, data, error):
"""Log processing error"""
error_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat()
self.error_log.append({
'error_id': error_id,
'timestamp': timestamp,
'stage': stage,
'error': error,
'data_sample': str(data)[:500] # Truncate large data
})
print(f"ERROR in stage {stage}: {error}")
print(f" Error ID: {error_id}")
print(f" Data sample: {str(data)[:100]}...")
def get_metrics(self):
"""Get pipeline performance metrics"""
return {
'pipeline_id': self.pipeline_id,
'total_items': self.metrics['total_items'],
'processed_items': self.metrics['processed_items'],
'failed_items': self.metrics['failed_items'],
'total_duration': (self.metrics['end_time'] - self.metrics['start_time']).total_seconds() if self.metrics['end_time'] else None,
'stage_metrics': dict(self.metrics['stage_metrics'])
}
def get_error_log(self):
"""Get error log for troubleshooting"""
return self.error_log
def save_metrics(self, file_path):
"""Save metrics to file"""
with open(file_path, 'w') as f:
json.dump(self.get_metrics(), f, indent=2)
def save_error_log(self, file_path):
"""Save error log to file"""
with open(file_path, 'w') as f:
json.dump(self.get_error_log(), f, indent=2)
def create_distributed_pipeline(self, num_nodes=3):
"""Create a distributed pipeline across multiple nodes"""
pipelines = []
for i in range(num_nodes):
pipeline = DataPipelineOrchestrator(
max_workers=self.max_workers,
batch_size=self.batch_size
)
# Copy stages
for stage in self.stages:
pipeline.add_stage(
stage['name'],
stage['processor'],
stage['batched']
)
pipelines.append(pipeline)
return DistributedPipeline(pipelines)
class DistributedPipeline:
def __init__(self, pipelines):
self.pipelines = pipelines
def process(self, data_stream):
"""Process data using distributed pipelines"""
# Split data among pipelines
data_chunks = self._split_data(data_stream, len(self.pipelines))
# Process in parallel
results = []
with ThreadPoolExecutor(max_workers=len(self.pipelines)) as executor:
futures = {
executor.submit(pipeline.process, chunk): i
for i, (pipeline, chunk) in enumerate(zip(self.pipelines, data_chunks))
}
for future in as_completed(futures):
i = futures[future]
try:
chunk_results = future.result()
results.extend(chunk_results)
except Exception as e:
print(f"Pipeline {i} failed: {str(e)}")
return results
def _split_data(self, data_stream, num_chunks):
"""Split data stream into chunks"""
chunks = [[] for _ in range(num_chunks)]
for i, item in enumerate(data_stream):
chunks[i % num_chunks].append(item)
return chunks
def get_metrics(self):
"""Get metrics from all pipelines"""
return [pipeline.get_metrics() for pipeline in self.pipelines]
def get_error_logs(self):
"""Get error logs from all pipelines"""
return [pipeline.get_error_log() for pipeline in self.pipelines]
```
#### **2. Schema Evolution Manager**
```python
# schema_evolution.py
import json
import re
from datetime import datetime
class SchemaEvolutionManager:
def __init__(self):
self.schemas = {} # {schema_id: schema_definition}
self.version_history = {} # {schema_id: [versions]}
self.compatibility_rules = {
'backward': self._check_backward_compatibility,
'forward': self._check_forward_compatibility,
'full': self._check_full_compatibility
}
def register_schema(self, schema_id, schema_definition, compatibility='backward'):
"""
Register a new schema or update an existing one
Returns:
Schema version number
"""
# Validate schema
if not self._validate_schema(schema_definition):
raise ValueError("Invalid schema definition")
# Check compatibility if updating existing schema
if schema_id in self.schemas:
current_schema = self.schemas[schema_id]
if not self.check_compatibility(current_schema, schema_definition, compatibility):
raise ValueError(f"Schema update is not {compatibility}-compatible")
# Create version
version = self._create_version(schema_id)
# Store schema
if schema_id not in self.schemas:
self.schemas[schema_id] = schema_definition
self.version_history[schema_id] = []
# Update version history
self.version_history[schema_id].append({
'version': version,
'schema': schema_definition,
'timestamp': datetime.utcnow().isoformat(),
'compatibility': compatibility
})
# Update current schema
self.schemas[schema_id] = schema_definition
return version
def _validate_schema(self, schema):
"""Validate schema structure"""
# Implementation would validate against schema standard
# This is a simplified version
required_fields = ['type', 'fields']
return all(field in schema for field in required_fields)
def _create_version(self, schema_id):
"""Create new version number"""
if schema_id not in self.version_history or not self.version_history[schema_id]:
return '1.0.0'
# Get current version
current_version = self.version_history[schema_id][-1]['version']
# Increment minor version
major, minor, patch = map(int, current_version.split('.'))
return f"{major}.{minor+1}.0"
def get_schema(self, schema_id, version=None):
"""Get schema definition"""
if schema_id not in self.schemas:
raise ValueError(f"Schema {schema_id} not found")
if version:
# Find specific version
for ver_info in reversed(self.version_history[schema_id]):
if ver_info['version'] == version:
return ver_info['schema']
raise ValueError(f"Version {version} not found for schema {schema_id}")
# Return current schema
return self.schemas[schema_id]
def get_version_history(self, schema_id):
"""Get version history for a schema"""
if schema_id not in self.version_history:
return []
return self.version_history[schema_id]
def check_compatibility(self, old_schema, new_schema, compatibility_type='backward'):
"""Check if schema update is compatible"""
if compatibility_type not in self.compatibility_rules:
raise ValueError(f"Unknown compatibility type: {compatibility_type}")
return self.compatibility_rules[compatibility_type](old_schema, new_schema)
def _check_backward_compatibility(self, old_schema, new_schema):
"""
Check if new schema is backward compatible with old schema
(new consumers can read old data)
"""
# Fields can be added but not removed
old_fields = {f['name'] for f in old_schema['fields']}
new_fields = {f['name'] for f in new_schema['fields']}
# Check for removed fields
removed_fields = old_fields - new_fields
if removed_fields:
return False
# Check field type compatibility
for field in old_schema['fields']:
new_field = next((f for f in new_schema['fields'] if f['name'] == field['name']), None)
if new_field and not self._is_type_compatible(field['type'], new_field['type']):
return False
return True
def _check_forward_compatibility(self, old_schema, new_schema):
"""
Check if new schema is forward compatible with old schema
(old consumers can read new data)
"""
# Fields can be removed but not added
old_fields = {f['name'] for f in old_schema['fields']}
new_fields = {f['name'] for f in new_schema['fields']}
# Check for added fields
added_fields = new_fields - old_fields
if added_fields:
return False
# Check field type compatibility
for field in new_schema['fields']:
old_field = next((f for f in old_schema['fields'] if f['name'] == field['name']), None)
if old_field and not self._is_type_compatible(field['type'], old_field['type']):
return False
return True
def _check_full_compatibility(self, old_schema, new_schema):
"""Check if schema update is fully compatible (both backward and forward)"""
return (
self._check_backward_compatibility(old_schema, new_schema) and
self._check_forward_compatibility(old_schema, new_schema)
)
def _is_type_compatible(self, old_type, new_type):
"""Check if type conversion is compatible"""
# Simplified type compatibility
compatibility_map = {
'int': ['int', 'long', 'float', 'double'],
'long': ['long', 'float', 'double'],
'float': ['float', 'double'],
'string': ['string', 'bytes'],
'bytes': ['bytes', 'string'],
'boolean': ['boolean'],
'null': ['null', 'string', 'bytes']
}
return new_type in compatibility_map.get(old_type, [])
def evolve_data(self, data, from_schema_id, to_schema_id, from_version=None, to_version=None):
"""
Evolve data from one schema version to another
Returns:
Transformed data
"""
# Get schemas
from_schema = self.get_schema(from_schema_id, from_version)
to_schema = self.get_schema(to_schema_id, to_version)
# Check compatibility path
if not self._can_evolve(from_schema, to_schema):
raise ValueError("Cannot evolve data between these schemas")
# Transform data
return self._transform_data(data, from_schema, to_schema)
def _can_evolve(self, from_schema, to_schema):
"""Check if data can be evolved between schemas"""
# Implementation would check compatibility path
return True # Simplified
def _transform_data(self, data, from_schema, to_schema):
"""Transform data from one schema to another"""
transformed = {}
# Map fields
for to_field in to_schema['fields']:
# Find matching field in source schema
from_field = next(
(f for f in from_schema['fields'] if f['name'] == to_field['name']),
None
)
if from_field and from_field['name'] in data:
# Convert value if needed
value = self._convert_value(
data[from_field['name']],
from_field['type'],
to_field['type']
)
transformed[to_field['name']] = value
elif to_field.get('default') is not None:
# Use default value
transformed[to_field['name']] = to_field['default']
return transformed
def _convert_value(self, value, from_type, to_type):
"""Convert value between types"""
if from_type == to_type:
return value
# Numeric conversions
if from_type in ['int', 'long', 'float', 'double'] and to_type in ['int', 'long', 'float', 'double']:
return self._convert_numeric(value, from_type, to_type)
# String conversions
if from_type == 'string' and to_type == 'bytes':
return value.encode('utf-8')
if from_type == 'bytes' and to_type == 'string':
return value.decode('utf-8')
# Null handling
if value is None:
return None
raise ValueError(f"Cannot convert from {from_type} to {to_type}")
def _convert_numeric(self, value, from_type, to_type):
"""Convert numeric values between types"""
# Convert to float first for consistent handling
if from_type == 'int':
numeric = int(value)
elif from_type == 'long':
numeric = int(value)
elif from_type in ['float', 'double']:
numeric = float(value)
else:
raise ValueError(f"Unsupported numeric type: {from_type}")
# Convert to target type
if to_type == 'int':
return int(numeric)
elif to_type == 'long':
return int(numeric)
elif to_type in ['float', 'double']:
return float(numeric)
raise ValueError(f"Unsupported numeric type: {to_type}")
def generate_schema_diff(self, schema_id, version1, version2):
"""Generate diff between two schema versions"""
schema1 = self.get_schema(schema_id, version1)
schema2 = self.get_schema(schema_id, version2)
changes = {
'added_fields': [],
'removed_fields': [],
'modified_fields': []
}
# Find added fields
fields1 = {f['name'] for f in schema1['fields']}
fields2 = {f['name'] for f in schema2['fields']}
changes['added_fields'] = list(fields2 - fields1)
changes['removed_fields'] = list(fields1 - fields2)
# Find modified fields
for field in fields1 & fields2:
f1 = next(f for f in schema1['fields'] if f['name'] == field)
f2 = next(f for f in schema2['fields'] if f['name'] == field)
if f1['type'] != f2['type']:
changes['modified_fields'].append({
'field': field,
'from_type': f1['type'],
'to_type': f2['type']
})
return changes
```
---
## Continued in next section ..