6fb1fcdebd
Two citations were out of bounds against kubernetes/kubernetes HEAD: - delta_fifo.go:797-801 → 797-800 (DeletedFinalStateUnknown struct ends at line 800; file has 800 lines total, off-by-one) - kube_features.go:2798-2811 → 2771-2784 (init() function moved up ~27 lines; file now has 2784 lines, old citation was past EOF) Audited all 30 citations (28 OK, 2 fixed, 0 flagged for review).
970 lines
34 KiB
Markdown
970 lines
34 KiB
Markdown
# Kubernetes-Specific Patterns
|
||
|
||
## 1. Controller / Reconciler Pattern
|
||
|
||
**Source:** `pkg/controller/deployment/deployment_controller.go` (lines 65–530)
|
||
|
||
### What it does
|
||
The controller pattern is the central design pattern of Kubernetes. Every controller watches a set of resources, maintains a work queue, and reconciles desired state with actual state through a sync loop.
|
||
|
||
### Why
|
||
Distributed systems can't guarantee that a single API call will bring the world to desired state. The controller pattern provides eventual consistency by continuously reconciling — it handles missed events, partial failures, and concurrent modifications.
|
||
|
||
### Structure
|
||
|
||
```go
|
||
// pkg/controller/deployment/deployment_controller.go:65-95
|
||
type DeploymentController struct {
|
||
rsControl controller.RSControlInterface
|
||
client clientset.Interface
|
||
|
||
// Testability: sync handler is injectable
|
||
syncHandler func(ctx context.Context, dKey string) error
|
||
enqueueDeployment func(deployment *apps.Deployment)
|
||
|
||
// Listers: read from local cache, not API server
|
||
dLister appslisters.DeploymentLister
|
||
rsLister appslisters.ReplicaSetLister
|
||
podLister corelisters.PodLister
|
||
|
||
// Synced funcs: gate processing until caches are warm
|
||
dListerSynced cache.InformerSynced
|
||
rsListerSynced cache.InformerSynced
|
||
|
||
// Work queue: rate-limited, deduplicating
|
||
queue workqueue.TypedRateLimitingInterface[string]
|
||
}
|
||
```
|
||
|
||
### The Canonical Worker Loop
|
||
|
||
```go
|
||
// pkg/controller/deployment/deployment_controller.go:481-515
|
||
func (dc *DeploymentController) worker(ctx context.Context) {
|
||
for dc.processNextWorkItem(ctx) {
|
||
}
|
||
}
|
||
|
||
func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
|
||
key, quit := dc.queue.Get()
|
||
if quit {
|
||
return false
|
||
}
|
||
defer dc.queue.Done(key)
|
||
|
||
err := dc.syncHandler(ctx, key)
|
||
dc.handleErr(ctx, err, key)
|
||
return true
|
||
}
|
||
|
||
func (dc *DeploymentController) handleErr(ctx context.Context, err error, key string) {
|
||
if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
|
||
dc.queue.Forget(key) // Success: clear rate limiter
|
||
return
|
||
}
|
||
if dc.queue.NumRequeues(key) < maxRetries {
|
||
dc.queue.AddRateLimited(key) // Retry with backoff
|
||
return
|
||
}
|
||
utilruntime.HandleError(err)
|
||
dc.queue.Forget(key) // Give up after maxRetries
|
||
}
|
||
```
|
||
|
||
### When to Use
|
||
|
||
**Triggers:**
|
||
- You're building a system that must maintain desired state over time (not just react to events once)
|
||
- External state can change outside your control (user edits, crashes, network partitions)
|
||
- You need automatic recovery from partial failures without human intervention
|
||
|
||
**Example — before:**
|
||
```go
|
||
// Event-driven: reacts once and hopes nothing changes
|
||
func handlePodCreated(pod Pod) {
|
||
assignToNode(pod)
|
||
// What if the node dies 5 seconds later? Nobody re-assigns.
|
||
}
|
||
```
|
||
|
||
**Example — after:**
|
||
```go
|
||
// Controller pattern: continuously reconciles desired vs actual
|
||
func (c *Scheduler) Reconcile(ctx context.Context, key string) error {
|
||
pod, err := c.podLister.Get(key)
|
||
if err != nil { return err }
|
||
|
||
if pod.Spec.NodeName == "" {
|
||
node := c.selectBestNode(pod)
|
||
return c.assignPodToNode(ctx, pod, node)
|
||
}
|
||
// Already assigned — verify node is still healthy
|
||
if !c.nodeIsReady(pod.Spec.NodeName) {
|
||
return c.reassignPod(ctx, pod)
|
||
}
|
||
return nil // desired state matches actual state
|
||
}
|
||
```
|
||
|
||
### When NOT to Use
|
||
|
||
**Don't use this when:**
|
||
- The operation is truly one-shot with no ongoing state to maintain (e.g., a CLI tool, a migration script)
|
||
- You're building a simple request/response service where each request is independent
|
||
- The "desired state" changes so frequently that reconciliation would always be stale (use streaming/event-driven instead)
|
||
|
||
**Over-application example:**
|
||
```go
|
||
// Over-engineered: a controller for sending one-time notifications
|
||
type NotificationController struct {
|
||
queue workqueue.TypedRateLimitingInterface[string]
|
||
}
|
||
|
||
func (c *NotificationController) Reconcile(ctx context.Context, key string) error {
|
||
notification, _ := c.lister.Get(key)
|
||
if notification.Status.Sent {
|
||
return nil // already sent
|
||
}
|
||
err := c.sendEmail(notification)
|
||
if err == nil {
|
||
notification.Status.Sent = true
|
||
c.client.Update(ctx, notification)
|
||
}
|
||
return err
|
||
}
|
||
```
|
||
|
||
**Better alternative:**
|
||
```go
|
||
// Simple job processor: one-time work doesn't need continuous reconciliation
|
||
func processNotification(ctx context.Context, notification Notification) error {
|
||
return sendEmail(notification)
|
||
}
|
||
```
|
||
|
||
**Why:** The controller pattern adds substantial complexity (informers, caches, workqueue, leader election). If your state doesn't drift — if once you do the thing, it stays done — a simple worker or job queue is far more appropriate.
|
||
|
||
### Key Properties
|
||
1. **Level-triggered, not edge-triggered** — the sync loop reads current state, not diffs
|
||
2. **Idempotent** — running sync twice produces the same result
|
||
3. **Key-based deduplication** — the workqueue coalesces multiple events for the same object
|
||
4. **Bounded retries** — exponential backoff with a max retry count (15 retries = ~82s max delay)
|
||
|
||
---
|
||
|
||
## 2. Informer + Cache + Workqueue Combo
|
||
|
||
**Source:** `staging/src/k8s.io/client-go/tools/cache/shared_informer.go` (lines 144–283), `staging/src/k8s.io/client-go/informers/factory.go` (lines 57–250)
|
||
|
||
### What it does
|
||
The Informer provides a local read cache of API server state, backed by a List+Watch connection. The SharedInformerFactory ensures only one informer per resource type exists per process, preventing duplicate watches.
|
||
|
||
### Why
|
||
- **Reduces API server load**: controllers read from local cache (Listers) instead of hitting the API
|
||
- **Reduces latency**: events are delivered via callbacks, no polling
|
||
- **Memory efficiency**: shared informers prevent N controllers from opening N watches
|
||
|
||
### Architecture
|
||
|
||
```
|
||
API Server
|
||
│
|
||
├── List (initial sync)
|
||
│
|
||
└── Watch (streaming updates)
|
||
│
|
||
▼
|
||
SharedIndexInformer
|
||
├── local Store (thread-safe cache)
|
||
├── Indexer (secondary indexes)
|
||
└── Event Handlers → [Controller1, Controller2, ...]
|
||
│
|
||
▼
|
||
WorkQueue
|
||
│
|
||
▼
|
||
worker goroutines
|
||
```
|
||
|
||
### SharedInformerFactory Pattern
|
||
|
||
```go
|
||
// staging/src/k8s.io/client-go/informers/factory.go:57-77
|
||
type sharedInformerFactory struct {
|
||
client kubernetes.Interface
|
||
lock sync.Mutex
|
||
informers map[reflect.Type]cache.SharedIndexInformer
|
||
startedInformers map[reflect.Type]bool
|
||
wg sync.WaitGroup
|
||
shuttingDown bool
|
||
}
|
||
```
|
||
|
||
### Registration via Event Handlers
|
||
|
||
```go
|
||
// pkg/controller/deployment/deployment_controller.go:117-146
|
||
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||
AddFunc: func(obj interface{}) { dc.addDeployment(logger, obj) },
|
||
UpdateFunc: func(oldObj, newObj interface{}) { dc.updateDeployment(logger, oldObj, newObj) },
|
||
DeleteFunc: func(obj interface{}) { dc.deleteDeployment(logger, obj) },
|
||
})
|
||
```
|
||
|
||
### Cache Sync Gate
|
||
|
||
```go
|
||
// pkg/controller/deployment/deployment_controller.go:189
|
||
if !cache.WaitForNamedCacheSyncWithContext(ctx, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
|
||
return
|
||
}
|
||
```
|
||
|
||
### When to Use
|
||
|
||
**Triggers:**
|
||
- Multiple components in the same process need to read the same Kubernetes resources
|
||
- You're building a controller that reacts to state changes but needs fast local reads
|
||
- API server load is a concern (you have many controllers or high-frequency reads)
|
||
|
||
**Example — before:**
|
||
```go
|
||
// Every reconcile calls the API server directly — O(controllers × syncs/sec) load
|
||
func (c *Controller) Reconcile(ctx context.Context, key string) error {
|
||
pod, err := c.client.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
|
||
if err != nil { return err }
|
||
|
||
nodes, err := c.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
|
||
if err != nil { return err }
|
||
// ... reconcile
|
||
}
|
||
```
|
||
|
||
**Example — after:**
|
||
```go
|
||
// Reads from local cache — zero API server load for reads
|
||
func (c *Controller) Reconcile(ctx context.Context, key string) error {
|
||
pod, err := c.podLister.Pods(ns).Get(name) // local cache
|
||
if err != nil { return err }
|
||
|
||
nodes, err := c.nodeLister.List(labels.Everything()) // local cache
|
||
if err != nil { return err }
|
||
// ... reconcile (only writes hit the API server)
|
||
}
|
||
```
|
||
|
||
### When NOT to Use
|
||
|
||
**Don't use this when:**
|
||
- You're building a short-lived CLI tool or one-shot script (informers need time to sync)
|
||
- You need strongly consistent reads (informer cache is eventually consistent — may lag by seconds)
|
||
- You only access a resource once or twice (the overhead of List+Watch isn't justified)
|
||
|
||
**Over-application example:**
|
||
```go
|
||
// One-shot migration script using informers — overkill
|
||
func main() {
|
||
factory := informers.NewSharedInformerFactory(client, 0)
|
||
podInformer := factory.Core().V1().Pods()
|
||
factory.Start(ctx.Done())
|
||
factory.WaitForCacheSync(ctx.Done())
|
||
|
||
pods, _ := podInformer.Lister().List(labels.Everything())
|
||
for _, pod := range pods {
|
||
migratePod(pod)
|
||
}
|
||
}
|
||
```
|
||
|
||
**Better alternative:**
|
||
```go
|
||
// Just list directly — you only need the data once
|
||
func main() {
|
||
pods, _ := client.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
|
||
for _, pod := range pods.Items {
|
||
migratePod(&pod)
|
||
}
|
||
}
|
||
```
|
||
|
||
**Why:** Informers are designed for long-running processes that continuously react to changes. For one-shot reads, a direct List call is simpler, faster to start, and uses less memory.
|
||
|
||
---
|
||
|
||
## 3. Workqueue: Typed Rate-Limiting Queue
|
||
|
||
**Source:** `staging/src/k8s.io/client-go/util/workqueue/queue.go` (lines 33–370), `rate_limiting_queue.go`
|
||
|
||
### What it does
|
||
A concurrent-safe work queue with three critical properties:
|
||
1. **Deduplication** (dirty set) — same item added twice results in one processing
|
||
2. **Re-entrancy** (processing set) — if an item is added while being processed, it's re-queued after Done()
|
||
3. **Rate limiting** — exponential backoff on failures
|
||
|
||
### Why
|
||
In a controller, multiple events may fire for the same object in rapid succession. Without deduplication, you'd process stale intermediate states. The dirty/processing set design ensures you always process the latest state while never losing notifications.
|
||
|
||
### When to Use
|
||
|
||
**Triggers:**
|
||
- Multiple event sources (informer callbacks) trigger work on the same object rapidly
|
||
- You need to deduplicate: 5 events for the same pod should result in 1 sync, not 5
|
||
- Failed processing should retry with exponential backoff, not flood the system
|
||
|
||
**Example — before:**
|
||
```go
|
||
// Raw channel: no deduplication, no backoff
|
||
events := make(chan string, 100)
|
||
|
||
// Producer fires rapid updates:
|
||
events <- "pod-abc" // event 1
|
||
events <- "pod-abc" // event 2 (duplicate!)
|
||
events <- "pod-abc" // event 3 (duplicate!)
|
||
|
||
// Consumer processes all 3 — wasteful
|
||
for key := range events {
|
||
reconcile(key) // called 3 times for the same stale state
|
||
}
|
||
```
|
||
|
||
**Example — after:**
|
||
```go
|
||
queue := workqueue.NewTypedRateLimitingQueue[string](
|
||
workqueue.DefaultTypedControllerRateLimiter[string](),
|
||
)
|
||
|
||
// Producer fires rapid updates — queue deduplicates:
|
||
queue.Add("pod-abc") // queued
|
||
queue.Add("pod-abc") // already dirty — no-op
|
||
queue.Add("pod-abc") // already dirty — no-op
|
||
|
||
// Consumer processes once with latest state:
|
||
key, _ := queue.Get()
|
||
err := reconcile(key) // called once
|
||
if err != nil {
|
||
queue.AddRateLimited(key) // retry with backoff
|
||
} else {
|
||
queue.Forget(key) // clear backoff counter
|
||
}
|
||
queue.Done(key)
|
||
```
|
||
|
||
### When NOT to Use
|
||
|
||
**Don't use this when:**
|
||
- Order of processing matters (the workqueue doesn't guarantee FIFO for rate-limited items)
|
||
- Each event carries unique payload data that must be processed individually (workqueue only stores keys, not event data)
|
||
- You have a single producer and single consumer with no contention (a plain channel suffices)
|
||
|
||
**Over-application example:**
|
||
```go
|
||
// Using workqueue for ordered log processing — wrong tool
|
||
queue := workqueue.NewTypedRateLimitingQueue[string](limiter)
|
||
|
||
// Each log line is unique and order matters
|
||
queue.Add("log-line-1")
|
||
queue.Add("log-line-2")
|
||
queue.Add("log-line-3")
|
||
// Problem: if "log-line-1" fails and gets rate-limited,
|
||
// "log-line-2" processes first → out of order
|
||
```
|
||
|
||
**Better alternative:**
|
||
```go
|
||
// Ordered processing needs a simple buffered channel or sequential queue
|
||
logs := make(chan LogEntry, 1000)
|
||
for entry := range logs {
|
||
if err := processLog(entry); err != nil {
|
||
retryWithBackoff(entry) // handle retry inline, preserving order
|
||
}
|
||
}
|
||
```
|
||
|
||
**Why:** The workqueue's strength is deduplication and rate-limited retry for key-based reconciliation. If your items are unique (not deduplicate-able) or ordering matters, use a channel or ordered queue instead.
|
||
|
||
### The Dirty/Processing Dance
|
||
|
||
```go
|
||
// staging/src/k8s.io/client-go/util/workqueue/queue.go:227-252
|
||
func (q *Typed[T]) Add(item T) {
|
||
q.cond.L.Lock()
|
||
defer q.cond.L.Unlock()
|
||
if q.shuttingDown { return }
|
||
if q.dirty.Has(item) {
|
||
if !q.processing.Has(item) {
|
||
q.queue.Touch(item) // Allow priority changes
|
||
}
|
||
return // Already marked for processing
|
||
}
|
||
q.dirty.Insert(item)
|
||
if q.processing.Has(item) {
|
||
return // Being processed, will re-queue on Done()
|
||
}
|
||
q.queue.Push(item)
|
||
q.cond.Signal()
|
||
}
|
||
|
||
func (q *Typed[T]) Done(item T) {
|
||
q.cond.L.Lock()
|
||
defer q.cond.L.Unlock()
|
||
q.processing.Delete(item)
|
||
if q.dirty.Has(item) {
|
||
q.queue.Push(item) // Was modified during processing
|
||
q.cond.Signal()
|
||
}
|
||
}
|
||
```
|
||
|
||
### Rate-Limited Requeue
|
||
|
||
```go
|
||
// staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go:120-122
|
||
func (q *rateLimitingType[T]) AddRateLimited(item T) {
|
||
q.TypedDelayingInterface.AddAfter(item, q.rateLimiter.When(item))
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 4. Tombstone Pattern (DeletedFinalStateUnknown)
|
||
|
||
**Source:** `staging/src/k8s.io/client-go/tools/cache/delta_fifo.go` (lines 797–800)
|
||
|
||
### What it does
|
||
When a watch disconnects and reconnects, some delete events may be missed. The DeltaFIFO synthesizes a `DeletedFinalStateUnknown` ("tombstone") containing the last known state of the object.
|
||
|
||
### Why
|
||
Without this, controllers would never learn about deletions that happened during disconnects.
|
||
|
||
```go
|
||
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go:797-800
|
||
type DeletedFinalStateUnknown struct {
|
||
Key string
|
||
Obj interface{}
|
||
}
|
||
```
|
||
|
||
### How controllers handle it
|
||
|
||
```go
|
||
// pkg/controller/deployment/deployment_controller.go:210-224
|
||
func (dc *DeploymentController) deleteDeployment(logger klog.Logger, obj interface{}) {
|
||
d, ok := obj.(*apps.Deployment)
|
||
if !ok {
|
||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||
if !ok {
|
||
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
|
||
return
|
||
}
|
||
d, ok = tombstone.Obj.(*apps.Deployment)
|
||
if !ok {
|
||
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Deployment %#v", obj))
|
||
return
|
||
}
|
||
}
|
||
dc.enqueueDeployment(d)
|
||
}
|
||
```
|
||
|
||
### When to Use
|
||
|
||
**Triggers:**
|
||
- You're writing a delete event handler for a Kubernetes informer
|
||
- Your system uses List+Watch and must handle watch disconnection gracefully
|
||
- You need to process all deletions, including those that occurred during downtime
|
||
|
||
**Example — before:**
|
||
```go
|
||
// Naive delete handler — breaks silently during watch reconnection
|
||
func (c *Controller) onDelete(obj interface{}) {
|
||
pod := obj.(*v1.Pod) // PANIC: obj might be DeletedFinalStateUnknown
|
||
c.cleanup(pod)
|
||
}
|
||
```
|
||
|
||
**Example — after:**
|
||
```go
|
||
// Tombstone-aware delete handler — handles all cases
|
||
func (c *Controller) onDelete(obj interface{}) {
|
||
pod, ok := obj.(*v1.Pod)
|
||
if !ok {
|
||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||
if !ok {
|
||
runtime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
|
||
return
|
||
}
|
||
pod, ok = tombstone.Obj.(*v1.Pod)
|
||
if !ok {
|
||
runtime.HandleError(fmt.Errorf("tombstone contained non-Pod: %T", tombstone.Obj))
|
||
return
|
||
}
|
||
}
|
||
c.cleanup(pod)
|
||
}
|
||
```
|
||
|
||
### When NOT to Use
|
||
|
||
**Don't use this when:**
|
||
- You're not using informers (direct API calls return concrete types, never tombstones)
|
||
- Your delete handler only enqueues a key for reconciliation (the reconciler will discover the deletion via a NotFound error from the lister — tombstone handling in the handler is optional)
|
||
- You're building a non-Kubernetes system (this is specific to client-go's watch semantics)
|
||
|
||
**Over-application example:**
|
||
```go
|
||
// Tombstone handling in a reconciler that already handles "not found"
|
||
func (c *Controller) Reconcile(ctx context.Context, key string) error {
|
||
obj, err := c.lister.Get(key)
|
||
if errors.IsNotFound(err) {
|
||
// Object was deleted — clean up
|
||
return c.handleDeletion(key)
|
||
}
|
||
// No need for tombstone logic here — the lister never returns tombstones
|
||
}
|
||
```
|
||
|
||
**Better alternative:** Tombstone handling belongs in event handler callbacks (AddFunc/UpdateFunc/DeleteFunc), not in the reconcile loop. The reconciler discovers deletions through "not found" errors from the lister.
|
||
|
||
**Why:** Tombstones are an artifact of the event delivery mechanism. If your architecture already handles "object doesn't exist" as a valid reconciliation state, you don't need to explicitly handle tombstones everywhere.
|
||
|
||
---
|
||
|
||
## 5. Controller Expectations Pattern
|
||
|
||
**Source:** `pkg/controller/controller_utils.go` (lines 130–315)
|
||
|
||
### What it does
|
||
Expectations track pending creates/deletes to prevent controllers from taking action on stale cache state. A controller won't sync until its expectations are satisfied or expired.
|
||
|
||
### Why
|
||
Between a controller issuing a create and the informer cache reflecting that new object, there's a window where the controller might create duplicates. Expectations close this gap.
|
||
|
||
```go
|
||
// pkg/controller/controller_utils.go:153-173
|
||
type ControllerExpectationsInterface interface {
|
||
GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
|
||
SatisfiedExpectations(logger klog.Logger, controllerKey string) bool
|
||
DeleteExpectations(logger klog.Logger, controllerKey string)
|
||
SetExpectations(logger klog.Logger, controllerKey string, add, del int) error
|
||
ExpectCreations(logger klog.Logger, controllerKey string, adds int) error
|
||
ExpectDeletions(logger klog.Logger, controllerKey string, dels int) error
|
||
CreationObserved(logger klog.Logger, controllerKey string)
|
||
DeletionObserved(logger klog.Logger, controllerKey string)
|
||
}
|
||
```
|
||
|
||
### When to Use
|
||
|
||
**Triggers:**
|
||
- Your controller creates or deletes child resources and uses informer cache to count them
|
||
- You observe duplicate creates/deletes during rapid reconciliation
|
||
- There's a visible lag between your write and the cache reflecting the new state
|
||
|
||
**Example — before:**
|
||
```go
|
||
// Without expectations: reconcile loop creates duplicates
|
||
func (c *RSController) Reconcile(ctx context.Context, key string) error {
|
||
rs, _ := c.rsLister.Get(key)
|
||
pods, _ := c.podLister.Pods(rs.Namespace).List(selector)
|
||
|
||
diff := int(*rs.Spec.Replicas) - len(pods)
|
||
if diff > 0 {
|
||
// Cache hasn't caught up from last reconcile → creates AGAIN
|
||
for i := 0; i < diff; i++ {
|
||
c.client.CoreV1().Pods(rs.Namespace).Create(ctx, newPod(), ...)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
```
|
||
|
||
**Example — after:**
|
||
```go
|
||
// With expectations: skip reconcile until cache catches up
|
||
func (c *RSController) Reconcile(ctx context.Context, key string) error {
|
||
if !c.expectations.SatisfiedExpectations(logger, key) {
|
||
return nil // still waiting for previous creates to appear in cache
|
||
}
|
||
|
||
rs, _ := c.rsLister.Get(key)
|
||
pods, _ := c.podLister.Pods(rs.Namespace).List(selector)
|
||
|
||
diff := int(*rs.Spec.Replicas) - len(pods)
|
||
if diff > 0 {
|
||
c.expectations.ExpectCreations(logger, key, diff)
|
||
for i := 0; i < diff; i++ {
|
||
_, err := c.client.CoreV1().Pods(rs.Namespace).Create(ctx, newPod(), ...)
|
||
if err != nil {
|
||
c.expectations.CreationObserved(logger, key) // decrement on failure
|
||
}
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
```
|
||
|
||
### When NOT to Use
|
||
|
||
**Don't use this when:**
|
||
- Your controller only updates existing resources (no creates/deletes of children)
|
||
- You use server-side apply or optimistic concurrency (conflicts are resolved by the API server)
|
||
- Your reconciliation is idempotent even with stale cache (e.g., "ensure this ConfigMap has these contents" — creating it twice returns AlreadyExists)
|
||
|
||
**Over-application example:**
|
||
```go
|
||
// Expectations for a controller that only patches status — unnecessary
|
||
func (c *StatusController) Reconcile(ctx context.Context, key string) error {
|
||
if !c.expectations.SatisfiedExpectations(logger, key) {
|
||
return nil // This gate adds nothing — we're only patching, not creating
|
||
}
|
||
obj, _ := c.lister.Get(key)
|
||
return c.client.Status().Patch(ctx, obj, patch)
|
||
}
|
||
```
|
||
|
||
**Better alternative:** Just patch directly. Status updates via patch are idempotent and don't create duplicate resources. Expectations only matter when create/delete timing could cause over-provisioning.
|
||
|
||
**Why:** Expectations add bookkeeping complexity. They solve a specific problem: cache lag causing duplicate creates/deletes. If your controller doesn't create or delete child resources, the problem doesn't exist.
|
||
|
||
---
|
||
|
||
## 6. OwnerReference / Controller Ref Manager Pattern
|
||
|
||
**Source:** `pkg/controller/controller_ref_manager.go` (lines 37–80)
|
||
|
||
### What it does
|
||
Implements garbage collection ownership through OwnerReferences. The ControllerRefManager handles adopting orphaned resources and releasing resources that no longer match.
|
||
|
||
### Why
|
||
Multiple controllers may create children. The ownership model ensures exactly one controller owns each child, enabling garbage collection and preventing conflicts.
|
||
|
||
```go
|
||
// pkg/controller/controller_ref_manager.go:37-50
|
||
type BaseControllerRefManager struct {
|
||
Controller metav1.Object
|
||
Selector labels.Selector
|
||
canAdoptErr error
|
||
canAdoptOnce sync.Once // Lazy, one-shot adoption check
|
||
CanAdoptFunc func(ctx context.Context) error
|
||
}
|
||
|
||
// The claim logic: adopt if matching and orphaned, release if owned but not matching
|
||
func (m *BaseControllerRefManager) ClaimObject(ctx context.Context, obj metav1.Object,
|
||
match func(metav1.Object) bool,
|
||
adopt, release func(context.Context, metav1.Object) error) (bool, error) {
|
||
controllerRef := metav1.GetControllerOfNoCopy(obj)
|
||
if controllerRef != nil {
|
||
if controllerRef.UID != m.Controller.GetUID() {
|
||
return false, nil // Owned by someone else
|
||
}
|
||
if match(obj) {
|
||
return true, nil // Already ours and matches
|
||
}
|
||
// Ours but no longer matches → release
|
||
}
|
||
// Orphan → adopt if matches
|
||
}
|
||
```
|
||
|
||
### When to Use
|
||
|
||
**Triggers:**
|
||
- Your controller creates child resources that should be garbage-collected when the parent is deleted
|
||
- Multiple controllers might "compete" for ownership of the same resources (e.g., label selector changes)
|
||
- You need to handle adoption: a resource's owner was deleted but the resource still exists
|
||
|
||
**Example — before:**
|
||
```go
|
||
// Manual cleanup — fragile, misses edge cases
|
||
func (c *Controller) deleteParent(ctx context.Context, parent *MyResource) error {
|
||
children, _ := c.childLister.List(labelsForParent(parent))
|
||
for _, child := range children {
|
||
c.client.Delete(ctx, child.Name, metav1.DeleteOptions{})
|
||
}
|
||
return c.client.Delete(ctx, parent.Name, metav1.DeleteOptions{})
|
||
// What if we crash between deleting children and parent?
|
||
// What if labels change and we miss some children?
|
||
}
|
||
```
|
||
|
||
**Example — after:**
|
||
```go
|
||
// OwnerReference + garbage collector handles cleanup automatically
|
||
func (c *Controller) createChild(ctx context.Context, parent *MyResource) error {
|
||
child := &v1.Pod{
|
||
ObjectMeta: metav1.ObjectMeta{
|
||
OwnerReferences: []metav1.OwnerReference{
|
||
*metav1.NewControllerRef(parent, myGVK),
|
||
},
|
||
},
|
||
}
|
||
_, err := c.client.CoreV1().Pods(parent.Namespace).Create(ctx, child, metav1.CreateOptions{})
|
||
return err
|
||
// When parent is deleted, GC automatically deletes all children
|
||
}
|
||
```
|
||
|
||
### When NOT to Use
|
||
|
||
**Don't use this when:**
|
||
- Child resources should outlive their parent (e.g., PersistentVolumes that persist after the PVC is deleted)
|
||
- Resources are truly independent — no parent/child lifecycle relationship exists
|
||
- You're working outside Kubernetes (OwnerReferences are a Kubernetes API concept)
|
||
|
||
**Over-application example:**
|
||
```go
|
||
// Setting owner reference on a shared ConfigMap used by many controllers
|
||
func (c *Controller) ensureSharedConfig(ctx context.Context, parent *MyResource) error {
|
||
cm := &v1.ConfigMap{
|
||
ObjectMeta: metav1.ObjectMeta{
|
||
Name: "shared-config",
|
||
OwnerReferences: []metav1.OwnerReference{
|
||
*metav1.NewControllerRef(parent, myGVK), // WRONG: shared resource
|
||
},
|
||
},
|
||
}
|
||
// Problem: when this parent is deleted, the shared ConfigMap is garbage-collected,
|
||
// breaking all other controllers that depend on it
|
||
}
|
||
```
|
||
|
||
**Better alternative:** Shared resources should not have owner references. Use finalizers on the parent to perform cleanup only of resources that are truly owned, or use labels + a dedicated cleanup controller.
|
||
|
||
**Why:** OwnerReferences create a hard lifecycle coupling: parent deletion cascades to children. This is exactly right for "this ReplicaSet owns these Pods" but catastrophic for shared resources.
|
||
|
||
---
|
||
|
||
## 7. Leader Election Pattern
|
||
|
||
**Source:** `staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go` (lines 116–230)
|
||
|
||
### What it does
|
||
Provides distributed mutex semantics using a Kubernetes resource (Lease) as the lock. Only one instance of a controller runs actively; others are hot standbys.
|
||
|
||
### When to Use
|
||
|
||
**Triggers:**
|
||
- You're running multiple replicas of a controller for high availability
|
||
- Only ONE instance should actively reconcile at a time (to avoid conflicts)
|
||
- You need automatic failover: if the leader dies, another replica takes over within seconds
|
||
|
||
**Example — before:**
|
||
```go
|
||
// All replicas reconcile simultaneously → write conflicts, duplicate work
|
||
func main() {
|
||
ctrl := NewController()
|
||
ctrl.Run(ctx) // every replica does this — chaos
|
||
}
|
||
```
|
||
|
||
**Example — after:**
|
||
```go
|
||
func main() {
|
||
ctrl := NewController()
|
||
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
|
||
Lock: resourceLock,
|
||
LeaseDuration: 15 * time.Second,
|
||
RenewDeadline: 10 * time.Second,
|
||
RetryPeriod: 2 * time.Second,
|
||
Callbacks: leaderelection.LeaderCallbacks{
|
||
OnStartedLeading: func(ctx context.Context) {
|
||
ctrl.Run(ctx) // only the leader reconciles
|
||
},
|
||
OnStoppedLeading: func() {
|
||
log.Fatal("lost leadership") // restart to re-enter election
|
||
},
|
||
},
|
||
})
|
||
}
|
||
```
|
||
|
||
### When NOT to Use
|
||
|
||
**Don't use this when:**
|
||
- Your workload can be safely sharded across replicas (e.g., each replica handles a different namespace)
|
||
- The work is read-only or idempotent at the individual item level (multiple readers are fine)
|
||
- You only ever run one replica (single-instance deployment — leader election is pointless overhead)
|
||
|
||
**Over-application example:**
|
||
```go
|
||
// Leader election for a metrics collector that only reads
|
||
func main() {
|
||
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
|
||
Lock: resourceLock,
|
||
Callbacks: leaderelection.LeaderCallbacks{
|
||
OnStartedLeading: func(ctx context.Context) {
|
||
collectMetrics(ctx) // reads metrics from nodes — safe to run on all replicas
|
||
},
|
||
},
|
||
})
|
||
// Problem: 2 of 3 replicas sit idle, collecting no metrics
|
||
}
|
||
```
|
||
|
||
**Better alternative:**
|
||
```go
|
||
// All replicas collect metrics (sharded by node or running in parallel)
|
||
func main() {
|
||
nodes := getAssignedNodes() // shard assignment via consistent hashing
|
||
for _, node := range nodes {
|
||
go collectMetricsForNode(ctx, node)
|
||
}
|
||
}
|
||
```
|
||
|
||
**Why:** Leader election serializes all work to one instance. This is correct for writes that would conflict, but wasteful for reads or shardable work. Use it only when concurrent execution would cause correctness problems.
|
||
|
||
### Why
|
||
Controller-manager runs multiple replicas for HA. Only one should reconcile to avoid conflicts.
|
||
|
||
```go
|
||
// staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go:116-163
|
||
type LeaderElectionConfig struct {
|
||
Lock rl.Interface
|
||
LeaseDuration time.Duration // Default 15s — how long a lease is valid
|
||
RenewDeadline time.Duration // Default 10s — how long leader retries renewal
|
||
RetryPeriod time.Duration // Default 2s — how often candidates check
|
||
Callbacks LeaderCallbacks
|
||
ReleaseOnCancel bool
|
||
}
|
||
|
||
type LeaderCallbacks struct {
|
||
OnStartedLeading func(context.Context)
|
||
OnStoppedLeading func()
|
||
OnNewLeader func(identity string)
|
||
}
|
||
|
||
// staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go:211-226
|
||
func (le *LeaderElector) Run(ctx context.Context) {
|
||
defer runtime.HandleCrashWithContext(ctx)
|
||
defer le.config.Callbacks.OnStoppedLeading()
|
||
if !le.acquire(ctx) {
|
||
return
|
||
}
|
||
ctx, cancel := context.WithCancel(ctx)
|
||
defer cancel()
|
||
go le.config.Callbacks.OnStartedLeading(ctx)
|
||
le.renew(ctx)
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 8. Feature Gates Pattern
|
||
|
||
**Source:** `pkg/features/kube_features.go` (lines 34–2784), `staging/src/k8s.io/client-go/features/features.go` (lines 34–80)
|
||
|
||
### What it does
|
||
A global registry of boolean flags that control feature rollout. Features progress through Alpha → Beta → GA → Deprecated lifecycle stages.
|
||
|
||
### Why
|
||
Kubernetes serves thousands of clusters. Features must be safe to enable/disable at runtime across versions. Feature gates provide:
|
||
- Progressive rollout (alpha off by default, beta on, GA locked)
|
||
- Per-version semantics (a feature may become beta in v1.28)
|
||
- Testing isolation
|
||
|
||
```go
|
||
// staging/src/k8s.io/client-go/features/features.go:50-70
|
||
type Feature string
|
||
|
||
type FeatureSpec struct {
|
||
Default bool
|
||
LockToDefault bool
|
||
PreRelease prerelease
|
||
Version *version.Version
|
||
}
|
||
|
||
type Gates interface {
|
||
Enabled(key Feature) bool
|
||
}
|
||
|
||
// pkg/features/kube_features.go:50-58 (example feature definition)
|
||
const (
|
||
AllowDNSOnlyNodeCSR featuregate.Feature = "AllowDNSOnlyNodeCSR"
|
||
// ... 2700+ lines of feature definitions
|
||
)
|
||
```
|
||
|
||
### Registration at init()
|
||
|
||
```go
|
||
// pkg/features/kube_features.go:2771-2784
|
||
func init() {
|
||
ca := &clientAdapter{utilfeature.DefaultMutableFeatureGate}
|
||
runtime.Must(clientfeatures.AddVersionedFeaturesToExistingFeatureGates(ca))
|
||
clientfeatures.ReplaceFeatureGates(ca)
|
||
runtime.Must(utilfeature.DefaultMutableFeatureGate.AddVersioned(defaultVersionedKubernetesFeatureGates))
|
||
}
|
||
```
|
||
|
||
### When to Use
|
||
|
||
**Triggers:**
|
||
- You ship a binary to many environments (clusters, customers) that need different feature sets
|
||
- New features are risky and must be progressively rolled out (alpha → beta → GA)
|
||
- You need to disable a feature at runtime without redeploying (kill switch)
|
||
- Multiple versions of your software coexist, and features must have version-aware behavior
|
||
|
||
**Example — before:**
|
||
```go
|
||
// Compile-time flags or environment variables — no lifecycle, no versioning
|
||
func reconcile(ctx context.Context, obj *MyResource) error {
|
||
if os.Getenv("ENABLE_NEW_SCHEDULING") == "true" {
|
||
return newSchedulingLogic(ctx, obj)
|
||
}
|
||
return oldSchedulingLogic(ctx, obj)
|
||
}
|
||
// Problem: no structured lifecycle. How do you deprecate this? When is it safe to remove?
|
||
```
|
||
|
||
**Example — after:**
|
||
```go
|
||
const NewSchedulingAlgorithm featuregate.Feature = "NewSchedulingAlgorithm"
|
||
|
||
// Registered with lifecycle metadata
|
||
var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
|
||
NewSchedulingAlgorithm: {Default: false, PreRelease: featuregate.Alpha}, // v1.28
|
||
// Later: {Default: true, PreRelease: featuregate.Beta} // v1.30
|
||
// Later: {LockToDefault: true, PreRelease: featuregate.GA} // v1.32
|
||
}
|
||
|
||
func reconcile(ctx context.Context, obj *MyResource) error {
|
||
if featuregate.DefaultFeatureGate.Enabled(NewSchedulingAlgorithm) {
|
||
return newSchedulingLogic(ctx, obj)
|
||
}
|
||
return oldSchedulingLogic(ctx, obj)
|
||
}
|
||
```
|
||
|
||
### When NOT to Use
|
||
|
||
**Don't use this when:**
|
||
- You have a simple application with one deployment target (just use config or env vars)
|
||
- The "feature" is actually user-facing configuration (use a config field on the resource, not a gate)
|
||
- You can safely always enable the feature (no risk, no need for a kill switch)
|
||
|
||
**Over-application example:**
|
||
```go
|
||
// Feature gate for a trivial log format change — overkill
|
||
const JSONLogging featuregate.Feature = "JSONLogging"
|
||
|
||
func setupLogger() {
|
||
if featuregate.DefaultFeatureGate.Enabled(JSONLogging) {
|
||
log.SetFormatter(JSONFormatter{})
|
||
} else {
|
||
log.SetFormatter(TextFormatter{})
|
||
}
|
||
}
|
||
// This never needs alpha/beta/GA lifecycle. It's just config.
|
||
```
|
||
|
||
**Better alternative:**
|
||
```go
|
||
// Simple configuration flag
|
||
type Config struct {
|
||
LogFormat string `json:"logFormat"` // "json" or "text"
|
||
}
|
||
```
|
||
|
||
**Why:** Feature gates add cognitive overhead (registry, lifecycle stages, version tracking). They're justified for behavioral changes that carry risk and need graduated rollout. For simple configuration choices with no risk dimension, a config field is clearer and simpler.
|