# **Part 6: Advanced Web Scraping Techniques – JavaScript Rendering, Fingerprinting, and Large-Scale Data Processing** **Duration:** ~60 minutes **Hashtags:** #AdvancedScraping #JavaScriptRendering #BrowserFingerprinting #DataPipelines #LegalCompliance #ScrapingOptimization #EnterpriseScraping #WebScraping #DataEngineering #TechInnovation --- ## **Table of Contents** 1. [Advanced JavaScript Rendering Techniques](#javascript-rendering) 2. [Browser Fingerprinting Countermeasures](#fingerprinting-countermeasures) 3. [Large-Scale Data Processing Pipelines](#data-processing-pipelines) 4. [Legal Risk Assessment Frameworks](#legal-risk-assessment) 5. [Scraping Performance Optimization](#performance-optimization) 6. [Case Study: Enterprise-Scale Scraping Platform](#enterprise-case-study) 7. [Quiz: Advanced Scraping Mastery](#advanced-quiz) 8. [Conclusion and Future of Web Scraping](#part6-conclusion) --- ## **1. Advanced JavaScript Rendering Techniques** 🖥️ Modern websites increasingly rely on JavaScript frameworks that require sophisticated rendering approaches beyond basic headless browsers. ### **JavaScript Framework Detection** #### **1. Framework Detection System** ```python # framework_detection.py import re import requests from bs4 import BeautifulSoup import tldextract class FrameworkDetector: def __init__(self): self.framework_signatures = self._load_framework_signatures() self.cache = {} def _load_framework_signatures(self): """Load signatures for detecting JavaScript frameworks""" return { 'react': { 'html_patterns': [ r'data-reactroot', r'data-reactid', r'__reactFiber\$', r'__reactProps\$' ], 'js_patterns': [ r'webpackJsonp.*react', r'facebook\.com[/\\]react', r'ReactDOM\.render' ], 'headers': [ ('x-powered-by', 'react') ] }, 'angular': { 'html_patterns': [ r'ng-', r'ng_', r'data-ng-', r'x-ng-', r'ng:', r'ng_', r'ng\$' ], 'js_patterns': [ r'angular\.module', r'ngRoute', r'angular\.version' ], 'headers': [ ('x-powered-by', 'angular') ] }, 'vue': { 'html_patterns': [ r'vue-', r'data-v-', r'__vue__', r'vuejs' ], 'js_patterns': [ r'Vue\.prototype', r'Vue\.extend', r'vue\.runtime' ], 'headers': [ ('x-powered-by', 'vue') ] }, 'svelte': { 'html_patterns': [ r'svelte-', r'data-svelte' ], 'js_patterns': [ r'__sveltets_', r'svelte\.internal' ], 'headers': [] }, 'nextjs': { 'html_patterns': [ r'__NEXT_DATA__', r'next-script', r'__next' ], 'js_patterns': [ r'next\.js', r'__NEXT_RELAY_ENV__' ], 'headers': [ ('server', 'next') ] }, 'nuxt': { 'html_patterns': [ r'__NUXT__', r'nuxt-' ], 'js_patterns': [ r'nuxt\.js', r'NUXT\.config' ], 'headers': [] } } def detect_frameworks(self, url, timeout=10): """ Detect JavaScript frameworks used by a website Returns: List of detected frameworks with confidence scores """ # Check cache first if url in self.cache: return self.cache[url] try: # Get domain for subresource analysis extracted = tldextract.extract(url) domain = f"{extracted.domain}.{extracted.suffix}" # Fetch page response = requests.get(url, timeout=timeout) response.raise_for_status() # Analyze content detected = self._analyze_content( response.text, response.headers, domain ) # Store in cache self.cache[url] = detected return detected except Exception as e: print(f"Error detecting frameworks for {url}: {str(e)}") return [] def _analyze_content(self, html, headers, domain): """Analyze HTML and headers for framework signatures""" detected = [] # Check HTML content for framework, patterns in self.framework_signatures.items(): confidence = 0 # Check HTML patterns for pattern in patterns['html_patterns']: if re.search(pattern, html, re.IGNORECASE): confidence += 0.2 # Check JS patterns soup = BeautifulSoup(html, 'html.parser') scripts = [script.string for script in soup.find_all('script') if script.string] for script in scripts: for pattern in patterns['js_patterns']: if re.search(pattern, script, re.IGNORECASE): confidence += 0.3 break # Check headers for header, value in patterns['headers']: if header in headers and value in headers[header].lower(): confidence += 0.5 # Add to results if confidence is sufficient if confidence > 0.3: detected.append({ 'framework': framework, 'confidence': min(1.0, confidence), 'evidence': self._get_evidence(framework, html, headers) }) # Sort by confidence detected.sort(key=lambda x: x['confidence'], reverse=True) return detected def _get_evidence(self, framework, html, headers): """Get specific evidence for detected framework""" evidence = [] patterns = self.framework_signatures[framework] # Check HTML patterns for pattern in patterns['html_patterns']: if re.search(pattern, html, re.IGNORECASE): evidence.append(f"HTML pattern match: {pattern}") # Check JS patterns soup = BeautifulSoup(html, 'html.parser') scripts = [script.string for script in soup.find_all('script') if script.string] for script in scripts: for pattern in patterns['js_patterns']: if re.search(pattern, script, re.IGNORECASE): evidence.append(f"JS pattern match: {pattern}") break # Check headers for header, value in patterns['headers']: if header in headers and value in headers[header].lower(): evidence.append(f"Header match: {header}: {headers[header]}") return evidence def get_rendering_strategy(self, url): """ Determine optimal rendering strategy based on detected frameworks Returns: Dictionary with rendering strategy recommendations """ frameworks = self.detect_frameworks(url) if not frameworks: return { 'strategy': 'requests', 'reason': 'No JavaScript frameworks detected', 'confidence': 1.0 } # Determine best strategy primary_framework = frameworks[0]['framework'] confidence = frameworks[0]['confidence'] if primary_framework in ['react', 'angular', 'vue', 'nextjs', 'nuxt']: return { 'strategy': 'headless_browser', 'framework': primary_framework, 'reason': f'Detected {primary_framework} framework requiring JS execution', 'confidence': confidence } if primary_framework == 'svelte': return { 'strategy': 'static_rendering', 'framework': primary_framework, 'reason': 'Svelte can often be statically rendered with proper configuration', 'confidence': confidence * 0.8 } return { 'strategy': 'headless_browser', 'framework': primary_framework, 'reason': 'Unknown framework requiring JS execution', 'confidence': confidence * 0.9 } ``` ### **Advanced Rendering Techniques** #### **1. Custom Browser Profiles for JavaScript Rendering** ```python # browser_profiles.py from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager import os import json import time import random class BrowserProfileManager: def __init__(self, profile_dir='browser_profiles'): self.profile_dir = profile_dir os.makedirs(profile_dir, exist_ok=True) self.profiles = self._load_profiles() def _load_profiles(self): """Load browser profiles from disk""" profiles = {} for filename in os.listdir(self.profile_dir): if filename.endswith('.json'): with open(os.path.join(self.profile_dir, filename), 'r') as f: profile_id = filename[:-5] profiles[profile_id] = json.load(f) return profiles def save_profile(self, profile_id, profile_data): """Save browser profile to disk""" with open(os.path.join(self.profile_dir, f"{profile_id}.json"), 'w') as f: json.dump(profile_data, f, indent=2) self.profiles[profile_id] = profile_data def get_profile(self, profile_id=None): """Get browser profile by ID or randomly""" if profile_id and profile_id in self.profiles: return self.profiles[profile_id] # Return random profile if ID not specified if self.profiles: return random.choice(list(self.profiles.values())) # Create default profile if none exist return self.create_default_profile() def create_default_profile(self): """Create a default browser profile""" profile = { 'user_agent': self._generate_user_agent(), 'screen_resolution': self._generate_screen_resolution(), 'timezone': self._generate_timezone(), 'language': self._generate_language(), 'webgl_vendor': self._generate_webgl_vendor(), 'webgl_renderer': self._generate_webgl_renderer(), 'audio_context': self._generate_audio_context(), 'font_list': self._generate_font_list(), 'plugins': self._generate_plugins(), 'touch_support': self._generate_touch_support(), 'hardware_concurrency': self._generate_hardware_concurrency(), 'device_memory': self._generate_device_memory(), 'webdriver': False, 'chrome': { 'app': {'version': '1.0.0'}, 'runtime': {'platform': 'Win32'}, 'webstore': {} } } # Save as default profile self.save_profile('default', profile) return profile def _generate_user_agent(self): """Generate realistic user agent string""" browsers = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version} Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version} Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version} Safari/537.36' ] versions = [ '90.0.4430.212', '91.0.4472.114', '91.0.4472.124', '91.0.4472.164', '92.0.4515.107', '92.0.4515.131' ] browser_template = random.choice(browsers) version = random.choice(versions) return browser_template.format(version=version) def _generate_screen_resolution(self): """Generate realistic screen resolution""" resolutions = [ '1920x1080', '1366x768', '1536x864', '1440x900', '2560x1440', '3840x2160' ] return random.choice(resolutions) def _generate_timezone(self): """Generate realistic timezone""" timezones = [ 'America/New_York', 'America/Chicago', 'America/Denver', 'America/Los_Angeles', 'Europe/London', 'Europe/Paris', 'Asia/Tokyo', 'Australia/Sydney' ] return random.choice(timezones) def _generate_language(self): """Generate realistic language setting""" languages = [ 'en-US,en;q=0.9', 'en-GB,en;q=0.8', 'es-ES,es;q=0.9', 'fr-FR,fr;q=0.9', 'de-DE,de;q=0.9', 'ja-JP,ja;q=0.9' ] return random.choice(languages) def _generate_webgl_vendor(self): """Generate realistic WebGL vendor""" vendors = [ 'Google Inc.', 'Intel Inc.', 'NVIDIA Corporation', 'AMD Corporation', 'ARM Limited', 'Imagination Technologies' ] return random.choice(vendors) def _generate_webgl_renderer(self): """Generate realistic WebGL renderer""" renderers = [ 'ANGLE (Intel, Intel(R) UHD Graphics 630 Direct3D11 vs_5_0 ps_5_0)', 'ANGLE (NVIDIA, NVIDIA GeForce GTX 1080 Ti Direct3D11 vs_5_0 ps_5_0)', 'WebKit WebGL', 'Mozilla Labs - WebGL' ] return random.choice(renderers) def _generate_audio_context(self): """Generate realistic AudioContext parameters""" return { 'sampleRate': random.choice([44100, 48000]), 'channelCount': 2, 'maxChannelCount': 2, 'contextType': random.choice(['webaudio', 'audioworklet']) } def _generate_font_list(self): """Generate realistic font list""" fonts = [ 'Arial', 'Times New Roman', 'Helvetica', 'Courier New', 'Verdana', 'Georgia', 'Comic Sans MS', 'Trebuchet MS', 'Arial Black', 'Impact', 'Lucida Console', 'Tahoma' ] return random.sample(fonts, k=random.randint(5, 10)) def _generate_plugins(self): """Generate realistic plugin list""" plugins = [ {'name': 'Chrome PDF Viewer', 'description': 'Portable Document Format'}, {'name': 'Widevine Content Decryption Module', 'description': 'Enables Widevine licenses'}, {'name': 'Native Client', 'description': 'Runs native code inside Chrome'} ] return plugins if random.random() > 0.3 else [] def _generate_touch_support(self): """Generate realistic touch support parameters""" return { 'maxTouchPoints': random.choice([0, 5, 10]), 'touchEvent': random.random() > 0.7, 'touchStart': random.random() > 0.7 } def _generate_hardware_concurrency(self): """Generate realistic hardware concurrency value""" return random.choice([2, 4, 8, 16]) def _generate_device_memory(self): """Generate realistic device memory value""" return random.choice([2, 4, 8]) def create_chrome_options(self, profile=None): """Create Chrome options with profile settings""" if not profile: profile = self.get_profile() chrome_options = Options() # Basic options chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--window-size=' + profile['screen_resolution']) chrome_options.add_argument(f'--user-agent={profile["user_agent"]}') # Language and timezone chrome_options.add_argument(f'--lang={profile["language"].split(",")[0]}') chrome_options.add_argument(f'--timezone={profile["timezone"]}') # Prevent detection chrome_options.add_argument('--disable-blink-features=AutomationControlled') chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) chrome_options.add_experimental_option('useAutomationExtension', False) # Add prefs to prevent detection prefs = { 'profile.default_content_setting_values': { 'images': 2, # Block images to speed up rendering 'javascript': 1 # Enable JavaScript }, 'intl.accept_languages': profile['language'], 'credentials_enable_service': False, 'profile.password_manager_enabled': False } chrome_options.add_experimental_option('prefs', prefs) return chrome_options def create_stealth_driver(self, profile=None): """Create a stealthy WebDriver with evasion techniques""" if not profile: profile = self.get_profile() chrome_options = self.create_chrome_options(profile) # Create driver driver = webdriver.Chrome( service=Service(ChromeDriverManager().install()), options=chrome_options ) # Execute CDP commands to hide WebDriver driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { 'source': ''' Object.defineProperty(navigator, 'webdriver', { get: () => undefined }) ''' }) # Additional evasion techniques self._apply_stealth_techniques(driver, profile) return driver def _apply_stealth_techniques(self, driver, profile): """Apply advanced stealth techniques to WebDriver""" # Override navigator properties driver.execute_script(f''' Object.defineProperty(navigator, 'plugins', {{ get: () => [ {{ name: 'Chrome PDF Viewer', description: 'Portable Document Format' }}, {{ name: 'Widevine Content Decryption Module', description: 'Enables Widevine licenses' }} ] }}); Object.defineProperty(navigator, 'languages', {{ get: () => ['{profile['language'].split(",")[0]}', '{profile['language'].split(",")[1].split(";")[0]}'] }}); Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile['hardware_concurrency']} }}); Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile['device_memory']} }}); Object.defineProperty(navigator, 'platform', {{ get: () => 'Win32' }}); Object.defineProperty(HTMLCanvasElement.prototype, 'toDataURL', {{ value: function() {{ return ''; }} }}); ''') # Set WebGL parameters driver.execute_script(f''' const getParameter = WebGLRenderingContext.prototype.getParameter; WebGLRenderingContext.prototype.getParameter = function(parameter) {{ if (parameter === 37445) return '{profile['webgl_vendor']}'; if (parameter === 37446) return '{profile['webgl_renderer']}'; return getParameter.apply(this, [parameter]); }}; ''') # Spoof audio context driver.execute_script(f''' const AudioContext = window.AudioContext || window.webkitAudioContext; const originalAudioContext = AudioContext; class CustomAudioContext extends originalAudioContext {{ constructor() {{ super(); this.sampleRate = {profile['audio_context']['sampleRate']}; }} }} window.AudioContext = CustomAudioContext; window.webkitAudioContext = CustomAudioContext; ''') ``` #### **2. Headless Browser Optimization** ```python # browser_optimization.py import time from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC class BrowserOptimizer: def __init__(self, driver): self.driver = driver self.load_times = [] self.resource_usage = [] def optimize_page_load(self, url, max_wait=30, resource_threshold=80): """ Optimize page loading by blocking unnecessary resources Returns: Boolean indicating success """ start_time = time.time() # Block unnecessary resources self._block_resources() # Navigate to URL self.driver.get(url) # Wait for critical elements if not self._wait_for_critical_elements(max_wait): return False # Measure load time load_time = time.time() - start_time self.load_times.append(load_time) # Check resource usage usage = self._get_resource_usage() self.resource_usage.append(usage) # Apply additional optimizations if needed if usage['cpu'] > resource_threshold or usage['memory'] > resource_threshold: self._apply_aggressive_optimizations() return True def _block_resources(self): """Block unnecessary resources to speed up rendering""" # Block images self.driver.execute_cdp_cmd('Network.setBlockedURLs', { 'urls': ['*.png', '*.jpg', '*.jpeg', '*.gif', '*.svg'] }) # Block trackers trackers = [ '*doubleclick.net*', '*googleadservices.com*', '*analytics.js*', '*facebook.net*', '*taboola.com*' ] self.driver.execute_cdp_cmd('Network.setBlockedURLs', { 'urls': trackers }) # Enable network conditions throttling self.driver.execute_cdp_cmd('Network.emulateNetworkConditions', { 'offline': False, 'latency': 50, # ms 'downloadThroughput': 150 * 1024, # 150 KB/s 'uploadThroughput': 50 * 1024 # 50 KB/s }) def _wait_for_critical_elements(self, max_wait): """Wait for critical page elements to load""" try: # Wait for body to be present WebDriverWait(self.driver, max_wait).until( EC.presence_of_element_located((By.TAG_NAME, 'body')) ) # Wait for main content container (customizable per site) try: WebDriverWait(self.driver, max_wait).until( EC.presence_of_element_located((By.ID, 'main-content')) ) except: try: WebDriverWait(self.driver, max_wait).until( EC.presence_of_element_located((By.CLASS_NAME, 'content')) ) except: pass # Wait for JavaScript to stabilize self._wait_for_js_stabilization(max_wait) return True except Exception as e: print(f"Error waiting for critical elements: {str(e)}") return False def _wait_for_js_stabilization(self, max_wait): """Wait for JavaScript execution to stabilize""" start_time = time.time() last_height = 0 stable_count = 0 while time.time() - start_time < max_wait: # Check scroll height current_height = self.driver.execute_script( "return document.body.scrollHeight" ) # Check if height has stabilized if abs(current_height - last_height) < 10: stable_count += 1 if stable_count >= 3: # Stable for 3 consecutive checks return else: stable_count = 0 last_height = current_height time.sleep(0.5) def _get_resource_usage(self): """Get current resource usage metrics""" # Get CPU and memory usage from performance metrics metrics = self.driver.execute_cdp_cmd('Performance.getMetrics', {}) cpu_usage = next( (m['value'] for m in metrics['metrics'] if m['name'] == 'CPUUtilization'), 0 ) * 100 memory_usage = next( (m['value'] for m in metrics['metrics'] if m['name'] == 'JSHeapUsedSize'), 0 ) # Convert to percentage of typical limits memory_percent = min(100, (memory_usage / (256 * 1024 * 1024)) * 100) return { 'cpu': cpu_usage, 'memory': memory_percent, 'network_requests': self._get_network_request_count() } def _get_network_request_count(self): """Get count of network requests made""" # Implementation would track network requests return 0 # Simplified def _apply_aggressive_optimizations(self): """Apply aggressive optimizations when resources are constrained""" # Disable JavaScript for non-critical pages self.driver.execute_cdp_cmd('Emulation.setScriptExecutionDisabled', { 'value': True }) # Set lower resource thresholds self.driver.execute_cdp_cmd('Network.emulateNetworkConditions', { 'offline': False, 'latency': 100, # ms 'downloadThroughput': 50 * 1024, # 50 KB/s 'uploadThroughput': 20 * 1024 # 20 KB/s }) # Block more resources self.driver.execute_cdp_cmd('Network.setBlockedURLs', { 'urls': ['*.css', '*.js', '*.woff', '*.woff2'] }) def analyze_performance(self): """Analyze browser performance metrics""" if not self.load_times: return None return { 'avg_load_time': sum(self.load_times) / len(self.load_times), 'max_load_time': max(self.load_times), 'resource_usage': { 'avg_cpu': sum(u['cpu'] for u in self.resource_usage) / len(self.resource_usage), 'avg_memory': sum(u['memory'] for u in self.resource_usage) / len(self.resource_usage) }, 'recommendations': self._generate_optimization_recommendations() } def _generate_optimization_recommendations(self): """Generate optimization recommendations based on performance data""" recommendations = [] # Load time recommendations avg_load_time = sum(self.load_times) / len(self.load_times) if avg_load_time > 10: recommendations.append( "Page load time is high (>10s). Consider blocking more resources " "or using a simpler rendering strategy." ) # Resource usage recommendations avg_cpu = sum(u['cpu'] for u in self.resource_usage) / len(self.resource_usage) if avg_cpu > 70: recommendations.append( "High CPU usage detected. Consider reducing JavaScript execution " "or using resource throttling." ) avg_memory = sum(u['memory'] for u in self.resource_usage) / len(self.resource_usage) if avg_memory > 80: recommendations.append( "High memory usage detected. Consider closing browser after use " "or using lighter browser profiles." ) return recommendations def cleanup(self): """Clean up resources after browser usage""" # Reset network conditions self.driver.execute_cdp_cmd('Network.clearBrowserCaches', {}) self.driver.execute_cdp_cmd('Network.setBlockedURLs', {'urls': []}) self.driver.execute_cdp_cmd('Network.emulateNetworkConditions', { 'offline': False, 'latency': 0, 'downloadThroughput': -1, 'uploadThroughput': -1 }) # Close browser self.driver.quit() ``` --- ## **2. Browser Fingerprinting Countermeasures** 🖌️ Modern websites use sophisticated browser fingerprinting techniques that require advanced countermeasures. ### **Fingerprinting Detection System** #### **1. Fingerprinting Detection Engine** ```python # fingerprint_detection.py import re from selenium import webdriver from selenium.webdriver.common.by import By import time class FingerprintingDetector: def __init__(self, driver): self.driver = driver self.detected_techniques = [] self.suspicious_patterns = self._load_suspicious_patterns() def _load_suspicious_patterns(self): """Load patterns that indicate fingerprinting attempts""" return [ # Canvas fingerprinting { 'name': 'canvas_fingerprinting', 'pattern': r'CanvasRenderingContext2D|toDataURL|getImageData', 'weight': 0.8, 'description': 'Canvas-based fingerprinting technique' }, # WebGL fingerprinting { 'name': 'webgl_fingerprinting', 'pattern': r'WebGLRenderingContext|getParameter|getSupportedExtensions', 'weight': 0.9, 'description': 'WebGL-based fingerprinting technique' }, # AudioContext fingerprinting { 'name': 'audio_fingerprinting', 'pattern': r'AudioContext|createOscillator|analyserNode', 'weight': 0.7, 'description': 'AudioContext-based fingerprinting technique' }, # Font detection { 'name': 'font_detection', 'pattern': r'div\.style\.fontFamily|document\.fonts', 'weight': 0.6, 'description': 'Font enumeration fingerprinting' }, # Plugin detection { 'name': 'plugin_detection', 'pattern': r'navigator\.plugins|navigator\.mimeTypes', 'weight': 0.5, 'description': 'Browser plugin enumeration' }, # Touch support detection { 'name': 'touch_detection', 'pattern': r'\'ontouchstart\' in document|window\.TouchEvent', 'weight': 0.4, 'description': 'Touch capability detection' }, # Hardware concurrency { 'name': 'hardware_concurrency', 'pattern': r'navigator\.hardwareConcurrency', 'weight': 0.7, 'description': 'Hardware concurrency detection' }, # Device memory { 'name': 'device_memory', 'pattern': r'navigator\.deviceMemory', 'weight': 0.6, 'description': 'Device memory detection' }, # Battery status { 'name': 'battery_status', 'pattern': r'navigator\.getBattery', 'weight': 0.3, 'description': 'Battery status API detection' }, # WebRTC IP leakage { 'name': 'webrtc_ip_leak', 'pattern': r'RTCPeerConnection|createDataChannel', 'weight': 0.9, 'description': 'WebRTC IP address leakage' } ] def detect_fingerprinting(self, url, timeout=15): """ Detect fingerprinting techniques on a website Returns: List of detected fingerprinting techniques """ # Navigate to URL self.driver.get(url) # Wait for page to load time.sleep(2) # Get page source page_source = self.driver.page_source # Analyze for suspicious patterns detected = [] for pattern in self.suspicious_patterns: if re.search(pattern['pattern'], page_source): detected.append({ 'technique': pattern['name'], 'confidence': pattern['weight'], 'description': pattern['description'], 'evidence': self._get_evidence(pattern['pattern'], page_source) }) # Check for dynamic fingerprinting (requires executing JS) dynamic_techniques = self._detect_dynamic_fingerprinting() detected.extend(dynamic_techniques) # Store results self.detected_techniques = detected return detected def _get_evidence(self, pattern, page_source): """Get specific evidence for detected pattern""" matches = re.findall(pattern, page_source) return matches[:3] # Return up to 3 examples def _detect_dynamic_fingerprinting(self): """Detect dynamically executed fingerprinting techniques""" detected = [] # Check for canvas fingerprinting canvas_result = self.driver.execute_script(''' try { var canvas = document.createElement('canvas'); var ctx = canvas.getContext('2d'); ctx.textBaseline = "top"; ctx.font = "14px 'Arial'"; ctx.textAlign = "none"; ctx.fillStyle = "#f60"; ctx.fillText("BrowserLeaks", 2, 15); ctx.fillStyle = "rgba(102, 204, 0, 0.7)"; ctx.fillText("com", 4, 17); return canvas.toDataURL(); } catch (e) { return null; } ''') if canvas_result and 'data:image/png;base64' in canvas_result: detected.append({ 'technique': 'canvas_fingerprinting', 'confidence': 0.9, 'description': 'Active canvas fingerprinting detected', 'evidence': [canvas_result[:50] + '...'] }) # Check for WebGL fingerprinting webgl_result = self.driver.execute_script(''' try { var canvas = document.createElement('canvas'); var gl = canvas.getContext('webgl') || canvas.getContext('experimental-webgl'); if (!gl) return null; var info = { vendor: gl.getParameter(gl.VENDOR), renderer: gl.getParameter(gl.RENDERER), version: gl.getParameter(gl.VERSION), shadingLanguageVersion: gl.getParameter(gl.SHADING_LANGUAGE_VERSION) }; return JSON.stringify(info); } catch (e) { return null; } ''') if webgl_result: detected.append({ 'technique': 'webgl_fingerprinting', 'confidence': 0.95, 'description': 'Active WebGL fingerprinting detected', 'evidence': [webgl_result] }) # Check for AudioContext fingerprinting audio_result = self.driver.execute_script(''' try { var audioCtx = new (window.AudioContext || window.webkitAudioContext)(); var oscillator = audioCtx.createOscillator(); var analyser = audioCtx.createAnalyser(); oscillator.connect(analyser); analyser.connect(audioCtx.destination); oscillator.start(); var bufferLength = analyser.frequencyBinCount; var dataArray = new Uint8Array(bufferLength); analyser.getByteFrequencyData(dataArray); return JSON.stringify({ sampleRate: audioCtx.sampleRate, frequencyData: Array.from(dataArray).slice(0, 10) }); } catch (e) { return null; } ''') if audio_result: detected.append({ 'technique': 'audio_fingerprinting', 'confidence': 0.85, 'description': 'Active AudioContext fingerprinting detected', 'evidence': [audio_result] }) return detected def get_countermeasures(self): """Get recommended countermeasures for detected techniques""" countermeasures = [] for technique in self.detected_techniques: if technique['technique'] == 'canvas_fingerprinting': countermeasures.append({ 'technique': 'canvas_fingerprinting', 'countermeasure': 'Spoof canvas rendering results', 'implementation': self._spoof_canvas }) elif technique['technique'] == 'webgl_fingerprinting': countermeasures.append({ 'technique': 'webgl_fingerprinting', 'countermeasure': 'Spoof WebGL parameters', 'implementation': self._spoof_webgl }) elif technique['technique'] == 'audio_fingerprinting': countermeasures.append({ 'technique': 'audio_fingerprinting', 'countermeasure': 'Spoof AudioContext results', 'implementation': self._spoof_audio }) # Add more countermeasures as needed return countermeasures def _spoof_canvas(self): """Spoof canvas fingerprinting results""" self.driver.execute_script(''' const toDataURL = HTMLCanvasElement.prototype.toDataURL; HTMLCanvasElement.prototype.toDataURL = function() { return ''; }; ''') def _spoof_webgl(self): """Spoof WebGL fingerprinting results""" self.driver.execute_script(''' const getParameter = WebGLRenderingContext.prototype.getParameter; WebGLRenderingContext.prototype.getParameter = function(parameter) { if (parameter === 37445) return 'Spoofed Vendor'; if (parameter === 37446) return 'Spoofed Renderer'; return getParameter.apply(this, [parameter]); }; ''') def _spoof_audio(self): """Spoof AudioContext fingerprinting results""" self.driver.execute_script(''' const AudioContext = window.AudioContext || window.webkitAudioContext; const originalAudioContext = AudioContext; class CustomAudioContext extends originalAudioContext { constructor() { super(); this.sampleRate = 44100; } createAnalyser() { const analyser = super.createAnalyser(); const originalGetByteFrequencyData = analyser.getByteFrequencyData; analyser.getByteFrequencyData = function(array) { // Fill with consistent values for (let i = 0; i < array.length; i++) { array[i] = i % 256; } return originalGetByteFrequencyData.call(this, array); }; return analyser; } } window.AudioContext = CustomAudioContext; window.webkitAudioContext = CustomAudioContext; ''') def apply_countermeasures(self): """Apply all recommended countermeasures""" countermeasures = self.get_countermeasures() for cm in countermeasures: cm['implementation']() return len(countermeasures) ``` ### **Advanced Fingerprinting Countermeasures** #### **1. Comprehensive Fingerprint Spoofing System** ```python # fingerprint_spoofing.py import random import string from selenium import webdriver class FingerprintSpoofingSystem: def __init__(self, driver): self.driver = driver self.spoofed_values = {} def spoof_all(self): """Apply all spoofing techniques""" self.spoof_timezone() self.spoof_language() self.spoof_resolution() self.spoof_user_agent() self.spoof_webgl() self.spoof_canvas() self.spoof_audio() self.spoof_fonts() self.spoof_plugins() self.spoof_touch() self.spoof_hardware() self.spoof_webdriver() self.spoof_webrtc() def spoof_timezone(self, timezone=None): """Spoof timezone information""" timezones = [ 'America/New_York', 'America/Chicago', 'America/Denver', 'America/Los_Angeles', 'Europe/London', 'Europe/Paris', 'Asia/Tokyo', 'Australia/Sydney' ] if not timezone: timezone = random.choice(timezones) self.driver.execute_cdp_cmd('Emulation.setTimezoneOverride', { 'timezoneId': timezone }) self.spoofed_values['timezone'] = timezone def spoof_language(self, language=None): """Spoof language preferences""" languages = [ 'en-US,en;q=0.9', 'en-GB,en;q=0.8', 'es-ES,es;q=0.9', 'fr-FR,fr;q=0.9', 'de-DE,de;q=0.9', 'ja-JP,ja;q=0.9' ] if not language: language = random.choice(languages) self.driver.execute_cdp_cmd('Emulation.setLocaleOverride', { 'locale': language.split(',')[0] }) self.spoofed_values['language'] = language def spoof_resolution(self, width=None, height=None): """Spoof screen resolution""" resolutions = [ (1920, 1080), (1366, 768), (1536, 864), (1440, 900), (2560, 1440), (3840, 2160) ] if width is None or height is None: width, height = random.choice(resolutions) self.driver.execute_cdp_cmd('Emulation.setDeviceMetricsOverride', { 'width': width, 'height': height, 'deviceScaleFactor': 1, 'mobile': False }) self.spoofed_values['resolution'] = f"{width}x{height}" def spoof_user_agent(self, user_agent=None): """Spoof user agent string""" user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1' ] if not user_agent: user_agent = random.choice(user_agents) self.driver.execute_cdp_cmd('Network.setUserAgentOverride', { 'userAgent': user_agent }) self.spoofed_values['user_agent'] = user_agent def spoof_webgl(self, vendor=None, renderer=None): """Spoof WebGL fingerprinting parameters""" vendors = [ 'Google Inc.', 'Intel Inc.', 'NVIDIA Corporation', 'AMD Corporation', 'ARM Limited', 'Imagination Technologies' ] renderers = [ 'ANGLE (Intel, Intel(R) UHD Graphics 630 Direct3D11 vs_5_0 ps_5_0)', 'ANGLE (NVIDIA, NVIDIA GeForce GTX 1080 Ti Direct3D11 vs_5_0 ps_5_0)', 'WebKit WebGL', 'Mozilla Labs - WebGL' ] if not vendor: vendor = random.choice(vendors) if not renderer: renderer = random.choice(renderers) self.driver.execute_script(f''' const getParameter = WebGLRenderingContext.prototype.getParameter; WebGLRenderingContext.prototype.getParameter = function(parameter) {{ if (parameter === 37445) return '{vendor}'; if (parameter === 37446) return '{renderer}'; return getParameter.apply(this, [parameter]); }}; ''') self.spoofed_values['webgl'] = {'vendor': vendor, 'renderer': renderer} def spoof_canvas(self): """Spoof canvas fingerprinting results""" self.driver.execute_script(''' const toDataURL = HTMLCanvasElement.prototype.toDataURL; HTMLCanvasElement.prototype.toDataURL = function() { return ''; }; const getImageData = CanvasRenderingContext2D.prototype.getImageData; CanvasRenderingContext2D.prototype.getImageData = function() { const data = getImageData.apply(this, arguments); // Modify pixel data to be consistent for (let i = 0; i < data.data.length; i += 4) { data.data[i] = 255; // R data.data[i+1] = 0; // G data.data[i+2] = 0; // B data.data[i+3] = 255; // A } return data; }; ''') self.spoofed_values['canvas'] = 'spoofed' def spoof_audio(self): """Spoof AudioContext fingerprinting results""" self.driver.execute_script(''' const AudioContext = window.AudioContext || window.webkitAudioContext; const originalAudioContext = AudioContext; class CustomAudioContext extends originalAudioContext { constructor() { super(); this.sampleRate = 44100; } createAnalyser() { const analyser = super.createAnalyser(); const originalGetByteFrequencyData = analyser.getByteFrequencyData; analyser.getByteFrequencyData = function(array) { // Fill with consistent values for (let i = 0; i < array.length; i++) { array[i] = i % 256; } return originalGetByteFrequencyData.call(this, array); }; return analyser; } } window.AudioContext = CustomAudioContext; window.webkitAudioContext = CustomAudioContext; ''') self.spoofed_values['audio'] = 'spoofed' def spoof_fonts(self, font_list=None): """Spoof available font list""" default_fonts = [ 'Arial', 'Times New Roman', 'Helvetica', 'Courier New', 'Verdana', 'Georgia', 'Comic Sans MS', 'Trebuchet MS' ] if not font_list: font_list = random.sample(default_fonts, k=random.randint(5, 8)) self.driver.execute_script(f''' // Override document.fonts const originalFonts = document.fonts; document.fonts = new Proxy(originalFonts, {{ get: function(target, prop) {{ if (prop === 'check') {{ return function() {{ return true; }}; }} if (prop === 'keys' || prop === 'values' || prop === 'entries') {{ return function* () {{ for (const font of {json.dumps(font_list)}) {{ yield font; }} }}; }} return target[prop]; }}, has: function(target, prop) {{ return prop === 'check' || prop === 'keys' || prop === 'values' || prop === 'entries' || target.hasOwnProperty(prop); }} }}); // Override getComputedStyle for font detection const originalGetComputedStyle = window.getComputedStyle; window.getComputedStyle = function(element, pseudoElt) {{ const style = originalGetComputedStyle(element, pseudoElt); return new Proxy(style, {{ get: function(target, prop) {{ if (prop === 'fontFamily') {{ return '{", ".join(font_list)}'; }} return target[prop]; }} }}); }}; ''') self.spoofed_values['fonts'] = font_list def spoof_plugins(self, plugins=None): """Spoof browser plugin list""" default_plugins = [ {'name': 'Chrome PDF Viewer', 'description': 'Portable Document Format'}, {'name': 'Widevine Content Decryption Module', 'description': 'Enables Widevine licenses'}, {'name': 'Native Client', 'description': 'Runs native code inside Chrome'} ] if plugins is None: plugins = random.sample(default_plugins, k=random.randint(1, 3)) plugins_script = json.dumps(plugins) self.driver.execute_script(f''' // Override navigator.plugins Object.defineProperty(navigator, 'plugins', {{ get: function() {{ return {plugins_script}.map(plugin => ({{ name: plugin.name, description: plugin.description, item: function() {{ return this; }}, namedItem: function() {{ return this; }} })); }} }}); // Override navigator.mimeTypes Object.defineProperty(navigator, 'mimeTypes', {{ get: function() {{ return {plugins_script}.map(plugin => ({{ type: 'application/pdf', description: plugin.description, suffixes: 'pdf', enabledPlugin: {{ name: plugin.name, description: plugin.description }} })); }} }}); ''') self.spoofed_values['plugins'] = plugins def spoof_touch(self, max_touch_points=None, has_touch=False): """Spoof touch capabilities""" if max_touch_points is None: max_touch_points = random.choice([0, 5, 10]) self.driver.execute_script(f''' // Override touch events ['touchstart', 'touchmove', 'touchend', 'touchcancel'].forEach(function(event) {{ document.addEventListener(event, function(e) {{ e.preventDefault(); }}, {{ passive: false }}); }}); // Override navigator.maxTouchPoints Object.defineProperty(navigator, 'maxTouchPoints', {{ get: function() {{ return {max_touch_points}; }} }}); // Override 'ontouchstart' in document Object.defineProperty(document, 'ontouchstart', {{ get: function() {{ return {str(has_touch).lower()}; }}, set: function() {{ }} }}); // Override window.TouchEvent window.TouchEvent = {str(has_touch).lower()}; ''') self.spoofed_values['touch'] = { 'max_touch_points': max_touch_points, 'has_touch': has_touch } def spoof_hardware(self, hardware_concurrency=None, device_memory=None): """Spoof hardware capabilities""" if hardware_concurrency is None: hardware_concurrency = random.choice([2, 4, 8, 16]) if device_memory is None: device_memory = random.choice([2, 4, 8]) self.driver.execute_script(f''' // Override hardwareConcurrency Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: function() {{ return {hardware_concurrency}; }} }}); // Override deviceMemory Object.defineProperty(navigator, 'deviceMemory', {{ get: function() {{ return {device_memory}; }} }}); ''') self.spoofed_values['hardware'] = { 'hardware_concurrency': hardware_concurrency, 'device_memory': device_memory } def spoof_webdriver(self): """Spoof WebDriver detection""" self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { 'source': ''' Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); window.navigator.chrome = { app: {isInstalled: false}, webstore: {onInstallStageChanged: {}, onDownloadProgress: {}}, runtime: {PlatformOs: {MAC: 'mac', WIN: 'win', ANDROID: 'android', CROS: 'cros', LINUX: 'linux', OPENBSD: 'openbsd'}, PlatformArch: {ARM: 'arm', X86_32: 'x86-32', X86_64: 'x86-64'}, PlatformNaclArch: {ARM: 'arm', X86_32: 'x86-32', X86_64: 'x86-64'}, RefreshType: {NONE: 0, FORMAL: 1, OPTIONAL: 2}} }; const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => ( parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); ''' }) self.spoofed_values['webdriver'] = 'spoofed' def spoof_webrtc(self): """Spoof WebRTC to prevent IP leakage""" self.driver.execute_script(''' // Override WebRTC to prevent IP leakage const originalGetUserMedia = navigator.mediaDevices.getUserMedia; navigator.mediaDevices.getUserMedia = function(constraints) { return originalGetUserMedia(constraints) .then(stream => { // Modify the stream to prevent IP leakage return stream; }); }; // Override RTCPeerConnection const originalRTCPeerConnection = window.RTCPeerConnection; window.RTCPeerConnection = function(...args) { const peerConnection = new originalRTCPeerConnection(...args); const originalCreateOffer = peerConnection.createOffer; peerConnection.createOffer = function(...args) { return originalCreateOffer.apply(this, args) .then(offer => { // Remove local IP addresses offer.sdp = offer.sdp.replace(/(c=IN IP4)(\\s+\\d+\\.\\d+\\.\\d+\\.\\d+)/g, '$1 0.0.0.0'); return offer; }); }; const originalCreateAnswer = peerConnection.createAnswer; peerConnection.createAnswer = function(...args) { return originalCreateAnswer.apply(this, args) .then(answer => { // Remove local IP addresses answer.sdp = answer.sdp.replace(/(c=IN IP4)(\\s+\\d+\\.\\d+\\.\\d+\\.\\d+)/g, '$1 0.0.0.0'); return answer; }); }; return peerConnection; }; ''') self.spoofed_values['webrtc'] = 'spoofed' ``` --- ## **3. Large-Scale Data Processing Pipelines** 📊 Handling massive amounts of scraped data requires robust data processing infrastructure. ### **Distributed Data Processing Architecture** #### **1. Data Pipeline Orchestration System** ```python # data_pipeline.py import time import uuid import json from datetime import datetime from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed class DataPipelineOrchestrator: def __init__(self, max_workers=10, batch_size=100): self.stages = [] self.max_workers = max_workers self.batch_size = batch_size self.pipeline_id = str(uuid.uuid4()) self.metrics = { 'start_time': None, 'end_time': None, 'total_items': 0, 'processed_items': 0, 'failed_items': 0, 'stage_metrics': defaultdict(dict) } self.error_log = [] def add_stage(self, stage_name, processor, batched=False): """Add a processing stage to the pipeline""" self.stages.append({ 'name': stage_name, 'processor': processor, 'batched': batched, 'start_time': None, 'end_time': None, 'items_processed': 0, 'items_failed': 0 }) def process(self, data_stream): """ Process data through the pipeline Args: data_stream: Iterable of data items Returns: Processed data stream """ self.metrics['start_time'] = datetime.utcnow() self.metrics['total_items'] = 0 # Initialize output queue output = [] # Process data through each stage current_data = data_stream for stage in self.stages: stage['start_time'] = datetime.utcnow() stage['items_processed'] = 0 stage['items_failed'] = 0 print(f"Starting stage: {stage['name']}") # Process data for this stage if stage['batched']: # Process in batches batch = [] for item in current_data: self.metrics['total_items'] += 1 batch.append(item) if len(batch) >= self.batch_size: self._process_batch(stage, batch, output) batch = [] # Process remaining items if batch: self._process_batch(stage, batch, output) else: # Process items individually for item in current_data: self.metrics['total_items'] += 1 self._process_item(stage, item, output) # Update metrics stage['end_time'] = datetime.utcnow() self.metrics['stage_metrics'][stage['name']] = { 'duration': (stage['end_time'] - stage['start_time']).total_seconds(), 'items_processed': stage['items_processed'], 'items_failed': stage['items_failed'] } # Prepare for next stage current_data = output output = [] self.metrics['end_time'] = datetime.utcnow() self.metrics['processed_items'] = self.metrics['total_items'] - self.metrics['failed_items'] print(f"Pipeline completed. Processed {self.metrics['processed_items']}/{self.metrics['total_items']} items.") return current_data def _process_batch(self, stage, batch, output_queue): """Process a batch of items through a stage""" try: # Process batch results = stage['processor'](batch) # Add successful results to output for result in results: output_queue.append(result) stage['items_processed'] += 1 return True except Exception as e: # Log error self._log_error(stage['name'], batch, str(e)) # Add failed items to error log for item in batch: self.metrics['failed_items'] += 1 stage['items_failed'] += 1 return False def _process_item(self, stage, item, output_queue): """Process a single item through a stage""" try: # Process item result = stage['processor'](item) # Add to output if successful if result is not None: output_queue.append(result) stage['items_processed'] += 1 return True except Exception as e: # Log error self._log_error(stage['name'], item, str(e)) # Track failure self.metrics['failed_items'] += 1 stage['items_failed'] += 1 return False def _log_error(self, stage, data, error): """Log processing error""" error_id = str(uuid.uuid4()) timestamp = datetime.utcnow().isoformat() self.error_log.append({ 'error_id': error_id, 'timestamp': timestamp, 'stage': stage, 'error': error, 'data_sample': str(data)[:500] # Truncate large data }) print(f"ERROR in stage {stage}: {error}") print(f" Error ID: {error_id}") print(f" Data sample: {str(data)[:100]}...") def get_metrics(self): """Get pipeline performance metrics""" return { 'pipeline_id': self.pipeline_id, 'total_items': self.metrics['total_items'], 'processed_items': self.metrics['processed_items'], 'failed_items': self.metrics['failed_items'], 'total_duration': (self.metrics['end_time'] - self.metrics['start_time']).total_seconds() if self.metrics['end_time'] else None, 'stage_metrics': dict(self.metrics['stage_metrics']) } def get_error_log(self): """Get error log for troubleshooting""" return self.error_log def save_metrics(self, file_path): """Save metrics to file""" with open(file_path, 'w') as f: json.dump(self.get_metrics(), f, indent=2) def save_error_log(self, file_path): """Save error log to file""" with open(file_path, 'w') as f: json.dump(self.get_error_log(), f, indent=2) def create_distributed_pipeline(self, num_nodes=3): """Create a distributed pipeline across multiple nodes""" pipelines = [] for i in range(num_nodes): pipeline = DataPipelineOrchestrator( max_workers=self.max_workers, batch_size=self.batch_size ) # Copy stages for stage in self.stages: pipeline.add_stage( stage['name'], stage['processor'], stage['batched'] ) pipelines.append(pipeline) return DistributedPipeline(pipelines) class DistributedPipeline: def __init__(self, pipelines): self.pipelines = pipelines def process(self, data_stream): """Process data using distributed pipelines""" # Split data among pipelines data_chunks = self._split_data(data_stream, len(self.pipelines)) # Process in parallel results = [] with ThreadPoolExecutor(max_workers=len(self.pipelines)) as executor: futures = { executor.submit(pipeline.process, chunk): i for i, (pipeline, chunk) in enumerate(zip(self.pipelines, data_chunks)) } for future in as_completed(futures): i = futures[future] try: chunk_results = future.result() results.extend(chunk_results) except Exception as e: print(f"Pipeline {i} failed: {str(e)}") return results def _split_data(self, data_stream, num_chunks): """Split data stream into chunks""" chunks = [[] for _ in range(num_chunks)] for i, item in enumerate(data_stream): chunks[i % num_chunks].append(item) return chunks def get_metrics(self): """Get metrics from all pipelines""" return [pipeline.get_metrics() for pipeline in self.pipelines] def get_error_logs(self): """Get error logs from all pipelines""" return [pipeline.get_error_log() for pipeline in self.pipelines] ``` #### **2. Schema Evolution Manager** ```python # schema_evolution.py import json import re from datetime import datetime class SchemaEvolutionManager: def __init__(self): self.schemas = {} # {schema_id: schema_definition} self.version_history = {} # {schema_id: [versions]} self.compatibility_rules = { 'backward': self._check_backward_compatibility, 'forward': self._check_forward_compatibility, 'full': self._check_full_compatibility } def register_schema(self, schema_id, schema_definition, compatibility='backward'): """ Register a new schema or update an existing one Returns: Schema version number """ # Validate schema if not self._validate_schema(schema_definition): raise ValueError("Invalid schema definition") # Check compatibility if updating existing schema if schema_id in self.schemas: current_schema = self.schemas[schema_id] if not self.check_compatibility(current_schema, schema_definition, compatibility): raise ValueError(f"Schema update is not {compatibility}-compatible") # Create version version = self._create_version(schema_id) # Store schema if schema_id not in self.schemas: self.schemas[schema_id] = schema_definition self.version_history[schema_id] = [] # Update version history self.version_history[schema_id].append({ 'version': version, 'schema': schema_definition, 'timestamp': datetime.utcnow().isoformat(), 'compatibility': compatibility }) # Update current schema self.schemas[schema_id] = schema_definition return version def _validate_schema(self, schema): """Validate schema structure""" # Implementation would validate against schema standard # This is a simplified version required_fields = ['type', 'fields'] return all(field in schema for field in required_fields) def _create_version(self, schema_id): """Create new version number""" if schema_id not in self.version_history or not self.version_history[schema_id]: return '1.0.0' # Get current version current_version = self.version_history[schema_id][-1]['version'] # Increment minor version major, minor, patch = map(int, current_version.split('.')) return f"{major}.{minor+1}.0" def get_schema(self, schema_id, version=None): """Get schema definition""" if schema_id not in self.schemas: raise ValueError(f"Schema {schema_id} not found") if version: # Find specific version for ver_info in reversed(self.version_history[schema_id]): if ver_info['version'] == version: return ver_info['schema'] raise ValueError(f"Version {version} not found for schema {schema_id}") # Return current schema return self.schemas[schema_id] def get_version_history(self, schema_id): """Get version history for a schema""" if schema_id not in self.version_history: return [] return self.version_history[schema_id] def check_compatibility(self, old_schema, new_schema, compatibility_type='backward'): """Check if schema update is compatible""" if compatibility_type not in self.compatibility_rules: raise ValueError(f"Unknown compatibility type: {compatibility_type}") return self.compatibility_rules[compatibility_type](old_schema, new_schema) def _check_backward_compatibility(self, old_schema, new_schema): """ Check if new schema is backward compatible with old schema (new consumers can read old data) """ # Fields can be added but not removed old_fields = {f['name'] for f in old_schema['fields']} new_fields = {f['name'] for f in new_schema['fields']} # Check for removed fields removed_fields = old_fields - new_fields if removed_fields: return False # Check field type compatibility for field in old_schema['fields']: new_field = next((f for f in new_schema['fields'] if f['name'] == field['name']), None) if new_field and not self._is_type_compatible(field['type'], new_field['type']): return False return True def _check_forward_compatibility(self, old_schema, new_schema): """ Check if new schema is forward compatible with old schema (old consumers can read new data) """ # Fields can be removed but not added old_fields = {f['name'] for f in old_schema['fields']} new_fields = {f['name'] for f in new_schema['fields']} # Check for added fields added_fields = new_fields - old_fields if added_fields: return False # Check field type compatibility for field in new_schema['fields']: old_field = next((f for f in old_schema['fields'] if f['name'] == field['name']), None) if old_field and not self._is_type_compatible(field['type'], old_field['type']): return False return True def _check_full_compatibility(self, old_schema, new_schema): """Check if schema update is fully compatible (both backward and forward)""" return ( self._check_backward_compatibility(old_schema, new_schema) and self._check_forward_compatibility(old_schema, new_schema) ) def _is_type_compatible(self, old_type, new_type): """Check if type conversion is compatible""" # Simplified type compatibility compatibility_map = { 'int': ['int', 'long', 'float', 'double'], 'long': ['long', 'float', 'double'], 'float': ['float', 'double'], 'string': ['string', 'bytes'], 'bytes': ['bytes', 'string'], 'boolean': ['boolean'], 'null': ['null', 'string', 'bytes'] } return new_type in compatibility_map.get(old_type, []) def evolve_data(self, data, from_schema_id, to_schema_id, from_version=None, to_version=None): """ Evolve data from one schema version to another Returns: Transformed data """ # Get schemas from_schema = self.get_schema(from_schema_id, from_version) to_schema = self.get_schema(to_schema_id, to_version) # Check compatibility path if not self._can_evolve(from_schema, to_schema): raise ValueError("Cannot evolve data between these schemas") # Transform data return self._transform_data(data, from_schema, to_schema) def _can_evolve(self, from_schema, to_schema): """Check if data can be evolved between schemas""" # Implementation would check compatibility path return True # Simplified def _transform_data(self, data, from_schema, to_schema): """Transform data from one schema to another""" transformed = {} # Map fields for to_field in to_schema['fields']: # Find matching field in source schema from_field = next( (f for f in from_schema['fields'] if f['name'] == to_field['name']), None ) if from_field and from_field['name'] in data: # Convert value if needed value = self._convert_value( data[from_field['name']], from_field['type'], to_field['type'] ) transformed[to_field['name']] = value elif to_field.get('default') is not None: # Use default value transformed[to_field['name']] = to_field['default'] return transformed def _convert_value(self, value, from_type, to_type): """Convert value between types""" if from_type == to_type: return value # Numeric conversions if from_type in ['int', 'long', 'float', 'double'] and to_type in ['int', 'long', 'float', 'double']: return self._convert_numeric(value, from_type, to_type) # String conversions if from_type == 'string' and to_type == 'bytes': return value.encode('utf-8') if from_type == 'bytes' and to_type == 'string': return value.decode('utf-8') # Null handling if value is None: return None raise ValueError(f"Cannot convert from {from_type} to {to_type}") def _convert_numeric(self, value, from_type, to_type): """Convert numeric values between types""" # Convert to float first for consistent handling if from_type == 'int': numeric = int(value) elif from_type == 'long': numeric = int(value) elif from_type in ['float', 'double']: numeric = float(value) else: raise ValueError(f"Unsupported numeric type: {from_type}") # Convert to target type if to_type == 'int': return int(numeric) elif to_type == 'long': return int(numeric) elif to_type in ['float', 'double']: return float(numeric) raise ValueError(f"Unsupported numeric type: {to_type}") def generate_schema_diff(self, schema_id, version1, version2): """Generate diff between two schema versions""" schema1 = self.get_schema(schema_id, version1) schema2 = self.get_schema(schema_id, version2) changes = { 'added_fields': [], 'removed_fields': [], 'modified_fields': [] } # Find added fields fields1 = {f['name'] for f in schema1['fields']} fields2 = {f['name'] for f in schema2['fields']} changes['added_fields'] = list(fields2 - fields1) changes['removed_fields'] = list(fields1 - fields2) # Find modified fields for field in fields1 & fields2: f1 = next(f for f in schema1['fields'] if f['name'] == field) f2 = next(f for f in schema2['fields'] if f['name'] == field) if f1['type'] != f2['type']: changes['modified_fields'].append({ 'field': field, 'from_type': f1['type'], 'to_type': f2['type'] }) return changes ``` --- ## Continued in next section ..