batch

package
v1.7.1-test.batching Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2026 License: Apache-2.0, Apache-2.0 Imports: 13 Imported by: 0

README

Batch Module - Design Guide

This module groups VM creation requests to reduce Azure API calls. Instead of creating VMs one-by-one, we batch requests with identical configurations into a single API call.

Why Batching?

When Karpenter needs to create many VMs quickly (e.g., burst of pending pods):

  • Without batching: N VMs = N API calls (slow, rate-limited, no placement optimization)
  • With batching: N VMs = 1 API call (faster, efficient, Azure can co-locate)

Think of it like a restaurant kitchen: you could cook each dish the moment an order arrives, but it's more efficient to group similar orders together — all steaks on the grill at once, all salads prepped together.

Architecture

                                                ┌─────────────────┐
   Request A (VM size: D4s) ──┐                 │                 │
                              │   ┌──────────┐  │   Coordinator   │
   Request B (VM size: D4s) ──┼──►│ Grouper  │──►│   (executes     │
                              │   │          │  │    batches)     │
   Request C (VM size: D8s) ──┘   └──────────┘  │                 │
                                       │        └─────────────────┘
                                       │
                                  Batch 1: A + B (same template)
                                  Batch 2: C (different template)
Components
Component Role
Grouper Collects requests, groups by template hash, manages timing
Coordinator Executes batches against Azure API, distributes results
Types Data structures (CreateRequest, PendingBatch, etc.)
Context Utilities for passing batch metadata through call stack
Data Flow Through Types
┌─────────────────┐
│  CreateRequest  │  ← What a caller submits (one per VM needed)
│  (with channel) │
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  PendingBatch   │  ← Groups requests with same template hash
│  (N requests)   │
└────────┬────────┘
         │
         ▼
┌──────────────────────┐
│  BatchPutMachine     │  ← HTTP header sent to Azure (JSON)
│  Header              │     Contains per-machine variations
└────────┬─────────────┘
         │
         ▼
┌─────────────────┐
│ CreateResponse  │  ← Sent back via request's channel
└─────────────────┘

How Requests Are Grouped

Requests are grouped by template hash - a hash of VM configuration fields that must be identical:

Category Fields
Hardware VMSize, GPUProfile
OS OSSKU, OSDiskSizeGB, OSDiskType, EnableFIPS
Kubernetes OrchestratorVersion, MaxPods, KubeletConfig
Network VNetSubnetID
Scheduling Priority (Spot vs Regular), Mode

Not included (per-machine variations): MachineName, Zones, Tags — these go in the BatchPutMachine HTTP header.

Why hash instead of comparing fields? It's faster and less error-prone. Two VMs with the same hash = same template = can be batched.

Timing Strategy

The Grouper uses a "wait for idle" strategy with three exit conditions:

Timeline:   |-----|-----|-----|-----|-----|-----|-----|
            0    100ms 200ms 300ms 400ms 500ms 600ms

Requests:   R1    R2                R3    R4

IdleTimer:  [====]                        [====]     FIRE! ← Execute
            reset reset                   reset

MaxTimer:   [=====================================]  (still running)
  1. Idle timeout: No new requests for idleTimeout → execute (burst ended)
  2. Max timeout: Waited maxTimeout total → execute (latency SLA)
  3. Batch full: Any batch reaches maxBatchSize → execute immediately
Tuning Tradeoffs
Small timeouts Large timeouts
✓ Low latency ✓ Better batching
✓ Fast scheduling ✓ Fewer API calls
✗ More API calls ✗ Higher latency

Typical production values:

  • IdleTimeout: 100-500ms (catch end of burst)
  • MaxTimeout: 1-5s (latency SLA guarantee)
  • MaxBatchSize: 10-50 (depends on Azure API limits)

Key Implementation Details

Grouper Main Loop
😴 Sleep... waiting for trigger
      │
      ▼
🔔 Trigger received! A request arrived
      │
      ▼
⏰ waitForIdle() - collect more requests that might be coming
      │
      ▼
