5b9f30e663
High-priority patterns from completeness review: - ssrf.md: metadata endpoints, DNS rebinding, webhook validation - race-conditions.md: TOCTOU, atomic operations, file/db races - jwt-security.md: algorithm confusion, kid injection, refresh tokens Now 16 patterns covering comprehensive web application security.
4.9 KiB
4.9 KiB
Server-Side Request Forgery (SSRF)
Rule
Never let user input control URLs for server-side requests. Validate and allowlist destinations.
Source: CWE-918: Server-Side Request Forgery
Why It's Dangerous
SSRF lets attackers:
- Access internal services (metadata APIs, databases, admin panels)
- Bypass firewalls (server is inside the network)
- Port scan internal infrastructure
- Read local files (
file://) - Exfiltrate data through DNS
Cloud Metadata Endpoints (Critical Targets)
| Cloud | Metadata URL |
|---|---|
| AWS | http://169.254.169.254/latest/meta-data/ |
| GCP | http://metadata.google.internal/ |
| Azure | http://169.254.169.254/metadata/instance |
| DigitalOcean | http://169.254.169.254/metadata/v1/ |
Correct Pattern
from urllib.parse import urlparse
import ipaddress
import socket
# Allowlist of permitted domains
ALLOWED_HOSTS = {"api.example.com", "cdn.example.com"}
def is_safe_url(url: str) -> bool:
"""Validate URL against SSRF attacks."""
try:
parsed = urlparse(url)
# Only allow HTTPS
if parsed.scheme != "https":
return False
# Check against allowlist
if parsed.hostname not in ALLOWED_HOSTS:
return False
# Resolve and check IP
ip = socket.gethostbyname(parsed.hostname)
ip_obj = ipaddress.ip_address(ip)
# Block private/reserved ranges
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved:
return False
# Block link-local (metadata endpoints)
if ip_obj.is_link_local:
return False
return True
except Exception:
return False
def fetch_url(url: str) -> bytes:
"""Safely fetch a URL after validation."""
if not is_safe_url(url):
raise ValueError("URL not allowed")
# Use timeout, disable redirects initially
response = requests.get(url, timeout=10, allow_redirects=False)
# If redirect, validate destination too
if response.is_redirect:
redirect_url = response.headers.get("Location")
if not is_safe_url(redirect_url):
raise ValueError("Redirect to disallowed URL")
return response.content
Incorrect Pattern
import requests
# Wrong: direct user input to URL
def fetch_user_url(url: str) -> bytes:
return requests.get(url).content
# Wrong: URL in query parameter
@app.route("/proxy")
def proxy():
url = request.args.get("url")
return requests.get(url).content
# Wrong: blocklist instead of allowlist
BLOCKED = ["169.254.169.254", "localhost", "127.0.0.1"]
def is_safe(url):
return urlparse(url).hostname not in BLOCKED
# Bypassed by: http://2130706433 (decimal IP)
# Bypassed by: http://0x7f000001 (hex IP)
# Bypassed by: http://127.1 (short form)
# Bypassed by: DNS rebinding
# Wrong: checking URL before resolution
def check_url(url):
parsed = urlparse(url)
if parsed.hostname == "internal.corp": # Attacker uses their DNS
return False
return True
DNS Rebinding Attack
# Attack scenario:
# 1. Attacker controls evil.com DNS
# 2. First resolution: evil.com -> 1.2.3.4 (passes validation)
# 3. TTL expires during request processing
# 4. Second resolution: evil.com -> 169.254.169.254 (metadata!)
# Defense: resolve once, pin IP for the request
def fetch_with_pinned_ip(url: str) -> bytes:
parsed = urlparse(url)
ip = socket.gethostbyname(parsed.hostname)
if not is_safe_ip(ip):
raise ValueError("Resolved to unsafe IP")
# Replace hostname with IP in request
# Include original Host header for virtual hosting
response = requests.get(
url.replace(parsed.hostname, ip),
headers={"Host": parsed.hostname},
timeout=10
)
return response.content
Webhook/Callback Validation
# Webhooks are high-risk SSRF vectors
class WebhookConfig:
def __init__(self, url: str):
if not is_safe_url(url):
raise ValueError("Invalid webhook URL")
# Additional webhook-specific checks
parsed = urlparse(url)
if parsed.port and parsed.port not in (80, 443):
raise ValueError("Non-standard port not allowed")
self.url = url
# At delivery time, re-validate (URL could have been stored long ago)
def deliver_webhook(config: WebhookConfig, payload: dict):
if not is_safe_url(config.url): # Re-check!
log.warning("Webhook URL no longer safe", url=config.url)
return
requests.post(config.url, json=payload, timeout=5)
Edge Cases
- URL shorteners can hide malicious destinations
- IPv6 addresses need separate validation
- Protocol smuggling (
gopher://,dict://) - Unicode/punycode domain tricks
- Partial URLs concatenated with base URL
- Stored URLs (webhooks) may become unsafe over time