commit 647928a0a1bb9d0ef96ea649f80d21c8dd7a0441 Author: Rodin Date: Sun May 10 22:45:03 2026 -0700 Initial commit: 9 security patterns for code review Fundamentals: secure-defaults, input-validation, credential-handling, audit-logging Identity: authentication, authorization Attack Prevention: injection-prevention, dos-prevention, prompt-injection diff --git a/README.md b/README.md new file mode 100644 index 0000000..888fb5c --- /dev/null +++ b/README.md @@ -0,0 +1,44 @@ +# Security Patterns + +Scannable patterns for security code review. Each file has: +- **Rule** — what to do +- **Correct Pattern** — code that works (Python) +- **Incorrect Pattern** — common mistakes +- **Edge Cases** — gotchas + +## Patterns + +### Fundamentals + +| File | Topic | +|------|-------| +| [secure-defaults.md](secure-defaults.md) | Fail closed, deny by default, defense in depth | +| [input-validation.md](input-validation.md) | Allowlist > blocklist, validate at boundaries | +| [credential-handling.md](credential-handling.md) | No hardcoded secrets, environment/secret manager | +| [audit-logging.md](audit-logging.md) | What to log, what not to log | + +### Identity + +| File | Topic | +|------|-------| +| [authentication.md](authentication.md) | Passwords, tokens, MFA, brute force protection | +| [authorization.md](authorization.md) | Permission checks, IDOR prevention, privilege escalation | + +### Attack Prevention + +| File | Topic | +|------|-------| +| [injection-prevention.md](injection-prevention.md) | SQL, command, template, path traversal | +| [dos-prevention.md](dos-prevention.md) | Rate limiting, resource bounds, algorithmic complexity | +| [prompt-injection.md](prompt-injection.md) | LLM security, data/instruction separation | + +## Sources + +- [OWASP Cheat Sheet Series](https://cheatsheetseries.owasp.org/) +- [OWASP Top 10](https://owasp.org/Top10/) +- [OWASP LLM Top 10](https://owasp.org/www-project-top-10-for-large-language-model-applications/) +- [CWE (Common Weakness Enumeration)](https://cwe.mitre.org/) + +## Usage + +Reference these patterns when building or reviewing systems. Code examples are in Python for universal model comprehension; concepts apply to any language. diff --git a/audit-logging.md b/audit-logging.md new file mode 100644 index 0000000..b4b4988 --- /dev/null +++ b/audit-logging.md @@ -0,0 +1,134 @@ +# Audit Logging + +## Rule + +Log security-relevant events. Never log secrets. + +**Source:** [OWASP Logging Cheat Sheet](https://cheatsheetseries.owasp.org/cheatsheets/Logging_Cheat_Sheet.html) + +## What to Log + +| Event | Log Level | Required Fields | +|-------|-----------|-----------------| +| Authentication success/failure | INFO/WARN | user_id, ip, timestamp, method | +| Authorization failure | WARN | user_id, resource, action, ip | +| Input validation failure | WARN | endpoint, validation_error, ip | +| Privilege escalation | WARN | user_id, old_role, new_role, by_whom | +| Data access (sensitive) | INFO | user_id, resource_type, resource_id | +| Configuration change | INFO | user_id, setting, old_value, new_value | +| Security control disabled | ALERT | user_id, control, reason | + +## Correct Pattern + +```python +import logging +import hashlib +from datetime import datetime + +# Structured logging +security_logger = logging.getLogger("security") + +def log_auth_attempt(user_id: str, success: bool, ip: str, method: str): + security_logger.info( + "authentication_attempt", + extra={ + "event_type": "auth", + "user_id": user_id, + "success": success, + "ip_address": ip, + "auth_method": method, + "timestamp": datetime.utcnow().isoformat(), + } + ) + +def log_access(user_id: str, resource: str, action: str, allowed: bool): + level = logging.INFO if allowed else logging.WARNING + security_logger.log( + level, + "access_attempt", + extra={ + "event_type": "access", + "user_id": user_id, + "resource": resource, + "action": action, + "allowed": allowed, + "timestamp": datetime.utcnow().isoformat(), + } + ) + +# Mask sensitive data in logs +def mask_sensitive(data: dict) -> dict: + """Mask sensitive fields for logging.""" + sensitive_keys = {"password", "token", "secret", "api_key", "ssn", "credit_card"} + masked = {} + for key, value in data.items(): + if any(s in key.lower() for s in sensitive_keys): + masked[key] = "[REDACTED]" + elif isinstance(value, dict): + masked[key] = mask_sensitive(value) + else: + masked[key] = value + return masked +``` + +## Incorrect Pattern + +```python +# Wrong: logging secrets +logging.info(f"User login with password: {password}") +logging.debug(f"API call with key: {api_key}") + +# Wrong: no context +logging.warning("Invalid input") # Which input? Where? Who? + +# Wrong: user-controlled data in log format string +logging.info(user_input) # Log injection possible + +# Wrong: logging PII without purpose +logging.info(f"User {name} with SSN {ssn} logged in") +``` + +## Log Injection Prevention + +```python +# Wrong: allows log injection +def log_user_action(action: str): + logging.info(f"User action: {action}") + # Input: "action\n2024-01-01 INFO: Admin granted" + +# Correct: escape or use structured logging +def log_user_action(action: str): + # Option 1: escape newlines + safe_action = action.replace("\n", "\\n").replace("\r", "\\r") + logging.info(f"User action: {safe_action}") + + # Option 2: structured logging (preferred) + logging.info("user_action", extra={"action": action}) +``` + +## Retention and Protection + +```python +# Log retention policy +RETENTION_DAYS = { + "security": 365, # Keep security logs 1 year + "access": 90, # Access logs 90 days + "debug": 7, # Debug logs 7 days +} + +# Tamper detection +def log_with_hash(event: dict): + """Append hash for integrity verification.""" + event["_hash"] = hashlib.sha256( + json.dumps(event, sort_keys=True).encode() + ).hexdigest() + security_logger.info(event) +``` + +## Edge Cases + +- Logs themselves become attack surface (log4shell) +- PII in logs may violate GDPR/CCPA +- High-volume logging can be used for DOS +- Stack traces may leak sensitive info +- Correlation IDs needed for distributed tracing diff --git a/authentication.md b/authentication.md new file mode 100644 index 0000000..bb916d3 --- /dev/null +++ b/authentication.md @@ -0,0 +1,159 @@ +# Authentication + +## Rule + +Verify identity before granting access. Use proven libraries, not DIY crypto. + +**Source:** [OWASP Authentication Cheat Sheet](https://cheatsheetseries.owasp.org/cheatsheets/Authentication_Cheat_Sheet.html) + +## Password Handling + +### Correct Pattern + +```python +import bcrypt +import secrets + +def hash_password(password: str) -> bytes: + """Hash password using bcrypt with automatic salt.""" + return bcrypt.hashpw(password.encode(), bcrypt.gensalt(rounds=12)) + +def verify_password(password: str, hashed: bytes) -> bool: + """Verify password against hash. Constant-time comparison.""" + return bcrypt.checkpw(password.encode(), hashed) + +# Password requirements +MIN_PASSWORD_LENGTH = 12 +COMMON_PASSWORDS = load_common_passwords() # Top 10k list + +def validate_password(password: str) -> list[str]: + """Return list of validation errors.""" + errors = [] + if len(password) < MIN_PASSWORD_LENGTH: + errors.append(f"Password must be at least {MIN_PASSWORD_LENGTH} characters") + if password.lower() in COMMON_PASSWORDS: + errors.append("Password is too common") + return errors +``` + +### Incorrect Pattern + +```python +# Wrong: plain text storage +user.password = password + +# Wrong: weak hashing +user.password = hashlib.md5(password.encode()).hexdigest() + +# Wrong: SHA without salt +user.password = hashlib.sha256(password.encode()).hexdigest() + +# Wrong: reversible encryption +user.password = encrypt(password, key) + +# Wrong: timing attack vulnerable +if user.password == submitted_password: + grant_access() +``` + +## Token Management + +### Correct Pattern + +```python +import secrets +from datetime import datetime, timedelta + +def generate_token() -> str: + """Generate cryptographically secure token.""" + return secrets.token_urlsafe(32) + +def generate_session(user_id: str) -> dict: + """Create session with expiration.""" + return { + "token": generate_token(), + "user_id": user_id, + "created_at": datetime.utcnow(), + "expires_at": datetime.utcnow() + timedelta(hours=24), + } + +def validate_session(session: dict) -> bool: + """Check session validity.""" + if datetime.utcnow() > session["expires_at"]: + return False + return True +``` + +### Incorrect Pattern + +```python +# Wrong: predictable tokens +token = f"session_{user_id}_{int(time.time())}" + +# Wrong: no expiration +session = {"token": token, "user_id": user_id} + +# Wrong: client-controlled expiration +if request.cookies.get("expires") > now: # User can modify! + grant_access() +``` + +## Multi-Factor Authentication + +```python +import pyotp + +def setup_totp(user_id: str) -> str: + """Generate TOTP secret for user.""" + secret = pyotp.random_base32() + store_totp_secret(user_id, secret) + return secret + +def verify_totp(user_id: str, code: str) -> bool: + """Verify TOTP code with time window.""" + secret = get_totp_secret(user_id) + totp = pyotp.TOTP(secret) + return totp.verify(code, valid_window=1) # ±30 seconds +``` + +## Brute Force Protection + +```python +from collections import defaultdict +import time + +class LoginRateLimiter: + def __init__(self): + self.attempts = defaultdict(list) + self.lockouts = {} + + def record_attempt(self, identifier: str, success: bool): + now = time.time() + + if not success: + self.attempts[identifier].append(now) + # Clean old attempts + self.attempts[identifier] = [ + t for t in self.attempts[identifier] + if now - t < 3600 # 1 hour window + ] + + # Lockout after 5 failures + if len(self.attempts[identifier]) >= 5: + self.lockouts[identifier] = now + 900 # 15 min lockout + else: + self.attempts[identifier] = [] + self.lockouts.pop(identifier, None) + + def is_locked(self, identifier: str) -> bool: + lockout_until = self.lockouts.get(identifier, 0) + return time.time() < lockout_until +``` + +## Edge Cases + +- Timing attacks on username enumeration +- Account lockout as DOS vector +- Session fixation attacks +- Token leakage in logs/URLs +- Password reset token reuse diff --git a/authorization.md b/authorization.md new file mode 100644 index 0000000..9b6d7e8 --- /dev/null +++ b/authorization.md @@ -0,0 +1,134 @@ +# Authorization + +## Rule + +Verify permissions on every request. Default deny. Check at the resource, not just the route. + +**Source:** [OWASP Authorization Cheat Sheet](https://cheatsheetseries.owasp.org/cheatsheets/Authorization_Cheat_Sheet.html) + +## Correct Pattern + +```python +from enum import Enum +from functools import wraps + +class Permission(Enum): + READ = "read" + WRITE = "write" + DELETE = "delete" + ADMIN = "admin" + +def check_permission(user_id: str, resource_type: str, + resource_id: str, permission: Permission) -> bool: + """Check if user has permission on specific resource.""" + # Get user's roles + roles = get_user_roles(user_id) + + # Check resource-level permissions + resource_perms = get_resource_permissions(resource_type, resource_id) + + for role in roles: + if permission in resource_perms.get(role, []): + return True + + # Check ownership + if get_resource_owner(resource_type, resource_id) == user_id: + if permission in [Permission.READ, Permission.WRITE]: + return True + + return False # Default deny + +def require_permission(resource_type: str, permission: Permission): + """Decorator to enforce authorization.""" + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + user_id = get_current_user_id() + resource_id = kwargs.get("resource_id") or args[0] + + if not check_permission(user_id, resource_type, resource_id, permission): + log_access(user_id, f"{resource_type}/{resource_id}", + permission.value, allowed=False) + raise PermissionDenied() + + log_access(user_id, f"{resource_type}/{resource_id}", + permission.value, allowed=True) + return func(*args, **kwargs) + return wrapper + return decorator + +@require_permission("document", Permission.READ) +def get_document(resource_id: str): + return Document.query.get(resource_id) +``` + +## Incorrect Pattern + +```python +# Wrong: checking only authentication, not authorization +@login_required +def delete_document(doc_id): + Document.query.get(doc_id).delete() # Any logged-in user can delete! + +# Wrong: client-side only checks +if user.role == "admin": # Checked in JavaScript only + show_admin_panel() + +# Wrong: IDOR vulnerability +@app.route("/api/users//profile") +def get_profile(user_id): + return User.query.get(user_id).to_dict() # No ownership check! + +# Wrong: relying on hidden URLs +@app.route("/admin/secret/delete-all") # Security through obscurity +def delete_all(): + ... +``` + +## IDOR Prevention + +```python +# Insecure Direct Object Reference - always verify ownership + +# Wrong +@app.route("/api/orders/") +def get_order(order_id): + return Order.query.get(order_id) # Any user can view any order + +# Correct +@app.route("/api/orders/") +def get_order(order_id): + order = Order.query.get(order_id) + if order.user_id != current_user.id: + if not current_user.has_permission("orders.view_all"): + raise PermissionDenied() + return order +``` + +## Privilege Escalation Prevention + +```python +def update_user_role(actor_id: str, target_user_id: str, new_role: str): + """Prevent privilege escalation.""" + actor = get_user(actor_id) + + # Can't grant roles higher than your own + if ROLE_HIERARCHY[new_role] > ROLE_HIERARCHY[actor.role]: + raise PermissionDenied("Cannot grant role higher than your own") + + # Can't modify users with higher roles + target = get_user(target_user_id) + if ROLE_HIERARCHY[target.role] >= ROLE_HIERARCHY[actor.role]: + raise PermissionDenied("Cannot modify user with equal or higher role") + + target.role = new_role + log_role_change(actor_id, target_user_id, target.role, new_role) +``` + +## Edge Cases + +- Time-of-check to time-of-use (TOCTOU) race conditions +- Horizontal privilege escalation (user A accesses user B's data) +- Vertical privilege escalation (user becomes admin) +- Permission caching leading to stale authz +- Implicit permissions from group membership diff --git a/credential-handling.md b/credential-handling.md new file mode 100644 index 0000000..e006fc3 --- /dev/null +++ b/credential-handling.md @@ -0,0 +1,90 @@ +# Credential Handling + +## Rule + +Never hardcode secrets. Load from environment or secret manager at runtime. + +**Source:** [CWE-798: Use of Hard-coded Credentials](https://cwe.mitre.org/data/definitions/798.html) + +## Correct Pattern + +```python +import os +from functools import lru_cache + +@lru_cache(maxsize=1) +def get_api_key() -> str: + """Load API key from environment. Fail fast if missing.""" + key = os.environ.get("API_KEY") + if not key: + raise RuntimeError("API_KEY environment variable not set") + return key + +# For cloud environments, use secret manager +def get_secret(name: str) -> str: + """Load secret from cloud secret manager.""" + from google.cloud import secretmanager + client = secretmanager.SecretManagerServiceClient() + response = client.access_secret_version(name=name) + return response.payload.data.decode("UTF-8") +``` + +## Incorrect Pattern + +```python +# Wrong: hardcoded secret +API_KEY = "sk-1234567890abcdef" + +# Wrong: secret in config file checked into git +config = {"api_key": "sk-1234567890abcdef"} + +# Wrong: secret in default argument +def call_api(key="sk-1234567890abcdef"): + ... + +# Wrong: secret in error message +def validate_key(key): + if key != expected_key: + raise ValueError(f"Invalid key: {key}") # Leaks the key! + +# Wrong: secret in log +logging.info(f"Using API key: {api_key}") +``` + +## Secret Detection + +Block these patterns in CI: + +```python +import re + +SECRET_PATTERNS = [ + r'(?i)(api[_-]?key|apikey)\s*[=:]\s*["\'][^"\']+["\']', + r'(?i)(secret|password|passwd|pwd)\s*[=:]\s*["\'][^"\']+["\']', + r'(?i)bearer\s+[a-zA-Z0-9_-]+', + r'sk-[a-zA-Z0-9]{32,}', # OpenAI-style keys + r'ghp_[a-zA-Z0-9]{36}', # GitHub PAT +] + +def scan_for_secrets(content: str) -> list[str]: + findings = [] + for pattern in SECRET_PATTERNS: + if re.search(pattern, content): + findings.append(f"Potential secret: {pattern}") + return findings +``` + +## Environment Separation + +| Environment | Source | Notes | +|-------------|--------|-------| +| Development | `.env` file (gitignored) | Never commit | +| CI | CI secrets / vault | Injected at runtime | +| Production | Secret manager | Rotated automatically | + +## Edge Cases + +- Secrets in Docker build args leak to image history +- Environment variables visible in `/proc` on Linux +- Secrets in URLs get logged by proxies/load balancers +- Clipboard managers may capture pasted secrets diff --git a/dos-prevention.md b/dos-prevention.md new file mode 100644 index 0000000..2bd5ac2 --- /dev/null +++ b/dos-prevention.md @@ -0,0 +1,180 @@ +# Denial of Service Prevention + +## Rule + +Bound all resource consumption. Assume attackers will send worst-case input. + +**Source:** [CWE-400: Uncontrolled Resource Consumption](https://cwe.mitre.org/data/definitions/400.html) + +## Request Limits + +### Correct Pattern + +```python +from functools import wraps +import time + +# Rate limiting +class RateLimiter: + def __init__(self, max_requests: int, window_seconds: int): + self.max_requests = max_requests + self.window = window_seconds + self.requests = {} # ip -> [timestamps] + + def is_allowed(self, ip: str) -> bool: + now = time.time() + cutoff = now - self.window + + # Clean old entries + self.requests[ip] = [ + t for t in self.requests.get(ip, []) + if t > cutoff + ] + + if len(self.requests[ip]) >= self.max_requests: + return False + + self.requests[ip].append(now) + return True + +# Request size limits +MAX_BODY_SIZE = 10 * 1024 * 1024 # 10MB + +@app.before_request +def limit_request_size(): + if request.content_length and request.content_length > MAX_BODY_SIZE: + abort(413) # Payload too large +``` + +### Incorrect Pattern + +```python +# Wrong: no size limit +data = request.get_data() # Could be gigabytes + +# Wrong: unbounded loop based on user input +for i in range(int(request.args["count"])): + process_item(i) + +# Wrong: no timeout +response = requests.get(user_url) # Hangs forever +``` + +## Algorithmic Complexity + +### Correct Pattern + +```python +# Limit input size before expensive operations +MAX_ITEMS = 10000 + +def process_list(items: list) -> list: + if len(items) > MAX_ITEMS: + raise ValueError(f"Too many items: {len(items)} > {MAX_ITEMS}") + return sorted(items) # O(n log n) but bounded + +# Use timeouts for expensive operations +import signal + +def timeout_handler(signum, frame): + raise TimeoutError("Operation timed out") + +def with_timeout(seconds: int): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(seconds) + try: + return func(*args, **kwargs) + finally: + signal.alarm(0) + return wrapper + return decorator + +@with_timeout(5) +def expensive_operation(data): + ... +``` + +### Incorrect Pattern + +```python +# Wrong: O(n²) or worse on unbounded input +def find_duplicates(items): + for i in items: + for j in items: # O(n²) + if i == j: + yield i + +# Wrong: regex with catastrophic backtracking +import re +pattern = re.compile(r'(a+)+$') # ReDoS vulnerable +pattern.match('a' * 30 + 'b') # Hangs +``` + +## Memory Limits + +### Correct Pattern + +```python +# Stream large files instead of loading into memory +def process_large_file(path: str): + with open(path, 'r') as f: + for line in f: # Streaming, constant memory + process_line(line) + +# Limit collection sizes +class BoundedCache: + def __init__(self, max_size: int = 1000): + self.max_size = max_size + self.cache = {} + + def set(self, key, value): + if len(self.cache) >= self.max_size: + # Evict oldest + oldest = next(iter(self.cache)) + del self.cache[oldest] + self.cache[key] = value +``` + +### Incorrect Pattern + +```python +# Wrong: loading entire file into memory +data = open(path).read() # Could be huge + +# Wrong: unbounded cache +cache = {} +def get_or_compute(key): + if key not in cache: + cache[key] = expensive_compute(key) # Grows forever + return cache[key] +``` + +## Connection Limits + +```python +# Limit concurrent connections per IP +MAX_CONNECTIONS_PER_IP = 10 + +# Timeouts on all network operations +import socket +socket.setdefaulttimeout(30) + +# Connection pooling with limits +from urllib3 import PoolManager +http = PoolManager( + maxsize=100, + block=True, + timeout=30 +) +``` + +## Edge Cases + +- Zip bombs (small file, huge uncompressed) +- XML entity expansion (billion laughs attack) +- Hash collision attacks (hash flooding) +- Slowloris (slow, incomplete requests) +- Amplification attacks (small request, large response) diff --git a/injection-prevention.md b/injection-prevention.md new file mode 100644 index 0000000..995e117 --- /dev/null +++ b/injection-prevention.md @@ -0,0 +1,138 @@ +# Injection Prevention + +## Rule + +Never concatenate untrusted input into commands, queries, or templates. Use parameterized APIs. + +**Source:** [OWASP Injection](https://owasp.org/Top10/A03_2021-Injection/) + +## SQL Injection + +### Correct Pattern + +```python +# Parameterized query — safe +def get_user(user_id: int): + cursor.execute( + "SELECT * FROM users WHERE id = %s", + (user_id,) + ) + return cursor.fetchone() + +# ORM — safe +def get_user(user_id: int): + return User.query.filter_by(id=user_id).first() +``` + +### Incorrect Pattern + +```python +# Wrong: string concatenation +def get_user(user_id): + cursor.execute(f"SELECT * FROM users WHERE id = {user_id}") + # Input: "1; DROP TABLE users; --" + +# Wrong: string formatting +query = "SELECT * FROM users WHERE name = '%s'" % name +``` + +## Command Injection + +### Correct Pattern + +```python +import subprocess +import shlex + +# Use list form — shell=False prevents injection +def run_command(filename: str): + result = subprocess.run( + ["ls", "-la", filename], + capture_output=True, + shell=False # Critical! + ) + return result.stdout + +# If you must use shell, validate strictly +VALID_FILENAME = re.compile(r'^[a-zA-Z0-9._-]+$') + +def safe_filename(name: str) -> str: + if not VALID_FILENAME.match(name): + raise ValueError("Invalid filename") + return name +``` + +### Incorrect Pattern + +```python +# Wrong: shell=True with user input +subprocess.run(f"ls -la {filename}", shell=True) +# Input: "file.txt; rm -rf /" + +# Wrong: os.system +os.system(f"convert {input_file} {output_file}") +``` + +## Template Injection + +### Correct Pattern + +```python +# Use auto-escaping templates +from jinja2 import Environment, select_autoescape + +env = Environment(autoescape=select_autoescape(['html', 'xml'])) +template = env.get_template("page.html") +output = template.render(user_name=user_input) # Auto-escaped +``` + +### Incorrect Pattern + +```python +# Wrong: rendering user input as template +template = Template(user_input) # SSTI vulnerability + +# Wrong: disabling auto-escape +template.render(content=Markup(user_input)) +``` + +## Path Traversal + +### Correct Pattern + +```python +import os +from pathlib import Path + +UPLOAD_DIR = Path("/app/uploads").resolve() + +def safe_path(filename: str) -> Path: + """Ensure path stays within allowed directory.""" + # Resolve to absolute, normalized path + requested = (UPLOAD_DIR / filename).resolve() + + # Verify it's still under UPLOAD_DIR + if not requested.is_relative_to(UPLOAD_DIR): + raise ValueError("Path traversal detected") + + return requested +``` + +### Incorrect Pattern + +```python +# Wrong: direct concatenation +path = f"/app/uploads/{filename}" +# Input: "../../../etc/passwd" + +# Wrong: checking for ".." without resolving +if ".." not in filename: # Can bypass with encoding + open(f"/uploads/{filename}") +``` + +## Edge Cases + +- Second-order injection (stored, then executed later) +- Polyglot payloads (valid in multiple contexts) +- Encoding bypasses (URL, Unicode, hex) +- Blind injection (no visible output) diff --git a/input-validation.md b/input-validation.md new file mode 100644 index 0000000..af27c70 --- /dev/null +++ b/input-validation.md @@ -0,0 +1,102 @@ +# Input Validation + +## Rule + +Validate all input. Allowlist > blocklist. + +**Source:** [OWASP Input Validation Cheat Sheet](https://cheatsheetseries.owasp.org/cheatsheets/Input_Validation_Cheat_Sheet.html) + +## Correct Pattern + +```python +import re +from typing import Optional + +# Allowlist: only permit known-good patterns +VALID_USERNAME = re.compile(r'^[a-zA-Z0-9_]{3,20}$') +VALID_EMAIL = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$') + +def validate_username(username: str) -> Optional[str]: + """Return sanitized username or None if invalid.""" + if not username: + return None + username = username.strip() + if VALID_USERNAME.match(username): + return username + return None + +def validate_positive_int(value: str, max_value: int = 10000) -> Optional[int]: + """Parse and validate positive integer with upper bound.""" + try: + n = int(value) + if 0 < n <= max_value: + return n + except (ValueError, TypeError): + pass + return None +``` + +## Incorrect Pattern + +```python +# Wrong: blocklist approach (attackers find bypasses) +def sanitize(s): + bad = ["