🚀 executeBatches() - dispatch all batches to Coordinator
      │
      └──► repeat forever (until context cancelled)

The loop includes panic recovery — if something goes catastrophically wrong, it logs and restarts rather than dying silently.

Trigger Channel (buffer size 1)
trigger chan struct{} // buffered, size 1

This is a Go pattern for "coalescing" notifications:

  • If channel is empty → send succeeds, signal delivered
  • If channel already has a signal → send is skipped (non-blocking)

Result: 100 rapid enqueues produce at most 1 wakeup. We don't need N signals for N requests — one is enough to wake up the processor.

Timer Reset Pattern

Go timers are tricky to reset safely. You must:

  1. Stop the timer
  2. Drain the channel if Stop() returns false (timer already fired)
  3. Then reset
if !idleTimer.Stop() {
    <-idleTimer.C  // drain to prevent race
}
idleTimer.Reset(g.idleTimeout)

This prevents a race where the old timer value is still in the channel.

Atomic Swap in executeBatches
g.mu.Lock()
batches := g.batches                       // grab current
g.batches = make(map[string]*PendingBatch) // replace with empty
g.mu.Unlock()
// Now process batches without holding lock

This is like having two order pads: while the kitchen works on orders from pad A, waiters write new orders on pad B. New requests immediately accumulate in the fresh map — no contention between enqueueing and execution.

Response Channel Pattern
func EnqueueCreate(req *CreateRequest) chan *CreateResponse

Returns a channel instead of blocking. Caller can:

  • Continue other work while waiting
  • Set up timeout handling
  • Handle cancellation gracefully
caller                          Grouper                      Coordinator
  │                                │                              │
  │── EnqueueCreate() ────────────►│                              │
  │◄── responseChan ───────────────│                              │
  │                                │                              │
  │ (caller can do other work)     │── (batches more requests) ──►│
  │                                │                              │
  │                                │── ExecuteBatch() ───────────►│
  │                                │                              │
  │◄─────────────────── response sent to channel ─────────────────│
BatchPutMachine Header

Azure batch creation uses a custom HTTP header. The API body contains the shared template; the header contains per-machine variations:

{
  "vmSkus": { "value": [] },
  "batchMachines": [
    { "machineName": "m-abc", "zones": ["1"], "tags": {"team": "platform"} },
    { "machineName": "m-def", "zones": ["2"], "tags": {"team": "platform"} }
  ]
}

This allows one API call to create multiple machines with slight variations (different names, zones, tags) while sharing the heavy configuration.

Flow Diagram

Caller
  │
  ├─► EnqueueCreate(req)
  │     │
  │     ├─► computeTemplateHash(template)
  │     ├─► Add to batches[hash]
  │     ├─► trigger <- signal
  │     └─► return req.responseChan
  │
  │   (caller waits on channel)
  │
  │                         Grouper.run() loop
  │                              │
  │                         <─── trigger
  │                              │
  │                         waitForIdle()
  │                              │ (collects more requests)
  │                              │
  │                         executeBatches()
  │                              │
  │                              ▼
  │                         Coordinator.ExecuteBatch()
  │                              │
  │                              ├─► buildBatchHeader()
  │                              ├─► Azure API call (with header)
  │                              ├─► parseFrontendErrors()
  │                              └─► Send to each req.responseChan
  │                                        │
  └──────────────────────────────────────◄─┘
        (caller receives response)

Context Utilities

Go's context.Context is like a backpack that travels with a request through the call stack. Instead of passing batch info as parameters through 10 layers of functions, we attach it to the context:

┌─────────────┐
│   Caller    │  ctx = context.Background()
└──────┬──────┘
       │
       ▼
┌──────────────────┐
│   Grouper        │  // Receives and batches requests
└──────┬───────────┘
       │
       ▼
┌──────────────────┐
│   Coordinator    │  ctx = WithBatchMetadata(ctx, &BatchMetadata{...})
└──────┬───────────┘
       │
       ▼
┌──────────────────┐
│  Azure Client    │  // Can call FromContext(ctx) to get batch info
└──────┬───────────┘
       │
       ▼
