8a94a08511
Now covers all OWASP Top 10:2025 categories: - A03: supply-chain.md (SolarWinds, Bybit, npm worm examples) - A04: cryptography.md (algorithm recommendations, key management) - A08: deserialization.md (pickle, yaml, language-specific risks) - A10: error-handling.md (fail closed, error messages)
152 lines
4.0 KiB
Markdown
152 lines
4.0 KiB
Markdown
# Insecure Deserialization
|
|
|
|
## Rule
|
|
|
|
Never deserialize untrusted data without validation. Prefer data-only formats.
|
|
|
|
**Source:** [OWASP Top 10 2025 - A08 Software or Data Integrity Failures](https://owasp.org/Top10/2025/A08_2025-Software_or_Data_Integrity_Failures/)
|
|
|
|
## Why It's Dangerous
|
|
|
|
Deserialization can:
|
|
- Execute arbitrary code
|
|
- Instantiate arbitrary objects
|
|
- Bypass authentication
|
|
- Cause denial of service
|
|
|
|
## Correct Pattern
|
|
|
|
```python
|
|
import json
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
# Prefer data-only formats (JSON, not pickle)
|
|
def safe_deserialize(data: str) -> dict:
|
|
"""Deserialize JSON (data-only, no code execution)."""
|
|
return json.loads(data)
|
|
|
|
# Validate structure after deserialization
|
|
@dataclass
|
|
class UserInput:
|
|
name: str
|
|
email: str
|
|
age: int
|
|
|
|
def parse_user_input(raw: str) -> UserInput:
|
|
data = json.loads(raw)
|
|
|
|
# Validate required fields
|
|
if not isinstance(data.get("name"), str):
|
|
raise ValueError("Invalid name")
|
|
if not isinstance(data.get("email"), str):
|
|
raise ValueError("Invalid email")
|
|
if not isinstance(data.get("age"), int):
|
|
raise ValueError("Invalid age")
|
|
|
|
return UserInput(
|
|
name=data["name"],
|
|
email=data["email"],
|
|
age=data["age"]
|
|
)
|
|
|
|
# If you must use object serialization, allowlist classes
|
|
ALLOWED_CLASSES = {"User", "Order", "Product"}
|
|
|
|
def safe_unpickle(data: bytes, allowed: set[str]) -> Any:
|
|
"""Restricted unpickler that only allows specific classes."""
|
|
import pickle
|
|
import io
|
|
|
|
class RestrictedUnpickler(pickle.Unpickler):
|
|
def find_class(self, module, name):
|
|
if name not in allowed:
|
|
raise pickle.UnpicklingError(f"Class {name} not allowed")
|
|
return super().find_class(module, name)
|
|
|
|
return RestrictedUnpickler(io.BytesIO(data)).load()
|
|
```
|
|
|
|
## Incorrect Pattern
|
|
|
|
```python
|
|
import pickle
|
|
import yaml
|
|
|
|
# Wrong: pickle from untrusted source
|
|
def load_session(cookie_value: bytes):
|
|
return pickle.loads(cookie_value) # RCE!
|
|
|
|
# Wrong: yaml.load (can execute code)
|
|
def load_config(yaml_string: str):
|
|
return yaml.load(yaml_string) # Should be yaml.safe_load
|
|
|
|
# Wrong: eval/exec on user data
|
|
def parse_expression(expr: str):
|
|
return eval(expr) # Arbitrary code execution
|
|
|
|
# Wrong: deserializing without validation
|
|
def process_request(data: bytes):
|
|
obj = pickle.loads(data)
|
|
obj.execute() # No type checking!
|
|
```
|
|
|
|
## Language-Specific Risks
|
|
|
|
| Language | Dangerous | Safe Alternative |
|
|
|----------|-----------|------------------|
|
|
| Python | `pickle.loads()` | JSON, restricted unpickler |
|
|
| Java | `ObjectInputStream` | JSON, allowlisted classes |
|
|
| PHP | `unserialize()` | `json_decode()` |
|
|
| Ruby | `Marshal.load()` | JSON, YAML.safe_load |
|
|
| JavaScript | `eval(JSON)` | `JSON.parse()` |
|
|
| .NET | `BinaryFormatter` | `JsonSerializer` |
|
|
|
|
## YAML Specific
|
|
|
|
```python
|
|
import yaml
|
|
|
|
# Wrong: yaml.load allows arbitrary Python objects
|
|
data = yaml.load(untrusted_yaml) # Can execute code!
|
|
# Attack: "!!python/object/apply:os.system ['rm -rf /']"
|
|
|
|
# Correct: yaml.safe_load only allows basic types
|
|
data = yaml.safe_load(untrusted_yaml)
|
|
```
|
|
|
|
## Signature Verification
|
|
|
|
If you must accept serialized objects:
|
|
|
|
```python
|
|
import hmac
|
|
import hashlib
|
|
|
|
SECRET_KEY = get_secret("serialization_key")
|
|
|
|
def sign_data(data: bytes) -> bytes:
|
|
"""Sign serialized data."""
|
|
signature = hmac.new(SECRET_KEY, data, hashlib.sha256).digest()
|
|
return signature + data
|
|
|
|
def verify_and_load(signed_data: bytes) -> Any:
|
|
"""Verify signature before deserializing."""
|
|
signature = signed_data[:32]
|
|
data = signed_data[32:]
|
|
|
|
expected = hmac.new(SECRET_KEY, data, hashlib.sha256).digest()
|
|
if not hmac.compare_digest(signature, expected):
|
|
raise SecurityError("Invalid signature")
|
|
|
|
return restricted_deserialize(data)
|
|
```
|
|
|
|
## Edge Cases
|
|
|
|
- Base64-encoded serialized data in cookies
|
|
- Serialized objects in database fields
|
|
- Message queues with serialized payloads
|
|
- Session data in Redis/Memcached
|
|
- Java RMI (Remote Method Invocation)
|