5b9f30e663
High-priority patterns from completeness review: - ssrf.md: metadata endpoints, DNS rebinding, webhook validation - race-conditions.md: TOCTOU, atomic operations, file/db races - jwt-security.md: algorithm confusion, kid injection, refresh tokens Now 16 patterns covering comprehensive web application security.
167 lines
5.1 KiB
Markdown
167 lines
5.1 KiB
Markdown
# JWT Security
|
|
|
|
## Rule
|
|
|
|
Verify algorithm, signature, issuer, audience, and expiration. Never trust the header blindly.
|
|
|
|
**Source:** [RFC 7519: JSON Web Token](https://datatracker.ietf.org/doc/html/rfc7519)
|
|
|
|
## Common JWT Attacks
|
|
|
|
| Attack | Description | Defense |
|
|
|--------|-------------|---------|
|
|
| alg=none | Header specifies no signature | Reject `none` algorithm |
|
|
| Algorithm confusion | RS256 → HS256 with public key as secret | Allowlist algorithms |
|
|
| Weak secret | Brute-forceable HMAC secret | Min 256-bit random secret |
|
|
| Missing expiration | Token valid forever | Require `exp` claim |
|
|
| kid injection | Header `kid` used in SQL/file path | Sanitize `kid` value |
|
|
| JKU/X5U injection | Fetch attacker's keys | Ignore or allowlist URLs |
|
|
|
|
## Correct Pattern
|
|
|
|
```python
|
|
import jwt
|
|
from datetime import datetime, timedelta
|
|
|
|
# Configuration - fixed, not from token
|
|
ALGORITHM = "RS256" # Asymmetric preferred
|
|
PUBLIC_KEY = load_public_key("keys/public.pem")
|
|
PRIVATE_KEY = load_private_key("keys/private.pem")
|
|
ISSUER = "https://auth.example.com"
|
|
AUDIENCE = "https://api.example.com"
|
|
|
|
def create_token(user_id: str, roles: list[str]) -> str:
|
|
"""Create a JWT with proper claims."""
|
|
now = datetime.utcnow()
|
|
payload = {
|
|
"sub": user_id,
|
|
"roles": roles,
|
|
"iat": now,
|
|
"exp": now + timedelta(hours=1), # Short expiration
|
|
"iss": ISSUER,
|
|
"aud": AUDIENCE,
|
|
}
|
|
return jwt.encode(payload, PRIVATE_KEY, algorithm=ALGORITHM)
|
|
|
|
def verify_token(token: str) -> dict:
|
|
"""Verify JWT with strict validation."""
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
PUBLIC_KEY,
|
|
algorithms=[ALGORITHM], # Allowlist, not from token!
|
|
issuer=ISSUER,
|
|
audience=AUDIENCE,
|
|
options={
|
|
"require": ["exp", "iat", "sub", "iss", "aud"],
|
|
"verify_exp": True,
|
|
"verify_iat": True,
|
|
"verify_iss": True,
|
|
"verify_aud": True,
|
|
}
|
|
)
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
raise AuthError("Token expired")
|
|
except jwt.InvalidTokenError as e:
|
|
raise AuthError(f"Invalid token: {e}")
|
|
```
|
|
|
|
## Incorrect Pattern
|
|
|
|
```python
|
|
import jwt
|
|
|
|
# Wrong: algorithm from token header
|
|
def bad_verify(token: str) -> dict:
|
|
header = jwt.get_unverified_header(token)
|
|
alg = header["algorithm"] # Attacker controls this!
|
|
return jwt.decode(token, SECRET, algorithms=[alg])
|
|
|
|
# Wrong: no algorithm restriction
|
|
def bad_verify_2(token: str) -> dict:
|
|
return jwt.decode(token, SECRET) # Accepts any algorithm
|
|
|
|
# Wrong: weak secret
|
|
SECRET = "secret123" # Trivially brute-forced
|
|
|
|
# Wrong: no expiration check
|
|
def bad_verify_3(token: str) -> dict:
|
|
return jwt.decode(token, SECRET, options={"verify_exp": False})
|
|
|
|
# Wrong: kid used in file path
|
|
def get_key(token: str):
|
|
header = jwt.get_unverified_header(token)
|
|
kid = header["kid"]
|
|
# Path traversal! kid = "../../../etc/passwd"
|
|
return open(f"keys/{kid}.pem").read()
|
|
```
|
|
|
|
## Algorithm Confusion Attack
|
|
|
|
```python
|
|
# Attack scenario:
|
|
# 1. Server uses RS256 (asymmetric)
|
|
# 2. Attacker changes header to HS256 (symmetric)
|
|
# 3. Attacker signs with the PUBLIC key as HMAC secret
|
|
# 4. Vulnerable server verifies with public key
|
|
# 5. Signature matches! Token accepted
|
|
|
|
# Vulnerable code
|
|
def vulnerable_verify(token: str, public_key: str):
|
|
# If alg=HS256, this uses public_key as HMAC secret
|
|
return jwt.decode(token, public_key, algorithms=["RS256", "HS256"])
|
|
|
|
# Secure code - explicit algorithm
|
|
def secure_verify(token: str, public_key: str):
|
|
return jwt.decode(token, public_key, algorithms=["RS256"])
|
|
```
|
|
|
|
## Refresh Token Pattern
|
|
|
|
```python
|
|
from secrets import token_urlsafe
|
|
|
|
# Access token: short-lived JWT (15 min)
|
|
# Refresh token: long-lived opaque token in database
|
|
|
|
def issue_tokens(user_id: str) -> tuple[str, str]:
|
|
access_token = create_token(user_id, exp_minutes=15)
|
|
refresh_token = token_urlsafe(32) # Opaque, not JWT
|
|
|
|
# Store refresh token in database with metadata
|
|
RefreshToken.create(
|
|
token_hash=hash(refresh_token),
|
|
user_id=user_id,
|
|
expires_at=datetime.utcnow() + timedelta(days=30),
|
|
device_info=get_device_info()
|
|
)
|
|
|
|
return access_token, refresh_token
|
|
|
|
def refresh_access_token(refresh_token: str) -> str:
|
|
"""Exchange refresh token for new access token."""
|
|
stored = RefreshToken.query.filter_by(
|
|
token_hash=hash(refresh_token)
|
|
).first()
|
|
|
|
if not stored or stored.is_expired or stored.is_revoked:
|
|
raise AuthError("Invalid refresh token")
|
|
|
|
# Rotate refresh token (one-time use)
|
|
stored.revoke()
|
|
new_access, new_refresh = issue_tokens(stored.user_id)
|
|
|
|
return new_access, new_refresh
|
|
```
|
|
|
|
## Edge Cases
|
|
|
|
- JWTs in URLs leak to logs and referrer headers
|
|
- Token storage: `httpOnly` cookies vs localStorage (XSS risk)
|
|
- Clock skew between servers affects `exp`/`iat` validation
|
|
- Long-lived tokens: implement revocation list
|
|
- `nbf` (not before) should be validated
|
|
- Nested JWTs (JWE wrapping JWS) need careful handling
|
|
- Don't put sensitive data in JWT payload (base64 is not encryption)
|