Documentation
¶
Overview ¶
Package memory implements the Memory Manager for Orla's agentic serving layer. It owns the KV cache lifecycle across workflow stage transitions, making cache preserve/flush decisions based on workflow-level signals that are invisible to request-level serving systems.
Index ¶
- type BackendCacheEntry
- type CacheAction
- type CacheActionType
- type CacheController
- type DefaultManager
- func (m *DefaultManager) ClearInflight(backend, requestID string)
- func (m *DefaultManager) DeregisterWorkflow(workflowID string)
- func (m *DefaultManager) OnMemoryPressure(_ context.Context, backend string, pressure float64) []CacheAction
- func (m *DefaultManager) OnTransition(ctx context.Context, signal StageTransition) CacheAction
- func (m *DefaultManager) RecordInflight(req InflightRequest)
- func (m *DefaultManager) RegisterCacheController(backend string, cc CacheController)
- func (m *DefaultManager) RegisterWorkflow(workflowID string)
- func (m *DefaultManager) StartPressureMonitor(ctx context.Context, backendsFn func() []string, interval time.Duration)
- type DefaultManagerConfig
- type FlushAtBoundaryPolicy
- type FlushUnderPressurePolicy
- type InflightRequest
- type Manager
- type MemoryStats
- type Policy
- type PolicyChain
- type PreserveOnSmallIncrementPolicy
- type SGLangCacheController
- type StageState
- type StageStatus
- type StageTransition
- type Tracker
- func (t *Tracker) ActiveWorkflowIDs() []string
- func (t *Tracker) ClearInflight(backend, requestID string)
- func (t *Tracker) DeregisterWorkflow(workflowID string)
- func (t *Tracker) GetWorkflow(workflowID string) *WorkflowState
- func (t *Tracker) InflightOnBackend(backend string) int
- func (t *Tracker) InflightWorkflowsOnBackend(backend string) map[string]struct{}
- func (t *Tracker) LastStageOnBackend(workflowID, backend string) *StageState
- func (t *Tracker) MarkBackendFlushed(workflowID, backend string)
- func (t *Tracker) OnStageComplete(signal StageTransition)
- func (t *Tracker) OnStageStart(signal StageTransition)
- func (t *Tracker) RecordInflight(req InflightRequest)
- func (t *Tracker) RegisterWorkflow(workflowID string)
- func (t *Tracker) WorkflowsWithPreservedCacheOnBackend(backend string) []string
- type TransitionType
- type WorkflowState
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BackendCacheEntry ¶
type BackendCacheEntry struct {
Backend string
Model string
Tokens int
Preserved bool
LastUpdate time.Time
}
BackendCacheEntry tracks a workflow's cache footprint on a specific backend.
type CacheAction ¶
type CacheAction struct {
Type CacheActionType
WorkflowID string
Backend string
Reason string
}
CacheAction is the Memory Manager's response to a stage transition signal.
type CacheActionType ¶
type CacheActionType string
CacheActionType is the action the Memory Manager instructs the backend executor to take.
const ( CacheActionPreserve CacheActionType = "preserve" CacheActionFlush CacheActionType = "flush" CacheActionNoop CacheActionType = "noop" )
type CacheController ¶
type CacheController interface {
FlushPrefix(ctx context.Context, sessionID string) error
MemoryUsage(ctx context.Context) (*MemoryStats, error)
}
CacheController is an optional interface that LLM providers can implement to support explicit cache management. Providers that don't support it simply don't implement the interface, and the Memory Manager falls back to soft-flush (mark as stale and let LRU reclaim).
type DefaultManager ¶
type DefaultManager struct {
// contains filtered or unexported fields
}
DefaultManager is the default Memory Manager implementation. It composes the three paper policies (preserve on small increment, flush at boundary, flush under pressure) with workflow state tracking and in-flight awareness.
func NewDefaultManager ¶
func NewDefaultManager(cfg DefaultManagerConfig) *DefaultManager
NewDefaultManager creates a new DefaultManager with the standard policy chain.
func (*DefaultManager) ClearInflight ¶
func (m *DefaultManager) ClearInflight(backend, requestID string)
ClearInflight removes an in-flight request.
func (*DefaultManager) DeregisterWorkflow ¶
func (m *DefaultManager) DeregisterWorkflow(workflowID string)
DeregisterWorkflow removes a workflow from tracking.
func (*DefaultManager) OnMemoryPressure ¶
func (m *DefaultManager) OnMemoryPressure(_ context.Context, backend string, pressure float64) []CacheAction
OnMemoryPressure evaluates whether to flush idle workflow caches under pressure.
func (*DefaultManager) OnTransition ¶
func (m *DefaultManager) OnTransition(ctx context.Context, signal StageTransition) CacheAction
OnTransition processes a stage lifecycle signal and returns a cache action.
Resolution order:
- Stage-level explicit CachePolicy override ("preserve" or "flush")
- Policy chain (preserve on small increment -> flush at boundary)
- Noop (let the backend's default LRU handle it)
func (*DefaultManager) RecordInflight ¶
func (m *DefaultManager) RecordInflight(req InflightRequest)
RecordInflight marks a request as in-flight.
func (*DefaultManager) RegisterCacheController ¶
func (m *DefaultManager) RegisterCacheController(backend string, cc CacheController)
RegisterCacheController associates a CacheController with a backend name.
func (*DefaultManager) RegisterWorkflow ¶
func (m *DefaultManager) RegisterWorkflow(workflowID string)
RegisterWorkflow initializes tracking for a new workflow execution.
func (*DefaultManager) StartPressureMonitor ¶
func (m *DefaultManager) StartPressureMonitor(ctx context.Context, backendsFn func() []string, interval time.Duration)
StartPressureMonitor launches a loop that periodically queries backends for memory pressure and triggers flush actions. It blocks until ctx is cancelled. backendsFn is called on each tick to discover the current set of backends, so dynamically registered backends are picked up automatically.
type DefaultManagerConfig ¶
type DefaultManagerConfig struct {
PreserveThreshold int
PressureThreshold float64
PressurePollMs int
}
DefaultManagerConfig configures the DefaultManager.
type FlushAtBoundaryPolicy ¶
type FlushAtBoundaryPolicy struct{}
FlushAtBoundaryPolicy flushes KV cache at workflow boundaries or when a stage transitions to a different backend/model.
func NewFlushAtBoundaryPolicy ¶
func NewFlushAtBoundaryPolicy() *FlushAtBoundaryPolicy
func (*FlushAtBoundaryPolicy) Evaluate ¶
func (p *FlushAtBoundaryPolicy) Evaluate(signal StageTransition, _ *Tracker) CacheAction
type FlushUnderPressurePolicy ¶
type FlushUnderPressurePolicy struct {
PressureThreshold float64
}
FlushUnderPressurePolicy flushes cache for idle workflows when memory pressure is reported. It selects the oldest idle workflow with preserved cache on the pressured backend.
func NewFlushUnderPressurePolicy ¶
func NewFlushUnderPressurePolicy(pressureThreshold float64) *FlushUnderPressurePolicy
func (*FlushUnderPressurePolicy) EvaluatePressure ¶
func (p *FlushUnderPressurePolicy) EvaluatePressure(backend string, currentPressure float64, tracker *Tracker) []CacheAction
EvaluatePressure is called from the memory pressure monitor, not from the normal transition path. It returns flush actions for idle workflows.
type InflightRequest ¶
type InflightRequest struct {
RequestID string
WorkflowID string
StageID string
Backend string
Streaming bool
StartedAt time.Time
}
InflightRequest describes a request currently being processed by a backend worker.
type Manager ¶
type Manager interface {
OnTransition(ctx context.Context, signal StageTransition) CacheAction
OnMemoryPressure(ctx context.Context, backend string, pressure float64) []CacheAction
RegisterWorkflow(workflowID string)
DeregisterWorkflow(workflowID string)
RecordInflight(req InflightRequest)
ClearInflight(backend, requestID string)
}
Manager is the Memory Manager interface. It receives stage transition signals from the Workflow Executor and returns cache management actions.
type MemoryStats ¶
MemoryStats reports a backend's memory utilization.
type Policy ¶
type Policy interface {
Evaluate(signal StageTransition, tracker *Tracker) CacheAction
}
Policy evaluates a stage transition and returns a cache action. Policies are composable: the first policy in a chain that returns a non-Noop action wins.
type PolicyChain ¶
type PolicyChain struct {
// contains filtered or unexported fields
}
PolicyChain evaluates policies in order. The first non-Noop result wins.
func NewPolicyChain ¶
func NewPolicyChain(policies ...Policy) *PolicyChain
func (*PolicyChain) Evaluate ¶
func (c *PolicyChain) Evaluate(signal StageTransition, tracker *Tracker) CacheAction
type PreserveOnSmallIncrementPolicy ¶
type PreserveOnSmallIncrementPolicy struct {
ThresholdTokens int
}
PreserveOnSmallIncrementPolicy preserves KV cache when a new stage on the same backend/model adds only a small number of tokens to the shared context.
func NewPreserveOnSmallIncrementPolicy ¶
func NewPreserveOnSmallIncrementPolicy(thresholdTokens int) *PreserveOnSmallIncrementPolicy
func (*PreserveOnSmallIncrementPolicy) Evaluate ¶
func (p *PreserveOnSmallIncrementPolicy) Evaluate(signal StageTransition, _ *Tracker) CacheAction
type SGLangCacheController ¶
type SGLangCacheController struct {
// contains filtered or unexported fields
}
SGLangCacheController implements CacheController for SGLang backends. It uses SGLang's /flush_cache and /get_server_info HTTP endpoints.
func NewSGLangCacheController ¶
func NewSGLangCacheController(baseURL string) *SGLangCacheController
NewSGLangCacheController creates a CacheController for an SGLang backend. baseURL is the root URL of the SGLang server (e.g. "http://sglang:30000").
func (*SGLangCacheController) FlushPrefix ¶
func (c *SGLangCacheController) FlushPrefix(ctx context.Context, _ string) error
FlushPrefix triggers a global KV cache flush on the SGLang backend. SGLang's /flush_cache is a global operation; the sessionID parameter is logged but not used for scoping because SGLang does not support per-session eviction.
func (*SGLangCacheController) MemoryUsage ¶
func (c *SGLangCacheController) MemoryUsage(ctx context.Context) (*MemoryStats, error)
MemoryUsage queries the SGLang backend for KV cache utilization.
type StageState ¶
type StageState struct {
StageID string
Backend string
Model string
Tokens int
Status StageStatus
StartedAt time.Time
}
StageState records per-stage metadata needed for cache decisions.
type StageStatus ¶
type StageStatus string
StageStatus tracks the lifecycle of a single stage within the tracker.
const ( StageStatusActive StageStatus = "active" StageStatusCompleted StageStatus = "completed" )
type StageTransition ¶
type StageTransition struct {
WorkflowID string
StageID string
Backend string
Model string
TransitionType TransitionType
// PrevBackend and PrevModel describe the preceding stage on the same
// workflow, if any. Empty when this is the first stage or the previous
// stage used a different backend/model.
PrevBackend string
PrevModel string
// ContextTokens is the approximate total context size in tokens.
ContextTokens int
// DeltaTokens is the number of tokens added since the previous stage.
DeltaTokens int
// CachePolicy is the stage-level override ("preserve", "flush", or empty for auto).
CachePolicy string
// PreserveThresholdTokens is the stage-level override for the small-increment threshold.
PreserveThresholdTokens *int
}
StageTransition is a signal from the Workflow Executor to the Memory Manager describing a lifecycle event in the workflow DAG.
type Tracker ¶
type Tracker struct {
// contains filtered or unexported fields
}
Tracker maintains the state of all active workflows and in-flight requests. It is the Memory Manager's source of truth for making cache decisions.
func (*Tracker) ActiveWorkflowIDs ¶
ActiveWorkflowIDs returns the IDs of all active workflows.
func (*Tracker) ClearInflight ¶
ClearInflight removes an in-flight request.
func (*Tracker) DeregisterWorkflow ¶
DeregisterWorkflow removes a workflow from tracking.
func (*Tracker) GetWorkflow ¶
func (t *Tracker) GetWorkflow(workflowID string) *WorkflowState
GetWorkflow returns a snapshot of a workflow's state, or nil if not found.
func (*Tracker) InflightOnBackend ¶
InflightOnBackend returns the number of in-flight requests on a backend.
func (*Tracker) InflightWorkflowsOnBackend ¶
InflightWorkflowsOnBackend returns the set of workflow IDs with in-flight requests on a given backend.
func (*Tracker) LastStageOnBackend ¶
func (t *Tracker) LastStageOnBackend(workflowID, backend string) *StageState
LastStageOnBackend returns the most recently completed stage for a workflow on a given backend, or nil if none.
func (*Tracker) MarkBackendFlushed ¶
MarkBackendFlushed marks a workflow's cache on a backend as no longer preserved.
func (*Tracker) OnStageComplete ¶
func (t *Tracker) OnStageComplete(signal StageTransition)
OnStageComplete records that a stage has finished executing.
func (*Tracker) OnStageStart ¶
func (t *Tracker) OnStageStart(signal StageTransition)
OnStageStart records that a stage has begun executing.
func (*Tracker) RecordInflight ¶
func (t *Tracker) RecordInflight(req InflightRequest)
RecordInflight marks a request as in-flight on a backend.
func (*Tracker) RegisterWorkflow ¶
RegisterWorkflow initializes tracking for a new workflow execution. Idempotent: calling with an already-registered workflowID is a no-op.
func (*Tracker) WorkflowsWithPreservedCacheOnBackend ¶
WorkflowsWithPreservedCacheOnBackend returns workflow IDs that have preserved cache entries on the given backend, excluding any currently in-flight.
type TransitionType ¶
type TransitionType string
TransitionType classifies a stage or workflow lifecycle event.
const ( TransitionStageStart TransitionType = "stage_start" TransitionStageComplete TransitionType = "stage_complete" TransitionWorkflowComplete TransitionType = "workflow_complete" )
type WorkflowState ¶
type WorkflowState struct {
ID string
ActiveStages map[string]*StageState // stageID -> state
CompletedStages map[string]*StageState // stageID -> state
BackendUsage map[string]*BackendCacheEntry // backend name -> cache entry
StartedAt time.Time
LastActivityAt time.Time
}
WorkflowState is the Memory Manager's view of a single active workflow.