Add supply-chain, deserialization, cryptography, error-handling patterns
Now covers all OWASP Top 10:2025 categories: - A03: supply-chain.md (SolarWinds, Bybit, npm worm examples) - A04: cryptography.md (algorithm recommendations, key management) - A08: deserialization.md (pickle, yaml, language-specific risks) - A10: error-handling.md (fail closed, error messages)
This commit is contained in:
@@ -6,36 +6,62 @@ Scannable patterns for security code review. Each file has:
|
||||
- **Incorrect Pattern** — common mistakes
|
||||
- **Edge Cases** — gotchas
|
||||
|
||||
Based on OWASP Top 10:2025 and recent security research.
|
||||
|
||||
## Patterns
|
||||
|
||||
### Fundamentals
|
||||
|
||||
| File | Topic |
|
||||
|------|-------|
|
||||
| [secure-defaults.md](secure-defaults.md) | Fail closed, deny by default, defense in depth |
|
||||
| [input-validation.md](input-validation.md) | Allowlist > blocklist, validate at boundaries |
|
||||
| [credential-handling.md](credential-handling.md) | No hardcoded secrets, environment/secret manager |
|
||||
| [audit-logging.md](audit-logging.md) | What to log, what not to log |
|
||||
| File | Topic | OWASP 2025 |
|
||||
|------|-------|------------|
|
||||
| [secure-defaults.md](secure-defaults.md) | Fail closed, deny by default, defense in depth | A06 |
|
||||
| [input-validation.md](input-validation.md) | Allowlist > blocklist, validate at boundaries | A03 |
|
||||
| [credential-handling.md](credential-handling.md) | No hardcoded secrets, environment/secret manager | — |
|
||||
| [audit-logging.md](audit-logging.md) | What to log, what not to log | A09 |
|
||||
| [error-handling.md](error-handling.md) | Fail closed, no sensitive info in errors | A10 |
|
||||
|
||||
### Identity
|
||||
|
||||
| File | Topic |
|
||||
|------|-------|
|
||||
| [authentication.md](authentication.md) | Passwords, tokens, MFA, brute force protection |
|
||||
| [authorization.md](authorization.md) | Permission checks, IDOR prevention, privilege escalation |
|
||||
| File | Topic | OWASP 2025 |
|
||||
|------|-------|------------|
|
||||
| [authentication.md](authentication.md) | Passwords, tokens, MFA, brute force protection | A07 |
|
||||
| [authorization.md](authorization.md) | Permission checks, IDOR prevention, privilege escalation | A01 |
|
||||
|
||||
### Attack Prevention
|
||||
|
||||
| File | Topic |
|
||||
|------|-------|
|
||||
| [injection-prevention.md](injection-prevention.md) | SQL, command, template, path traversal |
|
||||
| [dos-prevention.md](dos-prevention.md) | Rate limiting, resource bounds, algorithmic complexity |
|
||||
| [prompt-injection.md](prompt-injection.md) | LLM security, data/instruction separation |
|
||||
| File | Topic | OWASP 2025 |
|
||||
|------|-------|------------|
|
||||
| [injection-prevention.md](injection-prevention.md) | SQL, command, template, path traversal | A05 |
|
||||
| [dos-prevention.md](dos-prevention.md) | Rate limiting, resource bounds, algorithmic complexity | — |
|
||||
| [prompt-injection.md](prompt-injection.md) | LLM security, data/instruction separation | — |
|
||||
| [deserialization.md](deserialization.md) | Untrusted data deserialization, pickle, yaml | A08 |
|
||||
|
||||
### Infrastructure
|
||||
|
||||
| File | Topic | OWASP 2025 |
|
||||
|------|-------|------------|
|
||||
| [supply-chain.md](supply-chain.md) | SBOM, dependency scanning, signed packages | A03 |
|
||||
| [cryptography.md](cryptography.md) | Strong algorithms, key management, TLS | A04 |
|
||||
|
||||
## OWASP Top 10:2025 Coverage
|
||||
|
||||
| # | Category | Pattern |
|
||||
|---|----------|---------|
|
||||
| A01 | Broken Access Control | authorization.md |
|
||||
| A02 | Security Misconfiguration | secure-defaults.md |
|
||||
| A03 | Software Supply Chain Failures | supply-chain.md |
|
||||
| A04 | Cryptographic Failures | cryptography.md |
|
||||
| A05 | Injection | injection-prevention.md |
|
||||
| A06 | Insecure Design | secure-defaults.md |
|
||||
| A07 | Authentication Failures | authentication.md |
|
||||
| A08 | Software or Data Integrity Failures | deserialization.md |
|
||||
| A09 | Security Logging and Alerting Failures | audit-logging.md |
|
||||
| A10 | Mishandling of Exceptional Conditions | error-handling.md |
|
||||
|
||||
## Sources
|
||||
|
||||
- [OWASP Top 10:2025](https://owasp.org/Top10/2025/)
|
||||
- [OWASP Cheat Sheet Series](https://cheatsheetseries.owasp.org/)
|
||||
- [OWASP Top 10](https://owasp.org/Top10/)
|
||||
- [OWASP LLM Top 10](https://owasp.org/www-project-top-10-for-large-language-model-applications/)
|
||||
- [CWE (Common Weakness Enumeration)](https://cwe.mitre.org/)
|
||||
|
||||
|
||||
+140
@@ -0,0 +1,140 @@
|
||||
# Cryptographic Failures
|
||||
|
||||
## Rule
|
||||
|
||||
Use strong, modern algorithms. Never implement your own crypto. Manage keys securely.
|
||||
|
||||
**Source:** [OWASP Top 10 2025 - A04 Cryptographic Failures](https://owasp.org/Top10/2025/A04_2025-Cryptographic_Failures/)
|
||||
|
||||
## Algorithms to Use
|
||||
|
||||
| Purpose | Recommended | Avoid |
|
||||
|---------|-------------|-------|
|
||||
| Symmetric encryption | AES-256-GCM | DES, 3DES, RC4, ECB mode |
|
||||
| Hashing (general) | SHA-256, SHA-3 | MD5, SHA-1 |
|
||||
| Password hashing | bcrypt, Argon2, scrypt | SHA-*, MD5, plain hash |
|
||||
| Key exchange | ECDH, X25519 | RSA < 2048 bits |
|
||||
| Signatures | Ed25519, ECDSA | RSA < 2048 bits |
|
||||
| TLS | 1.2+ | SSL, TLS 1.0, 1.1 |
|
||||
|
||||
## Correct Pattern
|
||||
|
||||
```python
|
||||
from cryptography.fernet import Fernet
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
import os
|
||||
import base64
|
||||
|
||||
# Generate a secure key
|
||||
def generate_key() -> bytes:
|
||||
return Fernet.generate_key()
|
||||
|
||||
# Encrypt data
|
||||
def encrypt(data: bytes, key: bytes) -> bytes:
|
||||
f = Fernet(key)
|
||||
return f.encrypt(data)
|
||||
|
||||
# Decrypt data
|
||||
def decrypt(ciphertext: bytes, key: bytes) -> bytes:
|
||||
f = Fernet(key)
|
||||
return f.decrypt(ciphertext)
|
||||
|
||||
# Derive key from password (for encryption, not storage)
|
||||
def derive_key(password: str, salt: bytes) -> bytes:
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=salt,
|
||||
iterations=600000, # OWASP 2023 recommendation
|
||||
)
|
||||
return base64.urlsafe_b64encode(kdf.derive(password.encode()))
|
||||
|
||||
# Generate secure random values
|
||||
def generate_token(length: int = 32) -> str:
|
||||
return base64.urlsafe_b64encode(os.urandom(length)).decode()
|
||||
```
|
||||
|
||||
## Incorrect Pattern
|
||||
|
||||
```python
|
||||
import hashlib
|
||||
import random
|
||||
|
||||
# Wrong: MD5 for anything security-related
|
||||
hash = hashlib.md5(data).hexdigest()
|
||||
|
||||
# Wrong: SHA-256 for passwords (no salt, too fast)
|
||||
password_hash = hashlib.sha256(password.encode()).hexdigest()
|
||||
|
||||
# Wrong: predictable random
|
||||
token = random.randint(0, 999999) # Not cryptographically secure
|
||||
|
||||
# Wrong: hardcoded key
|
||||
KEY = b"mysecretkey12345"
|
||||
|
||||
# Wrong: ECB mode (patterns visible in ciphertext)
|
||||
from Crypto.Cipher import AES
|
||||
cipher = AES.new(key, AES.MODE_ECB)
|
||||
|
||||
# Wrong: rolling your own crypto
|
||||
def my_encrypt(data, key):
|
||||
return bytes(a ^ b for a, b in zip(data, cycle(key)))
|
||||
```
|
||||
|
||||
## Key Management
|
||||
|
||||
```python
|
||||
import os
|
||||
|
||||
# Load keys from environment or secret manager
|
||||
def get_encryption_key() -> bytes:
|
||||
key = os.environ.get("ENCRYPTION_KEY")
|
||||
if not key:
|
||||
raise RuntimeError("ENCRYPTION_KEY not set")
|
||||
return base64.urlsafe_b64decode(key)
|
||||
|
||||
# Key rotation
|
||||
class KeyManager:
|
||||
def __init__(self):
|
||||
self.current_key_id = os.environ["CURRENT_KEY_ID"]
|
||||
self.keys = self._load_keys()
|
||||
|
||||
def encrypt(self, data: bytes) -> dict:
|
||||
key = self.keys[self.current_key_id]
|
||||
ciphertext = encrypt(data, key)
|
||||
return {"key_id": self.current_key_id, "data": ciphertext}
|
||||
|
||||
def decrypt(self, envelope: dict) -> bytes:
|
||||
key = self.keys[envelope["key_id"]]
|
||||
return decrypt(envelope["data"], key)
|
||||
```
|
||||
|
||||
## TLS Configuration
|
||||
|
||||
```python
|
||||
import ssl
|
||||
|
||||
# Correct: modern TLS settings
|
||||
def create_ssl_context() -> ssl.SSLContext:
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
context.minimum_version = ssl.TLSVersion.TLSv1_2
|
||||
context.verify_mode = ssl.CERT_REQUIRED
|
||||
context.check_hostname = True
|
||||
context.load_default_certs()
|
||||
return context
|
||||
|
||||
# Wrong: disabling verification
|
||||
context = ssl.create_default_context()
|
||||
context.check_hostname = False
|
||||
context.verify_mode = ssl.CERT_NONE # Never do this!
|
||||
```
|
||||
|
||||
## Edge Cases
|
||||
|
||||
- IV/nonce reuse breaks encryption security
|
||||
- Timing attacks on comparison operations
|
||||
- Side-channel attacks on key operations
|
||||
- Key material in swap/core dumps
|
||||
- Encrypted data without integrity (use AEAD)
|
||||
- Insufficient entropy at startup
|
||||
@@ -0,0 +1,151 @@
|
||||
# Insecure Deserialization
|
||||
|
||||
## Rule
|
||||
|
||||
Never deserialize untrusted data without validation. Prefer data-only formats.
|
||||
|
||||
**Source:** [OWASP Top 10 2025 - A08 Software or Data Integrity Failures](https://owasp.org/Top10/2025/A08_2025-Software_or_Data_Integrity_Failures/)
|
||||
|
||||
## Why It's Dangerous
|
||||
|
||||
Deserialization can:
|
||||
- Execute arbitrary code
|
||||
- Instantiate arbitrary objects
|
||||
- Bypass authentication
|
||||
- Cause denial of service
|
||||
|
||||
## Correct Pattern
|
||||
|
||||
```python
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
# Prefer data-only formats (JSON, not pickle)
|
||||
def safe_deserialize(data: str) -> dict:
|
||||
"""Deserialize JSON (data-only, no code execution)."""
|
||||
return json.loads(data)
|
||||
|
||||
# Validate structure after deserialization
|
||||
@dataclass
|
||||
class UserInput:
|
||||
name: str
|
||||
email: str
|
||||
age: int
|
||||
|
||||
def parse_user_input(raw: str) -> UserInput:
|
||||
data = json.loads(raw)
|
||||
|
||||
# Validate required fields
|
||||
if not isinstance(data.get("name"), str):
|
||||
raise ValueError("Invalid name")
|
||||
if not isinstance(data.get("email"), str):
|
||||
raise ValueError("Invalid email")
|
||||
if not isinstance(data.get("age"), int):
|
||||
raise ValueError("Invalid age")
|
||||
|
||||
return UserInput(
|
||||
name=data["name"],
|
||||
email=data["email"],
|
||||
age=data["age"]
|
||||
)
|
||||
|
||||
# If you must use object serialization, allowlist classes
|
||||
ALLOWED_CLASSES = {"User", "Order", "Product"}
|
||||
|
||||
def safe_unpickle(data: bytes, allowed: set[str]) -> Any:
|
||||
"""Restricted unpickler that only allows specific classes."""
|
||||
import pickle
|
||||
import io
|
||||
|
||||
class RestrictedUnpickler(pickle.Unpickler):
|
||||
def find_class(self, module, name):
|
||||
if name not in allowed:
|
||||
raise pickle.UnpicklingError(f"Class {name} not allowed")
|
||||
return super().find_class(module, name)
|
||||
|
||||
return RestrictedUnpickler(io.BytesIO(data)).load()
|
||||
```
|
||||
|
||||
## Incorrect Pattern
|
||||
|
||||
```python
|
||||
import pickle
|
||||
import yaml
|
||||
|
||||
# Wrong: pickle from untrusted source
|
||||
def load_session(cookie_value: bytes):
|
||||
return pickle.loads(cookie_value) # RCE!
|
||||
|
||||
# Wrong: yaml.load (can execute code)
|
||||
def load_config(yaml_string: str):
|
||||
return yaml.load(yaml_string) # Should be yaml.safe_load
|
||||
|
||||
# Wrong: eval/exec on user data
|
||||
def parse_expression(expr: str):
|
||||
return eval(expr) # Arbitrary code execution
|
||||
|
||||
# Wrong: deserializing without validation
|
||||
def process_request(data: bytes):
|
||||
obj = pickle.loads(data)
|
||||
obj.execute() # No type checking!
|
||||
```
|
||||
|
||||
## Language-Specific Risks
|
||||
|
||||
| Language | Dangerous | Safe Alternative |
|
||||
|----------|-----------|------------------|
|
||||
| Python | `pickle.loads()` | JSON, restricted unpickler |
|
||||
| Java | `ObjectInputStream` | JSON, allowlisted classes |
|
||||
| PHP | `unserialize()` | `json_decode()` |
|
||||
| Ruby | `Marshal.load()` | JSON, YAML.safe_load |
|
||||
| JavaScript | `eval(JSON)` | `JSON.parse()` |
|
||||
| .NET | `BinaryFormatter` | `JsonSerializer` |
|
||||
|
||||
## YAML Specific
|
||||
|
||||
```python
|
||||
import yaml
|
||||
|
||||
# Wrong: yaml.load allows arbitrary Python objects
|
||||
data = yaml.load(untrusted_yaml) # Can execute code!
|
||||
# Attack: "!!python/object/apply:os.system ['rm -rf /']"
|
||||
|
||||
# Correct: yaml.safe_load only allows basic types
|
||||
data = yaml.safe_load(untrusted_yaml)
|
||||
```
|
||||
|
||||
## Signature Verification
|
||||
|
||||
If you must accept serialized objects:
|
||||
|
||||
```python
|
||||
import hmac
|
||||
import hashlib
|
||||
|
||||
SECRET_KEY = get_secret("serialization_key")
|
||||
|
||||
def sign_data(data: bytes) -> bytes:
|
||||
"""Sign serialized data."""
|
||||
signature = hmac.new(SECRET_KEY, data, hashlib.sha256).digest()
|
||||
return signature + data
|
||||
|
||||
def verify_and_load(signed_data: bytes) -> Any:
|
||||
"""Verify signature before deserializing."""
|
||||
signature = signed_data[:32]
|
||||
data = signed_data[32:]
|
||||
|
||||
expected = hmac.new(SECRET_KEY, data, hashlib.sha256).digest()
|
||||
if not hmac.compare_digest(signature, expected):
|
||||
raise SecurityError("Invalid signature")
|
||||
|
||||
return restricted_deserialize(data)
|
||||
```
|
||||
|
||||
## Edge Cases
|
||||
|
||||
- Base64-encoded serialized data in cookies
|
||||
- Serialized objects in database fields
|
||||
- Message queues with serialized payloads
|
||||
- Session data in Redis/Memcached
|
||||
- Java RMI (Remote Method Invocation)
|
||||
@@ -0,0 +1,182 @@
|
||||
# Error Handling
|
||||
|
||||
## Rule
|
||||
|
||||
Handle all errors explicitly. Fail closed. Never leak sensitive information in error messages.
|
||||
|
||||
**Source:** [OWASP Top 10 2025 - A10 Mishandling of Exceptional Conditions](https://owasp.org/Top10/2025/A10_2025-Mishandling_of_Exceptional_Conditions/)
|
||||
|
||||
## Fail Closed vs Fail Open
|
||||
|
||||
| Scenario | Fail Closed (Correct) | Fail Open (Wrong) |
|
||||
|----------|----------------------|-------------------|
|
||||
| Auth check errors | Deny access | Allow access |
|
||||
| Input validation errors | Reject request | Process anyway |
|
||||
| Transaction errors | Roll back | Partial commit |
|
||||
| Permission check timeout | Deny | Allow |
|
||||
|
||||
## Correct Pattern
|
||||
|
||||
```python
|
||||
import logging
|
||||
from contextlib import contextmanager
|
||||
|
||||
# Explicit error handling with fail-closed
|
||||
def check_permission(user_id: str, resource_id: str) -> bool:
|
||||
"""Return False on any error — fail closed."""
|
||||
try:
|
||||
permissions = fetch_permissions(user_id)
|
||||
return resource_id in permissions.allowed_resources
|
||||
except Exception as e:
|
||||
logging.exception("Permission check failed", extra={
|
||||
"user_id": user_id,
|
||||
"resource_id": resource_id
|
||||
})
|
||||
return False # Deny on error
|
||||
|
||||
# Transaction rollback on failure
|
||||
@contextmanager
|
||||
def transaction():
|
||||
"""Ensure complete rollback on any failure."""
|
||||
tx = begin_transaction()
|
||||
try:
|
||||
yield tx
|
||||
tx.commit()
|
||||
except Exception:
|
||||
tx.rollback()
|
||||
raise
|
||||
|
||||
def transfer_funds(from_acct: str, to_acct: str, amount: Decimal):
|
||||
with transaction() as tx:
|
||||
debit(tx, from_acct, amount)
|
||||
credit(tx, to_acct, amount)
|
||||
# If credit fails, debit is rolled back
|
||||
|
||||
# Generic error messages to users
|
||||
def handle_request(request):
|
||||
try:
|
||||
return process(request)
|
||||
except ValidationError as e:
|
||||
# Specific, safe error for user
|
||||
return {"error": str(e)}, 400
|
||||
except Exception as e:
|
||||
# Log details internally
|
||||
logging.exception("Unexpected error", extra={
|
||||
"request_id": request.id
|
||||
})
|
||||
# Generic message to user
|
||||
return {"error": "An unexpected error occurred"}, 500
|
||||
```
|
||||
|
||||
## Incorrect Pattern
|
||||
|
||||
```python
|
||||
# Wrong: fail open
|
||||
def check_access(user_id, resource):
|
||||
try:
|
||||
return has_permission(user_id, resource)
|
||||
except:
|
||||
return True # "If in doubt, let them in"
|
||||
|
||||
# Wrong: swallowing exceptions
|
||||
try:
|
||||
process_payment()
|
||||
except:
|
||||
pass # Silently fails, state unknown
|
||||
|
||||
# Wrong: leaking sensitive info
|
||||
except DatabaseError as e:
|
||||
return {"error": f"Database error: {e}"} # Exposes internals
|
||||
|
||||
# Wrong: stack trace to user
|
||||
except Exception as e:
|
||||
import traceback
|
||||
return {"error": traceback.format_exc()}
|
||||
|
||||
# Wrong: partial transaction
|
||||
def transfer(from_acct, to_acct, amount):
|
||||
debit(from_acct, amount)
|
||||
try:
|
||||
credit(to_acct, amount)
|
||||
except:
|
||||
pass # Debit happened but credit didn't!
|
||||
```
|
||||
|
||||
## Error Message Guidelines
|
||||
|
||||
| Internal Log | User-Facing Message |
|
||||
|--------------|---------------------|
|
||||
| `SQLException: column 'password' at line 5` | `An error occurred. Please try again.` |
|
||||
| `FileNotFoundError: /etc/shadow` | `Resource not found.` |
|
||||
| `ConnectionError: redis://prod-cache:6379` | `Service temporarily unavailable.` |
|
||||
| `KeyError: user['admin_token']` | `Invalid request.` |
|
||||
|
||||
## Global Exception Handler
|
||||
|
||||
```python
|
||||
from flask import Flask, jsonify
|
||||
import logging
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
@app.errorhandler(Exception)
|
||||
def handle_exception(e):
|
||||
"""Global handler — catch anything we missed."""
|
||||
# Log full details
|
||||
logging.exception("Unhandled exception")
|
||||
|
||||
# Return generic error to user
|
||||
if app.debug:
|
||||
# Only in dev — never in prod
|
||||
return {"error": str(e)}, 500
|
||||
else:
|
||||
return {"error": "Internal server error"}, 500
|
||||
|
||||
# Rate limit repeated errors (DOS prevention)
|
||||
class ErrorRateLimiter:
|
||||
def __init__(self, max_errors: int = 100, window: int = 60):
|
||||
self.max_errors = max_errors
|
||||
self.window = window
|
||||
self.errors = []
|
||||
|
||||
def record_error(self, error_type: str):
|
||||
now = time.time()
|
||||
self.errors = [t for t in self.errors if now - t < self.window]
|
||||
self.errors.append(now)
|
||||
|
||||
if len(self.errors) > self.max_errors:
|
||||
logging.warning(f"Error rate limit exceeded: {error_type}")
|
||||
# Could trigger alerting or blocking
|
||||
```
|
||||
|
||||
## Unchecked Return Values
|
||||
|
||||
```python
|
||||
# Wrong: ignoring return values
|
||||
def process_file(path):
|
||||
f = open(path) # Could fail
|
||||
data = f.read()
|
||||
f.close()
|
||||
return data
|
||||
|
||||
# Correct: handle all failure modes
|
||||
def process_file(path: str) -> str:
|
||||
try:
|
||||
with open(path) as f:
|
||||
return f.read()
|
||||
except FileNotFoundError:
|
||||
raise ValueError(f"File not found: {path}")
|
||||
except PermissionError:
|
||||
raise ValueError(f"Permission denied: {path}")
|
||||
except IOError as e:
|
||||
raise ValueError(f"IO error reading file: {e}")
|
||||
```
|
||||
|
||||
## Edge Cases
|
||||
|
||||
- Errors during error handling (recursive failure)
|
||||
- Resource leaks when exceptions occur
|
||||
- Timeout handling (treat as failure)
|
||||
- Async error handling (unhandled promise rejections)
|
||||
- Background job failures (need monitoring)
|
||||
- Partial failures in distributed systems
|
||||
+126
@@ -0,0 +1,126 @@
|
||||
# Supply Chain Security
|
||||
|
||||
## Rule
|
||||
|
||||
Verify integrity of all dependencies. Generate SBOMs. Monitor for vulnerabilities.
|
||||
|
||||
**Source:** [OWASP Top 10 2025 - A03 Software Supply Chain Failures](https://owasp.org/Top10/2025/A03_2025-Software_Supply_Chain_Failures/)
|
||||
|
||||
## Attack Examples
|
||||
|
||||
- **SolarWinds (2019)**: Compromised build system, 18,000 orgs affected
|
||||
- **Bybit (2025)**: Supply chain attack in wallet software, $1.5B theft
|
||||
- **Shai-Hulud (2025)**: Self-propagating npm worm, 500+ packages
|
||||
|
||||
## Correct Pattern
|
||||
|
||||
```python
|
||||
# Generate and maintain SBOM
|
||||
import subprocess
|
||||
import json
|
||||
import hashlib
|
||||
|
||||
def generate_sbom(project_path: str) -> dict:
|
||||
"""Generate Software Bill of Materials."""
|
||||
# Use CycloneDX or SPDX format
|
||||
result = subprocess.run(
|
||||
["cyclonedx-py", "poetry", "-o", "sbom.json"],
|
||||
cwd=project_path,
|
||||
capture_output=True
|
||||
)
|
||||
with open(f"{project_path}/sbom.json") as f:
|
||||
return json.load(f)
|
||||
|
||||
# Verify package integrity
|
||||
def verify_package(package_path: str, expected_hash: str) -> bool:
|
||||
"""Verify package hash before installation."""
|
||||
with open(package_path, "rb") as f:
|
||||
actual_hash = hashlib.sha256(f.read()).hexdigest()
|
||||
return actual_hash == expected_hash
|
||||
|
||||
# Pin dependencies with hashes
|
||||
# requirements.txt with hashes:
|
||||
# requests==2.28.0 --hash=sha256:abc123...
|
||||
|
||||
# Lock file example (poetry.lock, package-lock.json)
|
||||
def verify_lockfile_integrity(lockfile_path: str) -> bool:
|
||||
"""Ensure lockfile hasn't been tampered with."""
|
||||
# Compare against known-good version in version control
|
||||
...
|
||||
```
|
||||
|
||||
## Incorrect Pattern
|
||||
|
||||
```python
|
||||
# Wrong: no version pinning
|
||||
# requirements.txt
|
||||
# requests
|
||||
# flask
|
||||
|
||||
# Wrong: pulling from arbitrary sources
|
||||
pip install https://sketchy-site.com/package.tar.gz
|
||||
|
||||
# Wrong: no integrity verification
|
||||
def install_dependency(name):
|
||||
os.system(f"pip install {name}") # No hash check
|
||||
|
||||
# Wrong: auto-updating without verification
|
||||
def auto_update():
|
||||
os.system("pip install --upgrade -r requirements.txt")
|
||||
```
|
||||
|
||||
## Dependency Scanning
|
||||
|
||||
```python
|
||||
# Integrate vulnerability scanning in CI
|
||||
def scan_dependencies() -> list[dict]:
|
||||
"""Scan for known vulnerabilities."""
|
||||
# Use tools like:
|
||||
# - OWASP Dependency-Check
|
||||
# - Snyk
|
||||
# - GitHub Dependabot
|
||||
# - OSV (Open Source Vulnerabilities)
|
||||
|
||||
result = subprocess.run(
|
||||
["pip-audit", "--format=json"],
|
||||
capture_output=True
|
||||
)
|
||||
return json.loads(result.stdout)
|
||||
|
||||
def block_on_critical(vulnerabilities: list[dict]) -> bool:
|
||||
"""Fail CI on critical vulnerabilities."""
|
||||
critical = [v for v in vulnerabilities if v["severity"] == "CRITICAL"]
|
||||
if critical:
|
||||
raise SecurityError(f"Critical vulnerabilities found: {critical}")
|
||||
return True
|
||||
```
|
||||
|
||||
## CI/CD Hardening
|
||||
|
||||
```python
|
||||
# Verify CI/CD pipeline integrity
|
||||
PIPELINE_REQUIREMENTS = {
|
||||
"mfa_required": True,
|
||||
"branch_protection": True,
|
||||
"signed_commits": True,
|
||||
"code_review_required": True,
|
||||
"secrets_scanning": True,
|
||||
}
|
||||
|
||||
def audit_pipeline(config: dict) -> list[str]:
|
||||
"""Audit CI/CD configuration."""
|
||||
issues = []
|
||||
for requirement, expected in PIPELINE_REQUIREMENTS.items():
|
||||
if config.get(requirement) != expected:
|
||||
issues.append(f"Missing: {requirement}")
|
||||
return issues
|
||||
```
|
||||
|
||||
## Edge Cases
|
||||
|
||||
- Transitive dependencies (deps of deps) can be vulnerable
|
||||
- Typosquatting attacks (similar package names)
|
||||
- Dependency confusion (internal vs public package names)
|
||||
- Compromised maintainer accounts
|
||||
- Post-install scripts can execute arbitrary code
|
||||
- IDE extensions and dev tools are part of supply chain
|
||||
Reference in New Issue
Block a user