From 5b9f30e663a8ce916e09cca1e538a004f7018889 Mon Sep 17 00:00:00 2001 From: Rodin Date: Sun, 10 May 2026 23:17:54 -0700 Subject: [PATCH] Add SSRF, race conditions, JWT security patterns High-priority patterns from completeness review: - ssrf.md: metadata endpoints, DNS rebinding, webhook validation - race-conditions.md: TOCTOU, atomic operations, file/db races - jwt-security.md: algorithm confusion, kid injection, refresh tokens Now 16 patterns covering comprehensive web application security. --- README.md | 7 +- jwt-security.md | 166 ++++++++++++++++++++++++++++++++++++ race-conditions.md | 205 +++++++++++++++++++++++++++++++++++++++++++++ ssrf.md | 174 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 550 insertions(+), 2 deletions(-) create mode 100644 jwt-security.md create mode 100644 race-conditions.md create mode 100644 ssrf.md diff --git a/README.md b/README.md index badd266..f9fcd35 100644 --- a/README.md +++ b/README.md @@ -26,15 +26,18 @@ Based on OWASP Top 10:2025 and recent security research. |------|-------|------------| | [authentication.md](authentication.md) | Passwords, tokens, MFA, brute force protection | A07 | | [authorization.md](authorization.md) | Permission checks, IDOR prevention, privilege escalation | A01 | +| [jwt-security.md](jwt-security.md) | Algorithm confusion, weak secrets, expiration | A07 | ### Attack Prevention | File | Topic | OWASP 2025 | |------|-------|------------| | [injection-prevention.md](injection-prevention.md) | SQL, command, template, path traversal | A05 | +| [ssrf.md](ssrf.md) | Server-side request forgery, metadata endpoints | A10 | | [dos-prevention.md](dos-prevention.md) | Rate limiting, resource bounds, algorithmic complexity | — | | [prompt-injection.md](prompt-injection.md) | LLM security, data/instruction separation | — | | [deserialization.md](deserialization.md) | Untrusted data deserialization, pickle, yaml | A08 | +| [race-conditions.md](race-conditions.md) | TOCTOU, atomic check-and-act, database locks | — | ### Infrastructure @@ -53,10 +56,10 @@ Based on OWASP Top 10:2025 and recent security research. | A04 | Cryptographic Failures | cryptography.md | | A05 | Injection | injection-prevention.md | | A06 | Insecure Design | secure-defaults.md | -| A07 | Authentication Failures | authentication.md | +| A07 | Authentication Failures | authentication.md, jwt-security.md | | A08 | Software or Data Integrity Failures | deserialization.md | | A09 | Security Logging and Alerting Failures | audit-logging.md | -| A10 | Mishandling of Exceptional Conditions | error-handling.md | +| A10 | Mishandling of Exceptional Conditions | error-handling.md, ssrf.md | ## Sources diff --git a/jwt-security.md b/jwt-security.md new file mode 100644 index 0000000..dc17b30 --- /dev/null +++ b/jwt-security.md @@ -0,0 +1,166 @@ +# JWT Security + +## Rule + +Verify algorithm, signature, issuer, audience, and expiration. Never trust the header blindly. + +**Source:** [RFC 7519: JSON Web Token](https://datatracker.ietf.org/doc/html/rfc7519) + +## Common JWT Attacks + +| Attack | Description | Defense | +|--------|-------------|---------| +| alg=none | Header specifies no signature | Reject `none` algorithm | +| Algorithm confusion | RS256 → HS256 with public key as secret | Allowlist algorithms | +| Weak secret | Brute-forceable HMAC secret | Min 256-bit random secret | +| Missing expiration | Token valid forever | Require `exp` claim | +| kid injection | Header `kid` used in SQL/file path | Sanitize `kid` value | +| JKU/X5U injection | Fetch attacker's keys | Ignore or allowlist URLs | + +## Correct Pattern + +```python +import jwt +from datetime import datetime, timedelta + +# Configuration - fixed, not from token +ALGORITHM = "RS256" # Asymmetric preferred +PUBLIC_KEY = load_public_key("keys/public.pem") +PRIVATE_KEY = load_private_key("keys/private.pem") +ISSUER = "https://auth.example.com" +AUDIENCE = "https://api.example.com" + +def create_token(user_id: str, roles: list[str]) -> str: + """Create a JWT with proper claims.""" + now = datetime.utcnow() + payload = { + "sub": user_id, + "roles": roles, + "iat": now, + "exp": now + timedelta(hours=1), # Short expiration + "iss": ISSUER, + "aud": AUDIENCE, + } + return jwt.encode(payload, PRIVATE_KEY, algorithm=ALGORITHM) + +def verify_token(token: str) -> dict: + """Verify JWT with strict validation.""" + try: + payload = jwt.decode( + token, + PUBLIC_KEY, + algorithms=[ALGORITHM], # Allowlist, not from token! + issuer=ISSUER, + audience=AUDIENCE, + options={ + "require": ["exp", "iat", "sub", "iss", "aud"], + "verify_exp": True, + "verify_iat": True, + "verify_iss": True, + "verify_aud": True, + } + ) + return payload + except jwt.ExpiredSignatureError: + raise AuthError("Token expired") + except jwt.InvalidTokenError as e: + raise AuthError(f"Invalid token: {e}") +``` + +## Incorrect Pattern + +```python +import jwt + +# Wrong: algorithm from token header +def bad_verify(token: str) -> dict: + header = jwt.get_unverified_header(token) + alg = header["algorithm"] # Attacker controls this! + return jwt.decode(token, SECRET, algorithms=[alg]) + +# Wrong: no algorithm restriction +def bad_verify_2(token: str) -> dict: + return jwt.decode(token, SECRET) # Accepts any algorithm + +# Wrong: weak secret +SECRET = "secret123" # Trivially brute-forced + +# Wrong: no expiration check +def bad_verify_3(token: str) -> dict: + return jwt.decode(token, SECRET, options={"verify_exp": False}) + +# Wrong: kid used in file path +def get_key(token: str): + header = jwt.get_unverified_header(token) + kid = header["kid"] + # Path traversal! kid = "../../../etc/passwd" + return open(f"keys/{kid}.pem").read() +``` + +## Algorithm Confusion Attack + +```python +# Attack scenario: +# 1. Server uses RS256 (asymmetric) +# 2. Attacker changes header to HS256 (symmetric) +# 3. Attacker signs with the PUBLIC key as HMAC secret +# 4. Vulnerable server verifies with public key +# 5. Signature matches! Token accepted + +# Vulnerable code +def vulnerable_verify(token: str, public_key: str): + # If alg=HS256, this uses public_key as HMAC secret + return jwt.decode(token, public_key, algorithms=["RS256", "HS256"]) + +# Secure code - explicit algorithm +def secure_verify(token: str, public_key: str): + return jwt.decode(token, public_key, algorithms=["RS256"]) +``` + +## Refresh Token Pattern + +```python +from secrets import token_urlsafe + +# Access token: short-lived JWT (15 min) +# Refresh token: long-lived opaque token in database + +def issue_tokens(user_id: str) -> tuple[str, str]: + access_token = create_token(user_id, exp_minutes=15) + refresh_token = token_urlsafe(32) # Opaque, not JWT + + # Store refresh token in database with metadata + RefreshToken.create( + token_hash=hash(refresh_token), + user_id=user_id, + expires_at=datetime.utcnow() + timedelta(days=30), + device_info=get_device_info() + ) + + return access_token, refresh_token + +def refresh_access_token(refresh_token: str) -> str: + """Exchange refresh token for new access token.""" + stored = RefreshToken.query.filter_by( + token_hash=hash(refresh_token) + ).first() + + if not stored or stored.is_expired or stored.is_revoked: + raise AuthError("Invalid refresh token") + + # Rotate refresh token (one-time use) + stored.revoke() + new_access, new_refresh = issue_tokens(stored.user_id) + + return new_access, new_refresh +``` + +## Edge Cases + +- JWTs in URLs leak to logs and referrer headers +- Token storage: `httpOnly` cookies vs localStorage (XSS risk) +- Clock skew between servers affects `exp`/`iat` validation +- Long-lived tokens: implement revocation list +- `nbf` (not before) should be validated +- Nested JWTs (JWE wrapping JWS) need careful handling +- Don't put sensitive data in JWT payload (base64 is not encryption) diff --git a/race-conditions.md b/race-conditions.md new file mode 100644 index 0000000..a80a7f9 --- /dev/null +++ b/race-conditions.md @@ -0,0 +1,205 @@ +# Race Conditions and TOCTOU + +## Rule + +Check-then-act must be atomic. Never trust state between check and use. + +**Source:** [CWE-362: Concurrent Execution using Shared Resource with Improper Synchronization](https://cwe.mitre.org/data/definitions/362.html) + +## TOCTOU (Time-of-Check to Time-of-Use) + +``` +Thread A: check(x) --> use(x) +Thread B: modify(x) + ^-- state changes between check and use +``` + +## Correct Pattern + +```python +import threading +from contextlib import contextmanager + +# Pattern 1: Atomic check-and-act with locking +class BankAccount: + def __init__(self, balance: Decimal): + self.balance = balance + self._lock = threading.Lock() + + def withdraw(self, amount: Decimal) -> bool: + """Atomic withdrawal - no race window.""" + with self._lock: + if self.balance >= amount: + self.balance -= amount + return True + return False + +# Pattern 2: Database-level atomicity +def transfer_funds(conn, from_id: int, to_id: int, amount: Decimal): + """Use database transaction + row locks.""" + with conn.begin(): + # SELECT FOR UPDATE prevents concurrent modification + from_acct = conn.execute( + "SELECT balance FROM accounts WHERE id = %s FOR UPDATE", + (from_id,) + ).fetchone() + + if from_acct.balance < amount: + raise InsufficientFunds() + + conn.execute( + "UPDATE accounts SET balance = balance - %s WHERE id = %s", + (amount, from_id) + ) + conn.execute( + "UPDATE accounts SET balance = balance + %s WHERE id = %s", + (amount, to_id) + ) + +# Pattern 3: Compare-and-swap (optimistic locking) +def update_with_version(conn, item_id: int, new_data: dict, expected_version: int): + """Fail if version changed since we read it.""" + result = conn.execute( + """UPDATE items + SET data = %s, version = version + 1 + WHERE id = %s AND version = %s""", + (new_data, item_id, expected_version) + ) + if result.rowcount == 0: + raise ConcurrentModificationError("Item was modified by another request") +``` + +## Incorrect Pattern + +```python +# Wrong: check-then-act without atomicity +class BankAccount: + def withdraw(self, amount): + if self.balance >= amount: # Check + # Race window! Another thread can withdraw here + self.balance -= amount # Act + return True + return False + +# Wrong: file race condition +def safe_write(path, data): + if not os.path.exists(path): # Check + # Race window! File could be created here + with open(path, 'w') as f: # Act + f.write(data) + +# Wrong: double-checked locking (broken in many languages) +_instance = None +_lock = threading.Lock() + +def get_instance(): + if _instance is None: # First check without lock + with _lock: + if _instance is None: # Second check + _instance = ExpensiveObject() + return _instance +``` + +## File System Races + +```python +import os +import tempfile + +# Wrong: check then create +def create_file(path): + if os.path.exists(path): + raise FileExistsError() + with open(path, 'w') as f: # Race! + f.write("data") + +# Correct: atomic creation (fails if exists) +def create_file_safe(path): + fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) + try: + os.write(fd, b"data") + finally: + os.close(fd) + +# Wrong: temp file with predictable name +def bad_temp(): + path = f"/tmp/myapp_{os.getpid()}.tmp" # Predictable! + with open(path, 'w') as f: + f.write(secret_data) + +# Correct: secure temp file +def good_temp(): + fd, path = tempfile.mkstemp() + try: + os.write(fd, secret_data.encode()) + finally: + os.close(fd) + os.unlink(path) +``` + +## Signup / Registration Races + +```python +# Wrong: check username then create +def register(username: str, password: str): + if User.query.filter_by(username=username).first(): + raise UsernameExists() + # Race window! Another request could register same username + user = User(username=username, password=hash(password)) + db.session.add(user) + db.session.commit() + +# Correct: use database constraint, handle exception +def register_safe(username: str, password: str): + user = User(username=username, password=hash(password)) + db.session.add(user) + try: + db.session.commit() # UNIQUE constraint enforced here + except IntegrityError: + db.session.rollback() + raise UsernameExists() +``` + +## Coupon / Discount Races + +```python +# Wrong: check-then-apply coupon +def apply_coupon(order_id: int, coupon_code: str): + coupon = Coupon.query.filter_by(code=coupon_code).first() + if coupon.uses_remaining <= 0: + raise CouponExhausted() + + # Race window! 100 requests could pass the check simultaneously + order = Order.query.get(order_id) + order.discount = coupon.discount + coupon.uses_remaining -= 1 + db.session.commit() + +# Correct: atomic decrement with row lock +def apply_coupon_safe(order_id: int, coupon_code: str): + with db.session.begin(): + result = db.session.execute( + """UPDATE coupons + SET uses_remaining = uses_remaining - 1 + WHERE code = :code AND uses_remaining > 0 + RETURNING discount""", + {"code": coupon_code} + ) + row = result.fetchone() + if not row: + raise CouponExhausted() + + db.session.execute( + "UPDATE orders SET discount = :discount WHERE id = :id", + {"discount": row.discount, "id": order_id} + ) +``` + +## Edge Cases + +- Rate limiters with race conditions allow bursts +- Session creation races can create duplicates +- Inventory/stock decrements need atomic operations +- Distributed systems need distributed locks (Redis, etcd) +- File permission checks before open (symlink attacks) +- Signal handlers can interrupt between check and use diff --git a/ssrf.md b/ssrf.md new file mode 100644 index 0000000..adb0835 --- /dev/null +++ b/ssrf.md @@ -0,0 +1,174 @@ +# Server-Side Request Forgery (SSRF) + +## Rule + +Never let user input control URLs for server-side requests. Validate and allowlist destinations. + +**Source:** [CWE-918: Server-Side Request Forgery](https://cwe.mitre.org/data/definitions/918.html) + +## Why It's Dangerous + +SSRF lets attackers: +- Access internal services (metadata APIs, databases, admin panels) +- Bypass firewalls (server is inside the network) +- Port scan internal infrastructure +- Read local files (`file://`) +- Exfiltrate data through DNS + +## Cloud Metadata Endpoints (Critical Targets) + +| Cloud | Metadata URL | +|-------|--------------| +| AWS | `http://169.254.169.254/latest/meta-data/` | +| GCP | `http://metadata.google.internal/` | +| Azure | `http://169.254.169.254/metadata/instance` | +| DigitalOcean | `http://169.254.169.254/metadata/v1/` | + +## Correct Pattern + +```python +from urllib.parse import urlparse +import ipaddress +import socket + +# Allowlist of permitted domains +ALLOWED_HOSTS = {"api.example.com", "cdn.example.com"} + +def is_safe_url(url: str) -> bool: + """Validate URL against SSRF attacks.""" + try: + parsed = urlparse(url) + + # Only allow HTTPS + if parsed.scheme != "https": + return False + + # Check against allowlist + if parsed.hostname not in ALLOWED_HOSTS: + return False + + # Resolve and check IP + ip = socket.gethostbyname(parsed.hostname) + ip_obj = ipaddress.ip_address(ip) + + # Block private/reserved ranges + if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved: + return False + + # Block link-local (metadata endpoints) + if ip_obj.is_link_local: + return False + + return True + except Exception: + return False + +def fetch_url(url: str) -> bytes: + """Safely fetch a URL after validation.""" + if not is_safe_url(url): + raise ValueError("URL not allowed") + + # Use timeout, disable redirects initially + response = requests.get(url, timeout=10, allow_redirects=False) + + # If redirect, validate destination too + if response.is_redirect: + redirect_url = response.headers.get("Location") + if not is_safe_url(redirect_url): + raise ValueError("Redirect to disallowed URL") + + return response.content +``` + +## Incorrect Pattern + +```python +import requests + +# Wrong: direct user input to URL +def fetch_user_url(url: str) -> bytes: + return requests.get(url).content + +# Wrong: URL in query parameter +@app.route("/proxy") +def proxy(): + url = request.args.get("url") + return requests.get(url).content + +# Wrong: blocklist instead of allowlist +BLOCKED = ["169.254.169.254", "localhost", "127.0.0.1"] +def is_safe(url): + return urlparse(url).hostname not in BLOCKED + # Bypassed by: http://2130706433 (decimal IP) + # Bypassed by: http://0x7f000001 (hex IP) + # Bypassed by: http://127.1 (short form) + # Bypassed by: DNS rebinding + +# Wrong: checking URL before resolution +def check_url(url): + parsed = urlparse(url) + if parsed.hostname == "internal.corp": # Attacker uses their DNS + return False + return True +``` + +## DNS Rebinding Attack + +```python +# Attack scenario: +# 1. Attacker controls evil.com DNS +# 2. First resolution: evil.com -> 1.2.3.4 (passes validation) +# 3. TTL expires during request processing +# 4. Second resolution: evil.com -> 169.254.169.254 (metadata!) + +# Defense: resolve once, pin IP for the request +def fetch_with_pinned_ip(url: str) -> bytes: + parsed = urlparse(url) + ip = socket.gethostbyname(parsed.hostname) + + if not is_safe_ip(ip): + raise ValueError("Resolved to unsafe IP") + + # Replace hostname with IP in request + # Include original Host header for virtual hosting + response = requests.get( + url.replace(parsed.hostname, ip), + headers={"Host": parsed.hostname}, + timeout=10 + ) + return response.content +``` + +## Webhook/Callback Validation + +```python +# Webhooks are high-risk SSRF vectors +class WebhookConfig: + def __init__(self, url: str): + if not is_safe_url(url): + raise ValueError("Invalid webhook URL") + + # Additional webhook-specific checks + parsed = urlparse(url) + if parsed.port and parsed.port not in (80, 443): + raise ValueError("Non-standard port not allowed") + + self.url = url + +# At delivery time, re-validate (URL could have been stored long ago) +def deliver_webhook(config: WebhookConfig, payload: dict): + if not is_safe_url(config.url): # Re-check! + log.warning("Webhook URL no longer safe", url=config.url) + return + + requests.post(config.url, json=payload, timeout=5) +``` + +## Edge Cases + +- URL shorteners can hide malicious destinations +- IPv6 addresses need separate validation +- Protocol smuggling (`gopher://`, `dict://`) +- Unicode/punycode domain tricks +- Partial URLs concatenated with base URL +- Stored URLs (webhooks) may become unsafe over time