# **Part 5: Specialized Web Scraping – Social Media, Mobile Apps, Dark Web, and Advanced Data Extraction** **Duration:** ~60 minutes **Hashtags:** #SocialMediaScraping #MobileScraping #DarkWeb #FinancialData #MediaExtraction #AuthScraping #ScrapingSaaS #APIReverseEngineering #EthicalScraping #DataScience --- ## **Table of Contents** 1. [Advanced Social Media Scraping with API Reverse Engineering](#social-media-scraping) 2. [Mobile App Scraping and Reverse Engineering](#mobile-app-scraping) 3. [Dark Web Data Extraction (Ethical Considerations)](#dark-web-extraction) 4. [Real-Time Financial Data Scraping](#financial-data-scraping) 5. [Image and Video Content Extraction at Scale](#media-content-extraction) 6. [Scraping Behind Complex Authentication Systems](#complex-auth-scraping) 7. [Building Scraping-as-a-Service Platforms](#scraping-saas) 8. [Quiz: Specialized Scraping Mastery](#specialized-quiz) 9. [Conclusion and Future of Web Scraping](#part5-conclusion) --- ## **1. Advanced Social Media Scraping with API Reverse Engineering** 📱 Social media platforms present unique challenges with their sophisticated anti-scraping measures and constantly changing APIs. ### **The Social Media Scraping Landscape** #### **Platform-Specific Challenges** | Platform | Key Challenges | Data Value | |----------|----------------|------------| | **Facebook** | Graph API restrictions, complex authentication, shadow banning | User behavior, page insights, ad data | | **Twitter/X** | Rate limits, API costs, real-time streaming complexity | Public sentiment, trending topics, influencer data | | **Instagram** | App-only API, image-heavy content, strict rate limits | Visual trends, influencer marketing, engagement metrics | | **LinkedIn** | Strict ToS, complex authentication, professional data sensitivity | Professional networks, job market trends, company insights | | **TikTok** | Mobile-first API, sophisticated anti-bot measures | Viral content, emerging trends, youth demographics | ### **API Reverse Engineering Techniques** #### **1. Network Traffic Analysis** ```python # network_analysis.py import mitmproxy.http from mitmproxy import ctx import json import re class SocialMediaAnalyzer: def __init__(self): self.endpoints = {} self.auth_tokens = {} self.request_patterns = {} def request(self, flow: mitmproxy.http.HTTPFlow): """Process outgoing requests""" url = flow.request.pretty_url headers = dict(flow.request.headers) method = flow.request.method # Extract platform from URL platform = self._identify_platform(url) if not platform: return # Track endpoints self._track_endpoint(platform, url, method) # Extract authentication tokens self._extract_auth_tokens(platform, headers) # Log request patterns self._log_request_pattern(platform, url, flow.request.content) def response(self, flow: mitmproxy.http.HTTPFlow): """Process incoming responses""" url = flow.request.pretty_url platform = self._identify_platform(url) if not platform: return # Analyze response structure self._analyze_response_structure(platform, flow.response.content) # Detect anti-scraping measures self._detect_anti_scraping_measures(platform, flow.response) def _identify_platform(self, url): """Identify social media platform from URL""" patterns = { 'facebook': r'facebook\.com|graph\.facebook\.com', 'twitter': r'twitter\.com|api\.twitter\.com', 'instagram': r'instagram\.com|i\.instagram\.com', 'linkedin': r'linkedin\.com|api\.linkedin\.com', 'tiktok': r'tiktok\.com|api2\.musical\.ly|api-t2\.musical\.ly' } for platform, pattern in patterns.items(): if re.search(pattern, url): return platform return None def _track_endpoint(self, platform, url, method): """Track API endpoints and their usage patterns""" if platform not in self.endpoints: self.endpoints[platform] = {} # Normalize URL (remove IDs, etc.) normalized = re.sub(r'/\d+/', '/{id}/', url) normalized = re.sub(r'\?[\w=&]+', '', normalized) if normalized not in self.endpoints[platform]: self.endpoints[platform][normalized] = { 'method': method, 'count': 0, 'last_seen': None } self.endpoints[platform][normalized]['count'] += 1 self.endpoints[platform][normalized]['last_seen'] = time.time() def _extract_auth_tokens(self, platform, headers): """Extract authentication tokens from headers""" tokens = {} # Facebook/Instagram if 'x-fb-http-engine' in headers: tokens['fb_dtsg'] = self._extract_fb_dtsg(headers.get('cookie', '')) tokens['jazoest'] = self._extract_jazoest(headers.get('cookie', '')) # Twitter if 'x-csrf-token' in headers: tokens['csrf_token'] = headers['x-csrf-token'] # TikTok if 'x-tt-token' in headers: tokens['tt_token'] = headers['x-tt-token'] if tokens: self.auth_tokens[platform] = tokens def _extract_fb_dtsg(self, cookie): """Extract Facebook DTSG token from cookie""" match = re.search(r'\"dtsg\":\{\""token\"":\""([^"]+)\""}', cookie) return match.group(1) if match else None def _extract_jazoest(self, cookie): """Extract Facebook jazoest token""" match = re.search(r'jazoest=(\d+)', cookie) return match.group(1) if match else None def _log_request_pattern(self, platform, url, content): """Log patterns in request content""" if not content: return try: # Try parsing as JSON data = json.loads(content) self._analyze_json_structure(platform, data) except: # Analyze as form data self._analyze_form_data(platform, content) def _analyze_json_structure(self, platform, data): """Analyze JSON request structure""" if platform not in self.request_patterns: self.request_patterns[platform] = { 'fields': {}, 'endpoints': {} } # Track fields used in requests for key in data.keys(): self.request_patterns[platform]['fields'][key] = ( self.request_patterns[platform]['fields'].get(key, 0) + 1 ) def _analyze_response_structure(self, platform, content): """Analyze API response structure""" try: data = json.loads(content) # Track common response patterns # Implementation would extract schema patterns except: pass def _detect_anti_scraping_measures(self, platform, response): """Detect anti-scraping measures in responses""" # Check for CAPTCHA responses if b'captcha' in response.content.lower(): ctx.log.info(f"{platform}: CAPTCHA detected") # Check for rate limiting if response.status_code == 429: ctx.log.info(f"{platform}: Rate limit hit") # Extract retry-after header retry_after = response.headers.get('retry-after') if retry_after: ctx.log.info(f"{platform}: Retry after {retry_after} seconds") # Check for shadow banning indicators if platform == 'twitter' and response.status_code == 200: try: data = json.loads(response.content) if 'timeline' in data and not data['timeline'].get('instructions'): ctx.log.info("Twitter: Possible shadow ban - empty timeline") except: pass def get_api_map(self): """Generate API map for a platform""" return { 'endpoints': self.endpoints, 'auth_tokens': self.auth_tokens, 'request_patterns': self.request_patterns } # Usage with mitmproxy # Run with: mitmproxy -s network_analysis.py ``` ### **Platform-Specific Scraping Strategies** #### **1. Twitter/X API Reverse Engineering** ```python # twitter_api.py import requests import json import time import random from datetime import datetime class TwitterAPI: def __init__(self, cookies=None, proxy=None): self.session = requests.Session() self.proxy = proxy # Twitter API endpoints (from reverse engineering) self.endpoints = { 'guest_token': 'https://api.twitter.com/1.1/guest/activate.json', 'user_tweets': 'https://api.twitter.com/2/timeline/profile/{user_id}.json', 'search': 'https://api.twitter.com/2/search/adaptive.json', 'tweet_details': 'https://api.twitter.com/2/timeline/conversation/{tweet_id}.json' } # Required headers self.headers = { 'authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA', 'x-guest-token': '', 'x-twitter-auth-type': 'OAuth2Session', 'x-twitter-active-user': 'yes', 'x-twitter-client-language': 'en', 'content-type': 'application/json' } # Query parameters self.params = { 'user_tweets': { 'include_profile_interstitial_type': '1', 'include_blocking': '1', 'include_blocked_by': '1', 'include_followed_by': '1', 'include_want_retweets': '1', 'include_mute_edge': '1', 'include_can_dm': '1', 'include_can_media_tag': '1', 'skip_status': '1', 'cards_platform': 'Web-12', 'include_cards': '1', 'include_ext_alt_text': 'true', 'include_quote_count': 'true', 'include_reply_count': '1', 'tweet_mode': 'extended', 'include_entities': 'true', 'include_user_entities': 'true', 'include_ext_media_color': 'true', 'include_ext_media_availability': 'true', 'send_error_codes': 'true', 'simple_quoted_tweets': 'false', 'count': '20', 'ext': 'mediaStats,highlightedLabel' }, 'search': { 'q': '', 'product': 'Latest', 'count': '20', 'include_entities': 'true', 'tweet_mode': 'extended', 'simple_quoted_tweets': 'false', 'spelling_corrections': '1', 'ext': 'mediaStats,highlightedLabel' } } # Get guest token self._get_guest_token() # Add cookies if provided if cookies: self.session.cookies.update(cookies) def _get_guest_token(self): """Get guest token for API requests""" response = self.session.post( self.endpoints['guest_token'], headers={ 'authorization': self.headers['authorization'] }, proxies=self.proxy ) if response.status_code == 200: guest_token = response.json()['guest_token'] self.headers['x-guest-token'] = guest_token return guest_token raise Exception(f"Failed to get guest token: {response.status_code}") def _refresh_guest_token(self): """Refresh guest token when expired""" time.sleep(1) return self._get_guest_token() def _make_request(self, endpoint, params=None, json_data=None, method='GET'): """Make authenticated API request""" try: if method == 'GET': response = self.session.get( endpoint, headers=self.headers, params=params, proxies=self.proxy ) else: response = self.session.post( endpoint, headers=self.headers, params=params, json=json_data, proxies=self.proxy ) # Handle rate limiting if response.status_code == 429: retry_after = int(response.headers.get('x-rate-limit-reset', 60)) time.sleep(retry_after + 5) return self._make_request(endpoint, params, json_data, method) # Handle guest token expiration if 'errors' in response.text and 'guest token' in response.text.lower(): self._refresh_guest_token() return self._make_request(endpoint, params, json_data, method) response.raise_for_status() return response.json() except Exception as e: print(f"API request failed: {str(e)}") raise def get_user_id(self, username): """Get user ID from username (requires authenticated session)""" # This would typically require an authenticated session # Implementation varies based on Twitter's current API structure pass def get_user_tweets(self, user_id, count=20): """Get tweets from a user's profile""" endpoint = self.endpoints['user_tweets'].format(user_id=user_id) params = self.params['user_tweets'].copy() params['count'] = str(count) return self._make_request(endpoint, params=params) def search_tweets(self, query, count=20): """Search for tweets matching a query""" params = self.params['search'].copy() params['q'] = query params['count'] = str(count) return self._make_request(self.endpoints['search'], params=params) def get_tweet_details(self, tweet_id): """Get details for a specific tweet""" endpoint = self.endpoints['tweet_details'].format(tweet_id=tweet_id) params = { 'include_conversation_info': 'true', 'include_profile_interstitial_type': '1', 'include_blocking': '1', 'include_blocked_by': '1', 'include_followed_by': '1', 'include_want_retweets': '1', 'include_mute_edge': '1', 'include_can_dm': '1', 'include_can_media_tag': '1', 'skip_status': '1', 'cards_platform': 'Web-12', 'include_cards': '1', 'include_ext_alt_text': 'true', 'include_quote_count': 'true', 'include_reply_count': '1', 'tweet_mode': 'extended', 'include_entities': 'true', 'include_user_entities': 'true', 'include_ext_media_color': 'true', 'include_ext_media_availability': 'true', 'send_error_codes': 'true', 'simple_quoted_tweets': 'false', 'count': '20', 'ext': 'mediaStats,highlightedLabel' } return self._make_request(endpoint, params=params) def extract_tweet_data(self, tweet_json): """Extract structured data from tweet JSON""" # Implementation would parse Twitter's complex response structure pass ``` #### **2. Instagram Mobile API Scraping** ```python # instagram_api.py import requests import json import time import re from urllib.parse import urlparse, parse_qs class InstagramAPI: def __init__(self, cookies=None, proxy=None): self.session = requests.Session() self.proxy = proxy self.device_id = self._generate_device_id() # Instagram API endpoints self.endpoints = { 'login': 'https://i.instagram.com/api/v1/accounts/login/', 'user_id': 'https://i.instagram.com/api/v1/users/{username}/usernameinfo/', 'user_feed': 'https://i.instagram.com/api/v1/feed/user/{user_id}/', 'post_details': 'https://i.instagram.com/api/v1/media/{media_id}/info/', 'search_users': 'https://i.instagram.com/api/v1/users/search/?query={query}&count=50' } # Required headers self.headers = { 'user-agent': self._get_ig_user_agent(), 'x-ig-capabilities': '3brTvw==', 'x-ig-connection-type': 'WIFI', 'x-ig-app-id': '567067343352427', 'x-ig-www-claim': 'hmac.AR2Fvp7vD8dH2J6-8i8iG5QjG0JdY0pYJZ2Z3Q1e5b6c7d8e', 'accept-language': 'en-US', 'accept-encoding': 'gzip, deflate', 'x-fb-http-engine': 'Liger' } # Add cookies if provided if cookies: self.session.cookies.update(cookies) def _generate_device_id(self): """Generate Instagram device ID""" return f"android-{random.randint(0, 1e12):x}:{int(time.time())}" def _get_ig_user_agent(self): """Generate Instagram-compatible User-Agent""" versions = [ '192.168.3.11.123', '192.168.3.11.125', '192.168.3.11.129', '234.1.0.0.132' ] return ( f"Instagram {random.choice(versions)} Android " "(29/10; 560dpi; 1440x2891; samsung; SM-G975F; " "star2qltesq; exynos9820; en_US; 361658937)" ) def _make_request(self, endpoint, params=None, data=None, method='GET'): """Make authenticated API request""" # Add common parameters if params is None: params = {} params['device_id'] = self.device_id params['guid'] = self._generate_guid() params['adid'] = self._generate_guid() # Add Instagram signature if signature = self._generate_signature(data) params['signed_body'] = f"{signature}.{json.dumps(data)}" try: if method == 'GET': response = self.session.get( endpoint, headers=self.headers, params=params, proxies=self.proxy ) else: response = self.session.post( endpoint, headers=self.headers, params=params, data=data, proxies=self.proxy ) # Handle rate limiting if response.status_code == 429: retry_after = int(response.headers.get('x-ratelimit-reset', 60)) time.sleep(retry_after + 5) return self._make_request(endpoint, params, data, method) # Handle invalid session if 'login_required' in response.text: raise Exception("Login required - session expired") response.raise_for_status() return response.json() except Exception as e: print(f"API request failed: {str(e)}") raise def _generate_guid(self): """Generate Instagram GUID""" return str(uuid.uuid4()) def _generate_signature(self, data): """Generate Instagram request signature""" # Instagram uses a specific signature algorithm # This is a simplified version data_json = json.dumps(data, separators=(',', ':')) return hmac.new( b'ig_secret_key', data_json.encode(), hashlib.sha256 ).hexdigest() def get_user_id(self, username): """Get user ID from username""" endpoint = self.endpoints['user_id'].format(username=username) response = self._make_request(endpoint) return response['user']['pk'] def get_user_feed(self, user_id, count=12): """Get a user's feed/posts""" endpoint = self.endpoints['user_feed'].format(user_id=user_id) params = { 'count': str(count), 'max_id': '', 'min_timestamp': '', 'rank_token': self._generate_guid(), 'include_fixed_destinations': 'true' } return self._make_request(endpoint, params=params) def get_post_details(self, media_id): """Get details for a specific post""" endpoint = self.endpoints['post_details'].format(media_id=media_id) return self._make_request(endpoint) def search_users(self, query, count=10): """Search for users by query""" endpoint = self.endpoints['search_users'].format(query=query) response = self._make_request(endpoint) return response['users'][:count] def extract_post_data(self, post_json): """Extract structured data from post JSON""" # Implementation would parse Instagram's response structure pass def extract_user_data(self, user_json): """Extract structured data from user JSON""" # Implementation would parse user profile data pass ``` ### **Ethical Considerations for Social Media Scraping** #### **Social Media Scraping Policy Framework** ```python # social_media_policy.py from datetime import datetime, timedelta class SocialMediaPolicy: def __init__(self): self.platform_policies = { 'facebook': self._facebook_policy(), 'twitter': self._twitter_policy(), 'instagram': self._instagram_policy(), 'linkedin': self._linkedin_policy(), 'tiktok': self._tiktok_policy() } self.user_consent = {} self.compliance_log = [] def _facebook_policy(self): """Facebook scraping policy based on ToS and GDPR""" return { 'allowed_data': [ 'public_page_info', 'public_post_content', 'public_event_info' ], 'prohibited_data': [ 'private_messages', 'user_contact_info', 'friend_lists', 'non-public_content' ], 'rate_limits': { 'requests_per_hour': 200, 'delay_between_requests': 3.0 }, 'consent_required': True, 'data_retention': timedelta(days=30), 'gdpr_compliance': True, 'ccpa_compliance': True } def _twitter_policy(self): """Twitter scraping policy based on ToS and GDPR""" return { 'allowed_data': [ 'public_tweets', 'public_user_profiles', 'trending_topics' ], 'prohibited_data': [ 'direct_messages', 'non-public_metrics', 'user_email_addresses' ], 'rate_limits': { 'requests_per_15min': 900, # Standard API limit 'delay_between_requests': 1.0 }, 'consent_required': False, # Public data is generally allowed 'data_retention': timedelta(days=7), 'gdpr_compliance': True, 'ccpa_compliance': True } def _instagram_policy(self): """Instagram scraping policy based on ToS and GDPR""" return { 'allowed_data': [ 'public_profiles', 'public_posts', 'public_comments' ], 'prohibited_data': [ 'private_messages', 'follower_lists', 'user_location_data' ], 'rate_limits': { 'requests_per_hour': 200, 'delay_between_requests': 3.0 }, 'consent_required': True, 'data_retention': timedelta(days=30), 'gdpr_compliance': True, 'ccpa_compliance': True } def _linkedin_policy(self): """LinkedIn scraping policy based on ToS and GDPR""" return { 'allowed_data': [ 'public_company_pages', 'public_job_listings' ], 'prohibited_data': [ 'user_profiles', 'connection_data', 'messaging_data', 'non-public_content' ], 'rate_limits': { 'requests_per_hour': 100, 'delay_between_requests': 5.0 }, 'consent_required': True, 'data_retention': timedelta(days=14), 'gdpr_compliance': True, 'ccpa_compliance': True } def _tiktok_policy(self): """TikTok scraping policy based on ToS and GDPR""" return { 'allowed_data': [ 'public_videos', 'public_profiles', 'public_hashtags' ], 'prohibited_data': [ 'user_contacts', 'private_messages', 'user_location_history' ], 'rate_limits': { 'requests_per_hour': 300, 'delay_between_requests': 2.0 }, 'consent_required': True, 'data_retention': timedelta(days=30), 'gdpr_compliance': True, 'ccpa_compliance': True } def check_compliance(self, platform, data_request, user_id=None): """ Check if data request complies with platform policy Returns: Tuple of (is_compliant, reason, recommended_action) """ policy = self.platform_policies.get(platform) if not policy: return (False, "Unsupported platform", "Do not proceed") # Check data types unauthorized_data = [ d for d in data_request['data_types'] if d in policy['prohibited_data'] ] if unauthorized_ return ( False, f"Requesting prohibited data: {', '.join(unauthorized_data)}", "Remove prohibited data types from request" ) # Check consent (if required) if policy['consent_required'] and not self._has_valid_consent(user_id, platform): return ( False, "Valid consent required but not provided", "Obtain explicit user consent" ) # Check rate limits if not self._check_rate_limits(platform): return ( False, "Exceeding rate limits for platform", "Implement request throttling" ) # Check data retention retention_days = (data_request.get('retention_date', datetime.utcnow()) - datetime.utcnow()).days if retention_days > policy['data_retention'].days: return ( False, f"Requested data retention ({retention_days} days) exceeds policy limit " f"({policy['data_retention'].days} days)", f"Reduce retention period to {policy['data_retention'].days} days" ) return (True, "Request complies with platform policy", "Proceed with request") def _has_valid_consent(self, user_id, platform): """Check if valid consent exists for user and platform""" if not user_id: return False consent = self.user_consent.get(user_id, {}).get(platform) if not consent: return False # Check consent expiration if consent['expiration'] < datetime.utcnow(): return False return True def _check_rate_limits(self, platform): """Check if current request stays within rate limits""" # Implementation would track request history return True def log_compliance_check(self, platform, request, result): """Log compliance check for auditing""" self.compliance_log.append({ 'timestamp': datetime.utcnow().isoformat(), 'platform': platform, 'request': request, 'result': result, 'user_id': request.get('user_id') }) def get_policy_summary(self, platform): """Get human-readable policy summary for a platform""" policy = self.platform_policies.get(platform) if not policy: return "Policy information not available for this platform" return ( f"Scraping Policy for {platform.capitalize()}:\n\n" f"ALLOWED DATA:\n- " + "\n- ".join(policy['allowed_data']) + "\n\n" f"PROHIBITED DATA:\n- " + "\n- ".join(policy['prohibited_data']) + "\n\n" f"RATE LIMITS: {policy['rate_limits']['requests_per_hour']} requests/hour\n" f"DATA RETENTION: Up to {policy['data_retention'].days} days\n" f"CONSENT REQUIRED: {'Yes' if policy['consent_required'] else 'No'}\n\n" f"COMPLIANCE: GDPR {'✓' if policy['gdpr_compliance'] else '✗'}, " f"CCPA {'✓' if policy['ccpa_compliance'] else '✗'}" ) ``` --- ## **2. Mobile App Scraping and Reverse Engineering** 📲 Mobile apps present unique challenges with their native code, encrypted traffic, and device-specific protections. ### **Mobile Traffic Interception Techniques** #### **1. SSL Pinning Bypass** ```python # ssl_pinning_bypass.py import frida import sys import time class SSLPinningBypasser: def __init__(self, package_name): self.package_name = package_name self.device = frida.get_usb_device() self.session = None self.script = None def start(self): """Start the SSL pinning bypass""" print(f"Attaching to {self.package_name}...") # Spawn the app pid = self.device.spawn([self.package_name]) self.session = self.device.attach(pid) # Load bypass script with open('ssl_bypass.js') as f: js_code = f.read() # Create and load script self.script = self.session.create_script(js_code) self.script.on('message', self._on_message) self.script.load() # Resume the app self.device.resume(pid) print(f"SSL pinning bypass active for {self.package_name}") return True def _on_message(self, message, data): """Handle messages from Frida script""" if message['type'] == 'send': print(f"[FRIDA] {message['payload']}") else: print(message) def stop(self): """Stop the SSL pinning bypass""" if self.session: self.session.detach() self.session = None print("SSL pinning bypass stopped") def interact(self): """Interactive mode to keep the script running""" print("Press Ctrl+C to stop...") try: while True: time.sleep(1) except KeyboardInterrupt: self.stop() # SSL bypass script (ssl_bypass.js) """ // SSL pinning bypass for Android and iOS console.log("SSL pinning bypass script loaded"); // Android bypass if (Java.available) { Java.perform(function() { console.log("Android environment detected"); // OkHttp bypass var OkHttpClient = Java.use('okhttp3.OkHttpClient'); OkHttpClient$Builder = Java.use('okhttp3.OkHttpClient$Builder'); OkHttpClient$Builder.build.implementation = function() { this.hostnameVerifier({verify: function() { return true; }}); this.sslSocketFactory({}, {}); return this.build(); }; // TrustManager bypass var TrustManagerImpl = Java.use('com.android.org.conscrypt.TrustManagerImpl'); TrustManagerImpl.checkTrustedRecursive.implementation = function() { console.log("[SSL] Bypassing TrustManager check"); return; }; console.log("Android SSL pinning bypass installed"); }); } // iOS bypass if (ObjC.available) { console.log("iOS environment detected"); // NSURLSession bypass var NSURLSession = ObjC.classes.NSURLSession; var NSURLSessionTask = ObjC.classes.NSURLSessionTask; Interceptor.attach(NSURLSessionTask['- _willPerformHTTPRedirection:'], { onEnter: function(args) { console.log("[SSL] Bypassing redirection SSL check"); var newRequest = args[2]; Interceptor.attach(newRequest['- URL'], { onLeave: function(retval) { retval.replace(ObjC.classes.NSURL.URLWithString_("https://example.com")); } }); } }); console.log("iOS SSL pinning bypass installed"); } """ # Usage if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: python ssl_pinning_bypass.py <package_name>") sys.exit(1) bypasser = SSLPinningBypasser(sys.argv[1]) if bypasser.start(): bypasser.interact() ``` #### **2. Mobile API Reverse Engineering** ```python # mobile_api.py import requests import json import re from urllib.parse import urlparse class MobileAPIAnalyzer: def __init__(self, proxy='http://localhost:8080'): self.proxy = {'http': proxy, 'https': proxy} self.session = requests.Session() self.endpoints = {} self.auth_mechanisms = {} self.request_patterns = {} def intercept_traffic(self, duration=60): """ Intercept mobile app traffic for analysis Args: duration: How long to intercept traffic (seconds) """ print(f"Starting traffic interception for {duration} seconds...") print("Please use the mobile app during this time") # This would typically connect to a proxy like Burp Suite # For demonstration, we'll simulate traffic capture time.sleep(duration) # Analyze captured traffic self._analyze_captured_traffic() return { 'endpoints': self.endpoints, 'auth_mechanisms': self.auth_mechanisms, 'request_patterns': self.request_patterns } def _analyze_captured_traffic(self): """Analyze captured HTTP/HTTPS traffic""" # In a real implementation, this would process captured traffic # Here we simulate with sample data self._process_sample_traffic() def _process_sample_traffic(self): """Process sample traffic for demonstration""" # Simulated endpoints self.endpoints = { 'auth': { 'url': 'https://api.example.com/v1/auth/login', 'methods': ['POST'], 'params': ['username', 'password', 'device_id'] }, 'user_profile': { 'url': 'https://api.example.com/v1/users/{user_id}', 'methods': ['GET'], 'params': ['user_id'], 'auth_required': True }, 'feed': { 'url': 'https://api.example.com/v1/feed', 'methods': ['GET'], 'params': ['limit', 'cursor'], 'auth_required': True } } # Simulated auth mechanisms self.auth_mechanisms = { 'type': 'JWT', 'header': 'Authorization', 'format': 'Bearer {token}', 'token_lifetime': 3600 } # Simulated request patterns self.request_patterns = { 'common_headers': [ 'User-Agent', 'Content-Type', 'Authorization', 'X-Device-ID', 'X-App-Version' ], 'device_identifiers': [ 'device_id', 'advertising_id', 'device_model', 'os_version' ] } def generate_client_sdk(self, language='python'): """ Generate client SDK for the mobile API Returns: Code for API client in specified language """ if language == 'python': return self._generate_python_sdk() elif language == 'javascript': return self._generate_javascript_sdk() else: raise ValueError(f"Unsupported language: {language}") def _generate_python_sdk(self): """Generate Python SDK for the mobile API""" endpoints = self.endpoints auth = self.auth_mechanisms sdk_code = f"""# Auto-generated mobile API client import requests import json import time class MobileAPIClient: def __init__(self, base_url="{self._extract_base_url()}", api_key=None): self.base_url = base_url self.api_key = api_key self.session = requests.Session() self.token = None self.token_expiry = 0 self.device_id = self._generate_device_id() def _generate_device_id(self): \"\"\"Generate device identifier\"\"\" return f"device-{{int(time.time())}}-{{hash(str(time.time()))}}" def _ensure_authenticated(self): \"\"\"Ensure we have a valid authentication token\"\"\" if self.token and time.time() < self.token_expiry: return if not self.api_key: raise ValueError("API key required for authentication") # Get new token response = self.session.post( f"{{self.base_url}}/auth/token", json={{'api_key': self.api_key, 'device_id': self.device_id}} ) response.raise_for_status() data = response.json() self.token = data['token'] self.token_expiry = time.time() + data.get('expires_in', 3600) - 60 def _make_request(self, endpoint, method='GET', params=None, data=None): \"\"\"Make authenticated API request\"\"\" self._ensure_authenticated() url = f"{{self.base_url}}{{endpoint}}" headers = {{ 'Authorization': '{auth['format'].replace('{token}', '{{self.token}}')}', 'Content-Type': 'application/json', 'X-Device-ID': self.device_id }} if method.upper() == 'GET': response = self.session.get( url, headers=headers, params=params ) else: response = self.session.request( method, url, headers=headers, params=params, json=data ) response.raise_for_status() return response.json() """ # Add endpoint methods for name, endpoint in endpoints.items(): if 'auth_required' in endpoint and not endpoint['auth_required']: auth_comment = "# No authentication required" else: auth_comment = "# Authentication required" sdk_code += f" # {endpoint['url']} - {auth_comment}\n" sdk_code += f" def {self._snake_case(name)}(self" # Add parameters if 'params' in endpoint: for param in endpoint['params']: sdk_code += f", {param}=None" sdk_code += "):\n" sdk_code += f" \"\"\"{endpoint['url']}\"\"\"\n" # Build params dictionary if 'params' in endpoint: sdk_code += " params = {}\n" for param in endpoint['params']: sdk_code += f" if {param} is not None:\n" sdk_code += f" params['{param}'] = {param}\n" # Make request sdk_code += f" return self._make_request(\n" sdk_code += f" '{endpoint['url'].replace(self._extract_base_url(), '')}',\n" sdk_code += f" method='{endpoint['methods'][0]}',\n" if 'params' in endpoint: sdk_code += " params=params\n" else: sdk_code += " params=None\n" sdk_code += " )\n\n" return sdk_code def _extract_base_url(self): """Extract base URL from endpoints""" if not self.endpoints: return "https://api.example.com/v1" # Get first endpoint URL first_url = list(self.endpoints.values())[0]['url'] parsed = urlparse(first_url) return f"{parsed.scheme}://{parsed.netloc}{parsed.path.rsplit('/', 1)[0]}" def _snake_case(self, name): """Convert name to snake_case""" s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() ``` ### **Mobile-Specific Scraping Challenges** #### **1. Device Fingerprinting Countermeasures** ```python # device_fingerprint.py import uuid import random import string from datetime import datetime class MobileDeviceFingerprint: def __init__(self, platform='android'): self.platform = platform self.fingerprint = self._generate_fingerprint() def _generate_fingerprint(self): """Generate realistic mobile device fingerprint""" if self.platform == 'android': return self._generate_android_fingerprint() elif self.platform == 'ios': return self._generate_ios_fingerprint() else: raise ValueError(f"Unsupported platform: {self.platform}") def _generate_android_fingerprint(self): """Generate Android-specific device fingerprint""" manufacturers = ['samsung', 'google', 'xiaomi', 'huawei', 'oneplus'] models = { 'samsung': ['SM-G975F', 'SM-N975F', 'SM-A505FN'], 'google': ['Pixel 3', 'Pixel 4', 'Pixel 5'], 'xiaomi': ['MI 9', 'Redmi Note 8', 'Mi 10'], 'huawei': ['P30 Pro', 'Mate 20', 'Nova 5T'], 'oneplus': ['ONEPLUS A6003', 'ONEPLUS 7T', 'ONEPLUS 8'] } android_versions = ['10', '11', '12'] manufacturer = random.choice(manufacturers) model = random.choice(models[manufacturer]) android_version = random.choice(android_versions) return { 'platform': 'android', 'manufacturer': manufacturer, 'model': model, 'android_version': android_version, 'device_id': self._generate_device_id(), 'advertising_id': self._generate_advertising_id(), 'screen_resolution': self._generate_screen_resolution(), 'locale': self._generate_locale(), 'user_agent': self._generate_user_agent(manufacturer, model, android_version), 'build_fingerprint': self._generate_build_fingerprint(manufacturer, model, android_version) } def _generate_ios_fingerprint(self): """Generate iOS-specific device fingerprint""" models = ['iPhone12,1', 'iPhone12,3', 'iPhone12,5', 'iPhone13,2', 'iPhone13,4'] ios_versions = ['14.4', '14.5', '15.0', '15.1'] model = random.choice(models) ios_version = random.choice(ios_versions) return { 'platform': 'ios', 'model': model, 'ios_version': ios_version, 'device_id': self._generate_device_id(), 'advertising_id': self._generate_advertising_id(), 'screen_resolution': self._generate_screen_resolution(), 'locale': self._generate_locale(), 'user_agent': self._generate_user_agent(model, ios_version), 'idfv': self._generate_idfv() } def _generate_device_id(self): """Generate device ID""" return f"android-{random.randint(0, 1e12):x}:{int(time.time())}" def _generate_advertising_id(self): """Generate advertising ID""" return str(uuid.uuid4()) def _generate_screen_resolution(self): """Generate realistic screen resolution""" resolutions = [ '1080x1920', '1440x2960', '720x1280', '1080x2340', '1125x2436', '1284x2778' ] return random.choice(resolutions) def _generate_locale(self): """Generate locale information""" locales = [ 'en-US', 'en-GB', 'es-ES', 'fr-FR', 'de-DE', 'ja-JP', 'ko-KR', 'zh-CN' ] return random.choice(locales) def _generate_user_agent(self, *args): """Generate platform-specific user agent""" if self.platform == 'android': manufacturer, model, android_version = args return ( f"Mozilla/5.0 (Linux; Android {android_version}; {model}) " "AppleWebKit/537.36 (KHTML, like Gecko) Mobile " "Chrome/91.0.4472.120 Safari/537.36" ) else: model, ios_version = args return ( f"Mozilla/5.0 (iPhone; CPU iPhone OS {ios_version.replace('.', '_')} " f"like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) " "Mobile/15E148 [FBAN/FBIOS;FBDV/{model};FBMD/iPhone;FBSN/iOS;" f"FBSV/{ios_version};FBSS/3;FBCR/;FBID/phone;FBLC/en_US;FBOP/5]" ) def _generate_build_fingerprint(self, manufacturer, model, android_version): """Generate Android build fingerprint""" build_ids = { '10': 'QQ2A.200501.001.B3', '11': 'RQ3A.210705.001', '12': 'SP2A.220505.008' } return ( f"{manufacturer}/{model}/{model}:{android_version}/" f"{build_ids[android_version]}/user/release-keys" ) def _generate_idfv(self): """Generate iOS Identifier for Vendors (IDFV)""" return str(uuid.uuid4()) def get_headers(self): """Get headers for mobile API requests""" fingerprint = self.fingerprint headers = { 'User-Agent': fingerprint['user_agent'], 'X-Device-ID': fingerprint['device_id'], 'X-Advertising-ID': fingerprint['advertising_id'], 'X-Device-Model': fingerprint['model'], 'X-OS-Version': fingerprint['android_version'] if self.platform == 'android' else fingerprint['ios_version'], 'Accept-Language': fingerprint['locale'].replace('-', '_') + ',en-US;q=0.9', 'Content-Type': 'application/json; charset=utf-8', 'X-Requested-With': 'XMLHttpRequest' } # Platform-specific headers if self.platform == 'android': headers.update({ 'X-Manufacturer': fingerprint['manufacturer'], 'X-Android-Version': fingerprint['android_version'], 'X-Screen-Resolution': fingerprint['screen_resolution'] }) else: headers.update({ 'X-Device-Type': 'iPhone', 'X-IDFV': fingerprint['idfv'] }) return headers def emulate_device(self, driver): """Emulate device in Selenium/WebDriver""" fingerprint = self.fingerprint # Set mobile emulation options mobile_emulation = { 'deviceMetrics': { 'width': int(fingerprint['screen_resolution'].split('x')[0]), 'height': int(fingerprint['screen_resolution'].split('x')[1]), 'pixelRatio': 3.0 }, 'userAgent': fingerprint['user_agent'] } # Configure Chrome options driver.execute_cdp_cmd('Emulation.setDeviceMetricsOverride', mobile_emulation) driver.execute_cdp_cmd('Network.setUserAgentOverride', { 'userAgent': fingerprint['user_agent'] }) # Set additional device properties driver.execute_script(""" Object.defineProperty(navigator, 'platform', { get: () => '%s' }); Object.defineProperty(navigator, 'deviceMemory', { get: () => 8 }); Object.defineProperty(navigator, 'hardwareConcurrency', { get: () => 8 }); """ % (fingerprint['model'])) ``` --- ## **3. Dark Web Data Extraction (Ethical Considerations)** 🌑 The dark web presents unique opportunities and risks for data extraction. This section focuses on ethical approaches to dark web data collection. ### **Understanding the Dark Web Landscape** #### **Dark Web Structure** ``` Surface Web (Indexed by search engines) → 90% of web ↓ Deep Web (Unindexed but legal content) → 9% of web ↓ Dark Web (Intentionally hidden services) → 0.005% of web ↓ ├── Tor Network (.onion sites) ├── I2P Network (.i2p sites) ├── Freenet └── Other decentralized networks ``` ### **Ethical Framework for Dark Web Research** #### **1. Ethical Decision Matrix** ```python # dark_web_ethics.py from datetime import datetime, timedelta class DarkWebEthics: def __init__(self, researcher_id, institution): self.researcher_id = researcher_id self.institution = institution self.approval_history = [] self.data_inventory = [] self.current_session = None def request_access(self, target, purpose, duration=1): """ Request ethical approval for dark web access Args: target: Dark web resource to access purpose: Research purpose duration: Duration of access in days Returns: Approval status and token """ # Validate researcher credentials if not self._validate_researcher(): return { 'approved': False, 'reason': 'Researcher not validated', 'token': None } # Check institutional approval if not self._check_institutional_approval(): return { 'approved': False, 'reason': 'Institutional approval required', 'token': None } # Evaluate ethical considerations ethics_check = self._evaluate_ethical_risks(target, purpose) if not ethics_check['approved']: return { **ethics_check, 'token': None } # Create access token access_token = self._generate_access_token( target, purpose, duration ) # Record approval self.approval_history.append({ 'timestamp': datetime.utcnow().isoformat(), 'researcher': self.researcher_id, 'institution': self.institution, 'target': target, 'purpose': purpose, 'duration': duration, 'token': access_token, 'expires': (datetime.utcnow() + timedelta(days=duration)).isoformat() }) # Start session self.current_session = { 'token': access_token, 'target': target, 'start_time': datetime.utcnow().isoformat(), 'end_time': (datetime.utcnow() + timedelta(days=duration)).isoformat() } return { 'approved': True, 'reason': 'Access approved', 'token': access_token, 'expires': self.current_session['end_time'] } def _validate_researcher(self): """Validate researcher credentials""" # Implementation would check against institutional database return True # Simplified for example def _check_institutional_approval(self): """Check for institutional review board approval""" # Implementation would verify IRB approval return True # Simplified for example def _evaluate_ethical_risks(self, target, purpose): """Evaluate ethical risks of dark web access""" # Check target against prohibited categories prohibited_categories = [ 'child_sexual_abuse_material', 'active_malware_distribution', 'human_trafficking', 'drug_marketplaces', 'weapon_marketplaces' ] # Analyze target URL target_category = self._categorize_target(target) # Check if target is in prohibited categories if target_category in prohibited_categories: return { 'approved': False, 'reason': f'Access to {target_category} is prohibited', 'risk_level': 'critical' } # Evaluate purpose valid_purposes = [ 'cybersecurity_research', 'threat_intelligence', 'academic_study', 'law_enforcement' ] if purpose not in valid_purposes: return { 'approved': False, 'reason': 'Invalid research purpose', 'risk_level': 'high' } # Check for minimal intrusion principle if not self._follows_minimal_intrusion(purpose): return { 'approved': False, 'reason': 'Does not follow minimal intrusion principle', 'risk_level': 'medium' } return { 'approved': True, 'reason': 'Ethical requirements met', 'risk_level': 'low' } def _categorize_target(self, target): """Categorize dark web target""" # Implementation would analyze target content # This is a simplified version if 'child' in target or 'cp' in target: return 'child_sexual_abuse_material' if 'malware' in target or 'exploit' in target: return 'active_malware_distribution' if 'drugs' in target or 'narcotics' in target: return 'drug_marketplaces' if 'weapons' in target or 'guns' in target: return 'weapon_marketplaces' return 'other' def _follows_minimal_intrusion(self, purpose): """Check if research follows minimal intrusion principle""" # Implementation would evaluate research methodology return True # Simplified for example def _generate_access_token(self, target, purpose, duration): """Generate secure access token""" import uuid import hashlib import time token_data = f"{self.researcher_id}|{target}|{purpose}|{duration}|{time.time()}" return hashlib.sha256(token_data.encode()).hexdigest() def log_data_collection(self, data_type, volume, sensitivity): """ Log data collection activities Args: data_type: Type of data collected volume: Volume of data (in MB) sensitivity: Sensitivity level (1-5) """ if not self.current_session: raise ValueError("No active session") # Validate sensitivity level if sensitivity < 1 or sensitivity > 5: raise ValueError("Sensitivity must be between 1-5") # Record data collection self.data_inventory.append({ 'timestamp': datetime.utcnow().isoformat(), 'session_token': self.current_session['token'], 'data_type': data_type, 'volume_mb': volume, 'sensitivity': sensitivity, 'researcher': self.researcher_id }) def get_ethical_guidelines(self): """Get comprehensive ethical guidelines for dark web research""" return { 'principles': [ 'Respect for persons: Recognize autonomy and protect those with diminished autonomy', 'Beneficence: Maximize possible benefits and minimize possible harms', 'Justice: Ensure fair distribution of research benefits and burdens', 'Minimal Intrusion: Collect only the minimum data necessary for research', 'Transparency: Document all research methods and data handling procedures', 'Accountability: Maintain clear audit trails of all research activities' ], 'prohibited_activities': [ 'Accessing or distributing child sexual abuse material', 'Participating in illegal marketplaces', 'Engaging in or facilitating cybercrime', 'Collecting personally identifiable information without consent', 'Causing disruption to dark web services' ], 'data_handling_requirements': [ 'All data must be encrypted at rest and in transit', 'Sensitive data must be stored in secure, access-controlled environments', 'Data retention must not exceed research needs', 'Personal data must be anonymized where possible', 'Regular security audits must be conducted' ] } def validate_session(self, token): """Validate if session token is valid""" if not self.current_session or self.current_session['token'] != token: return False # Check expiration if datetime.utcnow() > datetime.fromisoformat(self.current_session['end_time']): return False return True ``` ### **Technical Implementation for Safe Exploration** #### **1. Secure Dark Web Access Environment** ```python # dark_web_access.py import stem from stem.control import Controller import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import os import tempfile import subprocess import time class DarkWebAccess: def __init__(self, tor_port=9050, control_port=9051): self.tor_port = tor_port self.control_port = control_port self.tor_process = None self.session = None self.temp_dir = tempfile.mkdtemp() def start_tor(self, socks_port=None, control_port=None): """ Start a dedicated Tor instance for research Returns: Tor process object """ print("Starting isolated Tor instance for research...") # Create Tor configuration torrc = f""" SocksPort {socks_port or self.tor_port} ControlPort {control_port or self.control_port} DataDirectory {self.temp_dir}/tor_data Log notice stdout AvoidDiskWrites 1 """ # Write config to temporary file config_path = os.path.join(self.temp_dir, 'torrc') with open(config_path, 'w') as f: f.write(torrc) # Start Tor process self.tor_process = subprocess.Popen( ['tor', '-f', config_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # Wait for Tor to initialize time.sleep(5) # Verify Tor is running if not self._is_tor_running(): self.stop_tor() raise RuntimeError("Failed to start Tor instance") print(f"Tor instance started on SOCKS port {self.tor_port}") return self.tor_process def _is_tor_running(self): """Check if Tor is running and ready""" try: with Controller.from_port(port=self.control_port) as controller: controller.authenticate() return controller.is_new_consensus_available() except: return False def create_session(self): """ Create requests session with Tor proxy Returns: Configured requests session """ # Ensure Tor is running if not self.tor_process or not self._is_tor_running(): self.start_tor() # Create session with retry logic session = requests.Session() # Configure Tor proxy session.proxies = { 'http': f'socks5h://127.0.0.1:{self.tor_port}', 'https': f'socks5h://127.0.0.1:{self.tor_port}' } # Add retry mechanism retry = Retry( total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504] ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) # Set safe headers session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' }) self.session = session return session def get_onion_service(self, onion_url): """ Access an onion service with safety measures Returns: Response object """ # Validate URL format if not onion_url.endswith('.onion'): raise ValueError("Invalid onion URL - must end with .onion") # Create session if needed if not self.session: self.create_session() # Add safety headers self.session.headers.update({ 'Tor-Connection': 'Isolated', 'Research-Purpose': 'Academic/Cybersecurity' }) try: # Make request with timeout response = self.session.get( onion_url, timeout=30 ) # Check for dangerous content self._check_for_dangerous_content(response) return response except Exception as e: print(f"Error accessing {onion_url}: {str(e)}") raise def _check_for_dangerous_content(self, response): """Check response for prohibited content""" # Check content type content_type = response.headers.get('Content-Type', '') if 'application/octet-stream' in content_type: raise ValueError("Binary content detected - potential malware") # Check for known CSAM indicators (simplified) if self._contains_csam_indicators(response.text): raise ValueError("Potential child sexual abuse material detected") def _contains_csam_indicators(self, content): """Check for indicators of prohibited content""" # This is a simplified example - real implementation would be more sophisticated csam_keywords = [ 'child porn', 'cp', 'lolicon', 'shotacon', 'teen porn', 'young porn', 'minor porn' ] content_lower = content.lower() return any(keyword in content_lower for keyword in csam_keywords) def rotate_identity(self): """Rotate Tor identity for anonymity""" with Controller.from_port(port=self.control_port) as controller: controller.authenticate() controller.signal(stem.Signal.NEWNYM) print("Tor identity rotated") def stop_tor(self): """Stop the Tor instance and clean up""" if self.tor_process: self.tor_process.terminate() self.tor_process.wait() self.tor_process = None # Clean up temporary directory if os.path.exists(self.temp_dir): import shutil shutil.rmtree(self.temp_dir) print("Tor instance stopped and cleaned up") def __enter__(self): """Context manager entry""" self.start_tor() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit""" self.stop_tor() ``` --- ## **4. Real-Time Financial Data Scraping** 💰 Financial data scraping requires precision, speed, and regulatory compliance. ### **Financial Data Sources and Regulations** #### **Key Financial Data Regulations** | Regulation | Scope | Key Requirements | |------------|-------|------------------| | **SEC Rule 15c3-5** | US Broker-dealers | Market access controls, risk management | | **MiFID II** | EU Financial Markets | Best execution, transaction reporting | | **Regulation NMS** | US Equity Markets | Order protection, market data | | **GDPR** | EU Personal Data | Data protection, consent | | **FINRA Rules** | US Broker-dealers | Supervision, recordkeeping | ### **Real-Time Financial Data Pipeline** #### **1. Market Data Collection System** ```python # market_data.py import websocket import json import time import threading from datetime import datetime import pandas as pd from regulatory_compliance import FinancialCompliance class MarketDataCollector: def __init__(self, compliance_system): self.compliance = compliance_system self.connections = {} self.data_buffer = [] self.buffer_size = 1000 self.flush_interval = 5 # seconds self.flush_thread = None self.running = False def connect_to_exchange(self, exchange, symbols, callback=None): """ Connect to a financial exchange data feed Args: exchange: Exchange name (e.g., 'binance', 'nyse') symbols: List of symbols to track callback: Optional callback for real-time data """ # Check regulatory compliance if not self.compliance.check_market_data_access(exchange, symbols): raise RegulatoryViolationError( f"Access to {exchange} data for {symbols} not compliant" ) # Create connection based on exchange if exchange in ['binance', 'coinbase', 'kraken']: conn = self._connect_to_crypto_exchange(exchange, symbols, callback) elif exchange in ['nyse', 'nasdaq', 'amex']: conn = self._connect_to_stock_exchange(exchange, symbols, callback) else: raise ValueError(f"Unsupported exchange: {exchange}") self.connections[exchange] = conn return conn def _connect_to_crypto_exchange(self, exchange, symbols, callback): """Connect to cryptocurrency exchange WebSocket""" # Exchange-specific WebSocket URLs urls = { 'binance': 'wss://stream.binance.com:9443/stream', 'coinbase': 'wss://ws-feed.pro.coinbase.com', 'kraken': 'wss://ws.kraken.com' } # Format symbols for exchange if exchange == 'binance': stream_names = [f"{s.lower()}@ticker" for s in symbols] payload = { "method": "SUBSCRIBE", "params": stream_names, "id": 1 } elif exchange == 'coinbase': product_ids = [s.replace('-', '-') for s in symbols] payload = { "type": "subscribe", "product_ids": product_ids, "channels": ["ticker"] } elif exchange == 'kraken': pair_ids = [s.replace('-', '').lower() for s in symbols] payload = { "event": "subscribe", "pair": pair_ids, "subscription": {"name": "ticker"} } # Create WebSocket connection ws = websocket.WebSocketApp( urls[exchange], on_open=lambda ws: ws.send(json.dumps(payload)), on_message=lambda ws, msg: self._handle_crypto_message(ws, msg, exchange, callback), on_error=lambda ws, err: print(f"{exchange} error: {err}"), on_close=lambda ws, close_status_code, close_msg: print(f"{exchange} connection closed") ) # Start in separate thread thread = threading.Thread(target=ws.run_forever) thread.daemon = True thread.start() return { 'websocket': ws, 'thread': thread, 'exchange': exchange, 'symbols': symbols } def _handle_crypto_message(self, ws, message, exchange, callback): """Handle cryptocurrency market data message""" try: data = json.loads(message) # Process based on exchange format if exchange == 'binance': if 'data' in data and data['stream'].endswith('@ticker'): ticker = data['data'] symbol = ticker['s'] processed = { 'exchange': exchange, 'symbol': symbol, 'price': float(ticker['c']), 'volume': float(ticker['v']), 'timestamp': datetime.fromtimestamp(ticker['E'] / 1000) } elif exchange == 'coinbase': if data['type'] == 'ticker': processed = { 'exchange': exchange, 'symbol': data['product_id'], 'price': float(data['price']), 'volume': float(data['volume_24h']), 'timestamp': datetime.fromisoformat(data['time']) } elif exchange == 'kraken': if 'channelID' in data and 'ticker' in data.get('channelName', ''): pair = data['pair'] ticker = data['data'] processed = { 'exchange': exchange, 'symbol': pair, 'price': float(ticker['c'][0]), 'volume': float(ticker['v'][1]), 'timestamp': datetime.fromtimestamp(ticker['t'][1]) } # Add to buffer self._add_to_buffer(processed) # Execute callback if callback: callback(processed) except Exception as e: print(f"Error processing {exchange} message: {str(e)}") def _connect_to_stock_exchange(self, exchange, symbols, callback): """Connect to stock exchange data feed""" # Implementation would connect to financial data providers # like Polygon, Alpha Vantage, or exchange direct feeds pass def _add_to_buffer(self, data_point): """Add data point to buffer and flush if needed""" self.data_buffer.append(data_point) # Flush if buffer is full if len(self.data_buffer) >= self.buffer_size: self._flush_buffer() def start_buffer_flushing(self): """Start background thread for periodic buffer flushing""" self.running = True self.flush_thread = threading.Thread(target=self._buffer_flushing_loop) self.flush_thread.daemon = True self.flush_thread.start() def _buffer_flushing_loop(self): """Loop that periodically flushes the buffer""" while self.running: time.sleep(self.flush_interval) if self.data_buffer: self._flush_buffer() def _flush_buffer(self): """Flush buffer to storage with regulatory compliance""" if not self.data_buffer: return # Apply regulatory compliance checks compliant_data = [] for data in self.data_buffer: if self.compliance.check_data_storage_compliance(data): compliant_data.append(data) # Store compliant data self._store_data(compliant_data) # Clear buffer self.data_buffer = [] def _store_data(self, data_points): """Store data points in compliant storage""" # Implementation would store in regulated database pass def get_historical_data(self, exchange, symbol, start_date, end_date): """ Get historical data with regulatory compliance Returns: DataFrame of historical data """ # Check compliance if not self.compliance.check_historical_data_access(exchange, symbol): raise RegulatoryViolationError( f"Historical data access for {symbol} not compliant" ) # Get data (implementation would connect to historical data source) # This is a simplified example dates = pd.date_range(start=start_date, end=end_date) prices = [100 + random.gauss(0, 2) for _ in dates] return pd.DataFrame({ 'timestamp': dates, 'price': prices, 'volume': [random.randint(1000, 10000) for _ in dates] }) def stop(self): """Stop all connections and buffer flushing""" self.running = False # Close all WebSocket connections for conn in self.connections.values(): if 'websocket' in conn: conn['websocket'].close() # Flush remaining data if self.data_buffer: self._flush_buffer() ``` #### **2. Regulatory Compliance System** ```python # regulatory_compliance.py from datetime import datetime, timedelta class FinancialCompliance: def __init__(self): # Regulatory requirements by jurisdiction self.regulations = { 'sec': self._sec_rules(), 'finra': self._finra_rules(), 'mifid_ii': self._mifid_ii_rules(), 'gdpr': self._gdpr_rules() } self.user_permissions = {} self.data_access_log = [] def _sec_rules(self): """SEC regulatory requirements""" return { 'market_data_access': { 'allowed_exchanges': ['nyse', 'nasdaq', 'amex'], 'allowed_data_types': ['price', 'volume', 'trades'], 'prohibited_data_types': ['order_book_depth', 'individual_trades'], 'rate_limits': { 'requests_per_minute': 100 } }, 'data_storage': { 'retention_period': timedelta(days=30), 'encryption_required': True, 'audit_trail_required': True }, 'user_requirements': { 'certification_required': True, 'supervision_required': True } } def _finra_rules(self): """FINRA regulatory requirements""" return { 'market_data_access': { 'allowed_exchanges': ['nyse', 'nasdaq', 'amex', 'arca'], 'allowed_data_types': ['price', 'volume', 'trades', 'quotes'], 'prohibited_data_types': ['order_book_depth'], 'rate_limits': { 'requests_per_minute': 200 } }, 'data_storage': { 'retention_period': timedelta(days=90), 'encryption_required': True, 'audit_trail_required': True }, 'user_requirements': { 'certification_required': True, 'supervision_required': True } } def _mifid_ii_rules(self): """MiFID II regulatory requirements""" return { 'market_data_access': { 'allowed_exchanges': ['lse', 'euronext', 'deutsche_boerse'], 'allowed_data_types': ['price', 'volume', 'trades', 'quotes'], 'prohibited_data_types': [], 'rate_limits': { 'requests_per_minute': 150 } }, 'data_storage': { 'retention_period': timedelta(days=7), 'encryption_required': True, 'audit_trail_required': True }, 'user_requirements': { 'certification_required': False, 'supervision_required': True } } def _gdpr_rules(self): """GDPR requirements for financial data""" return { 'market_data_access': { 'allowed_data_types': ['aggregated_market_data'], 'prohibited_data_types': ['personal_trading_data'], 'consent_required': True }, 'data_storage': { 'retention_period': timedelta(days=30), 'encryption_required': True, 'right_to_erasure': True }, 'user_requirements': { 'data_protection_officer': True } } def check_market_data_access(self, exchange, symbols, user_id=None): """ Check if market data access is compliant Returns: Boolean indicating compliance """ # Determine applicable regulations regulations = self._determine_applicable_regulations(exchange) # Check each regulation for reg in regulations: if not self._check_market_data_rules(reg, exchange, symbols, user_id): return False return True def _determine_applicable_regulations(self, exchange): """Determine which regulations apply to an exchange""" exchange_reg_map = { 'nyse': ['sec', 'finra'], 'nasdaq': ['sec', 'finra'], 'amex': ['sec', 'finra'], 'lse': ['mifid_ii'], 'euronext': ['mifid_ii'], 'deutsche_boerse': ['mifid_ii'], 'binance': ['gdpr'], 'coinbase': ['gdpr', 'sec'] } return exchange_reg_map.get(exchange, []) def _check_market_data_rules(self, regulation, exchange, symbols, user_id): """Check compliance with specific market data rules""" rules = self.regulations[regulation]['market_data_access'] # Check exchange permission if exchange not in rules['allowed_exchanges']: self._log_compliance_violation( regulation, f"Access to {exchange} not permitted" ) return False # Check data types (simplified) # In reality, would analyze what data is being requested # Check rate limits if not self._check_rate_limits(regulation): self._log_compliance_violation( regulation, "Exceeding rate limits" ) return False # Check user permissions if user_id and not self._check_user_permissions(regulation, user_id): self._log_compliance_violation( regulation, f"User {user_id} lacks required permissions" ) return False return True def _check_rate_limits(self, regulation): """Check if current request stays within rate limits""" # Implementation would track request history return True def _check_user_permissions(self, regulation, user_id): """Check if user has required permissions""" # Check user's regulatory certifications user = self.user_permissions.get(user_id, {}) # Check required certifications if (self.regulations[regulation]['user_requirements'].get('certification_required') and 'regulatory_certification' not in user.get('credentials', [])): return False return True def check_data_storage_compliance(self, data_point): """ Check if data storage complies with regulations Returns: Boolean indicating compliance """ # Determine applicable regulations # This would analyze the data_point to determine regulations # For simplicity, assume SEC rules apply rules = self.regulations['sec']['data_storage'] # Check encryption if rules['encryption_required'] and not data_point.get('encrypted'): self._log_compliance_violation( 'sec', "Storing unencrypted data" ) return False # Check retention period if (datetime.utcnow() - data_point['timestamp']) > rules['retention_period']: self._log_compliance_violation( 'sec', "Exceeding data retention period" ) return False return True def check_historical_data_access(self, exchange, symbol): """ Check if historical data access is compliant Returns: Boolean indicating compliance """ # Determine applicable regulations regulations = self._determine_applicable_regulations(exchange) # Check each regulation for reg in regulations: if not self._check_historical_data_rules(reg, exchange, symbol): return False return True def _check_historical_data_rules(self, regulation, exchange, symbol): """Check compliance with historical data access rules""" # Implementation would check specific historical data rules return True def _log_compliance_violation(self, regulation, violation): """Log compliance violation for auditing""" self.data_access_log.append({ 'timestamp': datetime.utcnow().isoformat(), 'regulation': regulation, 'violation': violation, 'severity': 'high' }) print(f"COMPLIANCE VIOLATION: {regulation} - {violation}") ``` --- ## **5. Image and Video Content Extraction at Scale** 🖼️ Extracting and analyzing visual content requires specialized techniques beyond traditional web scraping. ### **Computer Vision for Content Analysis** #### **1. Image Analysis Pipeline** ```python # image_analysis.py import cv2 import numpy as np import requests from io import BytesIO from PIL import Image import pytesseract import tensorflow as tf from tensorflow.keras.applications import MobileNetV2 from tensorflow.keras.applications.mobilenet_v2 import preprocess_input from tensorflow.keras.preprocessing.image import img_to_array class ImageAnalysisPipeline: def __init__(self, object_detection_model='yolov5', text_recognition=True, nsfw_detection=True): self.object_detection_model = self._load_object_detection_model(object_detection_model) self.text_recognition = text_recognition self.nsfw_detection = nsfw_detection self.nsfw_classifier = self._load_nsfw_classifier() if nsfw_detection else None def _load_object_detection_model(self, model_name): """Load object detection model""" if model_name == 'yolov5': # In practice, would load YOLOv5 model return lambda img: self._mock_yolo_detection(img) elif model_name == 'ssd': # In practice, would load SSD model return lambda img: self._mock_ssd_detection(img) else: raise ValueError(f"Unsupported object detection model: {model_name}") def _load_nsfw_classifier(self): """Load NSFW classification model""" # In practice, would load a pre-trained NSFW model return lambda img: self._mock_nsfw_classification(img) def _mock_yolo_detection(self, image): """Mock YOLO object detection for demonstration""" # Return mock detection results return [ {'label': 'person', 'confidence': 0.95, 'bbox': [100, 50, 200, 300]}, {'label': 'car', 'confidence': 0.85, 'bbox': [300, 100, 150, 100]} ] def _mock_ssd_detection(self, image): """Mock SSD object detection for demonstration""" return [ {'label': 'person', 'confidence': 0.92, 'bbox': [105, 55, 195, 295]}, {'label': 'car', 'confidence': 0.82, 'bbox': [305, 105, 145, 95]} ] def _mock_nsfw_classification(self, image): """Mock NSFW classification for demonstration""" return { 'sexual': 0.1, 'drawing': 0.05, 'hentai': 0.02, 'porn': 0.01, 'sexy': 0.03 } def download_image(self, url, timeout=10): """ Download image from URL Returns: PIL Image object """ try: response = requests.get(url, timeout=timeout) response.raise_for_status() return Image.open(BytesIO(response.content)) except Exception as e: print(f"Error downloading image {url}: {str(e)}") return None def preprocess_image(self, image, target_size=(224, 224)): """ Preprocess image for analysis Returns: Preprocessed image array """ # Convert to RGB if necessary if image.mode != 'RGB': image = image.convert('RGB') # Resize image = image.resize(target_size) # Convert to array image_array = img_to_array(image) # Preprocess for model return preprocess_input(image_array) def extract_text(self, image): """ Extract text from image using OCR Returns: Extracted text """ if not self.text_recognition: return "" # Convert PIL image to OpenCV format cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) # Convert to grayscale gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) # Apply thresholding thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1] # Perform OCR text = pytesseract.image_to_string(thresh) return text.strip() def detect_objects(self, image): """ Detect objects in image Returns: List of detected objects with labels and bounding boxes """ # Convert PIL image to array image_array = np.array(image) # Run object detection return self.object_detection_model(image_array) def classify_nsfw(self, image): """ Classify image for NSFW content Returns: NSFW classification scores """ if not self.nsfw_detection: return None # Convert PIL image to array image_array = np.array(image) # Run NSFW classification return self.nsfw_classifier(image_array) def analyze_image(self, url=None, image=None): """ Analyze image from URL or PIL Image object Returns: Dictionary of analysis results """ # Download image if URL provided if url and not image: image = self.download_image(url) if not image: return None # Get image dimensions width, height = image.size # Extract text text = self.extract_text(image) if self.text_recognition else "" # Detect objects objects = self.detect_objects(image) # NSFW classification nsfw_scores = self.classify_nsfw(image) if self.nsfw_detection else None return { 'url': url, 'dimensions': {'width': width, 'height': height}, 'text_content': text, 'detected_objects': objects, 'nsfw_classification': nsfw_scores, 'timestamp': datetime.utcnow().isoformat() } def process_batch(self, image_urls, max_workers=10): """ Process multiple images in parallel Returns: List of analysis results """ from concurrent.futures import ThreadPoolExecutor results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit tasks future_to_url = { executor.submit(self.analyze_image, url): url for url in image_urls } # Process results as they complete for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: result = future.result() if result: results.append(result) except Exception as e: print(f"Error processing {url}: {str(e)}") return results ``` #### **2. Video Analysis System** ```python # video_analysis.py import cv2 import numpy as np import os from moviepy.editor import VideoFileClip from image_analysis import ImageAnalysisPipeline class VideoAnalysisSystem: def __init__(self, frame_skip=30, analysis_interval=5.0): self.frame_skip = frame_skip # Process every Nth frame self.analysis_interval = analysis_interval # Seconds between keyframe analysis self.image_analyzer = ImageAnalysisPipeline() def download_video(self, url, output_path=None): """ Download video from URL Returns: Path to downloaded video file """ if not output_path: output_path = f"video_{int(time.time())}.mp4" try: response = requests.get(url, stream=True) response.raise_for_status() with open(output_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return output_path except Exception as e: print(f"Error downloading video {url}: {str(e)}") return None def extract_keyframes(self, video_path, method='interval'): """ Extract keyframes from video Returns: List of PIL Image objects """ if method == 'interval': return self._extract_by_interval(video_path) elif method == 'scene_change': return self._extract_by_scene_change(video_path) else: raise ValueError(f"Unsupported keyframe extraction method: {method}") def _extract_by_interval(self, video_path): """Extract keyframes at fixed time intervals""" clip = VideoFileClip(video_path) frames = [] # Extract frames at specified intervals for t in np.arange(0, clip.duration, self.analysis_interval): frame = clip.get_frame(t) frames.append(Image.fromarray(frame)) clip.close() return frames def _extract_by_scene_change(self, video_path): """Extract keyframes based on scene changes""" cap = cv2.VideoCapture(video_path) frames = [] prev_frame = None frame_count = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break # Process every Nth frame to reduce computation if frame_count % self.frame_skip == 0: # Convert to grayscale gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Detect scene changes if prev_frame is not None: diff = cv2.absdiff(prev_frame, gray) non_zero_count = np.count_nonzero(diff) # If significant difference, consider it a scene change if non_zero_count > (gray.size * 0.2): frames.append(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))) prev_frame = gray frame_count += 1 cap.release() return frames def analyze_video(self, url=None, video_path=None): """ Analyze video from URL or file path Returns: Dictionary of video analysis results """ # Download video if URL provided if url and not video_path: video_path = self.download_video(url) if not video_path: return None # Extract keyframes keyframes = self.extract_keyframes(video_path) # Analyze keyframes frame_analysis = [] for i, frame in enumerate(keyframes): analysis = self.image_analyzer.analyze_image(image=frame) if analysis: analysis['frame_index'] = i analysis['timestamp'] = i * self.analysis_interval frame_analysis.append(analysis) # Get video metadata clip = VideoFileClip(video_path) metadata = { 'duration': clip.duration, 'fps': clip.fps, 'size': (clip.w, clip.h), 'codec': clip.reader.infos.get('codec', 'unknown') } clip.close() # Clean up downloaded video if url and video_path: os.remove(video_path) return { 'metadata': metadata, 'keyframe_analysis': frame_analysis, 'timestamp': datetime.utcnow().isoformat() } def extract_product_appearances(self, video_analysis, product_logos): """ Extract product appearances from video analysis Returns: List of product appearance events """ appearances = [] for frame in video_analysis['keyframe_analysis']: for obj in frame['detected_objects']: if obj['label'] in product_logos and obj['confidence'] > 0.7: appearances.append({ 'timestamp': frame['timestamp'], 'product': obj['label'], 'confidence': obj['confidence'], 'bbox': obj['bbox'] }) return appearances def summarize_video_content(self, video_analysis): """ Create content summary from video analysis Returns: Content summary string """ # Extract main objects object_counts = {} for frame in video_analysis['keyframe_analysis']: for obj in frame['detected_objects']: object_counts[obj['label']] = object_counts.get(obj['label'], 0) + 1 # Get top objects top_objects = sorted( object_counts.items(), key=lambda x: x[1], reverse=True )[:5] # Extract text content all_text = " ".join( frame['text_content'] for frame in video_analysis['keyframe_analysis'] if frame['text_content'] ) return { 'main_objects': [obj[0] for obj in top_objects], 'object_frequency': {obj[0]: obj[1] for obj in top_objects}, 'text_summary': self._summarize_text(all_text), 'total_duration': video_analysis['metadata']['duration'] } def _summarize_text(self, text, max_sentences=3): """Create simple text summary""" if not text: return "" # Split into sentences sentences = re.split(r'(?<=[.!?]) +', text) # Return top sentences by length (simple heuristic) return " ".join(sorted(sentences, key=len, reverse=True)[:max_sentences]) ``` ## Continued in next section ..