### **Top 100 Flask Developer Interview Questions & Answers (Part 5: Questions 401–482)** --- #### **401. How do you prevent SQL injection when using SQLAlchemy's `text()` constructs?** **Answer:** Always use **parameterized queries** even with `text()`: ```python from sqlalchemy import text # SAFE: result = db.session.execute( text("SELECT * FROM users WHERE name = :name"), {"name": user_input} ) # UNSAFE (AVOID): result = db.session.execute( text(f"SELECT * FROM users WHERE name = '{user_input}'") ) ``` *Never* format SQL strings with user input. --- #### **402. How do you implement "time-based access tokens" with short-lived JWTs?** **Answer:** ```python from datetime import timedelta from flask_jwt_extended import create_access_token @app.route('/token') def get_token(): # Short-lived token (5 minutes) access_token = create_access_token( identity=current_user.id, expires_delta=timedelta(minutes=5) ) # Long-lived refresh token (30 days) refresh_token = create_refresh_token( identity=current_user.id, expires_delta=timedelta(days=30) ) return jsonify(access_token=access_token, refresh_token=refresh_token) ``` *Refresh tokens should be stored in a secure HTTP-only cookie.* --- #### **403. How do you debug "context mismatch" errors in Flask async views?** **Answer:** - **Error**: `RuntimeError: Working outside of request context` - **Fix**: 1. Ensure async views use `async def` 2. Use `app.ensure_sync()` for sync functions: ```python @app.route('/async') async def async_view(): result = await app.ensure_sync(sync_function)() return result ``` 3. Never store `request` objects for later use --- #### **404. How do you implement "secure password hashing" with Argon2?** **Answer:** Use `passlib` with Argon2: ```python from passlib.hash import argon2 # Hash password hash = argon2.using( memory_cost=65536, time_cost=3, parallelism=1 ).hash("password") # Verify if argon2.verify("password", hash): # Valid ``` *Configuration*: - `memory_cost`: 64MB (minimum for 2023) - `time_cost`: 3 iterations - `parallelism`: 1 (adjust based on CPU cores) --- #### **405. How do you handle "timing attacks" on string comparisons?** **Answer:** Always use `hmac.compare_digest()`: ```python from werkzeug.security import safe_str_cmp # SAFE: if safe_str_cmp(user_provided_token, stored_token): # Valid # UNSAFE (AVOID): if user_provided_token == stored_token: # Vulnerable to timing attacks ``` *Applies to:* - Token validation - Password checks - Signature verification --- #### **406. How do you implement "secure session fixation" protection?** **Answer:** Regenerate session ID on privilege changes: ```python from flask import session @app.route('/login', methods=['POST']) def login(): user = authenticate(request.form['username'], request.form['password']) if user: # Regenerate session ID to prevent fixation session.regenerate() login_user(user) return redirect(url_for('dashboard')) ``` *Flask-Login* handles this automatically with `fresh_login_required`. --- #### **407. How do you prevent "HTTP Header Injection" in Flask responses?** **Answer:** Flask automatically validates headers, but: 1. **Never** use user input in header names/values: ```python # UNSAFE: response.headers[user_input] = "value" # SAFE: if user_input in ALLOWED_HEADERS: response.headers[user_input] = sanitize_value(value) ``` 2. Use `werkzeug.datastructures.Headers` for manual header construction --- #### **408. How do you implement "secure file download" with content disposition?** **Answer:** ```python from flask import send_file @app.route('/download/<file_id>') @login_required def download(file_id): file = get_authorized_file(file_id) return send_file( file.path, as_attachment=True, download_name=file.safe_name, mimetype=file.mimetype, # Prevent XSS in filename attachment_filename=sanitize_filename(file.name) ) def sanitize_filename(name): return secure_filename(os.path.basename(name)) ``` --- #### **409. How do you handle "cross-site scripting" (XSS) in Flask templates?** **Answer:** 1. **Auto-escaping**: Enabled by default in Jinja2 2. **Manual escaping** when needed: ```jinja {{ user_input | e }} {# Equivalent to {{ user_input }} #} ``` 3. **Mark safe HTML only when absolutely necessary**: ```python Markup("<strong>Trusted content</strong>") ``` *Never* mark user-generated content as safe. --- #### **410. How do you implement "secure password recovery" with rate limiting?** **Answer:** ```python from flask_limiter import Limiter limiter = Limiter( app, key_func=lambda: request.remote_addr, default_limits=["5/hour"] ) @app.route('/recover', methods=['POST']) @limiter.limit("1/minute") def recover(): # Validate email format if not validate_email(request.form['email']): return "Invalid email", 400 # Generate token (15-minute expiry) token = generate_recovery_token(request.form['email']) # Send email (async) send_recovery_email.delay(request.form['email'], token) # Always return 200 to prevent email enumeration return "If email exists, recovery instructions were sent", 200 ``` --- #### **411. How do you prevent "insecure direct object references" (IDOR)?** **Answer:** Always verify ownership: ```python @app.route('/documents/<int:doc_id>') @login_required def view_document(doc_id): # Critical: Check ownership document = Document.query.filter_by( id=doc_id, user_id=current_user.id # Ownership check ).first_or_404() return render_template('document.html', doc=document) ``` *Never* use: `Document.query.get(doc_id)` without ownership check. --- #### **412. How do you implement "secure API authentication" with mutual TLS?** **Answer:** 1. Configure Nginx for mTLS: ```nginx server { ssl_client_certificate /path/to/ca.pem; ssl_verify_client on; ssl_verify_depth 2; } ``` 2. In Flask: ```python @app.before_request def require_mtls(): if not request.environ.get('SSL_CLIENT_VERIFY') == 'SUCCESS': abort(401, "mTLS required") # Optional: Verify specific client certificate cert = request.environ['SSL_CLIENT_CERT'] if not is_authorized_cert(cert): abort(403, "Unauthorized client") ``` --- #### **413. How do you handle "mass assignment" vulnerabilities in Flask?** **Answer:** 1. **Never** use `form.populate_obj()` directly 2. Use explicit field mapping: ```python def update_profile(): user.name = form.name.data user.email = form.email.data # NO: user.role = form.role.data (sensitive field) ``` 3. Or use whitelist with `only` parameter: ```python form.populate_obj(user, only=['name', 'email']) ``` --- #### **414. How do you implement "secure file upload" with virus scanning?** **Answer:** ```python import pyclamd @app.route('/upload', methods=['POST']) def upload(): file = request.files['file'] # Validate type (server-side) if not allowed_file(file.filename): return "Invalid file type", 400 # Scan for viruses if not scan_virus(file): return "Virus detected", 403 # Save with random name filename = secure_filename(file.filename) safe_path = os.path.join(UPLOAD_FOLDER, str(uuid4())) file.save(safe_path) return "Uploaded", 201 def scan_virus(file): file.seek(0) # Reset stream position cd = pyclamd.ClamdNetworkSocket() result = cd.scan_stream(file.read()) file.seek(0) # Reset again for saving return result is None ``` --- #### **415. How do you prevent "clickjacking" attacks in Flask?** **Answer:** Set `X-Frame-Options` header: ```python from flask_talisman import Talisman Talisman( app, frame_options='DENY', # Or 'SAMEORIGIN' content_security_policy={ 'frame-ancestors': "'none'" # Modern alternative } ) ``` *Alternative:* Nginx configuration: ```nginx add_header X-Frame-Options "DENY"; ``` --- #### **416. How do you implement "secure password policies" with complexity checks?** **Answer:** ```python import re def validate_password(password): # Minimum length if len(password) < 12: return False, "Password must be at least 12 characters" # Complexity requirements if not re.search(r'[A-Z]', password): return False, "Password must contain uppercase letter" if not re.search(r'[a-z]', password): return False, "Password must contain lowercase letter" if not re.search(r'[0-9]', password): return False, "Password must contain a number" if not re.search(r'[^A-Za-z0-9]', password): return False, "Password must contain special character" # Check against common passwords if password.lower() in COMMON_PASSWORDS: return False, "Password too common" return True, "" ``` --- #### **417. How do you handle "sensitive data exposure" in Flask logs?** **Answer:** 1. **Mask sensitive data** in logs: ```python import re from flask import request @app.before_request def mask_sensitive_data(): if request.path == '/login': request.masked_data = { k: '***' if k in ['password', 'token'] else v for k, v in request.form.items() } @app.after_request def log_request(response): app.logger.info( f"{request.method} {request.path} {request.masked_data} → {response.status}" ) return response ``` 2. Use structured logging with field filtering --- #### **418. How do you implement "secure session management" for mobile apps?** **Answer:** ```python app.config.update( # Required for mobile WebView SESSION_COOKIE_SAMESITE="None", SESSION_COOKIE_SECURE=True, SESSION_COOKIE_HTTPONLY=True, # Shorter lifetime for mobile PERMANENT_SESSION_LIFETIME=timedelta(days=7), # Use Redis for distributed sessions SESSION_TYPE="redis", SESSION_REDIS=redis.from_url(os.getenv("REDIS_URL")) ) # Additional mobile-specific security @app.before_request def mobile_security(): # Verify custom header for mobile clients if request.headers.get('X-Mobile-Client'): if not validate_mobile_token(request.headers['X-Mobile-Token']): abort(401) ``` --- #### **419. How do you prevent "insecure deserialization" attacks?** **Answer:** 1. **NEVER** use `pickle` with untrusted data 2. For sessions: - Use `Flask-Session` with Redis (not filesystem) - Configure `SESSION_USE_SIGNER=True` 3. For APIs: - Prefer JSON over pickle/YAML - Validate all input types: ```python def validate_data(data): if isinstance(data, (list, dict)): # Recursively validate ... ``` --- #### **420. How do you implement "secure API key management" with rotation?** **Answer:** ```python class APIKey(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) key_hash = db.Column(db.String(128)) created_at = db.Column(db.DateTime, default=datetime.utcnow) revoked_at = db.Column(db.DateTime, nullable=True) last_used = db.Column(db.DateTime) @app.route('/api/keys', methods=['POST']) @login_required def create_api_key(): # Generate random key raw_key = secrets.token_urlsafe(32) key_hash = generate_password_hash(raw_key) # Store hashed key api_key = APIKey( user_id=current_user.id, key_hash=key_hash ) db.session.add(api_key) db.session.commit() # Return only to client once return jsonify(api_key=raw_key), 201 @app.before_request def authenticate_api_key(): key = request.headers.get('X-API-KEY') if not key: abort(401) # Find active key api_key = APIKey.query.filter( APIKey.key_hash == key, APIKey.revoked_at.is_(None) ).first() if not api_key or not check_password_hash(api_key.key_hash, key): abort(401) # Update usage api_key.last_used = datetime.utcnow() db.session.commit() ``` --- #### **421. How do you handle "security misconfiguration" in Flask?** **Answer:** 1. **Disable debug mode in production**: ```python # Never do this in production: app.run(debug=True) ``` 2. **Set secure headers**: ```python Talisman(app, content_security_policy=STRICT_CSP) ``` 3. **Remove unnecessary headers**: ```python @app.after_request def remove_headers(response): response.headers.pop('Server', None) response.headers.pop('X-Powered-By', None) return response ``` 4. **Validate all configurations**: ```bash flask config check ``` --- #### **422. How do you implement "secure CORS" for single-page applications?** **Answer:** ```python from flask_cors import CORS # Configure with strict origins CORS(app, resources={r"/api/*": { "origins": [ "https://trusted.example.com", "https://staging.example.com" ], "supports_credentials": True, "allow_headers": ["Authorization", "Content-Type"], "max_age": 86400 }}, supports_credentials=True ) # Critical: Set SameSite cookie for credentials app.config.update( SESSION_COOKIE_SAMESITE="Strict", SESSION_COOKIE_SECURE=True ) ``` *Never* use `origins="*"` with `supports_credentials=True`. --- #### **423. How do you prevent "XML External Entity" (XXE) attacks?** **Answer:** Disable entity resolution: ```python from lxml import etree def parse_xml(xml_data): parser = etree.XMLParser( resolve_entities=False, no_network=True, dtd_validation=False ) return etree.fromstring(xml_data, parser) # For stdlib xml: import defusedxml.ElementTree as ET ET.defuse_stdlib() ``` *Best practice:* Avoid XML; use JSON instead. --- #### **424. How do you implement "secure password storage" with multiple algorithms?** **Answer:** ```python from passlib.context import CryptContext pwd_context = CryptContext( schemes=["argon2", "bcrypt"], default="argon2", argon2__memory_cost=65536, argon2__time_cost=3, argon2__parallelism=1, bcrypt__rounds=15 ) def upgrade_hash_if_needed(user, password): # Verify with current algorithm if not pwd_context.verify(password, user.password_hash): return False # Check if needs upgrade if pwd_context.needs_update(user.password_hash): # Rehash with new algorithm user.password_hash = pwd_context.hash(password) db.session.commit() return True ``` --- #### **425. How do you handle "insufficient logging & monitoring" security gaps?** **Answer:** Implement comprehensive logging: ```python import logging from pythonjsonlogger import jsonlogger # Configure JSON logging log_handler = logging.StreamHandler() formatter = jsonlogger.JsonFormatter( '%(asctime)s %(levelname)s %(name)s %(message)s' ) log_handler.setFormatter(formatter) app.logger.addHandler(log_handler) app.logger.setLevel(logging.INFO) # Critical events to log AUDIT_EVENTS = ['login', 'password_change', 'admin_access', 'data_export'] @app.after_request def audit_log(response): if request.path in AUDIT_EVENTS: app.logger.info( "audit_event", extra={ "event": request.path, "user": current_user.id if current_user.is_authenticated else None, "ip": request.remote_addr, "status": response.status_code } ) return response ``` --- #### **426. How do you implement "secure multi-tenancy" with database isolation?** **Answer:** 1. **Schema-based isolation**: ```python @app.before_request def set_tenant(): tenant_id = request.headers.get('X-Tenant-ID') if not tenant_id or not is_valid_tenant(tenant_id): abort(401) # Set search path for PostgreSQL db.session.execute(f"SET search_path TO tenant_{tenant_id}") ``` 2. **Row-level security**: ```sql CREATE POLICY tenant_isolation ON users USING (tenant_id = current_setting('app.current_tenant')::uuid); ``` --- #### **427. How do you prevent "business logic abuse" in Flask APIs?** **Answer:** ```python @app.route('/reset-password', methods=['POST']) def reset_password(): # Rate limiting if too_many_requests(request.remote_addr): return "Too many requests", 429 # Business rule: Only allow 1 reset per email per hour last_reset = PasswordReset.query.filter( PasswordReset.email == request.json['email'], PasswordReset.created_at > datetime.utcnow() - timedelta(hours=1) ).first() if last_reset: return "Reset already requested", 429 # Process reset... ``` *Always validate business rules server-side.* --- #### **428. How do you implement "secure API versioning" with deprecation?** **Answer:** ```python @app.before_request def check_api_version(): version = request.headers.get('Accept', 'v1').split('/')[-1] # Check if deprecated if version in app.config['DEPRECATED_API_VERSIONS']: response = jsonify( error="API version deprecated", supported_versions=app.config['SUPPORTED_API_VERSIONS'] ) response.headers['Deprecation'] = 'true' response.headers['Sunset'] = 'Wed, 31 Dec 2023 23:59:59 GMT' return response, 429 # Store version for request g.api_version = version ``` --- #### **429. How do you handle "insecure APIs" with proper rate limiting?** **Answer:** Use `Flask-Limiter` with multiple strategies: ```python from flask_limiter import Limiter from flask_limiter.util import get_remote_address limiter = Limiter( app, key_func=get_remote_address, default_limits=["200/hour"], storage_uri="redis://localhost:6379" ) # Custom limits per endpoint @limiter.limit("5/second", override_defaults=False) @limiter.limit("100/minute", override_defaults=False) @app.route('/api/expensive') def expensive_api(): # ... # User-based limits @limiter.limit(lambda: f"{get_current_user_id() or 'anon'}", limit="1000/hour") @app.route('/api/user-data') def user_data(): # ... ``` --- #### **430. How do you implement "secure authentication flow" with OAuth2 PKCE?** **Answer:** ```python import secrets import base64 import hashlib @app.route('/login') def login(): # Generate code verifier and challenge code_verifier = secrets.token_urlsafe(96) code_challenge = base64.urlsafe_b64encode( hashlib.sha256(code_verifier.encode()).digest() ).rstrip(b'=').decode('utf-8') # Store in session session['code_verifier'] = code_verifier # Redirect to auth provider return redirect( f"https://auth.example.com/authorize" f"?response_type=code" f"&client_id={CLIENT_ID}" f"&redirect_uri={REDIRECT_URI}" f"&code_challenge={code_challenge}" f"&code_challenge_method=S256" ) @app.route('/callback') def callback(): # Exchange code for token response = requests.post( "https://auth.example.com/token", data={ "grant_type": "authorization_code", "code": request.args.get('code'), "redirect_uri": REDIRECT_URI, "client_id": CLIENT_ID, "code_verifier": session.pop('code_verifier', '') } ) # Process token... ``` --- #### **431. How do you prevent "mass email compromise" via contact forms?** **Answer:** ```python @app.route('/contact', methods=['POST']) def contact(): # Rate limiting if too_many_submissions(request.remote_addr): return "Too many submissions", 429 # Validate email format if not validate_email(request.form['email']): return "Invalid email", 400 # CAPTCHA verification if not verify_captcha(request.form['captcha']): return "CAPTCHA failed", 400 # Content scanning if contains_spam(request.form['message']): return "Suspicious content", 400 # Send email (async) send_contact_email.delay( to=app.config['CONTACT_EMAIL'], reply_to=request.form['email'], message=request.form['message'] ) return "Message sent", 200 ``` --- #### **432. How do you implement "secure data export" with GDPR compliance?** **Answer:** ```python @app.route('/data-export', methods=['POST']) @login_required def data_export(): # Verify password (re-authentication) if not current_user.verify_password(request.form['password']): return "Invalid password", 401 # Generate export asynchronously task = generate_data_export.delay(current_user.id) return jsonify( status="processing", task_id=task.id, expires_at=datetime.utcnow() + timedelta(hours=24) ), 202 @celery.task def generate_data_export(user_id): user = User.query.get(user_id) # Anonymize PII in logs with mask_pii_logging(): # Generate export export = { "account": user.safe_data, "activity": [a.safe_data for a in user.activity] } # Store encrypted encrypted = encrypt_export(export) save_to_secure_storage(encrypted) ``` --- #### **433. How do you handle "insecure third-party dependencies" in Flask?** **Answer:** 1. **Dependency scanning**: ```bash pip install safety safety check -r requirements.txt ``` 2. **Automated updates**: Dependabot/GitHub Actions 3. **Lock files**: `requirements.txt` with pinned versions 4. **Runtime protection**: ```python # Check critical dependencies at startup if pkg_resources.get_distribution("flask").version < "2.0.0": raise RuntimeError("Flask 2.0+ required for security fixes") ``` --- #### **434. How do you implement "secure admin interface" with additional protections?** **Answer:** ```python admin = Blueprint('admin', __name__, url_prefix='/admin') @admin.before_request def require_admin(): if not current_user.is_authenticated or not current_user.is_admin: abort(403) # Additional admin-specific security if not request.headers.get('X-Admin-OTP'): abort(401, "Admin OTP required") # Verify from trusted network if request.remote_addr not in app.config['TRUSTED_ADMIN_NETWORKS']: abort(403, "Admin access restricted to corporate network") # Rate limiting for admin @admin.route('/users') @limiter.limit("10/minute") def list_users(): # ... ``` --- #### **435. How do you prevent "credential stuffing" attacks in Flask?** **Answer:** ```python @app.route('/login', methods=['POST']) def login(): # Rate limiting by username username = request.form['username'] if too_many_attempts(username): return "Account temporarily locked", 423 # Check breached passwords if is_password_breached(request.form['password']): return "Password compromised. Please reset.", 403 # Verify credentials user = User.query.filter_by(username=username).first() if user and user.check_password(request.form['password']): # Reset failed attempts reset_failed_attempts(username) login_user(user) return redirect(url_for('dashboard')) # Increment failed attempts increment_failed_attempts(username) # Generic message to prevent username enumeration return "Invalid credentials", 401 def is_password_breached(password): # Check against HIBP API (k-anonymity) sha1_hash = hashlib.sha1(password.encode()).hexdigest().upper() prefix, suffix = sha1_hash[:5], sha1_hash[5:] response = requests.get(f"https://api.pwnedpasswords.com/range/{prefix}") return suffix in response.text ``` --- #### **436. How do you implement "secure session timeout" with idle detection?** **Answer:** ```python @app.before_request def check_session_timeout(): if not current_user.is_authenticated: return # Check last activity last_active = session.get('last_active', datetime.utcnow()) idle_time = datetime.utcnow() - last_active # Update last active session['last_active'] = datetime.utcnow() # Enforce idle timeout if idle_time > app.config['IDLE_TIMEOUT']: logout_user() return redirect(url_for('login') + '?reason=idle_timeout') # In templates (client-side idle detection) @app.context_processor def inject_idle_timeout(): return { 'IDLE_TIMEOUT': int(app.config['IDLE_TIMEOUT'].total_seconds()) } ``` *Client-side JavaScript should also handle idle timeout.* --- #### **437. How do you handle "insecure file permissions" in Flask deployments?** **Answer:** 1. **During build**: ```dockerfile # Set non-root user RUN adduser -D flaskuser USER flaskuser # Set secure permissions RUN chmod 700 /app && \ find /app -type d -exec chmod 700 {} \; && \ find /app -type f -exec chmod 600 {} \; ``` 2. **At runtime**: ```python # Verify critical files def check_file_permissions(): if not os.access(CONFIG_FILE, os.R_OK | os.W_OK): raise RuntimeError("Invalid config file permissions") # Should be 600 for secrets mode = os.stat(SECRETS_FILE).st_mode if mode & 0o777 != 0o600: app.logger.critical("Insecure secrets file permissions!") ``` --- #### **438. How do you implement "secure audit logging" with immutable records?** **Answer:** ```python class AuditLog(db.Model): id = db.Column(db.Integer, primary_key=True) timestamp = db.Column(db.DateTime, default=datetime.utcnow) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) action = db.Column(db.String(50)) details = db.Column(db.JSON) # Cryptographic chain previous_hash = db.Column(db.String(64)) current_hash = db.Column(db.String(64), unique=True) def log_audit(action, details=None): # Get last log entry last_log = AuditLog.query.order_by(AuditLog.timestamp.desc()).first() # Create new entry audit = AuditLog( user_id=current_user.id if current_user.is_authenticated else None, action=action, details=details, previous_hash=last_log.current_hash if last_log else None ) # Calculate hash (includes previous hash) audit.current_hash = calculate_hash(audit) db.session.add(audit) db.session.commit() def calculate_hash(log_entry): data = f"{log_entry.timestamp}{log_entry.user_id}{log_entry.action}" \ f"{json.dumps(log_entry.details)}{log_entry.previous_hash}" return hashlib.sha256(data.encode()).hexdigest() # Verification def verify_audit_chain(): logs = AuditLog.query.order_by(AuditLog.timestamp).all() for i, log in enumerate(logs[1:], 1): if log.previous_hash != calculate_hash(logs[i-1]): return False return True ``` --- #### **439. How do you prevent "host header attacks" in Flask?** **Answer:** ```python @app.before_request def validate_host(): allowed_hosts = app.config['ALLOWED_HOSTS'] # Get host from Host header (not request.host which is from WSGI) host = request.headers.get('Host', '') # Remove port if present host = host.split(':')[0] # Check against allowed hosts if host not in allowed_hosts: app.logger.warning(f"Invalid host header: {host}") abort(400, "Invalid host") # Additional check for HTTPS if request.is_secure and host not in app.config['SECURE_HOSTS']: abort(400, "HTTPS required for this host") ``` --- #### **440. How do you implement "secure passwordless authentication"?** **Answer:** ```python @app.route('/magic-link', methods=['POST']) def magic_link(): # Rate limiting if too_many_requests(request.remote_addr): return "Too many requests", 429 # Validate email if not validate_email(request.form['email']): return "Invalid email", 400 # Generate token (5-minute expiry) token = generate_magic_token(request.form['email']) # Store hashed token store_magic_token(token) # Send email send_magic_link.delay(request.form['email'], token) # Always return 200 to prevent email enumeration return "If email exists, magic link was sent", 200 @app.route('/auth/<token>') def auth_with_token(token): # Verify token email = verify_magic_token(token) if not email: return "Invalid or expired token", 400 # Log in user user = User.query.filter_by(email=email).first() if user: login_user(user, remember=True) return redirect(url_for('dashboard')) ``` --- #### **441. How do you handle "insecure redirects and forwards" in Flask?** **Answer:** ```python @app.route('/redirect') def safe_redirect(): # Get next URL from query parameter next_url = request.args.get('next') # Validate as absolute URL if not next_url or not is_safe_url(next_url): return redirect(url_for('home')) return redirect(next_url) def is_safe_url(url): # Parse URL parsed = urlparse(url) # Must be same domain if parsed.netloc and parsed.netloc != request.host: return False # Must use same scheme if parsed.scheme and parsed.scheme != request.scheme: return False # Must not contain fragments if parsed.fragment: return False # Must be valid path return bool(urlparse(url).path) ``` --- #### **442. How do you implement "secure two-factor authentication" with TOTP?** **Answer:** ```python import pyotp @app.route('/setup-2fa', methods=['GET']) @login_required def setup_2fa(): # Generate secret secret = pyotp.random_base32() current_user.totp_secret = secret db.session.commit() # Generate QR code URL totp = pyotp.TOTP(secret) provisioning_uri = totp.provisioning_uri( name=current_user.email, issuer_name="MyApp" ) return render_template('setup_2fa.html', qr_uri=provisioning_uri) @app.route('/verify-2fa', methods=['POST']) @login_required def verify_2fa(): token = request.form['token'] totp = pyotp.TOTP(current_user.totp_secret) # Verify with 1-interval leeway if totp.verify(token, valid_window=1): # Mark 2FA as verified for this session session['2fa_verified'] = True return redirect(url_for('dashboard')) return "Invalid token", 400 @app.before_request def require_2fa(): # Skip for setup/verify routes if request.endpoint in ['setup_2fa', 'verify_2fa']: return # Require 2FA for sensitive routes if current_user.is_authenticated and current_user.totp_secret: if request.endpoint in ['account_settings', 'payment'] and not session.get('2fa_verified'): return redirect(url_for('verify_2fa')) ``` --- #### **443. How do you prevent "HTTP Request Smuggling" in Flask apps?** **Answer:** Mitigate at proxy level: 1. **Nginx configuration**: ```nginx proxy_http_version 1.1; proxy_set_header Connection ""; proxy_request_buffering off; ``` 2. **Gunicorn configuration**: ```ini [server] forwarded_allow_ips = * ``` 3. **Flask validation**: ```python @app.before_request def validate_request(): # Check for multiple Content-Length headers if len(request.headers.getlist('Content-Length')) > 1: abort(400, "Multiple Content-Length headers") # Check for Transfer-Encoding with Content-Length if 'Content-Length' in request.headers and 'Transfer-Encoding' in request.headers: abort(400, "Conflicting transfer encodings") ``` --- #### **444. How do you implement "secure data masking" for PII in development?** **Answer:** ```python def mask_pii(value, field_name): """Mask PII based on field type""" if field_name in ['ssn', 'national_id']: return value[:3] + 'XXX' + value[-4:] elif field_name in ['email', 'username']: parts = value.split('@') return parts[0][0] + '***@' + parts[1] if len(parts) > 1 else '***' elif field_name in ['phone']: return re.sub(r'\d', 'X', value[:-4]) + value[-4:] return value def mask_query_results(query, pii_fields): """Mask PII in query results""" results = query.all() for obj in results: for field in pii_fields: if hasattr(obj, field): setattr(obj, field, mask_pii(getattr(obj, field), field)) return results # Usage in development if app.config['ENV'] == 'development': @app.before_request def mask_development_data(): if request.path.startswith('/admin'): g.pii_fields = ['email', 'phone', 'ssn'] # Monkey patch query to mask results db.Query.all = lambda q: mask_query_results(q, g.pii_fields) ``` --- #### **445. How do you handle "insecure cryptography" in Flask applications?** **Answer:** 1. **Use vetted libraries**: - `cryptography` instead of `pycrypto` - `passlib` for password hashing 2. **Avoid custom crypto**: - Never implement your own encryption - Use high-level APIs (Fernet) 3. **Secure key management**: ```python from cryptography.fernet import Fernet # Get key from secure source key = os.getenv('ENCRYPTION_KEY') or load_from_vault() fernet = Fernet(key) # Encrypt encrypted = fernet.encrypt(b"Secret data") # Decrypt decrypted = fernet.decrypt(encrypted) ``` 4. **Algorithm rotation**: ```python def decrypt_data(encrypted_data): # Try current key try: return fernet.decrypt(encrypted_data) except InvalidToken: # Try previous key for rotation return old_fernet.decrypt(encrypted_data) ``` --- #### **446. How do you implement "secure user impersonation" for support teams?** **Answer:** ```python @admin.route('/impersonate/<int:user_id>') @require_admin def impersonate(user_id): # Log the action log_impersonation(current_user.id, user_id) # Create special session token impersonation_token = create_impersonation_token( admin_id=current_user.id, user_id=user_id, expires_in=timedelta(minutes=15) ) # Set in secure cookie response = redirect(url_for('user_dashboard')) response.set_cookie( 'impersonation_token', impersonation_token, httponly=True, secure=True, samesite='Strict' ) return response @app.before_request def handle_impersonation(): token = request.cookies.get('impersonation_token') if token: try: data = verify_impersonation_token(token) # Set impersonated user g.impersonated_user = User.query.get(data['user_id']) g.original_admin = User.query.get(data['admin_id']) except: # Invalid token - clear cookie response = make_response(redirect(url_for('login'))) response.set_cookie('impersonation_token', '', expires=0) return response # In templates @app.context_processor def inject_impersonation(): return { 'is_impersonating': hasattr(g, 'impersonated_user'), 'original_admin': g.original_admin if hasattr(g, 'original_admin') else None } ``` --- #### **447. How do you prevent "account enumeration" via timing attacks?** **Answer:** ```python @app.route('/login', methods=['POST']) def login(): # Always take same time regardless of user existence start_time = time.time() # Get user (but don't use yet) user = User.query.filter_by(email=request.form['email']).first() # Simulate password check time fake_hash = generate_fake_password_hash() safe_str_cmp(request.form['password'], fake_hash) # Now do real check if user exists if user and safe_str_cmp( request.form['password'], user.password_hash ): login_user(user) return redirect(url_for('dashboard')) # Ensure constant time elapsed = time.time() - start_time if elapsed < MIN_LOGIN_TIME: time.sleep(MIN_LOGIN_TIME - elapsed) # Always return same message return "Invalid credentials", 401 def generate_fake_password_hash(): """Generate fake hash that takes similar time to verify""" return generate_password_hash("fake_password_" + secrets.token_hex(8)) ``` --- #### **448. How do you implement "secure data retention policies" with GDPR?** **Answer:** ```python @celery.task def enforce_data_retention(): # Delete inactive accounts cutoff = datetime.utcnow() - timedelta(days=365) inactive_users = User.query.filter( User.last_active < cutoff, User.is_active == False ).all() for user in inactive_users: # Anonymize instead of delete (for legal requirements) anonymize_user_data(user) db.session.delete(user) # Delete old audit logs log_cutoff = datetime.utcnow() - timedelta(days=180) AuditLog.query.filter(AuditLog.timestamp < log_cutoff).delete() db.session.commit() def anonymize_user_data(user): """Replace PII with random data while preserving structure""" user.email = f"anonymized-{secrets.token_hex(8)}@example.com" user.phone = f"+44 7{secrets.randbelow(999999999):09d}" user.first_name = random_first_name() user.last_name = random_last_name() # Clear sensitive fields user.address = None user.payment_info = None ``` --- #### **449. How do you handle "insecure CORS policy" misconfigurations?** **Answer:** ```python from flask_cors import CORS # Strict default configuration CORS(app, resources={r"/api/*": { "origins": app.config['TRUSTED_ORIGINS'], "methods": ["GET", "POST", "PUT", "DELETE"], "allow_headers": ["Authorization", "Content-Type", "X-Requested-With"], "supports_credentials": True, "max_age": 86400, "expose_headers": ["X-RateLimit-Limit", "X-RateLimit-Remaining"] }}, supports_credentials=True ) # Critical security checks at startup @app.before_first_request def check_cors_security(): # Verify no wildcard origins with credentials if '*' in app.config['TRUSTED_ORIGINS'] and app.config['CORS_SUPPORTS_CREDENTIALS']: raise RuntimeError("CORS misconfiguration: Cannot use wildcard with credentials") # Verify secure headers are set if not app.config.get('SESSION_COOKIE_SAMESITE') == 'Strict': app.logger.warning("CORS security: Session cookie should use SameSite=Strict") ``` --- #### **450. How do you implement "secure API key validation" with HMAC?** **Answer:** ```python import hmac import hashlib from functools import wraps def require_api_key(f): @wraps(f) def decorated(*args, **kwargs): # Get API key from header api_key = request.headers.get('X-API-Key') signature = request.headers.get('X-Signature') if not api_key or not signature: abort(401, "API key and signature required") # Get secret for this key api_secret = get_api_secret(api_key) if not api_secret: abort(401, "Invalid API key") # Create expected signature body = request.get_data() expected_sig = hmac.new( api_secret.encode(), body, hashlib.sha256 ).hexdigest() # Constant-time comparison if not hmac.compare_digest(signature, expected_sig): abort(401, "Invalid signature") return f(*args, **kwargs) return decorated # Usage @app.route('/secure-api', methods=['POST']) @require_api_key def secure_api(): # Process request return jsonify(success=True) ``` --- #### **451. How do you prevent "server-side request forgery" (SSRF) in Flask?** **Answer:** ```python import ipaddress from urllib.parse import urlparse def is_internal_ip(ip): """Check if IP is in private/reserved ranges""" try: return ipaddress.ip_address(ip).is_private except ValueError: return False def is_blocked_domain(domain): """Check against blocked domains""" blocked = ['localhost', 'docker', 'kubernetes'] return any(b in domain for b in blocked) def validate_url(url): """Validate URL for SSRF protection""" parsed = urlparse(url) # Check scheme if parsed.scheme not in ['http', 'https']: return False # Check domain if is_blocked_domain(parsed.netloc): return False # Check IP (if resolved) try: ip = socket.gethostbyname(parsed.netloc) if is_internal_ip(ip): return False except: return False # Check port if parsed.port and parsed.port not in [80, 443]: return False return True @app.route('/fetch-external') def fetch_external(): url = request.args.get('url') if not url or not validate_url(url): return "Invalid URL", 400 # Fetch with timeout response = requests.get(url, timeout=5) return response.text ``` --- #### **452. How do you implement "secure session storage" with Redis encryption?** **Answer:** ```python from cryptography.fernet import Fernet from flask_session import Session # Generate key from secure source encryption_key = os.getenv('SESSION_ENCRYPTION_KEY') or Fernet.generate_key() cipher = Fernet(encryption_key) class EncryptedRedisSessionInterface(SessionInterface): def __init__(self, redis=None, prefix='session:', serialization_format='json', **kwargs): self.redis = redis or Redis() self.prefix = prefix self.serialization_format = serialization_format def open_session(self, app, request): sid = request.cookies.get(app.session_cookie_name) if not sid: return self.session_class() val = self.redis.get(self.prefix + sid) if val is not None: try: # Decrypt session data val = cipher.decrypt(val) if self.serialization_format == 'json': data = json.loads(val) return self.session_class(data, sid, permanent=True) except: return self.session_class() return self.session_class() def save_session(self, app, session, response): domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) # Remove session if empty if not session: if session.modified: self.redis.delete(self.prefix + session.sid) response.delete_cookie( app.session_cookie_name, domain=domain, path=path ) return # Encrypt session data if self.serialization_format == 'json': val = json.dumps(dict(session)).encode('utf-8') val = cipher.encrypt(val) # Save to Redis self.redis.setex( self.prefix + session.sid, app.permanent_session_lifetime, val ) # Set cookie response.set_cookie( app.session_cookie_name, session.sid, expires=self.get_expiration_time(app, session), httponly=True, domain=domain, path=path, secure=app.config['SESSION_COOKIE_SECURE'], samesite=app.config['SESSION_COOKIE_SAMESITE'] ) # Configure Flask app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = redis.from_url('redis://localhost:6379') app.session_interface = EncryptedRedisSessionInterface() ``` --- #### **453. How do you handle "insecure deserialization of user data"?** **Answer:** ```python import json from jsonschema import validate, ValidationError # Schema for user-submitted data USER_DATA_SCHEMA = { "type": "object", "properties": { "name": {"type": "string", "maxLength": 100}, "preferences": { "type": "object", "properties": { "theme": {"type": "string", "enum": ["light", "dark"]}, "language": {"type": "string", "pattern": "^[a-z]{2}$"} } } }, "required": ["name"] } def safe_load_user_data(data): """Safely load and validate user data""" try: # Parse JSON parsed = json.loads(data) # Validate against schema validate(instance=parsed, schema=USER_DATA_SCHEMA) # Additional custom validation if len(parsed['name']) < 2: raise ValueError("Name too short") return parsed except (json.JSONDecodeError, ValidationError, ValueError) as e: # Log the error (without sensitive data) app.logger.warning(f"Invalid user data: {str(e)[:100]}") raise InvalidUserData("Invalid data format") # Usage @app.route('/update-profile', methods=['POST']) def update_profile(): try: data = safe_load_user_data(request.data) # Process valid data except InvalidUserData: return "Invalid data format", 400 ``` --- #### **454. How do you implement "secure multi-factor authentication" with WebAuthn?** **Answer:** ```python from flask_webauthn import ( WebAuthn, RegistrationRejectedException, AuthenticationRejectedException ) webauthn = WebAuthn( app, rp_name="MyApp", rp_id="example.com", url_prefix="/webauthn" ) @app.route('/webauthn/begin-registration', methods=['POST']) @login_required def begin_registration(): # Generate challenge registration_data, state = webauthn.registration_begin( user={ 'id': current_user.id_bytes, 'name': current_user.email, 'displayName': current_user.name } ) # Store state in session session['webauthn_registration_state'] = state return jsonify(registration_data) @app.route('/webauthn/finish-registration', methods=['POST']) @login_required def finish_registration(): # Verify registration try: webauthn.registration_finish( session.pop('webauthn_registration_state'), request.json ) except RegistrationRejectedException as e: return jsonify({"status": "error", "message": str(e)}), 400 # Save credential to database credential = WebAuthnCredential( user_id=current_user.id, credential_data=request.json['credentialData'] ) db.session.add(credential) db.session.commit() return jsonify({"status": "ok"}) # Authentication flow similar to registration ``` --- #### **455. How do you prevent "insecure direct object references" in API responses?** **Answer:** ```python def filter_response_data(data, user): """Filter response data based on user permissions""" if isinstance(data, dict): # Remove sensitive fields if 'password' in data: del data['password'] if 'api_key' in data and not user.is_admin: del data['api_key'] # Filter nested objects for key, value in data.items(): data[key] = filter_response_data(value, user) elif isinstance(data, list): return [filter_response_data(item, user) for item in data] return data @app.after_request def secure_api_response(response): # Only process JSON API responses if 'application/json' in response.content_type: try: data = json.loads(response.get_data(as_text=True)) filtered = filter_response_data(data, current_user) response.set_data(json.dumps(filtered)) except: pass return response ``` --- #### **456. How do you implement "secure password rotation" policies?** **Answer:** ```python @app.before_request def check_password_rotation(): if not current_user.is_authenticated: return # Check if password needs rotation days_since_change = (datetime.utcnow() - current_user.password_changed_at).days if days_since_change > app.config['PASSWORD_ROTATION_DAYS']: # Redirect to change password if request.endpoint not in ['change_password', 'logout']: return redirect(url_for('change_password', reason='rotation')) @app.route('/change-password', methods=['GET', 'POST']) @login_required def change_password(): if request.method == 'POST': # Verify current password if not current_user.verify_password(request.form['current_password']): flash("Current password is incorrect", "error") return redirect(url_for('change_password')) # Validate new password is_valid, message = validate_password(request.form['new_password']) if not is_valid: flash(message, "error") return redirect(url_for('change_password')) # Update password current_user.set_password(request.form['new_password']) current_user.password_changed_at = datetime.utcnow() db.session.commit() # Force re-authentication logout_user() flash("Password changed. Please log in again.", "success") return redirect(url_for('login')) return render_template('change_password.html') ``` --- #### **457. How do you handle "insecure API documentation" exposure?** **Answer:** ```python # Configure Swagger/OpenAPI from flasgger import Swagger # Secure documentation with auth @app.before_request def require_docs_auth(): if request.path.startswith('/apidocs') and not current_user.is_authenticated: return redirect(url_for('login', next=request.url)) # Configure Swagger with security template = { "swagger": "2.0", "info": { "title": "Secure API", "description": "Authenticated API documentation" }, "securityDefinitions": { "APIKeyHeader": { "type": "apiKey", "name": "X-API-Key", "in": "header" } }, "security": [ {"APIKeyHeader": []} ] } swagger = Swagger(app, template=template) # Hide sensitive endpoints @app.route('/internal-api', methods=['POST']) @swag_from('docs/internal_api.yml', validation=False) # Don't document def internal_api(): # ... ``` --- #### **458. How do you implement "secure data sanitization" for user-generated content?** **Answer:** ```python from bs4 import BeautifulSoup import bleach ALLOWED_TAGS = [ 'p', 'br', 'strong', 'em', 'u', 'a', 'ul', 'ol', 'li', 'blockquote' ] ALLOWED_ATTRIBUTES = { 'a': ['href', 'title'], 'img': ['src', 'alt'] } def sanitize_html(html): """Sanitize HTML content using bleach""" # First pass: BeautifulSoup to fix malformed HTML soup = BeautifulSoup(html, 'html.parser') fixed_html = str(soup) # Second pass: Bleach for sanitization clean = bleach.clean( fixed_html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES, protocols=['http', 'https'], strip=True ) # Third pass: Additional XSS protection clean = re.sub(r'javascript:', '', clean, flags=re.IGNORECASE) return clean @app.route('/submit-content', methods=['POST']) @login_required def submit_content(): # Sanitize all user-generated content content = sanitize_html(request.form['content']) # Additional validation if len(content) > MAX_CONTENT_LENGTH: return "Content too long", 400 # Save sanitized content post = Post(content=content) db.session.add(post) db.session.commit() return redirect(url_for('view_post', id=post.id)) ``` --- #### **459. How do you prevent "insecure file upload" path traversal?** **Answer:** ```python import os from werkzeug.utils import secure_filename def save_upload(file, user_id): # Validate file type if not allowed_file(file.filename): raise ValueError("Invalid file type") # Generate safe filename ext = os.path.splitext(file.filename)[1] safe_name = f"{secrets.token_hex(16)}{ext}" # Define upload path (outside web root if possible) upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], str(user_id)) os.makedirs(upload_dir, exist_ok=True) # Full path with safe name file_path = os.path.join(upload_dir, safe_name) # Save file file.save(file_path) # Verify file was saved in expected location if not os.path.realpath(file_path).startswith(os.path.realpath(upload_dir)): os.remove(file_path) raise ValueError("Invalid file path") return file_path def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS'] ``` --- #### **460. How do you implement "secure audit trails" for sensitive operations?** **Answer:** ```python from functools import wraps from datetime import datetime def audit_log(action, category="security"): """Decorator for audit logging sensitive operations""" def decorator(f): @wraps(f) def decorated(*args, **kwargs): # Get user (if authenticated) user_id = current_user.id if current_user.is_authenticated else None # Log before operation audit = AuditLog( timestamp=datetime.utcnow(), user_id=user_id, ip_address=request.remote_addr, user_agent=request.user_agent.string, action=action, category=category, status="started", request_data={ "path": request.path, "method": request.method, "params": request.args.to_dict() } ) db.session.add(audit) db.session.flush() # Get audit.id try: # Execute the function result = f(*args, **kwargs) # Update success audit.status = "completed" audit.response_data = {"result": "success"} db.session.commit() return result except Exception as e: # Update failure audit.status = "failed" audit.response_data = { "error": str(e), "type": e.__class__.__name__ } db.session.commit() raise return decorated return decorator # Usage @app.route('/admin/delete-user/<int:user_id>', methods=['POST']) @audit_log("user_delete", category="admin") @require_admin def delete_user(user_id): # Delete user logic return jsonify(status="success") ``` --- #### **461. How do you handle "insecure cryptography storage" in Flask apps?** **Answer:** ```python import os from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend def derive_key(master_password, salt): """Derive encryption key from master password""" kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend() ) return kdf.derive(master_password.encode()) def get_encryption_key(): """Get encryption key from secure source""" # 1. Try environment variable (for production) if key_env := os.getenv('APP_ENCRYPTION_KEY'): return key_env # 2. Try key management service (AWS KMS, HashiCorp Vault) if key_kms := get_key_from_kms(): return key_kms # 3. Fallback to file (for development only) if app.config['ENV'] == 'development': return load_key_from_file() raise RuntimeError("Encryption key not configured") def load_key_from_file(): """Securely load key from file (development only)""" key_path = os.path.join(app.instance_path, 'encryption.key') # Verify file permissions (must be 600) if os.path.exists(key_path): mode = os.stat(key_path).st_mode if mode & 0o777 != 0o600: raise RuntimeError("Insecure key file permissions") # Read or generate key if not os.path.exists(key_path): key = Fernet.generate_key() with open(key_path, 'wb') as f: f.write(key) os.chmod(key_path, 0o600) else: with open(key_path, 'rb') as f: key = f.read() return key ``` --- #### **462. How do you implement "secure cross-domain authentication" for microservices?** **Answer:** ```python import jwt from datetime import datetime, timedelta def create_service_token(audience, duration=timedelta(minutes=5)): """Create JWT for service-to-service authentication""" payload = { 'iss': app.config['SERVICE_NAME'], 'aud': audience, 'iat': datetime.utcnow(), 'exp': datetime.utcnow() + duration, 'jti': secrets.token_urlsafe(16) } return jwt.encode( payload, app.config['SERVICE_PRIVATE_KEY'], algorithm='RS256' ) def verify_service_token(token, issuer): """Verify JWT from another service""" try: return jwt.decode( token, app.config['SERVICE_PUBLIC_KEYS'][issuer], algorithms=['RS256'], audience=app.config['SERVICE_NAME'] ) except jwt.InvalidTokenError: return None # Usage in service A @app.route('/api/data') @require_auth def get_data(): # Call service B token = create_service_token(audience='service-b') response = requests.get( 'https://service-b/api/internal-data', headers={'Authorization': f'Bearer {token}'} ) return response.json() # Usage in service B @app.route('/api/internal-data') def internal_data(): token = request.headers.get('Authorization', '').replace('Bearer ', '') if not verify_service_token(token, issuer='service-a'): abort(401, "Invalid service token") # Return data ``` --- #### **463. How do you prevent "insecure API endpoint exposure"?** **Answer:** ```python # Define endpoint security levels API_SECURITY = { 'public': {'roles': []}, 'user': {'roles': ['user']}, 'admin': {'roles': ['admin']}, 'internal': {'roles': [], 'networks': ['10.0.0.0/8', '172.16.0.0/12']} } def require_api_access(level): """Decorator for API endpoint security""" def decorator(f): @wraps(f) def decorated(*args, **kwargs): # Check security level if level == 'internal': # Verify from internal network if not is_internal_ip(request.remote_addr): abort(403, "Internal API restricted to private network") elif level in ['user', 'admin']: # Verify authentication if not current_user.is_authenticated: abort(401, "Authentication required") # Verify role if level == 'admin' and not current_user.is_admin: abort(403, "Admin access required") return f(*args, **kwargs) return decorated return decorator # Usage @app.route('/api/public-data') @require_api_access('public') def public_data(): # No auth required return jsonify(data="public") @app.route('/api/user-profile') @require_api_access('user') def user_profile(): # User auth required return jsonify(profile=current_user.safe_data) @app.route('/api/admin-stats') @require_api_access('admin') def admin_stats(): # Admin auth required return jsonify(stats=get_admin_stats()) @app.route('/internal/health') @require_api_access('internal') def internal_health(): # Internal network only return jsonify(status="ok") ``` --- #### **464. How do you implement "secure data anonymization" for backups?** **Answer:** ```python def anonymize_database(): """Anonymize PII in database for backups""" # Anonymize users for user in User.query.all(): user.email = f"user{user.id}@example.com" user.phone = f"+44 7{user.id:09d}" user.first_name = fake.first_name() user.last_name = fake.last_name() user.address = fake.address() # Anonymize logs LogEntry.query.update({ LogEntry.ip_address: func.md5(LogEntry.ip_address) }, synchronize_session=False) # Clear sensitive fields PaymentInfo.query.update({ PaymentInfo.card_number: None, PaymentInfo.cvv: None }, synchronize_session=False) db.session.commit() def create_anonymized_backup(): """Create anonymized backup""" # Create temporary anonymized database temp_db = create_temp_database() try: # Copy data to temp DB copy_database(app.config['SQLALCHEMY_DATABASE_URI'], temp_db) # Anonymize with app.app_context(): app.config['SQLALCHEMY_DATABASE_URI'] = temp_db db.init_app(app) anonymize_database() # Create backup backup_path = f"/backups/anonymized_{datetime.now().isoformat()}.sql" os.system(f"pg_dump {temp_db} > {backup_path}") # Encrypt backup encrypt_file(backup_path, app.config['BACKUP_ENCRYPTION_KEY']) return backup_path finally: # Clean up drop_temp_database(temp_db) ``` --- #### **465. How do you handle "insecure error handling" that leaks information?** **Answer:** ```python @app.errorhandler(500) def handle_500(e): """Generic 500 error handler""" # Generate unique error ID for tracking error_id = generate_error_id() # Log full error details app.logger.error( "Server Error: %s\n%s", error_id, traceback.format_exc() ) # Return generic message return render_template( 'errors/500.html', error_id=error_id, support_email=app.config['SUPPORT_EMAIL'] ), 500 @app.errorhandler(404) def handle_404(e): """Custom 404 handler""" # Log suspicious 404s if is_suspicious_404(request.path): app.logger.warning("Suspicious 404: %s from %s", request.path, request.remote_addr) return render_template('errors/404.html'), 404 def is_suspicious_404(path): """Check if 404 might be probing for vulnerabilities""" suspicious_patterns = [ r'\/wp-admin', r'\/phpmyadmin', r'\/.env', r'\/backup\.(?:sql|zip)' ] return any(re.search(pattern, path) for pattern in suspicious_patterns) ``` --- #### **466. How do you implement "secure API rate limiting" with sliding window?** **Answer:** ```python import time import redis class SlidingWindowRateLimiter: def __init__(self, redis_client, limit=100, window=60): self.redis = redis_client self.limit = limit self.window = window def is_allowed(self, key): now = time.time() window_start = now - self.window # Create pipeline for atomic operations pipe = self.redis.pipeline() # Remove old timestamps pipe.zremrangebyscore(key, 0, window_start) # Add new timestamp pipe.zadd(key, {now: now}) # Set expiration pipe.expire(key, self.window) # Get current count pipe.zcard(key) # Execute pipeline _, _, _, count = pipe.execute() return count <= self.limit # Usage limiter = SlidingWindowRateLimiter( redis.from_url(app.config['REDIS_URL']), limit=100, window=60 ) @app.before_request def rate_limit(): if request.path.startswith('/api/'): user_key = f"rate_limit:{get_current_user_id() or request.remote_addr}" if not limiter.is_allowed(user_key): return "Rate limit exceeded", 429 ``` --- #### **467. How do you prevent "insecure direct object references" in background tasks?** **Answer:** ```python @celery.task def process_user_data(user_id, requesting_user_id): """Process user data with security checks""" # Verify requesting user has permission requesting_user = User.query.get(requesting_user_id) if not requesting_user or not requesting_user.is_admin: raise SecurityError("Unauthorized task request") # Verify target user exists user = User.query.get(user_id) if not user: raise ValueError("User not found") # Process data # ... # Secure task invocation @app.route('/process-data/<int:user_id>') @require_admin def start_processing(user_id): # Verify admin has permission to access this user if not current_user.can_access_user(user_id): abort(403, "Unauthorized user access") # Start task with both user IDs process_user_data.delay( user_id=user_id, requesting_user_id=current_user.id ) return "Processing started", 202 ``` --- #### **468. How do you implement "secure data validation" for complex inputs?** **Answer:** ```python from jsonschema import validate, FormatChecker from jsonschema.exceptions import ValidationError # Custom format checker format_checker = FormatChecker() format_checker.checks("date-time")(lambda s: True) # Override default # Schema with custom validation USER_SCHEMA = { "type": "object", "properties": { "email": { "type": "string", "format": "email", "maxLength": 254 }, "birthdate": { "type": "string", "format": "date", "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$" }, "preferences": { "type": "object", "properties": { "theme": {"type": "string", "enum": ["light", "dark", "system"]}, "language": {"type": "string", "pattern": "^[a-z]{2}(-[A-Z]{2})?$"} } } }, "required": ["email"], "additionalProperties": False } def validate_user_data(data): """Validate user data against schema""" try: validate( instance=data, schema=USER_SCHEMA, format_checker=format_checker ) # Custom business logic validation if 'birthdate' in data: birthdate = datetime.strptime(data['birthdate'], '%Y-%m-%d') if birthdate > datetime.now() - timedelta(days=18*365): raise ValueError("User must be at least 18 years old") return True except ValidationError as e: # Map JSON Schema errors to user-friendly messages if e.validator == 'format': if e.schema.get('format') == 'email': raise ValueError("Invalid email format") elif e.schema.get('format') == 'date': raise ValueError("Date must be in YYYY-MM-DD format") elif e.validator == 'pattern': if 'language' in e.schema_path: raise ValueError("Invalid language code") raise ValueError(f"Invalid data: {e.message}") # Usage @app.route('/update-profile', methods=['POST']) def update_profile(): try: data = request.get_json() validate_user_data(data) # Process valid data except ValueError as e: return jsonify(error=str(e)), 400 ``` --- #### **469. How do you handle "insecure cryptography algorithms" in legacy systems?** **Answer:** ```python from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend import base64 # Supported algorithms with deprecation status SUPPORTED_CIPHERS = { 'AES-256-GCM': { 'class': algorithms.AES, 'mode': lambda nonce: modes.GCM(nonce), 'key_size': 32, 'nonce_size': 12, 'deprecated': False }, 'AES-128-CBC': { 'class': algorithms.AES, 'mode': lambda iv: modes.CBC(iv), 'key_size': 16, 'nonce_size': 16, 'deprecated': True } } def encrypt_data(data, algorithm='AES-256-GCM'): """Encrypt data with specified algorithm""" if algorithm not in SUPPORTED_CIPHERS: raise ValueError("Unsupported encryption algorithm") config = SUPPORTED_CIPHERS[algorithm] key = get_encryption_key(config['key_size']) nonce = os.urandom(config['nonce_size']) cipher = Cipher( config['class'](key), config['mode'](nonce), backend=default_backend() ) encryptor = cipher.encryptor() # Add padding for CBC if algorithm == 'AES-128-CBC': padder = padding.PKCS7(128).padder() data = padder.update(data) + padder.finalize() ciphertext = encryptor.update(data) + encryptor.finalize() # For GCM, include tag if algorithm.startswith('AES') and 'GCM' in algorithm: ciphertext += encryptor.tag # Return format: algorithm|nonce|ciphertext return f"{algorithm}|{base64.b64encode(nonce).decode()}|{base64.b64encode(ciphertext).decode()}" def decrypt_data(encrypted_data): """Decrypt data with automatic algorithm detection""" algorithm, nonce_b64, ciphertext_b64 = encrypted_data.split('|') if algorithm not in SUPPORTED_CIPHERS: raise ValueError("Unsupported encryption algorithm") config = SUPPORTED_CIPHERS[algorithm] nonce = base64.b64decode(nonce_b64) ciphertext = base64.b64decode(ciphertext_b64) key = get_encryption_key(config['key_size']) # Separate tag for GCM if algorithm.endswith('GCM'): ciphertext, tag = ciphertext[:-16], ciphertext[-16:] cipher = Cipher( config['class'](key), config['mode'](nonce), backend=default_backend() ) decryptor = cipher.decryptor() plaintext = decryptor.update(ciphertext) + decryptor.finalize_with_tag(tag) else: cipher = Cipher( config['class'](key), config['mode'](nonce), backend=default_backend() ) decryptor = cipher.decryptor() plaintext = decryptor.update(ciphertext) + decryptor.finalize() # Remove padding for CBC if algorithm == 'AES-128-CBC': unpadder = padding.PKCS7(128).unpadder() plaintext = unpadder.update(plaintext) + unpadder.finalize() # Log deprecated algorithm usage for migration if config['deprecated']: app.logger.warning(f"Deprecated encryption algorithm used: {algorithm}") return plaintext ``` --- #### **470. How do you implement "secure data redaction" for logs and monitoring?** **Answer:** ```python import re from logging import Filter # Patterns to redact REDACT_PATTERNS = [ (r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '***@***.***'), (r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b', '****-****-****-****'), (r'\b[A-Za-z0-9+/=]{40,}\b', '***SECRET***'), (r'Bearer [A-Za-z0-9\-_\.]+', 'Bearer ***') ] class RedactingFilter(Filter): """Log filter that redacts sensitive information""" def filter(self, record): if isinstance(record.msg, str): for pattern, replacement in REDACT_PATTERNS: record.msg = re.sub(pattern, replacement, record.msg) # Process log arguments if record.args: args = list(record.args) for i, arg in enumerate(args): if isinstance(arg, str): for pattern, replacement in REDACT_PATTERNS: args[i] = re.sub(pattern, replacement, arg) record.args = tuple(args) return True # Configure logging def configure_logging(): logger = logging.getLogger() logger.addFilter(RedactingFilter()) # For JSON logging if app.config['JSON_LOGGING']: formatter = JsonFormatter( json_encoder=CustomJsonEncoder, # Redact fields in JSON extra={ 'email': '***@***.***', 'ssn': '***-**-****', 'api_key': '***' } ) # Usage in routes @app.route('/sensitive-route') def sensitive_route(): app.logger.info( "Processing sensitive data for user %s with token %s", current_user.email, request.headers.get('X-API-Key') ) # The logged message will have redacted values ``` --- #### **471. How do you prevent "insecure API versioning" that exposes deprecated endpoints?** **Answer:** ```python from functools import wraps # Track deprecated API versions DEPRECATED_VERSIONS = { 'v1': { 'sunset': datetime(2023, 12, 31), 'replacement': 'v2' }, 'v2': { 'sunset': datetime(2024, 6, 30), 'replacement': 'v3' } } def api_version(version): """Decorator for API version control""" def decorator(f): @wraps(f) def decorated(*args, **kwargs): # Check if version is deprecated if version in DEPRECATED_VERSIONS: deprecation = DEPRECATED_VERSIONS[version] response = make_response(f(*args, **kwargs)) # Add deprecation headers response.headers['Deprecation'] = 'true' response.headers['Sunset'] = deprecation['sunset'].strftime('%a, %d %b %Y %H:%M:%S GMT') response.headers['Link'] = ( f'<{url_for("api_docs", version=deprecation["replacement"])}>; rel="successor-version"' ) # Log deprecation usage app.logger.warning( "Deprecated API version used: %s from %s", version, request.remote_addr ) return response return f(*args, **kwargs) return decorated return decorator # Automatic version routing @app.before_request def route_api_version(): # Get version from header or URL version = request.headers.get('Accept', '').split('/')[-1] or request.view_args.get('version', 'v1') # Check if version exists if version not in app.view_functions: return jsonify(error="API version not supported"), 404 # Redirect deprecated versions if version in DEPRECATED_VERSIONS: return redirect( request.url.replace(version, DEPRECATED_VERSIONS[version]['replacement']), code=301 ) # Usage @app.route('/api/v1/users') @api_version('v1') def get_users_v1(): # Legacy implementation return jsonify(users=convert_to_v1_format(User.query.all())) @app.route('/api/v2/users') @api_version('v2') def get_users_v2(): # Current implementation return jsonify(data=UserSchema(many=True).dump(User.query.all())) ``` --- #### **472. How do you implement "secure data sanitization" for CSV exports?** **Answer:** ```python import csv import io from flask import Response def sanitize_csv_value(value): """Sanitize value for CSV to prevent formula injection""" # Check for formula injection attempts if isinstance(value, str) and value.startswith(('=', '+', '-', '@')): return "'" + value # Prepend apostrophe to treat as text # Escape double quotes if isinstance(value, str): return value.replace('"', '""') return value def generate_sanitized_csv(query, field_names): """Generate sanitized CSV from query results""" output = io.StringIO() writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) # Write header writer.writerow(field_names) # Write data for obj in query: row = [ sanitize_csv_value(getattr(obj, field)) for field in field_names ] writer.writerow(row) return output.getvalue() @app.route('/export/users.csv') @login_required def export_users(): # Verify permissions if not current_user.can_export_users: abort(403) # Get data users = User.query.all() # Generate sanitized CSV csv_data = generate_sanitized_csv( users, ['id', 'email', 'name', 'created_at'] ) # Return as attachment return Response( csv_data, mimetype='text/csv', headers={ 'Content-Disposition': 'attachment; filename=users_export.csv', 'Content-Security-Policy': "default-src 'none'" } ) ``` --- #### **473. How do you handle "insecure deserialization of session data"?** **Answer:** ```python from flask import Flask, session from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired import pickle class SecureSessionInterface: """Secure session interface with encrypted, signed session data""" def __init__(self, secret_key, salt='secure-session', serialization_format='pickle', max_age=86400): self.serializer = URLSafeTimedSerializer( secret_key, salt=salt, serializer=pickle if serialization_format == 'pickle' else None ) self.max_age = max_age def open_session(self, app, request): session_cookie = request.cookies.get(app.session_cookie_name) if not session_cookie: return None try: # Verify and load session data data = self.serializer.loads( session_cookie, max_age=self.max_age ) return data except (BadSignature, SignatureExpired): return None def save_session(self, app, session, response): domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) # Remove session if empty if not session: if session.modified: response.delete_cookie( app.session_cookie_name, domain=domain, path=path ) return # Serialize and sign session data session_cookie = self.serializer.dumps(dict(session)) # Set cookie response.set_cookie( app.session_cookie_name, session_cookie, expires=self.get_expiration_time(app, session), httponly=True, domain=domain, path=path, secure=app.config['SESSION_COOKIE_SECURE'], samesite=app.config['SESSION_COOKIE_SAMESITE'] ) # Configure Flask app = Flask(__name__) app.session_interface = SecureSessionInterface( secret_key=app.config['SECRET_KEY'], serialization_format='json' # Critical: Use JSON instead of pickle ) app.config.update( SESSION_COOKIE_SECURE=True, SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE='Lax', SESSION_COOKIE_SAMESITE='None' if app.config['USE_MOBILE'] else 'Lax' ) ``` --- #### **474. How do you implement "secure multi-tenancy" with row-level security?** **Answer:** ```python from sqlalchemy import event from flask import g # Track current tenant @app.before_request def set_tenant(): tenant_id = request.headers.get('X-Tenant-ID') if not tenant_id or not is_valid_tenant(tenant_id): abort(401, "Invalid tenant ID") g.tenant_id = tenant_id # SQLAlchemy event listener for row-level security @event.listens_for(db.session, 'before_flush') def before_flush(session, flush_context, instances): # Apply tenant filter to all queries for query in session.query_history: if hasattr(query.column_descriptions[0]['type'], 'tenant_id'): query.filter(query.column_descriptions[0]['type'].tenant_id == g.tenant_id) # Base model for multi-tenancy class TenantModel(db.Model): __abstract__ = True tenant_id = db.Column(db.Integer, nullable=False) @classmethod def query(cls): return super().query.filter_by(tenant_id=g.tenant_id) # Usage in models class Document(TenantModel): id = db.Column(db.Integer, primary_key=True) content = db.Column(db.Text) # tenant_id inherited from TenantModel # Alternative: PostgreSQL Row Level Security """ CREATE POLICY tenant_isolation ON documents USING (tenant_id = current_setting('app.current_tenant')::integer); -- Set tenant in session SET app.current_tenant = 123; """ ``` --- #### **475. How do you prevent "insecure direct object references" in file storage?** **Answer:** ```python import uuid import os from flask import send_file, abort @app.route('/files/<file_id>') @login_required def get_file(file_id): # Verify file exists in database file_record = File.query.get(file_id) if not file_record: abort(404) # Verify user has access if not current_user.can_access_file(file_record): abort(403) # Generate secure path secure_path = os.path.join( app.config['STORAGE_ROOT'], file_record.storage_path ) # Verify path is within storage root if not os.path.realpath(secure_path).startswith(os.path.realpath(app.config['STORAGE_ROOT'])): app.logger.critical(f"Path traversal attempt: {secure_path}") abort(403) # Send file with proper headers return send_file( secure_path, as_attachment=True, download_name=file_record.original_name, mimetype=file_record.mime_type, conditional=True ) # Secure file upload @app.route('/upload', methods=['POST']) @login_required def upload_file(): if 'file' not in request.files: return "No file provided", 400 file = request.files['file'] if file.filename == '': return "Empty filename", 400 # Generate secure storage path storage_id = str(uuid.uuid4()) storage_path = os.path.join( current_user.id, storage_id[:2], storage_id ) # Save to secure location full_path = os.path.join(app.config['STORAGE_ROOT'], storage_path) os.makedirs(os.path.dirname(full_path), exist_ok=True) file.save(full_path) # Create database record file_record = File( user_id=current_user.id, original_name=file.filename, storage_path=storage_path, mime_type=file.mimetype ) db.session.add(file_record) db.session.commit() return jsonify(file_id=file_record.id), 201 ``` --- #### **476. How do you implement "secure API key rotation" with zero downtime?** **Answer:** ```python class APIKey(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) key_hash = db.Column(db.String(128)) created_at = db.Column(db.DateTime, default=datetime.utcnow) rotated_at = db.Column(db.DateTime) status = db.Column(db.String(20), default='active') # active, rotating, rotated, revoked @app.route('/api/keys/rotate', methods=['POST']) @login_required def rotate_api_keys(): # Create new key new_key = create_api_key(current_user) # Mark old keys as rotating APIKey.query.filter( APIKey.user_id == current_user.id, APIKey.status == 'active' ).update({'status': 'rotating'}) db.session.commit() # Return both keys for transition return jsonify( new_key=new_key, rotation_period=app.config['API_KEY_ROTATION_PERIOD'] ), 200 @app.before_request def authenticate_api_key(): key = request.headers.get('X-API-KEY') if not key: return # Try active keys first api_key = APIKey.query.filter( APIKey.key_hash == key, APIKey.status == 'active' ).first() # If not found, try rotating keys (during transition) if not api_key: api_key = APIKey.query.filter( APIKey.key_hash == key, APIKey.status == 'rotating', APIKey.rotated_at > datetime.utcnow() - app.config['API_KEY_ROTATION_PERIOD'] ).first() if not api_key or not check_password_hash(api_key.key_hash, key): abort(401) # Update usage api_key.last_used = datetime.utcnow() db.session.commit() @celery.task def complete_key_rotation(): """Complete rotation after transition period""" cutoff = datetime.utcnow() - app.config['API_KEY_ROTATION_PERIOD'] APIKey.query.filter( APIKey.status == 'rotating', APIKey.rotated_at < cutoff ).update({'status': 'rotated'}) db.session.commit() # Revoke unused old keys APIKey.query.filter( APIKey.status == 'rotated', APIKey.last_used < cutoff ).update({'status': 'revoked'}) db.session.commit() ``` --- #### **477. How do you handle "insecure third-party API integrations"?** **Answer:** ```python import requests from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from requests.exceptions import Timeout, ConnectionError # Configure retry strategy @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((Timeout, ConnectionError)) ) def safe_api_call(url, method='get', **kwargs): """Make safe API call with retries and timeouts""" # Validate URL if not is_safe_url(url): raise ValueError("Invalid API URL") # Add security headers headers = kwargs.get('headers', {}) headers.update({ 'User-Agent': 'MyApp/1.0', 'X-Request-ID': str(uuid.uuid4()) }) # Make request with timeout response = requests.request( method, url, timeout=(3.05, 10), # Connect timeout, read timeout headers=headers, **kwargs ) # Check for suspicious responses if response.status_code == 418 or 'X-Debug' in response.headers: raise SecurityError("Suspicious API response") # Verify response content type if 'application/json' in response.headers.get('Content-Type', ''): try: return response.json() except ValueError: raise SecurityError("Invalid JSON response") return response.text # Usage @app.route('/external-data') def get_external_data(): try: data = safe_api_call( 'https://api.trusted-partner.com/data', params={'api_key': current_user.external_api_key}, verify=True # Verify SSL ) return jsonify(data) except (SecurityError, ValueError) as e: app.logger.error("API integration failed: %s", str(e)) return "Service temporarily unavailable", 503 ``` --- #### **478. How do you implement "secure data validation" for file uploads?** **Answer:** ```python import magic import imghdr from werkzeug.utils import secure_filename ALLOWED_MIME_TYPES = { 'image/jpeg': ['jpg', 'jpeg'], 'image/png': ['png'], 'application/pdf': ['pdf'] } def validate_upload(file): """Validate file upload with multiple checks""" # Check filename if not file.filename: return False, "Empty filename" # Check extension ext = os.path.splitext(file.filename)[1][1:].lower() if ext not in {ext for extensions in ALLOWED_MIME_TYPES.values() for ext in extensions}: return False, f"Invalid file type: .{ext}" # Check MIME type from content file.seek(0) mime = magic.from_buffer(file.read(2048), mime=True) file.seek(0) if mime not in ALLOWED_MIME_TYPES: return False, f"Invalid MIME type: {mime}" # Additional validation by type if mime.startswith('image/'): # Check image dimensions img = Image.open(file) if img.width > 5000 or img.height > 5000: return False, "Image too large" file.seek(0) elif mime == 'application/pdf': # Check PDF metadata for scripts if contains_javascript(file): return False, "PDF contains JavaScript" file.seek(0) return True, "" def contains_javascript(file): """Check if PDF contains JavaScript""" content = file.read() file.seek(0) return b'/JS' in content or b'/JavaScript' in content # Usage @app.route('/upload', methods=['POST']) def upload(): if 'file' not in request.files: return "No file provided", 400 file = request.files['file'] is_valid, message = validate_upload(file) if not is_valid: return message, 400 # Save file filename = f"{secrets.token_hex(16)}.{ext}" file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) return "File uploaded", 201 ``` --- #### **479. How do you prevent "insecure direct object references" in caching?** **Answer:** ```python from functools import wraps def cache_per_user(ttl=300): """Cache decorator that includes user ID in cache key""" def decorator(f): @wraps(f) def decorated(*args, **kwargs): # Generate cache key with user ID user_id = current_user.id if current_user.is_authenticated else 'anon' cache_key = f"{user_id}:{request.path}:{json.dumps(kwargs)}" # Try cache cached = cache.get(cache_key) if cached is not None: return cached # Execute function result = f(*args, **kwargs) # Cache result cache.set(cache_key, result, ttl=ttl) return result return decorated return decorator def cache_per_tenant(ttl=300): """Cache decorator that includes tenant ID in cache key""" def decorator(f): @wraps(f) def decorated(*args, **kwargs): # Get tenant ID from request tenant_id = request.headers.get('X-Tenant-ID', 'default') cache_key = f"{tenant_id}:{request.path}:{json.dumps(kwargs)}" # Cache logic similar to above # ... return decorated return decorator # Usage @app.route('/user-data') @login_required @cache_per_user(ttl=60) def user_data(): # Expensive operation return jsonify(data=get_user_data(current_user.id)) @app.route('/tenant-data') @cache_per_tenant(ttl=300) def tenant_data(): # Tenant-specific data return jsonify(data=get_tenant_data(g.tenant_id)) ``` --- #### **480. How do you implement "secure data sanitization" for search queries?** **Answer:** ```python import re from sqlalchemy import or_ def sanitize_search_query(query): """Sanitize search query to prevent injection""" # Remove potentially dangerous characters query = re.sub(r'[;\'"\\|&$`!]', '', query) # Limit length query = query[:100] # Escape special characters for LIKE query = re.sub(r'([%_])', r'\\\1', query) return query def search_users(term): """Search users with sanitized query""" sanitized = sanitize_search_query(term) # Use parameterized query return User.query.filter( or_( User.name.ilike(f'%{sanitized}%'), User.email.ilike(f'%{sanitized}%') ) ).limit(50).all() @app.route('/search') def search(): term = request.args.get('q', '') if not term: return jsonify(results=[]) # Rate limiting if too_many_searches(request.remote_addr): return jsonify(error="Too many searches"), 429 # Log sanitized query app.logger.info("Search query: %s", sanitize_search_query(term)) # Execute search results = search_users(term) # Return sanitized results return jsonify(results=[ { 'id': user.id, 'name': escape(user.name), 'email': escape(user.email) } for user in results ]) ``` --- #### **481. How do you handle "insecure cryptography key management"?** **Answer:** ```python import os import base64 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.backends import default_backend class KeyManager: """Secure key management with rotation support""" def __init__(self, key_store): self.key_store = key_store self.current_key = None self.previous_keys = [] # Load keys at startup self.load_keys() def load_keys(self): """Load encryption keys from secure storage""" # Get current key current_key_data = self.key_store.get('current_key') if current_key_data: self.current_key = self.deserialize_key(current_key_data) # Get previous keys previous_keys_data = self.key_store.get('previous_keys', []) self.previous_keys = [ self.deserialize_key(data) for data in previous_keys_data ] def rotate_keys(self): """Rotate encryption keys""" # Store current key as previous if self.current_key: self.previous_keys.append(self.current_key) # Keep only last 5 keys self.previous_keys = self.previous_keys[-5:] # Generate new key self.current_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) # Save keys self.key_store.set('current_key', self.serialize_key(self.current_key)) self.key_store.set('previous_keys', [ self.serialize_key(key) for key in self.previous_keys ]) def serialize_key(self, key): """Serialize key to secure format""" return key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.BestAvailableEncryption( os.urandom(32) # Generate new encryption key ) ) def deserialize_key(self, data): """Deserialize key from secure storage""" return serialization.load_pem_private_key( data, password=None, # Decryption handled by key store backend=default_backend() ) def get_encryption_key(self): """Get current encryption key""" if not self.current_key: self.rotate_keys() return self.current_key def decrypt_data(self, encrypted_data, key_id=None): """Decrypt data using appropriate key""" # Determine which key to use if key_id and key_id == 'current': key = self.current_key elif key_id and key_id.startswith('previous_'): idx = int(key_id.split('_')[1]) key = self.previous_keys[idx] else: # Try current key first try: return self._decrypt_with_key(encrypted_data, self.current_key) except: # Try previous keys for key in self.previous_keys: try: return self._decrypt_with_key(encrypted_data, key) except: continue raise ValueError("Could not decrypt data") return self._decrypt_with_key(encrypted_data, key) def _decrypt_with_key(self, encrypted_data, key): """Decrypt data with specific key""" # Implementation depends on encryption method # ... # Usage key_manager = KeyManager(VaultKeyStore()) encrypted = encrypt_data(data, key_manager.get_encryption_key()) decrypted = key_manager.decrypt_data(encrypted) ``` --- #### **482. How do you implement "secure data validation" for webhooks?** **Answer:** ```python import hmac import hashlib from functools import wraps def verify_webhook(f): """Decorator to verify webhook signatures""" @wraps(f) def decorated(*args, **kwargs): # Get signature from header signature = request.headers.get('X-Hook-Signature') if not signature: abort(401, "Missing signature") # Get secret for this provider provider = request.headers.get('X-Provider') secret = get_webhook_secret(provider) if not secret: abort(401, "Invalid provider") # Calculate expected signature body = request.get_data() expected = hmac.new( secret.encode(), body, hashlib.sha256 ).hexdigest() # Constant-time comparison if not hmac.compare_digest(signature, expected): abort(401, "Invalid signature") # Verify timestamp (prevent replay attacks) timestamp = request.headers.get('X-Timestamp') if not timestamp or abs(time.time() - int(timestamp)) > 300: abort(401, "Expired webhook") return f(*args, **kwargs) return decorated # Usage @app.route('/webhooks/github', methods=['POST']) @verify_webhook def github_webhook(): # Process GitHub webhook event = request.headers.get('X-GitHub-Event') payload = request.json # Verify event type if event not in ALLOWED_GITHUB_EVENTS: return "Event not supported", 400 # Process event process_github_event(event, payload) return "OK", 200 def get_webhook_secret(provider): """Get webhook secret for provider""" secrets = { 'github': app.config['GITHUB_WEBHOOK_SECRET'], 'stripe': app.config['STRIPE_WEBHOOK_SECRET'] } return secrets.get(provider) ``` ---