┌──────────────────┐
│   HTTP Layer     │  // Also has access to batch metadata if needed
└──────────────────┘
Usage
// After batch execution, mark the context
ctx = batch.WithBatchMetadata(ctx, &BatchMetadata{
    IsBatched:   true,
    MachineName: "m-abc",
    BatchID:     "uuid-123",
})

// Downstream code can check
if meta := batch.FromContext(ctx); meta != nil {
    log.Info("batched", "batchID", meta.BatchID)
}

// Skip batching for special cases (retries, urgent requests, testing)
ctx = batch.WithSkipBatching(ctx)
if batch.ShouldSkipBatching(ctx) {
    // Use direct API call instead
}

Azure SDK Notes

  • Poller pattern: Azure VM creation is a long-running operation (LRO). The API returns immediately with a Poller, which you use to check status and get the final result.
  • Pointer types: Azure SDK uses pointers everywhere for optional fields. The extractZones and extractTags helpers convert these to concrete values for JSON serialization.

Documentation

Overview

Context utilities for passing batch metadata through the call stack. Allows downstream code to check if a request was batched and access batch info.

Coordinator executes batches against Azure. It transforms a PendingBatch into a single API call using a custom "BatchPutMachine" HTTP header that lists all machines. This turns N API calls into 1, improving throughput and allowing Azure to optimize placement.

Package batch groups VM creation requests to reduce Azure API calls.

Instead of creating VMs one-by-one, the Grouper collects requests with identical configurations and sends them as a single batched API call. This improves throughput, reduces rate limiting, and lets Azure optimize placement.

Flow:

Requests ──► Grouper (groups by template hash) ──► Coordinator (executes batch)
                  │
             Batch 1: VMs with same config
             Batch 2: VMs with different config

Data types for the batch system.

Flow: CreateRequest → PendingBatch → BatchPutMachineHeader → CreateResponse

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ShouldSkipBatching

func ShouldSkipBatching(ctx context.Context) bool

ShouldSkipBatching checks if context is marked to bypass batching.

func WithBatchMetadata

func WithBatchMetadata(ctx context.Context, meta *BatchMetadata) context.Context

WithBatchMetadata attaches BatchMetadata to a context.

func WithFakeBatchEntries

func WithFakeBatchEntries(ctx context.Context, entries []MachineEntry) context.Context

WithFakeBatchEntries attaches per-machine entries to a context so the fake/test API client can see which machines are being created in this batch. This has NO production significance — the real Azure API reads per-machine data from the BatchPutMachine HTTP header, not from context.

Why this exists: policy.WithHTTPHeader stores the header using an unexported context key (internal/shared.CtxWithHTTPHeaderKey{}) and the SDK provides no public getter. Our fake implements the Go interface directly (no HTTP pipeline), so the header never materializes into an http.Request the fake could inspect. This context key mirrors the same []MachineEntry data so that in-process fakes can access it.

func WithSkipBatching

func WithSkipBatching(ctx context.Context) context.Context

WithSkipBatching marks a context to bypass batching (e.g., for retries).

Types

type BatchMetadata

type BatchMetadata struct {
	IsBatched   bool
	MachineName string
	BatchID     string
}

BatchMetadata is attached to context after batch execution.

func FromContext

func FromContext(ctx context.Context) *BatchMetadata

FromContext retrieves BatchMetadata if present, nil otherwise.

type BatchPutMachineHeader

type BatchPutMachineHeader struct {
	VMSkus        VMSkus         `json:"vmSkus"`
	BatchMachines []MachineEntry `json:"batchMachines"`
}

BatchPutMachineHeader is the JSON structure sent via HTTP header to Azure. It lists per-machine variations (name, zones, tags) while the API body contains the shared template.

type BatchingMachinesClient

type BatchingMachinesClient struct {
	// contains filtered or unexported fields
}

func NewBatchingMachinesClient

func NewBatchingMachinesClient(
	realClient azclient.AKSMachinesAPI,
	grouper *Grouper,
	resourceGroup string,
	clusterName string,
	poolName string,
) *BatchingMachinesClient

