5b9f30e663
High-priority patterns from completeness review: - ssrf.md: metadata endpoints, DNS rebinding, webhook validation - race-conditions.md: TOCTOU, atomic operations, file/db races - jwt-security.md: algorithm confusion, kid injection, refresh tokens Now 16 patterns covering comprehensive web application security.
175 lines
4.9 KiB
Markdown
175 lines
4.9 KiB
Markdown
# Server-Side Request Forgery (SSRF)
|
|
|
|
## Rule
|
|
|
|
Never let user input control URLs for server-side requests. Validate and allowlist destinations.
|
|
|
|
**Source:** [CWE-918: Server-Side Request Forgery](https://cwe.mitre.org/data/definitions/918.html)
|
|
|
|
## Why It's Dangerous
|
|
|
|
SSRF lets attackers:
|
|
- Access internal services (metadata APIs, databases, admin panels)
|
|
- Bypass firewalls (server is inside the network)
|
|
- Port scan internal infrastructure
|
|
- Read local files (`file://`)
|
|
- Exfiltrate data through DNS
|
|
|
|
## Cloud Metadata Endpoints (Critical Targets)
|
|
|
|
| Cloud | Metadata URL |
|
|
|-------|--------------|
|
|
| AWS | `http://169.254.169.254/latest/meta-data/` |
|
|
| GCP | `http://metadata.google.internal/` |
|
|
| Azure | `http://169.254.169.254/metadata/instance` |
|
|
| DigitalOcean | `http://169.254.169.254/metadata/v1/` |
|
|
|
|
## Correct Pattern
|
|
|
|
```python
|
|
from urllib.parse import urlparse
|
|
import ipaddress
|
|
import socket
|
|
|
|
# Allowlist of permitted domains
|
|
ALLOWED_HOSTS = {"api.example.com", "cdn.example.com"}
|
|
|
|
def is_safe_url(url: str) -> bool:
|
|
"""Validate URL against SSRF attacks."""
|
|
try:
|
|
parsed = urlparse(url)
|
|
|
|
# Only allow HTTPS
|
|
if parsed.scheme != "https":
|
|
return False
|
|
|
|
# Check against allowlist
|
|
if parsed.hostname not in ALLOWED_HOSTS:
|
|
return False
|
|
|
|
# Resolve and check IP
|
|
ip = socket.gethostbyname(parsed.hostname)
|
|
ip_obj = ipaddress.ip_address(ip)
|
|
|
|
# Block private/reserved ranges
|
|
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved:
|
|
return False
|
|
|
|
# Block link-local (metadata endpoints)
|
|
if ip_obj.is_link_local:
|
|
return False
|
|
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def fetch_url(url: str) -> bytes:
|
|
"""Safely fetch a URL after validation."""
|
|
if not is_safe_url(url):
|
|
raise ValueError("URL not allowed")
|
|
|
|
# Use timeout, disable redirects initially
|
|
response = requests.get(url, timeout=10, allow_redirects=False)
|
|
|
|
# If redirect, validate destination too
|
|
if response.is_redirect:
|
|
redirect_url = response.headers.get("Location")
|
|
if not is_safe_url(redirect_url):
|
|
raise ValueError("Redirect to disallowed URL")
|
|
|
|
return response.content
|
|
```
|
|
|
|
## Incorrect Pattern
|
|
|
|
```python
|
|
import requests
|
|
|
|
# Wrong: direct user input to URL
|
|
def fetch_user_url(url: str) -> bytes:
|
|
return requests.get(url).content
|
|
|
|
# Wrong: URL in query parameter
|
|
@app.route("/proxy")
|
|
def proxy():
|
|
url = request.args.get("url")
|
|
return requests.get(url).content
|
|
|
|
# Wrong: blocklist instead of allowlist
|
|
BLOCKED = ["169.254.169.254", "localhost", "127.0.0.1"]
|
|
def is_safe(url):
|
|
return urlparse(url).hostname not in BLOCKED
|
|
# Bypassed by: http://2130706433 (decimal IP)
|
|
# Bypassed by: http://0x7f000001 (hex IP)
|
|
# Bypassed by: http://127.1 (short form)
|
|
# Bypassed by: DNS rebinding
|
|
|
|
# Wrong: checking URL before resolution
|
|
def check_url(url):
|
|
parsed = urlparse(url)
|
|
if parsed.hostname == "internal.corp": # Attacker uses their DNS
|
|
return False
|
|
return True
|
|
```
|
|
|
|
## DNS Rebinding Attack
|
|
|
|
```python
|
|
# Attack scenario:
|
|
# 1. Attacker controls evil.com DNS
|
|
# 2. First resolution: evil.com -> 1.2.3.4 (passes validation)
|
|
# 3. TTL expires during request processing
|
|
# 4. Second resolution: evil.com -> 169.254.169.254 (metadata!)
|
|
|
|
# Defense: resolve once, pin IP for the request
|
|
def fetch_with_pinned_ip(url: str) -> bytes:
|
|
parsed = urlparse(url)
|
|
ip = socket.gethostbyname(parsed.hostname)
|
|
|
|
if not is_safe_ip(ip):
|
|
raise ValueError("Resolved to unsafe IP")
|
|
|
|
# Replace hostname with IP in request
|
|
# Include original Host header for virtual hosting
|
|
response = requests.get(
|
|
url.replace(parsed.hostname, ip),
|
|
headers={"Host": parsed.hostname},
|
|
timeout=10
|
|
)
|
|
return response.content
|
|
```
|
|
|
|
## Webhook/Callback Validation
|
|
|
|
```python
|
|
# Webhooks are high-risk SSRF vectors
|
|
class WebhookConfig:
|
|
def __init__(self, url: str):
|
|
if not is_safe_url(url):
|
|
raise ValueError("Invalid webhook URL")
|
|
|
|
# Additional webhook-specific checks
|
|
parsed = urlparse(url)
|
|
if parsed.port and parsed.port not in (80, 443):
|
|
raise ValueError("Non-standard port not allowed")
|
|
|
|
self.url = url
|
|
|
|
# At delivery time, re-validate (URL could have been stored long ago)
|
|
def deliver_webhook(config: WebhookConfig, payload: dict):
|
|
if not is_safe_url(config.url): # Re-check!
|
|
log.warning("Webhook URL no longer safe", url=config.url)
|
|
return
|
|
|
|
requests.post(config.url, json=payload, timeout=5)
|
|
```
|
|
|
|
## Edge Cases
|
|
|
|
- URL shorteners can hide malicious destinations
|
|
- IPv6 addresses need separate validation
|
|
- Protocol smuggling (`gopher://`, `dict://`)
|
|
- Unicode/punycode domain tricks
|
|
- Partial URLs concatenated with base URL
|
|
- Stored URLs (webhooks) may become unsafe over time
|