Files
Rodin 5b9f30e663 Add SSRF, race conditions, JWT security patterns
High-priority patterns from completeness review:
- ssrf.md: metadata endpoints, DNS rebinding, webhook validation
- race-conditions.md: TOCTOU, atomic operations, file/db races
- jwt-security.md: algorithm confusion, kid injection, refresh tokens

Now 16 patterns covering comprehensive web application security.
2026-05-10 23:17:54 -07:00

4.9 KiB

Server-Side Request Forgery (SSRF)

Rule

Never let user input control URLs for server-side requests. Validate and allowlist destinations.

Source: CWE-918: Server-Side Request Forgery

Why It's Dangerous

SSRF lets attackers:

  • Access internal services (metadata APIs, databases, admin panels)
  • Bypass firewalls (server is inside the network)
  • Port scan internal infrastructure
  • Read local files (file://)
  • Exfiltrate data through DNS

Cloud Metadata Endpoints (Critical Targets)

Cloud Metadata URL
AWS http://169.254.169.254/latest/meta-data/
GCP http://metadata.google.internal/
Azure http://169.254.169.254/metadata/instance
DigitalOcean http://169.254.169.254/metadata/v1/

Correct Pattern

from urllib.parse import urlparse
import ipaddress
import socket

# Allowlist of permitted domains
ALLOWED_HOSTS = {"api.example.com", "cdn.example.com"}

def is_safe_url(url: str) -> bool:
    """Validate URL against SSRF attacks."""
    try:
        parsed = urlparse(url)
        
        # Only allow HTTPS
        if parsed.scheme != "https":
            return False
        
        # Check against allowlist
        if parsed.hostname not in ALLOWED_HOSTS:
            return False
        
        # Resolve and check IP
        ip = socket.gethostbyname(parsed.hostname)
        ip_obj = ipaddress.ip_address(ip)
        
        # Block private/reserved ranges
        if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved:
            return False
        
        # Block link-local (metadata endpoints)
        if ip_obj.is_link_local:
            return False
        
        return True
    except Exception:
        return False

def fetch_url(url: str) -> bytes:
    """Safely fetch a URL after validation."""
    if not is_safe_url(url):
        raise ValueError("URL not allowed")
    
    # Use timeout, disable redirects initially
    response = requests.get(url, timeout=10, allow_redirects=False)
    
    # If redirect, validate destination too
    if response.is_redirect:
        redirect_url = response.headers.get("Location")
        if not is_safe_url(redirect_url):
            raise ValueError("Redirect to disallowed URL")
    
    return response.content

Incorrect Pattern

import requests

# Wrong: direct user input to URL
def fetch_user_url(url: str) -> bytes:
    return requests.get(url).content

# Wrong: URL in query parameter
@app.route("/proxy")
def proxy():
    url = request.args.get("url")
    return requests.get(url).content

# Wrong: blocklist instead of allowlist
BLOCKED = ["169.254.169.254", "localhost", "127.0.0.1"]
def is_safe(url):
    return urlparse(url).hostname not in BLOCKED
    # Bypassed by: http://2130706433 (decimal IP)
    # Bypassed by: http://0x7f000001 (hex IP)
    # Bypassed by: http://127.1 (short form)
    # Bypassed by: DNS rebinding

# Wrong: checking URL before resolution
def check_url(url):
    parsed = urlparse(url)
    if parsed.hostname == "internal.corp":  # Attacker uses their DNS
        return False
    return True

DNS Rebinding Attack

# Attack scenario:
# 1. Attacker controls evil.com DNS
# 2. First resolution: evil.com -> 1.2.3.4 (passes validation)
# 3. TTL expires during request processing
# 4. Second resolution: evil.com -> 169.254.169.254 (metadata!)

# Defense: resolve once, pin IP for the request
def fetch_with_pinned_ip(url: str) -> bytes:
    parsed = urlparse(url)
    ip = socket.gethostbyname(parsed.hostname)
    
    if not is_safe_ip(ip):
        raise ValueError("Resolved to unsafe IP")
    
    # Replace hostname with IP in request
    # Include original Host header for virtual hosting
    response = requests.get(
        url.replace(parsed.hostname, ip),
        headers={"Host": parsed.hostname},
        timeout=10
    )
    return response.content

Webhook/Callback Validation

# Webhooks are high-risk SSRF vectors
class WebhookConfig:
    def __init__(self, url: str):
        if not is_safe_url(url):
            raise ValueError("Invalid webhook URL")
        
        # Additional webhook-specific checks
        parsed = urlparse(url)
        if parsed.port and parsed.port not in (80, 443):
            raise ValueError("Non-standard port not allowed")
        
        self.url = url

# At delivery time, re-validate (URL could have been stored long ago)
def deliver_webhook(config: WebhookConfig, payload: dict):
    if not is_safe_url(config.url):  # Re-check!
        log.warning("Webhook URL no longer safe", url=config.url)
        return
    
    requests.post(config.url, json=payload, timeout=5)

Edge Cases

  • URL shorteners can hide malicious destinations
  • IPv6 addresses need separate validation
  • Protocol smuggling (gopher://, dict://)
  • Unicode/punycode domain tricks
  • Partial URLs concatenated with base URL
  • Stored URLs (webhooks) may become unsafe over time