### **Top 100 Flask Developer Interview Questions & Answers (Part 5: Questions 401–482)**
---
#### **401. How do you prevent SQL injection when using SQLAlchemy's `text()` constructs?**
**Answer:** Always use **parameterized queries** even with `text()`:
```python
from sqlalchemy import text
# SAFE:
result = db.session.execute(
text("SELECT * FROM users WHERE name = :name"),
{"name": user_input}
)
# UNSAFE (AVOID):
result = db.session.execute(
text(f"SELECT * FROM users WHERE name = '{user_input}'")
)
```
*Never* format SQL strings with user input.
---
#### **402. How do you implement "time-based access tokens" with short-lived JWTs?**
**Answer:**
```python
from datetime import timedelta
from flask_jwt_extended import create_access_token
@app.route('/token')
def get_token():
# Short-lived token (5 minutes)
access_token = create_access_token(
identity=current_user.id,
expires_delta=timedelta(minutes=5)
)
# Long-lived refresh token (30 days)
refresh_token = create_refresh_token(
identity=current_user.id,
expires_delta=timedelta(days=30)
)
return jsonify(access_token=access_token, refresh_token=refresh_token)
```
*Refresh tokens should be stored in a secure HTTP-only cookie.*
---
#### **403. How do you debug "context mismatch" errors in Flask async views?**
**Answer:**
- **Error**: `RuntimeError: Working outside of request context`
- **Fix**:
1. Ensure async views use `async def`
2. Use `app.ensure_sync()` for sync functions:
```python
@app.route('/async')
async def async_view():
result = await app.ensure_sync(sync_function)()
return result
```
3. Never store `request` objects for later use
---
#### **404. How do you implement "secure password hashing" with Argon2?**
**Answer:** Use `passlib` with Argon2:
```python
from passlib.hash import argon2
# Hash password
hash = argon2.using(
memory_cost=65536,
time_cost=3,
parallelism=1
).hash("password")
# Verify
if argon2.verify("password", hash):
# Valid
```
*Configuration*:
- `memory_cost`: 64MB (minimum for 2023)
- `time_cost`: 3 iterations
- `parallelism`: 1 (adjust based on CPU cores)
---
#### **405. How do you handle "timing attacks" on string comparisons?**
**Answer:** Always use `hmac.compare_digest()`:
```python
from werkzeug.security import safe_str_cmp
# SAFE:
if safe_str_cmp(user_provided_token, stored_token):
# Valid
# UNSAFE (AVOID):
if user_provided_token == stored_token:
# Vulnerable to timing attacks
```
*Applies to:*
- Token validation
- Password checks
- Signature verification
---
#### **406. How do you implement "secure session fixation" protection?**
**Answer:** Regenerate session ID on privilege changes:
```python
from flask import session
@app.route('/login', methods=['POST'])
def login():
user = authenticate(request.form['username'], request.form['password'])
if user:
# Regenerate session ID to prevent fixation
session.regenerate()
login_user(user)
return redirect(url_for('dashboard'))
```
*Flask-Login* handles this automatically with `fresh_login_required`.
---
#### **407. How do you prevent "HTTP Header Injection" in Flask responses?**
**Answer:** Flask automatically validates headers, but:
1. **Never** use user input in header names/values:
```python
# UNSAFE:
response.headers[user_input] = "value"
# SAFE:
if user_input in ALLOWED_HEADERS:
response.headers[user_input] = sanitize_value(value)
```
2. Use `werkzeug.datastructures.Headers` for manual header construction
---
#### **408. How do you implement "secure file download" with content disposition?**
**Answer:**
```python
from flask import send_file
@app.route('/download/<file_id>')
@login_required
def download(file_id):
file = get_authorized_file(file_id)
return send_file(
file.path,
as_attachment=True,
download_name=file.safe_name,
mimetype=file.mimetype,
# Prevent XSS in filename
attachment_filename=sanitize_filename(file.name)
)
def sanitize_filename(name):
return secure_filename(os.path.basename(name))
```
---
#### **409. How do you handle "cross-site scripting" (XSS) in Flask templates?**
**Answer:**
1. **Auto-escaping**: Enabled by default in Jinja2
2. **Manual escaping** when needed:
```jinja
{{ user_input | e }} {# Equivalent to {{ user_input }} #}
```
3. **Mark safe HTML only when absolutely necessary**:
```python
Markup("<strong>Trusted content</strong>")
```
*Never* mark user-generated content as safe.
---
#### **410. How do you implement "secure password recovery" with rate limiting?**
**Answer:**
```python
from flask_limiter import Limiter
limiter = Limiter(
app,
key_func=lambda: request.remote_addr,
default_limits=["5/hour"]
)
@app.route('/recover', methods=['POST'])
@limiter.limit("1/minute")
def recover():
# Validate email format
if not validate_email(request.form['email']):
return "Invalid email", 400
# Generate token (15-minute expiry)
token = generate_recovery_token(request.form['email'])
# Send email (async)
send_recovery_email.delay(request.form['email'], token)
# Always return 200 to prevent email enumeration
return "If email exists, recovery instructions were sent", 200
```
---
#### **411. How do you prevent "insecure direct object references" (IDOR)?**
**Answer:** Always verify ownership:
```python
@app.route('/documents/<int:doc_id>')
@login_required
def view_document(doc_id):
# Critical: Check ownership
document = Document.query.filter_by(
id=doc_id,
user_id=current_user.id # Ownership check
).first_or_404()
return render_template('document.html', doc=document)
```
*Never* use: `Document.query.get(doc_id)` without ownership check.
---
#### **412. How do you implement "secure API authentication" with mutual TLS?**
**Answer:**
1. Configure Nginx for mTLS:
```nginx
server {
ssl_client_certificate /path/to/ca.pem;
ssl_verify_client on;
ssl_verify_depth 2;
}
```
2. In Flask:
```python
@app.before_request
def require_mtls():
if not request.environ.get('SSL_CLIENT_VERIFY') == 'SUCCESS':
abort(401, "mTLS required")
# Optional: Verify specific client certificate
cert = request.environ['SSL_CLIENT_CERT']
if not is_authorized_cert(cert):
abort(403, "Unauthorized client")
```
---
#### **413. How do you handle "mass assignment" vulnerabilities in Flask?**
**Answer:**
1. **Never** use `form.populate_obj()` directly
2. Use explicit field mapping:
```python
def update_profile():
user.name = form.name.data
user.email = form.email.data
# NO: user.role = form.role.data (sensitive field)
```
3. Or use whitelist with `only` parameter:
```python
form.populate_obj(user, only=['name', 'email'])
```
---
#### **414. How do you implement "secure file upload" with virus scanning?**
**Answer:**
```python
import pyclamd
@app.route('/upload', methods=['POST'])
def upload():
file = request.files['file']
# Validate type (server-side)
if not allowed_file(file.filename):
return "Invalid file type", 400
# Scan for viruses
if not scan_virus(file):
return "Virus detected", 403
# Save with random name
filename = secure_filename(file.filename)
safe_path = os.path.join(UPLOAD_FOLDER, str(uuid4()))
file.save(safe_path)
return "Uploaded", 201
def scan_virus(file):
file.seek(0) # Reset stream position
cd = pyclamd.ClamdNetworkSocket()
result = cd.scan_stream(file.read())
file.seek(0) # Reset again for saving
return result is None
```
---
#### **415. How do you prevent "clickjacking" attacks in Flask?**
**Answer:** Set `X-Frame-Options` header:
```python
from flask_talisman import Talisman
Talisman(
app,
frame_options='DENY', # Or 'SAMEORIGIN'
content_security_policy={
'frame-ancestors': "'none'" # Modern alternative
}
)
```
*Alternative:* Nginx configuration:
```nginx
add_header X-Frame-Options "DENY";
```
---
#### **416. How do you implement "secure password policies" with complexity checks?**
**Answer:**
```python
import re
def validate_password(password):
# Minimum length
if len(password) < 12:
return False, "Password must be at least 12 characters"
# Complexity requirements
if not re.search(r'[A-Z]', password):
return False, "Password must contain uppercase letter"
if not re.search(r'[a-z]', password):
return False, "Password must contain lowercase letter"
if not re.search(r'[0-9]', password):
return False, "Password must contain a number"
if not re.search(r'[^A-Za-z0-9]', password):
return False, "Password must contain special character"
# Check against common passwords
if password.lower() in COMMON_PASSWORDS:
return False, "Password too common"
return True, ""
```
---
#### **417. How do you handle "sensitive data exposure" in Flask logs?**
**Answer:**
1. **Mask sensitive data** in logs:
```python
import re
from flask import request
@app.before_request
def mask_sensitive_data():
if request.path == '/login':
request.masked_data = {
k: '***' if k in ['password', 'token'] else v
for k, v in request.form.items()
}
@app.after_request
def log_request(response):
app.logger.info(
f"{request.method} {request.path} {request.masked_data} → {response.status}"
)
return response
```
2. Use structured logging with field filtering
---
#### **418. How do you implement "secure session management" for mobile apps?**
**Answer:**
```python
app.config.update(
# Required for mobile WebView
SESSION_COOKIE_SAMESITE="None",
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_HTTPONLY=True,
# Shorter lifetime for mobile
PERMANENT_SESSION_LIFETIME=timedelta(days=7),
# Use Redis for distributed sessions
SESSION_TYPE="redis",
SESSION_REDIS=redis.from_url(os.getenv("REDIS_URL"))
)
# Additional mobile-specific security
@app.before_request
def mobile_security():
# Verify custom header for mobile clients
if request.headers.get('X-Mobile-Client'):
if not validate_mobile_token(request.headers['X-Mobile-Token']):
abort(401)
```
---
#### **419. How do you prevent "insecure deserialization" attacks?**
**Answer:**
1. **NEVER** use `pickle` with untrusted data
2. For sessions:
- Use `Flask-Session` with Redis (not filesystem)
- Configure `SESSION_USE_SIGNER=True`
3. For APIs:
- Prefer JSON over pickle/YAML
- Validate all input types:
```python
def validate_data(data):
if isinstance(data, (list, dict)):
# Recursively validate
...
```
---
#### **420. How do you implement "secure API key management" with rotation?**
**Answer:**
```python
class APIKey(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
key_hash = db.Column(db.String(128))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
revoked_at = db.Column(db.DateTime, nullable=True)
last_used = db.Column(db.DateTime)
@app.route('/api/keys', methods=['POST'])
@login_required
def create_api_key():
# Generate random key
raw_key = secrets.token_urlsafe(32)
key_hash = generate_password_hash(raw_key)
# Store hashed key
api_key = APIKey(
user_id=current_user.id,
key_hash=key_hash
)
db.session.add(api_key)
db.session.commit()
# Return only to client once
return jsonify(api_key=raw_key), 201
@app.before_request
def authenticate_api_key():
key = request.headers.get('X-API-KEY')
if not key:
abort(401)
# Find active key
api_key = APIKey.query.filter(
APIKey.key_hash == key,
APIKey.revoked_at.is_(None)
).first()
if not api_key or not check_password_hash(api_key.key_hash, key):
abort(401)
# Update usage
api_key.last_used = datetime.utcnow()
db.session.commit()
```
---
#### **421. How do you handle "security misconfiguration" in Flask?**
**Answer:**
1. **Disable debug mode in production**:
```python
# Never do this in production:
app.run(debug=True)
```
2. **Set secure headers**:
```python
Talisman(app, content_security_policy=STRICT_CSP)
```
3. **Remove unnecessary headers**:
```python
@app.after_request
def remove_headers(response):
response.headers.pop('Server', None)
response.headers.pop('X-Powered-By', None)
return response
```
4. **Validate all configurations**:
```bash
flask config check
```
---
#### **422. How do you implement "secure CORS" for single-page applications?**
**Answer:**
```python
from flask_cors import CORS
# Configure with strict origins
CORS(app,
resources={r"/api/*": {
"origins": [
"https://trusted.example.com",
"https://staging.example.com"
],
"supports_credentials": True,
"allow_headers": ["Authorization", "Content-Type"],
"max_age": 86400
}},
supports_credentials=True
)
# Critical: Set SameSite cookie for credentials
app.config.update(
SESSION_COOKIE_SAMESITE="Strict",
SESSION_COOKIE_SECURE=True
)
```
*Never* use `origins="*"` with `supports_credentials=True`.
---
#### **423. How do you prevent "XML External Entity" (XXE) attacks?**
**Answer:** Disable entity resolution:
```python
from lxml import etree
def parse_xml(xml_data):
parser = etree.XMLParser(
resolve_entities=False,
no_network=True,
dtd_validation=False
)
return etree.fromstring(xml_data, parser)
# For stdlib xml:
import defusedxml.ElementTree as ET
ET.defuse_stdlib()
```
*Best practice:* Avoid XML; use JSON instead.
---
#### **424. How do you implement "secure password storage" with multiple algorithms?**
**Answer:**
```python
from passlib.context import CryptContext
pwd_context = CryptContext(
schemes=["argon2", "bcrypt"],
default="argon2",
argon2__memory_cost=65536,
argon2__time_cost=3,
argon2__parallelism=1,
bcrypt__rounds=15
)
def upgrade_hash_if_needed(user, password):
# Verify with current algorithm
if not pwd_context.verify(password, user.password_hash):
return False
# Check if needs upgrade
if pwd_context.needs_update(user.password_hash):
# Rehash with new algorithm
user.password_hash = pwd_context.hash(password)
db.session.commit()
return True
```
---
#### **425. How do you handle "insufficient logging & monitoring" security gaps?**
**Answer:** Implement comprehensive logging:
```python
import logging
from pythonjsonlogger import jsonlogger
# Configure JSON logging
log_handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(levelname)s %(name)s %(message)s'
)
log_handler.setFormatter(formatter)
app.logger.addHandler(log_handler)
app.logger.setLevel(logging.INFO)
# Critical events to log
AUDIT_EVENTS = ['login', 'password_change', 'admin_access', 'data_export']
@app.after_request
def audit_log(response):
if request.path in AUDIT_EVENTS:
app.logger.info(
"audit_event",
extra={
"event": request.path,
"user": current_user.id if current_user.is_authenticated else None,
"ip": request.remote_addr,
"status": response.status_code
}
)
return response
```
---
#### **426. How do you implement "secure multi-tenancy" with database isolation?**
**Answer:**
1. **Schema-based isolation**:
```python
@app.before_request
def set_tenant():
tenant_id = request.headers.get('X-Tenant-ID')
if not tenant_id or not is_valid_tenant(tenant_id):
abort(401)
# Set search path for PostgreSQL
db.session.execute(f"SET search_path TO tenant_{tenant_id}")
```
2. **Row-level security**:
```sql
CREATE POLICY tenant_isolation ON users
USING (tenant_id = current_setting('app.current_tenant')::uuid);
```
---
#### **427. How do you prevent "business logic abuse" in Flask APIs?**
**Answer:**
```python
@app.route('/reset-password', methods=['POST'])
def reset_password():
# Rate limiting
if too_many_requests(request.remote_addr):
return "Too many requests", 429
# Business rule: Only allow 1 reset per email per hour
last_reset = PasswordReset.query.filter(
PasswordReset.email == request.json['email'],
PasswordReset.created_at > datetime.utcnow() - timedelta(hours=1)
).first()
if last_reset:
return "Reset already requested", 429
# Process reset...
```
*Always validate business rules server-side.*
---
#### **428. How do you implement "secure API versioning" with deprecation?**
**Answer:**
```python
@app.before_request
def check_api_version():
version = request.headers.get('Accept', 'v1').split('/')[-1]
# Check if deprecated
if version in app.config['DEPRECATED_API_VERSIONS']:
response = jsonify(
error="API version deprecated",
supported_versions=app.config['SUPPORTED_API_VERSIONS']
)
response.headers['Deprecation'] = 'true'
response.headers['Sunset'] = 'Wed, 31 Dec 2023 23:59:59 GMT'
return response, 429
# Store version for request
g.api_version = version
```
---
#### **429. How do you handle "insecure APIs" with proper rate limiting?**
**Answer:** Use `Flask-Limiter` with multiple strategies:
```python
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200/hour"],
storage_uri="redis://localhost:6379"
)
# Custom limits per endpoint
@limiter.limit("5/second", override_defaults=False)
@limiter.limit("100/minute", override_defaults=False)
@app.route('/api/expensive')
def expensive_api():
# ...
# User-based limits
@limiter.limit(lambda: f"{get_current_user_id() or 'anon'}",
limit="1000/hour")
@app.route('/api/user-data')
def user_data():
# ...
```
---
#### **430. How do you implement "secure authentication flow" with OAuth2 PKCE?**
**Answer:**
```python
import secrets
import base64
import hashlib
@app.route('/login')
def login():
# Generate code verifier and challenge
code_verifier = secrets.token_urlsafe(96)
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b'=').decode('utf-8')
# Store in session
session['code_verifier'] = code_verifier
# Redirect to auth provider
return redirect(
f"https://auth.example.com/authorize"
f"?response_type=code"
f"&client_id={CLIENT_ID}"
f"&redirect_uri={REDIRECT_URI}"
f"&code_challenge={code_challenge}"
f"&code_challenge_method=S256"
)
@app.route('/callback')
def callback():
# Exchange code for token
response = requests.post(
"https://auth.example.com/token",
data={
"grant_type": "authorization_code",
"code": request.args.get('code'),
"redirect_uri": REDIRECT_URI,
"client_id": CLIENT_ID,
"code_verifier": session.pop('code_verifier', '')
}
)
# Process token...
```
---
#### **431. How do you prevent "mass email compromise" via contact forms?**
**Answer:**
```python
@app.route('/contact', methods=['POST'])
def contact():
# Rate limiting
if too_many_submissions(request.remote_addr):
return "Too many submissions", 429
# Validate email format
if not validate_email(request.form['email']):
return "Invalid email", 400
# CAPTCHA verification
if not verify_captcha(request.form['captcha']):
return "CAPTCHA failed", 400
# Content scanning
if contains_spam(request.form['message']):
return "Suspicious content", 400
# Send email (async)
send_contact_email.delay(
to=app.config['CONTACT_EMAIL'],
reply_to=request.form['email'],
message=request.form['message']
)
return "Message sent", 200
```
---
#### **432. How do you implement "secure data export" with GDPR compliance?**
**Answer:**
```python
@app.route('/data-export', methods=['POST'])
@login_required
def data_export():
# Verify password (re-authentication)
if not current_user.verify_password(request.form['password']):
return "Invalid password", 401
# Generate export asynchronously
task = generate_data_export.delay(current_user.id)
return jsonify(
status="processing",
task_id=task.id,
expires_at=datetime.utcnow() + timedelta(hours=24)
), 202
@celery.task
def generate_data_export(user_id):
user = User.query.get(user_id)
# Anonymize PII in logs
with mask_pii_logging():
# Generate export
export = {
"account": user.safe_data,
"activity": [a.safe_data for a in user.activity]
}
# Store encrypted
encrypted = encrypt_export(export)
save_to_secure_storage(encrypted)
```
---
#### **433. How do you handle "insecure third-party dependencies" in Flask?**
**Answer:**
1. **Dependency scanning**:
```bash
pip install safety
safety check -r requirements.txt
```
2. **Automated updates**: Dependabot/GitHub Actions
3. **Lock files**: `requirements.txt` with pinned versions
4. **Runtime protection**:
```python
# Check critical dependencies at startup
if pkg_resources.get_distribution("flask").version < "2.0.0":
raise RuntimeError("Flask 2.0+ required for security fixes")
```
---
#### **434. How do you implement "secure admin interface" with additional protections?**
**Answer:**
```python
admin = Blueprint('admin', __name__, url_prefix='/admin')
@admin.before_request
def require_admin():
if not current_user.is_authenticated or not current_user.is_admin:
abort(403)
# Additional admin-specific security
if not request.headers.get('X-Admin-OTP'):
abort(401, "Admin OTP required")
# Verify from trusted network
if request.remote_addr not in app.config['TRUSTED_ADMIN_NETWORKS']:
abort(403, "Admin access restricted to corporate network")
# Rate limiting for admin
@admin.route('/users')
@limiter.limit("10/minute")
def list_users():
# ...
```
---
#### **435. How do you prevent "credential stuffing" attacks in Flask?**
**Answer:**
```python
@app.route('/login', methods=['POST'])
def login():
# Rate limiting by username
username = request.form['username']
if too_many_attempts(username):
return "Account temporarily locked", 423
# Check breached passwords
if is_password_breached(request.form['password']):
return "Password compromised. Please reset.", 403
# Verify credentials
user = User.query.filter_by(username=username).first()
if user and user.check_password(request.form['password']):
# Reset failed attempts
reset_failed_attempts(username)
login_user(user)
return redirect(url_for('dashboard'))
# Increment failed attempts
increment_failed_attempts(username)
# Generic message to prevent username enumeration
return "Invalid credentials", 401
def is_password_breached(password):
# Check against HIBP API (k-anonymity)
sha1_hash = hashlib.sha1(password.encode()).hexdigest().upper()
prefix, suffix = sha1_hash[:5], sha1_hash[5:]
response = requests.get(f"https://api.pwnedpasswords.com/range/{prefix}")
return suffix in response.text
```
---
#### **436. How do you implement "secure session timeout" with idle detection?**
**Answer:**
```python
@app.before_request
def check_session_timeout():
if not current_user.is_authenticated:
return
# Check last activity
last_active = session.get('last_active', datetime.utcnow())
idle_time = datetime.utcnow() - last_active
# Update last active
session['last_active'] = datetime.utcnow()
# Enforce idle timeout
if idle_time > app.config['IDLE_TIMEOUT']:
logout_user()
return redirect(url_for('login') + '?reason=idle_timeout')
# In templates (client-side idle detection)
@app.context_processor
def inject_idle_timeout():
return {
'IDLE_TIMEOUT': int(app.config['IDLE_TIMEOUT'].total_seconds())
}
```
*Client-side JavaScript should also handle idle timeout.*
---
#### **437. How do you handle "insecure file permissions" in Flask deployments?**
**Answer:**
1. **During build**:
```dockerfile
# Set non-root user
RUN adduser -D flaskuser
USER flaskuser
# Set secure permissions
RUN chmod 700 /app && \
find /app -type d -exec chmod 700 {} \; && \
find /app -type f -exec chmod 600 {} \;
```
2. **At runtime**:
```python
# Verify critical files
def check_file_permissions():
if not os.access(CONFIG_FILE, os.R_OK | os.W_OK):
raise RuntimeError("Invalid config file permissions")
# Should be 600 for secrets
mode = os.stat(SECRETS_FILE).st_mode
if mode & 0o777 != 0o600:
app.logger.critical("Insecure secrets file permissions!")
```
---
#### **438. How do you implement "secure audit logging" with immutable records?**
**Answer:**
```python
class AuditLog(db.Model):
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
action = db.Column(db.String(50))
details = db.Column(db.JSON)
# Cryptographic chain
previous_hash = db.Column(db.String(64))
current_hash = db.Column(db.String(64), unique=True)
def log_audit(action, details=None):
# Get last log entry
last_log = AuditLog.query.order_by(AuditLog.timestamp.desc()).first()
# Create new entry
audit = AuditLog(
user_id=current_user.id if current_user.is_authenticated else None,
action=action,
details=details,
previous_hash=last_log.current_hash if last_log else None
)
# Calculate hash (includes previous hash)
audit.current_hash = calculate_hash(audit)
db.session.add(audit)
db.session.commit()
def calculate_hash(log_entry):
data = f"{log_entry.timestamp}{log_entry.user_id}{log_entry.action}" \
f"{json.dumps(log_entry.details)}{log_entry.previous_hash}"
return hashlib.sha256(data.encode()).hexdigest()
# Verification
def verify_audit_chain():
logs = AuditLog.query.order_by(AuditLog.timestamp).all()
for i, log in enumerate(logs[1:], 1):
if log.previous_hash != calculate_hash(logs[i-1]):
return False
return True
```
---
#### **439. How do you prevent "host header attacks" in Flask?**
**Answer:**
```python
@app.before_request
def validate_host():
allowed_hosts = app.config['ALLOWED_HOSTS']
# Get host from Host header (not request.host which is from WSGI)
host = request.headers.get('Host', '')
# Remove port if present
host = host.split(':')[0]
# Check against allowed hosts
if host not in allowed_hosts:
app.logger.warning(f"Invalid host header: {host}")
abort(400, "Invalid host")
# Additional check for HTTPS
if request.is_secure and host not in app.config['SECURE_HOSTS']:
abort(400, "HTTPS required for this host")
```
---
#### **440. How do you implement "secure passwordless authentication"?**
**Answer:**
```python
@app.route('/magic-link', methods=['POST'])
def magic_link():
# Rate limiting
if too_many_requests(request.remote_addr):
return "Too many requests", 429
# Validate email
if not validate_email(request.form['email']):
return "Invalid email", 400
# Generate token (5-minute expiry)
token = generate_magic_token(request.form['email'])
# Store hashed token
store_magic_token(token)
# Send email
send_magic_link.delay(request.form['email'], token)
# Always return 200 to prevent email enumeration
return "If email exists, magic link was sent", 200
@app.route('/auth/<token>')
def auth_with_token(token):
# Verify token
email = verify_magic_token(token)
if not email:
return "Invalid or expired token", 400
# Log in user
user = User.query.filter_by(email=email).first()
if user:
login_user(user, remember=True)
return redirect(url_for('dashboard'))
```
---
#### **441. How do you handle "insecure redirects and forwards" in Flask?**
**Answer:**
```python
@app.route('/redirect')
def safe_redirect():
# Get next URL from query parameter
next_url = request.args.get('next')
# Validate as absolute URL
if not next_url or not is_safe_url(next_url):
return redirect(url_for('home'))
return redirect(next_url)
def is_safe_url(url):
# Parse URL
parsed = urlparse(url)
# Must be same domain
if parsed.netloc and parsed.netloc != request.host:
return False
# Must use same scheme
if parsed.scheme and parsed.scheme != request.scheme:
return False
# Must not contain fragments
if parsed.fragment:
return False
# Must be valid path
return bool(urlparse(url).path)
```
---
#### **442. How do you implement "secure two-factor authentication" with TOTP?**
**Answer:**
```python
import pyotp
@app.route('/setup-2fa', methods=['GET'])
@login_required
def setup_2fa():
# Generate secret
secret = pyotp.random_base32()
current_user.totp_secret = secret
db.session.commit()
# Generate QR code URL
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(
name=current_user.email,
issuer_name="MyApp"
)
return render_template('setup_2fa.html',
qr_uri=provisioning_uri)
@app.route('/verify-2fa', methods=['POST'])
@login_required
def verify_2fa():
token = request.form['token']
totp = pyotp.TOTP(current_user.totp_secret)
# Verify with 1-interval leeway
if totp.verify(token, valid_window=1):
# Mark 2FA as verified for this session
session['2fa_verified'] = True
return redirect(url_for('dashboard'))
return "Invalid token", 400
@app.before_request
def require_2fa():
# Skip for setup/verify routes
if request.endpoint in ['setup_2fa', 'verify_2fa']:
return
# Require 2FA for sensitive routes
if current_user.is_authenticated and current_user.totp_secret:
if request.endpoint in ['account_settings', 'payment'] and not session.get('2fa_verified'):
return redirect(url_for('verify_2fa'))
```
---
#### **443. How do you prevent "HTTP Request Smuggling" in Flask apps?**
**Answer:** Mitigate at proxy level:
1. **Nginx configuration**:
```nginx
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_request_buffering off;
```
2. **Gunicorn configuration**:
```ini
[server]
forwarded_allow_ips = *
```
3. **Flask validation**:
```python
@app.before_request
def validate_request():
# Check for multiple Content-Length headers
if len(request.headers.getlist('Content-Length')) > 1:
abort(400, "Multiple Content-Length headers")
# Check for Transfer-Encoding with Content-Length
if 'Content-Length' in request.headers and 'Transfer-Encoding' in request.headers:
abort(400, "Conflicting transfer encodings")
```
---
#### **444. How do you implement "secure data masking" for PII in development?**
**Answer:**
```python
def mask_pii(value, field_name):
"""Mask PII based on field type"""
if field_name in ['ssn', 'national_id']:
return value[:3] + 'XXX' + value[-4:]
elif field_name in ['email', 'username']:
parts = value.split('@')
return parts[0][0] + '***@' + parts[1] if len(parts) > 1 else '***'
elif field_name in ['phone']:
return re.sub(r'\d', 'X', value[:-4]) + value[-4:]
return value
def mask_query_results(query, pii_fields):
"""Mask PII in query results"""
results = query.all()
for obj in results:
for field in pii_fields:
if hasattr(obj, field):
setattr(obj, field, mask_pii(getattr(obj, field), field))
return results
# Usage in development
if app.config['ENV'] == 'development':
@app.before_request
def mask_development_data():
if request.path.startswith('/admin'):
g.pii_fields = ['email', 'phone', 'ssn']
# Monkey patch query to mask results
db.Query.all = lambda q: mask_query_results(q, g.pii_fields)
```
---
#### **445. How do you handle "insecure cryptography" in Flask applications?**
**Answer:**
1. **Use vetted libraries**:
- `cryptography` instead of `pycrypto`
- `passlib` for password hashing
2. **Avoid custom crypto**:
- Never implement your own encryption
- Use high-level APIs (Fernet)
3. **Secure key management**:
```python
from cryptography.fernet import Fernet
# Get key from secure source
key = os.getenv('ENCRYPTION_KEY') or load_from_vault()
fernet = Fernet(key)
# Encrypt
encrypted = fernet.encrypt(b"Secret data")
# Decrypt
decrypted = fernet.decrypt(encrypted)
```
4. **Algorithm rotation**:
```python
def decrypt_data(encrypted_data):
# Try current key
try:
return fernet.decrypt(encrypted_data)
except InvalidToken:
# Try previous key for rotation
return old_fernet.decrypt(encrypted_data)
```
---
#### **446. How do you implement "secure user impersonation" for support teams?**
**Answer:**
```python
@admin.route('/impersonate/<int:user_id>')
@require_admin
def impersonate(user_id):
# Log the action
log_impersonation(current_user.id, user_id)
# Create special session token
impersonation_token = create_impersonation_token(
admin_id=current_user.id,
user_id=user_id,
expires_in=timedelta(minutes=15)
)
# Set in secure cookie
response = redirect(url_for('user_dashboard'))
response.set_cookie(
'impersonation_token',
impersonation_token,
httponly=True,
secure=True,
samesite='Strict'
)
return response
@app.before_request
def handle_impersonation():
token = request.cookies.get('impersonation_token')
if token:
try:
data = verify_impersonation_token(token)
# Set impersonated user
g.impersonated_user = User.query.get(data['user_id'])
g.original_admin = User.query.get(data['admin_id'])
except:
# Invalid token - clear cookie
response = make_response(redirect(url_for('login')))
response.set_cookie('impersonation_token', '', expires=0)
return response
# In templates
@app.context_processor
def inject_impersonation():
return {
'is_impersonating': hasattr(g, 'impersonated_user'),
'original_admin': g.original_admin if hasattr(g, 'original_admin') else None
}
```
---
#### **447. How do you prevent "account enumeration" via timing attacks?**
**Answer:**
```python
@app.route('/login', methods=['POST'])
def login():
# Always take same time regardless of user existence
start_time = time.time()
# Get user (but don't use yet)
user = User.query.filter_by(email=request.form['email']).first()
# Simulate password check time
fake_hash = generate_fake_password_hash()
safe_str_cmp(request.form['password'], fake_hash)
# Now do real check if user exists
if user and safe_str_cmp(
request.form['password'],
user.password_hash
):
login_user(user)
return redirect(url_for('dashboard'))
# Ensure constant time
elapsed = time.time() - start_time
if elapsed < MIN_LOGIN_TIME:
time.sleep(MIN_LOGIN_TIME - elapsed)
# Always return same message
return "Invalid credentials", 401
def generate_fake_password_hash():
"""Generate fake hash that takes similar time to verify"""
return generate_password_hash("fake_password_" + secrets.token_hex(8))
```
---
#### **448. How do you implement "secure data retention policies" with GDPR?**
**Answer:**
```python
@celery.task
def enforce_data_retention():
# Delete inactive accounts
cutoff = datetime.utcnow() - timedelta(days=365)
inactive_users = User.query.filter(
User.last_active < cutoff,
User.is_active == False
).all()
for user in inactive_users:
# Anonymize instead of delete (for legal requirements)
anonymize_user_data(user)
db.session.delete(user)
# Delete old audit logs
log_cutoff = datetime.utcnow() - timedelta(days=180)
AuditLog.query.filter(AuditLog.timestamp < log_cutoff).delete()
db.session.commit()
def anonymize_user_data(user):
"""Replace PII with random data while preserving structure"""
user.email = f"anonymized-{secrets.token_hex(8)}@example.com"
user.phone = f"+44 7{secrets.randbelow(999999999):09d}"
user.first_name = random_first_name()
user.last_name = random_last_name()
# Clear sensitive fields
user.address = None
user.payment_info = None
```
---
#### **449. How do you handle "insecure CORS policy" misconfigurations?**
**Answer:**
```python
from flask_cors import CORS
# Strict default configuration
CORS(app,
resources={r"/api/*": {
"origins": app.config['TRUSTED_ORIGINS'],
"methods": ["GET", "POST", "PUT", "DELETE"],
"allow_headers": ["Authorization", "Content-Type", "X-Requested-With"],
"supports_credentials": True,
"max_age": 86400,
"expose_headers": ["X-RateLimit-Limit", "X-RateLimit-Remaining"]
}},
supports_credentials=True
)
# Critical security checks at startup
@app.before_first_request
def check_cors_security():
# Verify no wildcard origins with credentials
if '*' in app.config['TRUSTED_ORIGINS'] and app.config['CORS_SUPPORTS_CREDENTIALS']:
raise RuntimeError("CORS misconfiguration: Cannot use wildcard with credentials")
# Verify secure headers are set
if not app.config.get('SESSION_COOKIE_SAMESITE') == 'Strict':
app.logger.warning("CORS security: Session cookie should use SameSite=Strict")
```
---
#### **450. How do you implement "secure API key validation" with HMAC?**
**Answer:**
```python
import hmac
import hashlib
from functools import wraps
def require_api_key(f):
@wraps(f)
def decorated(*args, **kwargs):
# Get API key from header
api_key = request.headers.get('X-API-Key')
signature = request.headers.get('X-Signature')
if not api_key or not signature:
abort(401, "API key and signature required")
# Get secret for this key
api_secret = get_api_secret(api_key)
if not api_secret:
abort(401, "Invalid API key")
# Create expected signature
body = request.get_data()
expected_sig = hmac.new(
api_secret.encode(),
body,
hashlib.sha256
).hexdigest()
# Constant-time comparison
if not hmac.compare_digest(signature, expected_sig):
abort(401, "Invalid signature")
return f(*args, **kwargs)
return decorated
# Usage
@app.route('/secure-api', methods=['POST'])
@require_api_key
def secure_api():
# Process request
return jsonify(success=True)
```
---
#### **451. How do you prevent "server-side request forgery" (SSRF) in Flask?**
**Answer:**
```python
import ipaddress
from urllib.parse import urlparse
def is_internal_ip(ip):
"""Check if IP is in private/reserved ranges"""
try:
return ipaddress.ip_address(ip).is_private
except ValueError:
return False
def is_blocked_domain(domain):
"""Check against blocked domains"""
blocked = ['localhost', 'docker', 'kubernetes']
return any(b in domain for b in blocked)
def validate_url(url):
"""Validate URL for SSRF protection"""
parsed = urlparse(url)
# Check scheme
if parsed.scheme not in ['http', 'https']:
return False
# Check domain
if is_blocked_domain(parsed.netloc):
return False
# Check IP (if resolved)
try:
ip = socket.gethostbyname(parsed.netloc)
if is_internal_ip(ip):
return False
except:
return False
# Check port
if parsed.port and parsed.port not in [80, 443]:
return False
return True
@app.route('/fetch-external')
def fetch_external():
url = request.args.get('url')
if not url or not validate_url(url):
return "Invalid URL", 400
# Fetch with timeout
response = requests.get(url, timeout=5)
return response.text
```
---
#### **452. How do you implement "secure session storage" with Redis encryption?**
**Answer:**
```python
from cryptography.fernet import Fernet
from flask_session import Session
# Generate key from secure source
encryption_key = os.getenv('SESSION_ENCRYPTION_KEY') or Fernet.generate_key()
cipher = Fernet(encryption_key)
class EncryptedRedisSessionInterface(SessionInterface):
def __init__(self, redis=None, prefix='session:',
serialization_format='json', **kwargs):
self.redis = redis or Redis()
self.prefix = prefix
self.serialization_format = serialization_format
def open_session(self, app, request):
sid = request.cookies.get(app.session_cookie_name)
if not sid:
return self.session_class()
val = self.redis.get(self.prefix + sid)
if val is not None:
try:
# Decrypt session data
val = cipher.decrypt(val)
if self.serialization_format == 'json':
data = json.loads(val)
return self.session_class(data, sid, permanent=True)
except:
return self.session_class()
return self.session_class()
def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
# Remove session if empty
if not session:
if session.modified:
self.redis.delete(self.prefix + session.sid)
response.delete_cookie(
app.session_cookie_name,
domain=domain,
path=path
)
return
# Encrypt session data
if self.serialization_format == 'json':
val = json.dumps(dict(session)).encode('utf-8')
val = cipher.encrypt(val)
# Save to Redis
self.redis.setex(
self.prefix + session.sid,
app.permanent_session_lifetime,
val
)
# Set cookie
response.set_cookie(
app.session_cookie_name,
session.sid,
expires=self.get_expiration_time(app, session),
httponly=True,
domain=domain,
path=path,
secure=app.config['SESSION_COOKIE_SECURE'],
samesite=app.config['SESSION_COOKIE_SAMESITE']
)
# Configure Flask
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = redis.from_url('redis://localhost:6379')
app.session_interface = EncryptedRedisSessionInterface()
```
---
#### **453. How do you handle "insecure deserialization of user data"?**
**Answer:**
```python
import json
from jsonschema import validate, ValidationError
# Schema for user-submitted data
USER_DATA_SCHEMA = {
"type": "object",
"properties": {
"name": {"type": "string", "maxLength": 100},
"preferences": {
"type": "object",
"properties": {
"theme": {"type": "string", "enum": ["light", "dark"]},
"language": {"type": "string", "pattern": "^[a-z]{2}$"}
}
}
},
"required": ["name"]
}
def safe_load_user_data(data):
"""Safely load and validate user data"""
try:
# Parse JSON
parsed = json.loads(data)
# Validate against schema
validate(instance=parsed, schema=USER_DATA_SCHEMA)
# Additional custom validation
if len(parsed['name']) < 2:
raise ValueError("Name too short")
return parsed
except (json.JSONDecodeError, ValidationError, ValueError) as e:
# Log the error (without sensitive data)
app.logger.warning(f"Invalid user data: {str(e)[:100]}")
raise InvalidUserData("Invalid data format")
# Usage
@app.route('/update-profile', methods=['POST'])
def update_profile():
try:
data = safe_load_user_data(request.data)
# Process valid data
except InvalidUserData:
return "Invalid data format", 400
```
---
#### **454. How do you implement "secure multi-factor authentication" with WebAuthn?**
**Answer:**
```python
from flask_webauthn import (
WebAuthn,
RegistrationRejectedException,
AuthenticationRejectedException
)
webauthn = WebAuthn(
app,
rp_name="MyApp",
rp_id="example.com",
url_prefix="/webauthn"
)
@app.route('/webauthn/begin-registration', methods=['POST'])
@login_required
def begin_registration():
# Generate challenge
registration_data, state = webauthn.registration_begin(
user={
'id': current_user.id_bytes,
'name': current_user.email,
'displayName': current_user.name
}
)
# Store state in session
session['webauthn_registration_state'] = state
return jsonify(registration_data)
@app.route('/webauthn/finish-registration', methods=['POST'])
@login_required
def finish_registration():
# Verify registration
try:
webauthn.registration_finish(
session.pop('webauthn_registration_state'),
request.json
)
except RegistrationRejectedException as e:
return jsonify({"status": "error", "message": str(e)}), 400
# Save credential to database
credential = WebAuthnCredential(
user_id=current_user.id,
credential_data=request.json['credentialData']
)
db.session.add(credential)
db.session.commit()
return jsonify({"status": "ok"})
# Authentication flow similar to registration
```
---
#### **455. How do you prevent "insecure direct object references" in API responses?**
**Answer:**
```python
def filter_response_data(data, user):
"""Filter response data based on user permissions"""
if isinstance(data, dict):
# Remove sensitive fields
if 'password' in data:
del data['password']
if 'api_key' in data and not user.is_admin:
del data['api_key']
# Filter nested objects
for key, value in data.items():
data[key] = filter_response_data(value, user)
elif isinstance(data, list):
return [filter_response_data(item, user) for item in data]
return data
@app.after_request
def secure_api_response(response):
# Only process JSON API responses
if 'application/json' in response.content_type:
try:
data = json.loads(response.get_data(as_text=True))
filtered = filter_response_data(data, current_user)
response.set_data(json.dumps(filtered))
except:
pass
return response
```
---
#### **456. How do you implement "secure password rotation" policies?**
**Answer:**
```python
@app.before_request
def check_password_rotation():
if not current_user.is_authenticated:
return
# Check if password needs rotation
days_since_change = (datetime.utcnow() - current_user.password_changed_at).days
if days_since_change > app.config['PASSWORD_ROTATION_DAYS']:
# Redirect to change password
if request.endpoint not in ['change_password', 'logout']:
return redirect(url_for('change_password',
reason='rotation'))
@app.route('/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
if request.method == 'POST':
# Verify current password
if not current_user.verify_password(request.form['current_password']):
flash("Current password is incorrect", "error")
return redirect(url_for('change_password'))
# Validate new password
is_valid, message = validate_password(request.form['new_password'])
if not is_valid:
flash(message, "error")
return redirect(url_for('change_password'))
# Update password
current_user.set_password(request.form['new_password'])
current_user.password_changed_at = datetime.utcnow()
db.session.commit()
# Force re-authentication
logout_user()
flash("Password changed. Please log in again.", "success")
return redirect(url_for('login'))
return render_template('change_password.html')
```
---
#### **457. How do you handle "insecure API documentation" exposure?**
**Answer:**
```python
# Configure Swagger/OpenAPI
from flasgger import Swagger
# Secure documentation with auth
@app.before_request
def require_docs_auth():
if request.path.startswith('/apidocs') and not current_user.is_authenticated:
return redirect(url_for('login', next=request.url))
# Configure Swagger with security
template = {
"swagger": "2.0",
"info": {
"title": "Secure API",
"description": "Authenticated API documentation"
},
"securityDefinitions": {
"APIKeyHeader": {
"type": "apiKey",
"name": "X-API-Key",
"in": "header"
}
},
"security": [
{"APIKeyHeader": []}
]
}
swagger = Swagger(app, template=template)
# Hide sensitive endpoints
@app.route('/internal-api', methods=['POST'])
@swag_from('docs/internal_api.yml', validation=False) # Don't document
def internal_api():
# ...
```
---
#### **458. How do you implement "secure data sanitization" for user-generated content?**
**Answer:**
```python
from bs4 import BeautifulSoup
import bleach
ALLOWED_TAGS = [
'p', 'br', 'strong', 'em', 'u', 'a', 'ul', 'ol', 'li', 'blockquote'
]
ALLOWED_ATTRIBUTES = {
'a': ['href', 'title'],
'img': ['src', 'alt']
}
def sanitize_html(html):
"""Sanitize HTML content using bleach"""
# First pass: BeautifulSoup to fix malformed HTML
soup = BeautifulSoup(html, 'html.parser')
fixed_html = str(soup)
# Second pass: Bleach for sanitization
clean = bleach.clean(
fixed_html,
tags=ALLOWED_TAGS,
attributes=ALLOWED_ATTRIBUTES,
protocols=['http', 'https'],
strip=True
)
# Third pass: Additional XSS protection
clean = re.sub(r'javascript:', '', clean, flags=re.IGNORECASE)
return clean
@app.route('/submit-content', methods=['POST'])
@login_required
def submit_content():
# Sanitize all user-generated content
content = sanitize_html(request.form['content'])
# Additional validation
if len(content) > MAX_CONTENT_LENGTH:
return "Content too long", 400
# Save sanitized content
post = Post(content=content)
db.session.add(post)
db.session.commit()
return redirect(url_for('view_post', id=post.id))
```
---
#### **459. How do you prevent "insecure file upload" path traversal?**
**Answer:**
```python
import os
from werkzeug.utils import secure_filename
def save_upload(file, user_id):
# Validate file type
if not allowed_file(file.filename):
raise ValueError("Invalid file type")
# Generate safe filename
ext = os.path.splitext(file.filename)[1]
safe_name = f"{secrets.token_hex(16)}{ext}"
# Define upload path (outside web root if possible)
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], str(user_id))
os.makedirs(upload_dir, exist_ok=True)
# Full path with safe name
file_path = os.path.join(upload_dir, safe_name)
# Save file
file.save(file_path)
# Verify file was saved in expected location
if not os.path.realpath(file_path).startswith(os.path.realpath(upload_dir)):
os.remove(file_path)
raise ValueError("Invalid file path")
return file_path
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
```
---
#### **460. How do you implement "secure audit trails" for sensitive operations?**
**Answer:**
```python
from functools import wraps
from datetime import datetime
def audit_log(action, category="security"):
"""Decorator for audit logging sensitive operations"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Get user (if authenticated)
user_id = current_user.id if current_user.is_authenticated else None
# Log before operation
audit = AuditLog(
timestamp=datetime.utcnow(),
user_id=user_id,
ip_address=request.remote_addr,
user_agent=request.user_agent.string,
action=action,
category=category,
status="started",
request_data={
"path": request.path,
"method": request.method,
"params": request.args.to_dict()
}
)
db.session.add(audit)
db.session.flush() # Get audit.id
try:
# Execute the function
result = f(*args, **kwargs)
# Update success
audit.status = "completed"
audit.response_data = {"result": "success"}
db.session.commit()
return result
except Exception as e:
# Update failure
audit.status = "failed"
audit.response_data = {
"error": str(e),
"type": e.__class__.__name__
}
db.session.commit()
raise
return decorated
return decorator
# Usage
@app.route('/admin/delete-user/<int:user_id>', methods=['POST'])
@audit_log("user_delete", category="admin")
@require_admin
def delete_user(user_id):
# Delete user logic
return jsonify(status="success")
```
---
#### **461. How do you handle "insecure cryptography storage" in Flask apps?**
**Answer:**
```python
import os
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
def derive_key(master_password, salt):
"""Derive encryption key from master password"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend()
)
return kdf.derive(master_password.encode())
def get_encryption_key():
"""Get encryption key from secure source"""
# 1. Try environment variable (for production)
if key_env := os.getenv('APP_ENCRYPTION_KEY'):
return key_env
# 2. Try key management service (AWS KMS, HashiCorp Vault)
if key_kms := get_key_from_kms():
return key_kms
# 3. Fallback to file (for development only)
if app.config['ENV'] == 'development':
return load_key_from_file()
raise RuntimeError("Encryption key not configured")
def load_key_from_file():
"""Securely load key from file (development only)"""
key_path = os.path.join(app.instance_path, 'encryption.key')
# Verify file permissions (must be 600)
if os.path.exists(key_path):
mode = os.stat(key_path).st_mode
if mode & 0o777 != 0o600:
raise RuntimeError("Insecure key file permissions")
# Read or generate key
if not os.path.exists(key_path):
key = Fernet.generate_key()
with open(key_path, 'wb') as f:
f.write(key)
os.chmod(key_path, 0o600)
else:
with open(key_path, 'rb') as f:
key = f.read()
return key
```
---
#### **462. How do you implement "secure cross-domain authentication" for microservices?**
**Answer:**
```python
import jwt
from datetime import datetime, timedelta
def create_service_token(audience, duration=timedelta(minutes=5)):
"""Create JWT for service-to-service authentication"""
payload = {
'iss': app.config['SERVICE_NAME'],
'aud': audience,
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + duration,
'jti': secrets.token_urlsafe(16)
}
return jwt.encode(
payload,
app.config['SERVICE_PRIVATE_KEY'],
algorithm='RS256'
)
def verify_service_token(token, issuer):
"""Verify JWT from another service"""
try:
return jwt.decode(
token,
app.config['SERVICE_PUBLIC_KEYS'][issuer],
algorithms=['RS256'],
audience=app.config['SERVICE_NAME']
)
except jwt.InvalidTokenError:
return None
# Usage in service A
@app.route('/api/data')
@require_auth
def get_data():
# Call service B
token = create_service_token(audience='service-b')
response = requests.get(
'https://service-b/api/internal-data',
headers={'Authorization': f'Bearer {token}'}
)
return response.json()
# Usage in service B
@app.route('/api/internal-data')
def internal_data():
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not verify_service_token(token, issuer='service-a'):
abort(401, "Invalid service token")
# Return data
```
---
#### **463. How do you prevent "insecure API endpoint exposure"?**
**Answer:**
```python
# Define endpoint security levels
API_SECURITY = {
'public': {'roles': []},
'user': {'roles': ['user']},
'admin': {'roles': ['admin']},
'internal': {'roles': [], 'networks': ['10.0.0.0/8', '172.16.0.0/12']}
}
def require_api_access(level):
"""Decorator for API endpoint security"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Check security level
if level == 'internal':
# Verify from internal network
if not is_internal_ip(request.remote_addr):
abort(403, "Internal API restricted to private network")
elif level in ['user', 'admin']:
# Verify authentication
if not current_user.is_authenticated:
abort(401, "Authentication required")
# Verify role
if level == 'admin' and not current_user.is_admin:
abort(403, "Admin access required")
return f(*args, **kwargs)
return decorated
return decorator
# Usage
@app.route('/api/public-data')
@require_api_access('public')
def public_data():
# No auth required
return jsonify(data="public")
@app.route('/api/user-profile')
@require_api_access('user')
def user_profile():
# User auth required
return jsonify(profile=current_user.safe_data)
@app.route('/api/admin-stats')
@require_api_access('admin')
def admin_stats():
# Admin auth required
return jsonify(stats=get_admin_stats())
@app.route('/internal/health')
@require_api_access('internal')
def internal_health():
# Internal network only
return jsonify(status="ok")
```
---
#### **464. How do you implement "secure data anonymization" for backups?**
**Answer:**
```python
def anonymize_database():
"""Anonymize PII in database for backups"""
# Anonymize users
for user in User.query.all():
user.email = f"user{user.id}@example.com"
user.phone = f"+44 7{user.id:09d}"
user.first_name = fake.first_name()
user.last_name = fake.last_name()
user.address = fake.address()
# Anonymize logs
LogEntry.query.update({
LogEntry.ip_address: func.md5(LogEntry.ip_address)
}, synchronize_session=False)
# Clear sensitive fields
PaymentInfo.query.update({
PaymentInfo.card_number: None,
PaymentInfo.cvv: None
}, synchronize_session=False)
db.session.commit()
def create_anonymized_backup():
"""Create anonymized backup"""
# Create temporary anonymized database
temp_db = create_temp_database()
try:
# Copy data to temp DB
copy_database(app.config['SQLALCHEMY_DATABASE_URI'], temp_db)
# Anonymize
with app.app_context():
app.config['SQLALCHEMY_DATABASE_URI'] = temp_db
db.init_app(app)
anonymize_database()
# Create backup
backup_path = f"/backups/anonymized_{datetime.now().isoformat()}.sql"
os.system(f"pg_dump {temp_db} > {backup_path}")
# Encrypt backup
encrypt_file(backup_path, app.config['BACKUP_ENCRYPTION_KEY'])
return backup_path
finally:
# Clean up
drop_temp_database(temp_db)
```
---
#### **465. How do you handle "insecure error handling" that leaks information?**
**Answer:**
```python
@app.errorhandler(500)
def handle_500(e):
"""Generic 500 error handler"""
# Generate unique error ID for tracking
error_id = generate_error_id()
# Log full error details
app.logger.error(
"Server Error: %s\n%s",
error_id,
traceback.format_exc()
)
# Return generic message
return render_template(
'errors/500.html',
error_id=error_id,
support_email=app.config['SUPPORT_EMAIL']
), 500
@app.errorhandler(404)
def handle_404(e):
"""Custom 404 handler"""
# Log suspicious 404s
if is_suspicious_404(request.path):
app.logger.warning("Suspicious 404: %s from %s",
request.path,
request.remote_addr)
return render_template('errors/404.html'), 404
def is_suspicious_404(path):
"""Check if 404 might be probing for vulnerabilities"""
suspicious_patterns = [
r'\/wp-admin',
r'\/phpmyadmin',
r'\/.env',
r'\/backup\.(?:sql|zip)'
]
return any(re.search(pattern, path) for pattern in suspicious_patterns)
```
---
#### **466. How do you implement "secure API rate limiting" with sliding window?**
**Answer:**
```python
import time
import redis
class SlidingWindowRateLimiter:
def __init__(self, redis_client, limit=100, window=60):
self.redis = redis_client
self.limit = limit
self.window = window
def is_allowed(self, key):
now = time.time()
window_start = now - self.window
# Create pipeline for atomic operations
pipe = self.redis.pipeline()
# Remove old timestamps
pipe.zremrangebyscore(key, 0, window_start)
# Add new timestamp
pipe.zadd(key, {now: now})
# Set expiration
pipe.expire(key, self.window)
# Get current count
pipe.zcard(key)
# Execute pipeline
_, _, _, count = pipe.execute()
return count <= self.limit
# Usage
limiter = SlidingWindowRateLimiter(
redis.from_url(app.config['REDIS_URL']),
limit=100,
window=60
)
@app.before_request
def rate_limit():
if request.path.startswith('/api/'):
user_key = f"rate_limit:{get_current_user_id() or request.remote_addr}"
if not limiter.is_allowed(user_key):
return "Rate limit exceeded", 429
```
---
#### **467. How do you prevent "insecure direct object references" in background tasks?**
**Answer:**
```python
@celery.task
def process_user_data(user_id, requesting_user_id):
"""Process user data with security checks"""
# Verify requesting user has permission
requesting_user = User.query.get(requesting_user_id)
if not requesting_user or not requesting_user.is_admin:
raise SecurityError("Unauthorized task request")
# Verify target user exists
user = User.query.get(user_id)
if not user:
raise ValueError("User not found")
# Process data
# ...
# Secure task invocation
@app.route('/process-data/<int:user_id>')
@require_admin
def start_processing(user_id):
# Verify admin has permission to access this user
if not current_user.can_access_user(user_id):
abort(403, "Unauthorized user access")
# Start task with both user IDs
process_user_data.delay(
user_id=user_id,
requesting_user_id=current_user.id
)
return "Processing started", 202
```
---
#### **468. How do you implement "secure data validation" for complex inputs?**
**Answer:**
```python
from jsonschema import validate, FormatChecker
from jsonschema.exceptions import ValidationError
# Custom format checker
format_checker = FormatChecker()
format_checker.checks("date-time")(lambda s: True) # Override default
# Schema with custom validation
USER_SCHEMA = {
"type": "object",
"properties": {
"email": {
"type": "string",
"format": "email",
"maxLength": 254
},
"birthdate": {
"type": "string",
"format": "date",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
},
"preferences": {
"type": "object",
"properties": {
"theme": {"type": "string", "enum": ["light", "dark", "system"]},
"language": {"type": "string", "pattern": "^[a-z]{2}(-[A-Z]{2})?$"}
}
}
},
"required": ["email"],
"additionalProperties": False
}
def validate_user_data(data):
"""Validate user data against schema"""
try:
validate(
instance=data,
schema=USER_SCHEMA,
format_checker=format_checker
)
# Custom business logic validation
if 'birthdate' in data:
birthdate = datetime.strptime(data['birthdate'], '%Y-%m-%d')
if birthdate > datetime.now() - timedelta(days=18*365):
raise ValueError("User must be at least 18 years old")
return True
except ValidationError as e:
# Map JSON Schema errors to user-friendly messages
if e.validator == 'format':
if e.schema.get('format') == 'email':
raise ValueError("Invalid email format")
elif e.schema.get('format') == 'date':
raise ValueError("Date must be in YYYY-MM-DD format")
elif e.validator == 'pattern':
if 'language' in e.schema_path:
raise ValueError("Invalid language code")
raise ValueError(f"Invalid data: {e.message}")
# Usage
@app.route('/update-profile', methods=['POST'])
def update_profile():
try:
data = request.get_json()
validate_user_data(data)
# Process valid data
except ValueError as e:
return jsonify(error=str(e)), 400
```
---
#### **469. How do you handle "insecure cryptography algorithms" in legacy systems?**
**Answer:**
```python
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import base64
# Supported algorithms with deprecation status
SUPPORTED_CIPHERS = {
'AES-256-GCM': {
'class': algorithms.AES,
'mode': lambda nonce: modes.GCM(nonce),
'key_size': 32,
'nonce_size': 12,
'deprecated': False
},
'AES-128-CBC': {
'class': algorithms.AES,
'mode': lambda iv: modes.CBC(iv),
'key_size': 16,
'nonce_size': 16,
'deprecated': True
}
}
def encrypt_data(data, algorithm='AES-256-GCM'):
"""Encrypt data with specified algorithm"""
if algorithm not in SUPPORTED_CIPHERS:
raise ValueError("Unsupported encryption algorithm")
config = SUPPORTED_CIPHERS[algorithm]
key = get_encryption_key(config['key_size'])
nonce = os.urandom(config['nonce_size'])
cipher = Cipher(
config['class'](key),
config['mode'](nonce),
backend=default_backend()
)
encryptor = cipher.encryptor()
# Add padding for CBC
if algorithm == 'AES-128-CBC':
padder = padding.PKCS7(128).padder()
data = padder.update(data) + padder.finalize()
ciphertext = encryptor.update(data) + encryptor.finalize()
# For GCM, include tag
if algorithm.startswith('AES') and 'GCM' in algorithm:
ciphertext += encryptor.tag
# Return format: algorithm|nonce|ciphertext
return f"{algorithm}|{base64.b64encode(nonce).decode()}|{base64.b64encode(ciphertext).decode()}"
def decrypt_data(encrypted_data):
"""Decrypt data with automatic algorithm detection"""
algorithm, nonce_b64, ciphertext_b64 = encrypted_data.split('|')
if algorithm not in SUPPORTED_CIPHERS:
raise ValueError("Unsupported encryption algorithm")
config = SUPPORTED_CIPHERS[algorithm]
nonce = base64.b64decode(nonce_b64)
ciphertext = base64.b64decode(ciphertext_b64)
key = get_encryption_key(config['key_size'])
# Separate tag for GCM
if algorithm.endswith('GCM'):
ciphertext, tag = ciphertext[:-16], ciphertext[-16:]
cipher = Cipher(
config['class'](key),
config['mode'](nonce),
backend=default_backend()
)
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize_with_tag(tag)
else:
cipher = Cipher(
config['class'](key),
config['mode'](nonce),
backend=default_backend()
)
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
# Remove padding for CBC
if algorithm == 'AES-128-CBC':
unpadder = padding.PKCS7(128).unpadder()
plaintext = unpadder.update(plaintext) + unpadder.finalize()
# Log deprecated algorithm usage for migration
if config['deprecated']:
app.logger.warning(f"Deprecated encryption algorithm used: {algorithm}")
return plaintext
```
---
#### **470. How do you implement "secure data redaction" for logs and monitoring?**
**Answer:**
```python
import re
from logging import Filter
# Patterns to redact
REDACT_PATTERNS = [
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '***@***.***'),
(r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b', '****-****-****-****'),
(r'\b[A-Za-z0-9+/=]{40,}\b', '***SECRET***'),
(r'Bearer [A-Za-z0-9\-_\.]+', 'Bearer ***')
]
class RedactingFilter(Filter):
"""Log filter that redacts sensitive information"""
def filter(self, record):
if isinstance(record.msg, str):
for pattern, replacement in REDACT_PATTERNS:
record.msg = re.sub(pattern, replacement, record.msg)
# Process log arguments
if record.args:
args = list(record.args)
for i, arg in enumerate(args):
if isinstance(arg, str):
for pattern, replacement in REDACT_PATTERNS:
args[i] = re.sub(pattern, replacement, arg)
record.args = tuple(args)
return True
# Configure logging
def configure_logging():
logger = logging.getLogger()
logger.addFilter(RedactingFilter())
# For JSON logging
if app.config['JSON_LOGGING']:
formatter = JsonFormatter(
json_encoder=CustomJsonEncoder,
# Redact fields in JSON
extra={
'email': '***@***.***',
'ssn': '***-**-****',
'api_key': '***'
}
)
# Usage in routes
@app.route('/sensitive-route')
def sensitive_route():
app.logger.info(
"Processing sensitive data for user %s with token %s",
current_user.email,
request.headers.get('X-API-Key')
)
# The logged message will have redacted values
```
---
#### **471. How do you prevent "insecure API versioning" that exposes deprecated endpoints?**
**Answer:**
```python
from functools import wraps
# Track deprecated API versions
DEPRECATED_VERSIONS = {
'v1': {
'sunset': datetime(2023, 12, 31),
'replacement': 'v2'
},
'v2': {
'sunset': datetime(2024, 6, 30),
'replacement': 'v3'
}
}
def api_version(version):
"""Decorator for API version control"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Check if version is deprecated
if version in DEPRECATED_VERSIONS:
deprecation = DEPRECATED_VERSIONS[version]
response = make_response(f(*args, **kwargs))
# Add deprecation headers
response.headers['Deprecation'] = 'true'
response.headers['Sunset'] = deprecation['sunset'].strftime('%a, %d %b %Y %H:%M:%S GMT')
response.headers['Link'] = (
f'<{url_for("api_docs", version=deprecation["replacement"])}>; rel="successor-version"'
)
# Log deprecation usage
app.logger.warning(
"Deprecated API version used: %s from %s",
version,
request.remote_addr
)
return response
return f(*args, **kwargs)
return decorated
return decorator
# Automatic version routing
@app.before_request
def route_api_version():
# Get version from header or URL
version = request.headers.get('Accept', '').split('/')[-1] or request.view_args.get('version', 'v1')
# Check if version exists
if version not in app.view_functions:
return jsonify(error="API version not supported"), 404
# Redirect deprecated versions
if version in DEPRECATED_VERSIONS:
return redirect(
request.url.replace(version, DEPRECATED_VERSIONS[version]['replacement']),
code=301
)
# Usage
@app.route('/api/v1/users')
@api_version('v1')
def get_users_v1():
# Legacy implementation
return jsonify(users=convert_to_v1_format(User.query.all()))
@app.route('/api/v2/users')
@api_version('v2')
def get_users_v2():
# Current implementation
return jsonify(data=UserSchema(many=True).dump(User.query.all()))
```
---
#### **472. How do you implement "secure data sanitization" for CSV exports?**
**Answer:**
```python
import csv
import io
from flask import Response
def sanitize_csv_value(value):
"""Sanitize value for CSV to prevent formula injection"""
# Check for formula injection attempts
if isinstance(value, str) and value.startswith(('=', '+', '-', '@')):
return "'" + value # Prepend apostrophe to treat as text
# Escape double quotes
if isinstance(value, str):
return value.replace('"', '""')
return value
def generate_sanitized_csv(query, field_names):
"""Generate sanitized CSV from query results"""
output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL)
# Write header
writer.writerow(field_names)
# Write data
for obj in query:
row = [
sanitize_csv_value(getattr(obj, field))
for field in field_names
]
writer.writerow(row)
return output.getvalue()
@app.route('/export/users.csv')
@login_required
def export_users():
# Verify permissions
if not current_user.can_export_users:
abort(403)
# Get data
users = User.query.all()
# Generate sanitized CSV
csv_data = generate_sanitized_csv(
users,
['id', 'email', 'name', 'created_at']
)
# Return as attachment
return Response(
csv_data,
mimetype='text/csv',
headers={
'Content-Disposition': 'attachment; filename=users_export.csv',
'Content-Security-Policy': "default-src 'none'"
}
)
```
---
#### **473. How do you handle "insecure deserialization of session data"?**
**Answer:**
```python
from flask import Flask, session
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
import pickle
class SecureSessionInterface:
"""Secure session interface with encrypted, signed session data"""
def __init__(self, secret_key, salt='secure-session',
serialization_format='pickle', max_age=86400):
self.serializer = URLSafeTimedSerializer(
secret_key,
salt=salt,
serializer=pickle if serialization_format == 'pickle' else None
)
self.max_age = max_age
def open_session(self, app, request):
session_cookie = request.cookies.get(app.session_cookie_name)
if not session_cookie:
return None
try:
# Verify and load session data
data = self.serializer.loads(
session_cookie,
max_age=self.max_age
)
return data
except (BadSignature, SignatureExpired):
return None
def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
# Remove session if empty
if not session:
if session.modified:
response.delete_cookie(
app.session_cookie_name,
domain=domain,
path=path
)
return
# Serialize and sign session data
session_cookie = self.serializer.dumps(dict(session))
# Set cookie
response.set_cookie(
app.session_cookie_name,
session_cookie,
expires=self.get_expiration_time(app, session),
httponly=True,
domain=domain,
path=path,
secure=app.config['SESSION_COOKIE_SECURE'],
samesite=app.config['SESSION_COOKIE_SAMESITE']
)
# Configure Flask
app = Flask(__name__)
app.session_interface = SecureSessionInterface(
secret_key=app.config['SECRET_KEY'],
serialization_format='json' # Critical: Use JSON instead of pickle
)
app.config.update(
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Lax',
SESSION_COOKIE_SAMESITE='None' if app.config['USE_MOBILE'] else 'Lax'
)
```
---
#### **474. How do you implement "secure multi-tenancy" with row-level security?**
**Answer:**
```python
from sqlalchemy import event
from flask import g
# Track current tenant
@app.before_request
def set_tenant():
tenant_id = request.headers.get('X-Tenant-ID')
if not tenant_id or not is_valid_tenant(tenant_id):
abort(401, "Invalid tenant ID")
g.tenant_id = tenant_id
# SQLAlchemy event listener for row-level security
@event.listens_for(db.session, 'before_flush')
def before_flush(session, flush_context, instances):
# Apply tenant filter to all queries
for query in session.query_history:
if hasattr(query.column_descriptions[0]['type'], 'tenant_id'):
query.filter(query.column_descriptions[0]['type'].tenant_id == g.tenant_id)
# Base model for multi-tenancy
class TenantModel(db.Model):
__abstract__ = True
tenant_id = db.Column(db.Integer, nullable=False)
@classmethod
def query(cls):
return super().query.filter_by(tenant_id=g.tenant_id)
# Usage in models
class Document(TenantModel):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text)
# tenant_id inherited from TenantModel
# Alternative: PostgreSQL Row Level Security
"""
CREATE POLICY tenant_isolation ON documents
USING (tenant_id = current_setting('app.current_tenant')::integer);
-- Set tenant in session
SET app.current_tenant = 123;
"""
```
---
#### **475. How do you prevent "insecure direct object references" in file storage?**
**Answer:**
```python
import uuid
import os
from flask import send_file, abort
@app.route('/files/<file_id>')
@login_required
def get_file(file_id):
# Verify file exists in database
file_record = File.query.get(file_id)
if not file_record:
abort(404)
# Verify user has access
if not current_user.can_access_file(file_record):
abort(403)
# Generate secure path
secure_path = os.path.join(
app.config['STORAGE_ROOT'],
file_record.storage_path
)
# Verify path is within storage root
if not os.path.realpath(secure_path).startswith(os.path.realpath(app.config['STORAGE_ROOT'])):
app.logger.critical(f"Path traversal attempt: {secure_path}")
abort(403)
# Send file with proper headers
return send_file(
secure_path,
as_attachment=True,
download_name=file_record.original_name,
mimetype=file_record.mime_type,
conditional=True
)
# Secure file upload
@app.route('/upload', methods=['POST'])
@login_required
def upload_file():
if 'file' not in request.files:
return "No file provided", 400
file = request.files['file']
if file.filename == '':
return "Empty filename", 400
# Generate secure storage path
storage_id = str(uuid.uuid4())
storage_path = os.path.join(
current_user.id,
storage_id[:2],
storage_id
)
# Save to secure location
full_path = os.path.join(app.config['STORAGE_ROOT'], storage_path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
file.save(full_path)
# Create database record
file_record = File(
user_id=current_user.id,
original_name=file.filename,
storage_path=storage_path,
mime_type=file.mimetype
)
db.session.add(file_record)
db.session.commit()
return jsonify(file_id=file_record.id), 201
```
---
#### **476. How do you implement "secure API key rotation" with zero downtime?**
**Answer:**
```python
class APIKey(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
key_hash = db.Column(db.String(128))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
rotated_at = db.Column(db.DateTime)
status = db.Column(db.String(20), default='active') # active, rotating, rotated, revoked
@app.route('/api/keys/rotate', methods=['POST'])
@login_required
def rotate_api_keys():
# Create new key
new_key = create_api_key(current_user)
# Mark old keys as rotating
APIKey.query.filter(
APIKey.user_id == current_user.id,
APIKey.status == 'active'
).update({'status': 'rotating'})
db.session.commit()
# Return both keys for transition
return jsonify(
new_key=new_key,
rotation_period=app.config['API_KEY_ROTATION_PERIOD']
), 200
@app.before_request
def authenticate_api_key():
key = request.headers.get('X-API-KEY')
if not key:
return
# Try active keys first
api_key = APIKey.query.filter(
APIKey.key_hash == key,
APIKey.status == 'active'
).first()
# If not found, try rotating keys (during transition)
if not api_key:
api_key = APIKey.query.filter(
APIKey.key_hash == key,
APIKey.status == 'rotating',
APIKey.rotated_at > datetime.utcnow() - app.config['API_KEY_ROTATION_PERIOD']
).first()
if not api_key or not check_password_hash(api_key.key_hash, key):
abort(401)
# Update usage
api_key.last_used = datetime.utcnow()
db.session.commit()
@celery.task
def complete_key_rotation():
"""Complete rotation after transition period"""
cutoff = datetime.utcnow() - app.config['API_KEY_ROTATION_PERIOD']
APIKey.query.filter(
APIKey.status == 'rotating',
APIKey.rotated_at < cutoff
).update({'status': 'rotated'})
db.session.commit()
# Revoke unused old keys
APIKey.query.filter(
APIKey.status == 'rotated',
APIKey.last_used < cutoff
).update({'status': 'revoked'})
db.session.commit()
```
---
#### **477. How do you handle "insecure third-party API integrations"?**
**Answer:**
```python
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from requests.exceptions import Timeout, ConnectionError
# Configure retry strategy
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((Timeout, ConnectionError))
)
def safe_api_call(url, method='get', **kwargs):
"""Make safe API call with retries and timeouts"""
# Validate URL
if not is_safe_url(url):
raise ValueError("Invalid API URL")
# Add security headers
headers = kwargs.get('headers', {})
headers.update({
'User-Agent': 'MyApp/1.0',
'X-Request-ID': str(uuid.uuid4())
})
# Make request with timeout
response = requests.request(
method,
url,
timeout=(3.05, 10), # Connect timeout, read timeout
headers=headers,
**kwargs
)
# Check for suspicious responses
if response.status_code == 418 or 'X-Debug' in response.headers:
raise SecurityError("Suspicious API response")
# Verify response content type
if 'application/json' in response.headers.get('Content-Type', ''):
try:
return response.json()
except ValueError:
raise SecurityError("Invalid JSON response")
return response.text
# Usage
@app.route('/external-data')
def get_external_data():
try:
data = safe_api_call(
'https://api.trusted-partner.com/data',
params={'api_key': current_user.external_api_key},
verify=True # Verify SSL
)
return jsonify(data)
except (SecurityError, ValueError) as e:
app.logger.error("API integration failed: %s", str(e))
return "Service temporarily unavailable", 503
```
---
#### **478. How do you implement "secure data validation" for file uploads?**
**Answer:**
```python
import magic
import imghdr
from werkzeug.utils import secure_filename
ALLOWED_MIME_TYPES = {
'image/jpeg': ['jpg', 'jpeg'],
'image/png': ['png'],
'application/pdf': ['pdf']
}
def validate_upload(file):
"""Validate file upload with multiple checks"""
# Check filename
if not file.filename:
return False, "Empty filename"
# Check extension
ext = os.path.splitext(file.filename)[1][1:].lower()
if ext not in {ext for extensions in ALLOWED_MIME_TYPES.values() for ext in extensions}:
return False, f"Invalid file type: .{ext}"
# Check MIME type from content
file.seek(0)
mime = magic.from_buffer(file.read(2048), mime=True)
file.seek(0)
if mime not in ALLOWED_MIME_TYPES:
return False, f"Invalid MIME type: {mime}"
# Additional validation by type
if mime.startswith('image/'):
# Check image dimensions
img = Image.open(file)
if img.width > 5000 or img.height > 5000:
return False, "Image too large"
file.seek(0)
elif mime == 'application/pdf':
# Check PDF metadata for scripts
if contains_javascript(file):
return False, "PDF contains JavaScript"
file.seek(0)
return True, ""
def contains_javascript(file):
"""Check if PDF contains JavaScript"""
content = file.read()
file.seek(0)
return b'/JS' in content or b'/JavaScript' in content
# Usage
@app.route('/upload', methods=['POST'])
def upload():
if 'file' not in request.files:
return "No file provided", 400
file = request.files['file']
is_valid, message = validate_upload(file)
if not is_valid:
return message, 400
# Save file
filename = f"{secrets.token_hex(16)}.{ext}"
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return "File uploaded", 201
```
---
#### **479. How do you prevent "insecure direct object references" in caching?**
**Answer:**
```python
from functools import wraps
def cache_per_user(ttl=300):
"""Cache decorator that includes user ID in cache key"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Generate cache key with user ID
user_id = current_user.id if current_user.is_authenticated else 'anon'
cache_key = f"{user_id}:{request.path}:{json.dumps(kwargs)}"
# Try cache
cached = cache.get(cache_key)
if cached is not None:
return cached
# Execute function
result = f(*args, **kwargs)
# Cache result
cache.set(cache_key, result, ttl=ttl)
return result
return decorated
return decorator
def cache_per_tenant(ttl=300):
"""Cache decorator that includes tenant ID in cache key"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Get tenant ID from request
tenant_id = request.headers.get('X-Tenant-ID', 'default')
cache_key = f"{tenant_id}:{request.path}:{json.dumps(kwargs)}"
# Cache logic similar to above
# ...
return decorated
return decorator
# Usage
@app.route('/user-data')
@login_required
@cache_per_user(ttl=60)
def user_data():
# Expensive operation
return jsonify(data=get_user_data(current_user.id))
@app.route('/tenant-data')
@cache_per_tenant(ttl=300)
def tenant_data():
# Tenant-specific data
return jsonify(data=get_tenant_data(g.tenant_id))
```
---
#### **480. How do you implement "secure data sanitization" for search queries?**
**Answer:**
```python
import re
from sqlalchemy import or_
def sanitize_search_query(query):
"""Sanitize search query to prevent injection"""
# Remove potentially dangerous characters
query = re.sub(r'[;\'"\\|&$`!]', '', query)
# Limit length
query = query[:100]
# Escape special characters for LIKE
query = re.sub(r'([%_])', r'\\\1', query)
return query
def search_users(term):
"""Search users with sanitized query"""
sanitized = sanitize_search_query(term)
# Use parameterized query
return User.query.filter(
or_(
User.name.ilike(f'%{sanitized}%'),
User.email.ilike(f'%{sanitized}%')
)
).limit(50).all()
@app.route('/search')
def search():
term = request.args.get('q', '')
if not term:
return jsonify(results=[])
# Rate limiting
if too_many_searches(request.remote_addr):
return jsonify(error="Too many searches"), 429
# Log sanitized query
app.logger.info("Search query: %s", sanitize_search_query(term))
# Execute search
results = search_users(term)
# Return sanitized results
return jsonify(results=[
{
'id': user.id,
'name': escape(user.name),
'email': escape(user.email)
} for user in results
])
```
---
#### **481. How do you handle "insecure cryptography key management"?**
**Answer:**
```python
import os
import base64
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
class KeyManager:
"""Secure key management with rotation support"""
def __init__(self, key_store):
self.key_store = key_store
self.current_key = None
self.previous_keys = []
# Load keys at startup
self.load_keys()
def load_keys(self):
"""Load encryption keys from secure storage"""
# Get current key
current_key_data = self.key_store.get('current_key')
if current_key_data:
self.current_key = self.deserialize_key(current_key_data)
# Get previous keys
previous_keys_data = self.key_store.get('previous_keys', [])
self.previous_keys = [
self.deserialize_key(data) for data in previous_keys_data
]
def rotate_keys(self):
"""Rotate encryption keys"""
# Store current key as previous
if self.current_key:
self.previous_keys.append(self.current_key)
# Keep only last 5 keys
self.previous_keys = self.previous_keys[-5:]
# Generate new key
self.current_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# Save keys
self.key_store.set('current_key', self.serialize_key(self.current_key))
self.key_store.set('previous_keys', [
self.serialize_key(key) for key in self.previous_keys
])
def serialize_key(self, key):
"""Serialize key to secure format"""
return key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(
os.urandom(32) # Generate new encryption key
)
)
def deserialize_key(self, data):
"""Deserialize key from secure storage"""
return serialization.load_pem_private_key(
data,
password=None, # Decryption handled by key store
backend=default_backend()
)
def get_encryption_key(self):
"""Get current encryption key"""
if not self.current_key:
self.rotate_keys()
return self.current_key
def decrypt_data(self, encrypted_data, key_id=None):
"""Decrypt data using appropriate key"""
# Determine which key to use
if key_id and key_id == 'current':
key = self.current_key
elif key_id and key_id.startswith('previous_'):
idx = int(key_id.split('_')[1])
key = self.previous_keys[idx]
else:
# Try current key first
try:
return self._decrypt_with_key(encrypted_data, self.current_key)
except:
# Try previous keys
for key in self.previous_keys:
try:
return self._decrypt_with_key(encrypted_data, key)
except:
continue
raise ValueError("Could not decrypt data")
return self._decrypt_with_key(encrypted_data, key)
def _decrypt_with_key(self, encrypted_data, key):
"""Decrypt data with specific key"""
# Implementation depends on encryption method
# ...
# Usage
key_manager = KeyManager(VaultKeyStore())
encrypted = encrypt_data(data, key_manager.get_encryption_key())
decrypted = key_manager.decrypt_data(encrypted)
```
---
#### **482. How do you implement "secure data validation" for webhooks?**
**Answer:**
```python
import hmac
import hashlib
from functools import wraps
def verify_webhook(f):
"""Decorator to verify webhook signatures"""
@wraps(f)
def decorated(*args, **kwargs):
# Get signature from header
signature = request.headers.get('X-Hook-Signature')
if not signature:
abort(401, "Missing signature")
# Get secret for this provider
provider = request.headers.get('X-Provider')
secret = get_webhook_secret(provider)
if not secret:
abort(401, "Invalid provider")
# Calculate expected signature
body = request.get_data()
expected = hmac.new(
secret.encode(),
body,
hashlib.sha256
).hexdigest()
# Constant-time comparison
if not hmac.compare_digest(signature, expected):
abort(401, "Invalid signature")
# Verify timestamp (prevent replay attacks)
timestamp = request.headers.get('X-Timestamp')
if not timestamp or abs(time.time() - int(timestamp)) > 300:
abort(401, "Expired webhook")
return f(*args, **kwargs)
return decorated
# Usage
@app.route('/webhooks/github', methods=['POST'])
@verify_webhook
def github_webhook():
# Process GitHub webhook
event = request.headers.get('X-GitHub-Event')
payload = request.json
# Verify event type
if event not in ALLOWED_GITHUB_EVENTS:
return "Event not supported", 400
# Process event
process_github_event(event, payload)
return "OK", 200
def get_webhook_secret(provider):
"""Get webhook secret for provider"""
secrets = {
'github': app.config['GITHUB_WEBHOOK_SECRET'],
'stripe': app.config['STRIPE_WEBHOOK_SECRET']
}
return secrets.get(provider)
```
---