func (*BatchingMachinesClient) BeginCreateOrUpdate

func (c *BatchingMachinesClient) BeginCreateOrUpdate(
	ctx context.Context,
	resourceGroupName string,
	resourceName string,
	agentPoolName string,
	aksMachineName string,
	parameters armcontainerservice.Machine,
	options *armcontainerservice.MachinesClientBeginCreateOrUpdateOptions,
) (*runtime.Poller[armcontainerservice.MachinesClientCreateOrUpdateResponse], error)

func (*BatchingMachinesClient) Get

func (c *BatchingMachinesClient) Get(
	ctx context.Context,
	resourceGroupName string,
	resourceName string,
	agentPoolName string,
	aksMachineName string,
	options *armcontainerservice.MachinesClientGetOptions,
) (armcontainerservice.MachinesClientGetResponse, error)

func (*BatchingMachinesClient) NewListPager

func (c *BatchingMachinesClient) NewListPager(
	resourceGroupName string,
	resourceName string,
	agentPoolName string,
	options *armcontainerservice.MachinesClientListOptions,
) *runtime.Pager[armcontainerservice.MachinesClientListResponse]

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

func NewCoordinator

func NewCoordinator(
	realClient azclient.AKSMachinesAPI,
	resourceGroup string,
	clusterName string,
	poolName string,
) *Coordinator

func (*Coordinator) ExecuteBatch

func (c *Coordinator) ExecuteBatch(batch *PendingBatch)

ExecuteBatch sends a batch to Azure as one API call, then distributes results (success or per-machine errors) back to each request's channel.

type CreateRequest

type CreateRequest struct {
	// contains filtered or unexported fields
}

CreateRequest is a single VM creation request. The caller waits on responseChan.

type CreateResponse

type CreateResponse struct {
	Poller  *runtime.Poller[armcontainerservice.MachinesClientCreateOrUpdateResponse]
	Err     error
	BatchID string // For log correlation
}

CreateResponse is sent back via the request's channel after batch execution. Contains either a Poller (success) or Err (failure).

type Grouper

type Grouper struct {
	// contains filtered or unexported fields
}

Grouper collects VM creation requests and groups them by template for batch execution.

It runs a background loop that waits for requests, gives them time to accumulate, then dispatches batches to the Coordinator. Requests with identical VM configs (same size, OS, network, etc.) land in the same batch.

func NewGrouper

func NewGrouper(ctx context.Context, opts Options) *Grouper

NewGrouper creates a Grouper. Call SetCoordinator() then Start() to begin processing.

func (*Grouper) EnqueueCreate

func (g *Grouper) EnqueueCreate(req *CreateRequest) chan *CreateResponse

EnqueueCreate adds a request to the appropriate batch and returns a channel for the response. The caller should wait on the returned channel.

func (*Grouper) IsEnabled

func (g *Grouper) IsEnabled() bool

func (*Grouper) SetCoordinator

func (g *Grouper) SetCoordinator(coordinator *Coordinator)

func (*Grouper) Start

func (g *Grouper) Start()

Start launches the background processing loop.

type MachineEntry

type MachineEntry struct {
	MachineName string            `json:"machineName"`
	Zones       []string          `json:"zones"`
	Tags        map[string]string `json:"tags"`
}

func FakeBatchEntriesFromContext

func FakeBatchEntriesFromContext(ctx context.Context) []MachineEntry

FakeBatchEntriesFromContext retrieves per-machine batch entries if present. Only used by fakes/tests — see WithFakeBatchEntries.

type Options

type Options struct {
	IdleTimeout  time.Duration
	MaxTimeout   time.Duration
	MaxBatchSize int
}

Options tunes the batching behavior.

Small timeouts = lower latency, more API calls
Large timeouts = better batching, higher latency

type PendingBatch

type PendingBatch struct {
	// contains filtered or unexported fields
}

PendingBatch groups requests with the same template hash.

type VMSkus

type VMSkus struct {
	Value []interface{} `json:"value"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL