memory

package
v1.2.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package memory implements the Memory Manager for Orla's agentic serving layer. It owns the KV cache lifecycle across workflow stage transitions, making cache preserve/flush decisions based on workflow-level signals that are invisible to request-level serving systems.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BackendCacheEntry

type BackendCacheEntry struct {
	Backend    string
	Model      string
	Tokens     int
	Preserved  bool
	LastUpdate time.Time
}

BackendCacheEntry tracks a workflow's cache footprint on a specific backend.

type CacheAction

type CacheAction struct {
	Type       CacheActionType
	WorkflowID string
	Backend    string
	Reason     string
}

CacheAction is the Memory Manager's response to a stage transition signal.

type CacheActionType

type CacheActionType string

CacheActionType is the action the Memory Manager instructs the backend executor to take.

const (
	CacheActionPreserve CacheActionType = "preserve"
	CacheActionFlush    CacheActionType = "flush"
	CacheActionNoop     CacheActionType = "noop"
)

type CacheController

type CacheController interface {
	FlushPrefix(ctx context.Context, sessionID string) error
	MemoryUsage(ctx context.Context) (*MemoryStats, error)
}

CacheController is an optional interface that LLM providers can implement to support explicit cache management. Providers that don't support it simply don't implement the interface, and the Memory Manager falls back to soft-flush (mark as stale and let LRU reclaim).

type DefaultManager

type DefaultManager struct {
	// contains filtered or unexported fields
}

DefaultManager is the default Memory Manager implementation. It composes the three paper policies (preserve on small increment, flush at boundary, flush under pressure) with workflow state tracking and in-flight awareness.

func NewDefaultManager

func NewDefaultManager(cfg DefaultManagerConfig) *DefaultManager

NewDefaultManager creates a new DefaultManager with the standard policy chain.

func (*DefaultManager) ClearInflight

func (m *DefaultManager) ClearInflight(backend, requestID string)

ClearInflight removes an in-flight request.

func (*DefaultManager) DeregisterWorkflow

func (m *DefaultManager) DeregisterWorkflow(workflowID string)

DeregisterWorkflow removes a workflow from tracking.

func (*DefaultManager) OnMemoryPressure

func (m *DefaultManager) OnMemoryPressure(_ context.Context, backend string, pressure float64) []CacheAction

OnMemoryPressure evaluates whether to flush idle workflow caches under pressure.

func (*DefaultManager) OnTransition

func (m *DefaultManager) OnTransition(ctx context.Context, signal StageTransition) CacheAction

OnTransition processes a stage lifecycle signal and returns a cache action.

Resolution order:

  1. Stage-level explicit CachePolicy override ("preserve" or "flush")
  2. Policy chain (preserve on small increment -> flush at boundary)
  3. Noop (let the backend's default LRU handle it)

func (*DefaultManager) RecordInflight

func (m *DefaultManager) RecordInflight(req InflightRequest)

RecordInflight marks a request as in-flight.

func (*DefaultManager) RegisterCacheController

func (m *DefaultManager) RegisterCacheController(backend string, cc CacheController)

RegisterCacheController associates a CacheController with a backend name.

func (*DefaultManager) RegisterWorkflow

func (m *DefaultManager) RegisterWorkflow(workflowID string)

RegisterWorkflow initializes tracking for a new workflow execution.

func (*DefaultManager) StartPressureMonitor

func (m *DefaultManager) StartPressureMonitor(ctx context.Context, backendsFn func() []string, interval time.Duration)

StartPressureMonitor launches a loop that periodically queries backends for memory pressure and triggers flush actions. It blocks until ctx is cancelled. backendsFn is called on each tick to discover the current set of backends, so dynamically registered backends are picked up automatically.

type DefaultManagerConfig

type DefaultManagerConfig struct {
	PreserveThreshold int
	PressureThreshold float64
	PressurePollMs    int
}

DefaultManagerConfig configures the DefaultManager.

type FlushAtBoundaryPolicy

type FlushAtBoundaryPolicy struct{}

FlushAtBoundaryPolicy flushes KV cache at workflow boundaries or when a stage transitions to a different backend/model.

func NewFlushAtBoundaryPolicy

func NewFlushAtBoundaryPolicy() *FlushAtBoundaryPolicy

func (*FlushAtBoundaryPolicy) Evaluate

func (p *FlushAtBoundaryPolicy) Evaluate(signal StageTransition, _ *Tracker) CacheAction

type FlushUnderPressurePolicy

type FlushUnderPressurePolicy struct {
	PressureThreshold float64
}

FlushUnderPressurePolicy flushes cache for idle workflows when memory pressure is reported. It selects the oldest idle workflow with preserved cache on the pressured backend.

func NewFlushUnderPressurePolicy

func NewFlushUnderPressurePolicy(pressureThreshold float64) *FlushUnderPressurePolicy

func (*FlushUnderPressurePolicy) EvaluatePressure

func (p *FlushUnderPressurePolicy) EvaluatePressure(backend string, currentPressure float64, tracker *Tracker) []CacheAction

EvaluatePressure is called from the memory pressure monitor, not from the normal transition path. It returns flush actions for idle workflows.

type InflightRequest

type InflightRequest struct {
	RequestID  string
	WorkflowID string
	StageID    string
	Backend    string
	Streaming  bool
	StartedAt  time.Time
}

InflightRequest describes a request currently being processed by a backend worker.

type Manager

type Manager interface {
	OnTransition(ctx context.Context, signal StageTransition) CacheAction
	OnMemoryPressure(ctx context.Context, backend string, pressure float64) []CacheAction
	RegisterWorkflow(workflowID string)
	DeregisterWorkflow(workflowID string)
	RecordInflight(req InflightRequest)
	ClearInflight(backend, requestID string)
}

Manager is the Memory Manager interface. It receives stage transition signals from the Workflow Executor and returns cache management actions.

type MemoryStats

type MemoryStats struct {
	UsedBytes  int64
	TotalBytes int64
	Pressure   float64 // 0.0 to 1.0
}

MemoryStats reports a backend's memory utilization.

type Policy

type Policy interface {
	Evaluate(signal StageTransition, tracker *Tracker) CacheAction
}

Policy evaluates a stage transition and returns a cache action. Policies are composable: the first policy in a chain that returns a non-Noop action wins.

type PolicyChain

type PolicyChain struct {
	// contains filtered or unexported fields
}

PolicyChain evaluates policies in order. The first non-Noop result wins.

func NewPolicyChain

func NewPolicyChain(policies ...Policy) *PolicyChain

func (*PolicyChain) Evaluate

func (c *PolicyChain) Evaluate(signal StageTransition, tracker *Tracker) CacheAction

type PreserveOnSmallIncrementPolicy

type PreserveOnSmallIncrementPolicy struct {
	ThresholdTokens int
}

PreserveOnSmallIncrementPolicy preserves KV cache when a new stage on the same backend/model adds only a small number of tokens to the shared context.

func NewPreserveOnSmallIncrementPolicy

func NewPreserveOnSmallIncrementPolicy(thresholdTokens int) *PreserveOnSmallIncrementPolicy

func (*PreserveOnSmallIncrementPolicy) Evaluate

type SGLangCacheController

type SGLangCacheController struct {
	// contains filtered or unexported fields
}

SGLangCacheController implements CacheController for SGLang backends. It uses SGLang's /flush_cache and /get_server_info HTTP endpoints.

func NewSGLangCacheController

func NewSGLangCacheController(baseURL string) *SGLangCacheController

NewSGLangCacheController creates a CacheController for an SGLang backend. baseURL is the root URL of the SGLang server (e.g. "http://sglang:30000").

func (*SGLangCacheController) FlushPrefix

func (c *SGLangCacheController) FlushPrefix(ctx context.Context, _ string) error

FlushPrefix triggers a global KV cache flush on the SGLang backend. SGLang's /flush_cache is a global operation; the sessionID parameter is logged but not used for scoping because SGLang does not support per-session eviction.

func (*SGLangCacheController) MemoryUsage

func (c *SGLangCacheController) MemoryUsage(ctx context.Context) (*MemoryStats, error)

MemoryUsage queries the SGLang backend for KV cache utilization.

type StageState

type StageState struct {
	StageID   string
	Backend   string
	Model     string
	Tokens    int
	Status    StageStatus
	StartedAt time.Time
}

StageState records per-stage metadata needed for cache decisions.

type StageStatus

type StageStatus string

StageStatus tracks the lifecycle of a single stage within the tracker.

const (
	StageStatusActive    StageStatus = "active"
	StageStatusCompleted StageStatus = "completed"
)

type StageTransition

type StageTransition struct {
	WorkflowID     string
	StageID        string
	Backend        string
	Model          string
	TransitionType TransitionType

	// PrevBackend and PrevModel describe the preceding stage on the same
	// workflow, if any. Empty when this is the first stage or the previous
	// stage used a different backend/model.
	PrevBackend string
	PrevModel   string

	// ContextTokens is the approximate total context size in tokens.
	ContextTokens int
	// DeltaTokens is the number of tokens added since the previous stage.
	DeltaTokens int

	// CachePolicy is the stage-level override ("preserve", "flush", or empty for auto).
	CachePolicy string
	// PreserveThresholdTokens is the stage-level override for the small-increment threshold.
	PreserveThresholdTokens *int
}

StageTransition is a signal from the Workflow Executor to the Memory Manager describing a lifecycle event in the workflow DAG.

type Tracker

type Tracker struct {
	// contains filtered or unexported fields
}

Tracker maintains the state of all active workflows and in-flight requests. It is the Memory Manager's source of truth for making cache decisions.

func NewTracker

func NewTracker() *Tracker

NewTracker creates a new workflow state tracker.

func (*Tracker) ActiveWorkflowIDs

func (t *Tracker) ActiveWorkflowIDs() []string

ActiveWorkflowIDs returns the IDs of all active workflows.

func (*Tracker) ClearInflight

func (t *Tracker) ClearInflight(backend, requestID string)

ClearInflight removes an in-flight request.

func (*Tracker) DeregisterWorkflow

func (t *Tracker) DeregisterWorkflow(workflowID string)

DeregisterWorkflow removes a workflow from tracking.

func (*Tracker) GetWorkflow

func (t *Tracker) GetWorkflow(workflowID string) *WorkflowState

GetWorkflow returns a snapshot of a workflow's state, or nil if not found.

func (*Tracker) InflightOnBackend

func (t *Tracker) InflightOnBackend(backend string) int

InflightOnBackend returns the number of in-flight requests on a backend.

func (*Tracker) InflightWorkflowsOnBackend

func (t *Tracker) InflightWorkflowsOnBackend(backend string) map[string]struct{}

InflightWorkflowsOnBackend returns the set of workflow IDs with in-flight requests on a given backend.

func (*Tracker) LastStageOnBackend

func (t *Tracker) LastStageOnBackend(workflowID, backend string) *StageState

LastStageOnBackend returns the most recently completed stage for a workflow on a given backend, or nil if none.

func (*Tracker) MarkBackendFlushed

func (t *Tracker) MarkBackendFlushed(workflowID, backend string)

MarkBackendFlushed marks a workflow's cache on a backend as no longer preserved.

func (*Tracker) OnStageComplete

func (t *Tracker) OnStageComplete(signal StageTransition)

OnStageComplete records that a stage has finished executing.

func (*Tracker) OnStageStart

func (t *Tracker) OnStageStart(signal StageTransition)

OnStageStart records that a stage has begun executing.

func (*Tracker) RecordInflight

func (t *Tracker) RecordInflight(req InflightRequest)

RecordInflight marks a request as in-flight on a backend.

func (*Tracker) RegisterWorkflow

func (t *Tracker) RegisterWorkflow(workflowID string)

RegisterWorkflow initializes tracking for a new workflow execution. Idempotent: calling with an already-registered workflowID is a no-op.

func (*Tracker) WorkflowsWithPreservedCacheOnBackend

func (t *Tracker) WorkflowsWithPreservedCacheOnBackend(backend string) []string

WorkflowsWithPreservedCacheOnBackend returns workflow IDs that have preserved cache entries on the given backend, excluding any currently in-flight.

type TransitionType

type TransitionType string

TransitionType classifies a stage or workflow lifecycle event.

const (
	TransitionStageStart       TransitionType = "stage_start"
	TransitionStageComplete    TransitionType = "stage_complete"
	TransitionWorkflowComplete TransitionType = "workflow_complete"
)

type WorkflowState

type WorkflowState struct {
	ID              string
	ActiveStages    map[string]*StageState        // stageID -> state
	CompletedStages map[string]*StageState        // stageID -> state
	BackendUsage    map[string]*BackendCacheEntry // backend name -> cache entry
	StartedAt       time.Time
	LastActivityAt  time.Time
}

WorkflowState is the Memory Manager's view of a single active workflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL