acp

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// AcpSpawnAcceptedNote is the note for accepted oneshot spawns.
	AcpSpawnAcceptedNote = "initial ACP task queued in isolated session; follow-ups continue in the bound thread."

	// AcpSpawnSessionAcceptedNote is the note for accepted persistent session spawns.
	AcpSpawnSessionAcceptedNote = "thread-bound ACP session stays active after this task; continue in-thread for follow-ups."
)

Variables

This section is empty.

Functions

func CheckAcpAgentAuthorization

func CheckAcpAgentAuthorization(cfg *config.Config, agentID string) error

CheckAcpAgentAuthorization checks if an agent is authorized for ACP operations.

func FormatAcpPolicyError

func FormatAcpPolicyError(code, message string) string

FormatAcpPolicyError formats an ACP policy error message.

func GetGlobalThreadBindingService

func GetGlobalThreadBindingService() *channels.ThreadBindingService

GetGlobalThreadBindingService returns the global thread binding service.

func HasLegacyAcpIdentityProjection

func HasLegacyAcpIdentityProjection(meta *SessionAcpMeta) bool

HasLegacyAcpIdentityProjection checks if metadata has legacy ACP identity projection.

func IdentityEquals

func IdentityEquals(a, b *SessionIdentity) bool

IdentityEquals checks if two session identities are equal.

func InferRuntimeOptionPatchFromConfigOption

func InferRuntimeOptionPatchFromConfigOption(key, value string) map[string]any

InferRuntimeOptionPatchFromConfigOption infers a runtime option patch from a config option.

func IsAcpAgentAuthorized

func IsAcpAgentAuthorized(cfg *config.Config, agentID string) bool

IsAcpAgentAuthorized checks if an agent is authorized for ACP operations.

func IsAcpEnabledByPolicy

func IsAcpEnabledByPolicy(cfg *config.Config) bool

IsAcpEnabledByPolicy checks if ACP is enabled by policy.

func IsSessionIdentityPending

func IsSessionIdentityPending(identity *SessionIdentity) bool

IsSessionIdentityPending checks if a session identity is pending.

func NormalizeText

func NormalizeText(s string) string

NormalizeText normalizes text values.

func ResolveAcpAgentPolicyError

func ResolveAcpAgentPolicyError(cfg *config.Config, agentID string) error

ResolveAcpAgentPolicyError checks if an agent is authorized to use ACP. Returns an error if the agent is not authorized.

func ResolveAcpBackend

func ResolveAcpBackend(cfg *config.Config, requestedBackend string) string

ResolveAcpBackend resolves the ACP backend to use.

func ResolveAcpDefaultAgent

func ResolveAcpDefaultAgent(cfg *config.Config) string

ResolveAcpDefaultAgent resolves the default agent ID for ACP sessions.

func ResolveAcpIdleTimeoutMs

func ResolveAcpIdleTimeoutMs(cfg *config.Config) int

ResolveAcpIdleTimeoutMs resolves the idle timeout for ACP sessions.

func ResolveAcpMaxConcurrentSessions

func ResolveAcpMaxConcurrentSessions(cfg *config.Config) int

ResolveAcpMaxConcurrentSessions resolves the maximum concurrent sessions limit.

func ResolveRuntimeHandleIdentifiersFromIdentity

func ResolveRuntimeHandleIdentifiersFromIdentity(identity *SessionIdentity) map[string]string

ResolveRuntimeHandleIdentifiersFromIdentity extracts runtime handle identifiers from identity.

func ResolveRuntimeOptionsFromMeta

func ResolveRuntimeOptionsFromMeta(meta map[string]any) map[string]any

ResolveRuntimeOptionsFromMeta extracts runtime options from metadata.

func RuntimeOptionsEqual

func RuntimeOptionsEqual(a, b map[string]any) bool

RuntimeOptionsEqual checks if two runtime options are equal.

func SetGlobalManager

func SetGlobalManager(manager *Manager)

SetGlobalManager sets the global ACP manager instance. This should be called once during application initialization.

func SetGlobalThreadBindingService

func SetGlobalThreadBindingService(service *channels.ThreadBindingService)

SetGlobalThreadBindingService sets the global thread binding service.

func ValidateRuntimeConfigOptionInput

func ValidateRuntimeConfigOptionInput(key, value string) (normalizedKey, normalizedValue string)

ValidateRuntimeConfigOptionInput validates a config option input.

func ValidateRuntimeModeInput

func ValidateRuntimeModeInput(mode string) string

ValidateRuntimeModeInput validates a runtime mode input.

Types

type AcpSessionRouter

type AcpSessionRouter struct {
	// contains filtered or unexported fields
}

AcpSessionRouter implements the channels.AcpSessionRouter interface. This provides thread-bound session routing functionality for the ACP manager.

func NewAcpSessionRouter

func NewAcpSessionRouter(manager *Manager) *AcpSessionRouter

NewAcpSessionRouter creates a new ACP session router.

func (*AcpSessionRouter) IsACPThreadBinding

func (r *AcpSessionRouter) IsACPThreadBinding(channel, accountID, conversationID string) bool

IsACPThreadBinding checks if a conversation has an active ACP thread binding.

func (*AcpSessionRouter) RouteToAcpSession

func (r *AcpSessionRouter) RouteToAcpSession(channel, accountID, conversationID string) string

RouteToAcpSession checks if there's a thread-bound ACP session for this conversation and returns the ACP session key if found.

func (*AcpSessionRouter) SetThreadBindingService

func (r *AcpSessionRouter) SetThreadBindingService(service *channels.ThreadBindingService)

SetThreadBindingService sets the thread binding service.

type AcpSessionStatus

type AcpSessionStatus struct {
	SessionKey     string                         `json:"session_key"`
	Backend        string                         `json:"backend"`
	Agent          string                         `json:"agent"`
	Identity       *SessionIdentity               `json:"identity,omitempty"`
	State          string                         `json:"state"`
	Mode           runtime.AcpRuntimeSessionMode  `json:"mode"`
	RuntimeOptions map[string]any                 `json:"runtime_options"`
	Capabilities   runtime.AcpRuntimeCapabilities `json:"capabilities"`
	RuntimeStatus  *runtime.AcpRuntimeStatus      `json:"runtime_status,omitempty"`
	LastActivityAt int64                          `json:"last_activity_at"`
	LastError      string                         `json:"last_error,omitempty"`
}

AcpSessionStatus represents the status of an ACP session.

type ActiveTurnState

type ActiveTurnState struct {
	// contains filtered or unexported fields
}

ActiveTurnState tracks an active turn execution.

type ActorQueue

type ActorQueue struct {
	// contains filtered or unexported fields
}

ActorQueue serializes operations per session.

func NewActorQueue

func NewActorQueue() *ActorQueue

NewActorQueue creates a new actor queue.

func (*ActorQueue) GetTailMapForTesting

func (q *ActorQueue) GetTailMapForTesting() map[string]*chan struct{}

GetTailMapForTesting returns the queues map for testing.

func (*ActorQueue) GetTotalPendingCount

func (q *ActorQueue) GetTotalPendingCount() int

GetTotalPendingCount returns the total number of pending operations.

func (*ActorQueue) Run

func (q *ActorQueue) Run(sessionKey string, fn func() error) error

Run executes a function with session-level serialization.

type CachedRuntimeState

type CachedRuntimeState struct {
	// contains filtered or unexported fields
}

CachedRuntimeState represents a cached runtime state.

type CancelSessionInput

type CancelSessionInput struct {
	Cfg        *config.Config
	SessionKey string
	Reason     string
}

CancelSession cancels an active turn in an ACP session.

type CloseSessionInput

type CloseSessionInput struct {
	Cfg               *config.Config
	SessionKey        string
	Reason            string
	RequireAcpSession bool
	ClearMeta         bool
}

CloseSession closes an ACP session.

func (*CloseSessionInput) AllowBackendUnavailable

func (input *CloseSessionInput) AllowBackendUnavailable() bool

AllowBackendUnavailable checks if backend unavailable should be allowed.

type CloseSessionResult

type CloseSessionResult struct {
	RuntimeClosed bool
	RuntimeNotice string
	MetaCleared   bool
}

type GetSessionStatusInput

type GetSessionStatusInput struct {
	Cfg        *config.Config
	SessionKey string
}

GetSessionStatus returns the status of an ACP session.

type IdleCandidate

type IdleCandidate struct {
	SessionKey    string
	LastTouchedAt time.Time
	Handle        *runtime.AcpRuntimeHandle
}

IdleCandidate represents a candidate for idle eviction.

type InitializeSessionInput

type InitializeSessionInput struct {
	Cfg        *config.Config
	SessionKey string
	Agent      string
	Mode       runtime.AcpRuntimeSessionMode
	Cwd        string
	BackendID  string
}

InitializeSession initializes a new ACP session.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages ACP session lifecycle.

func GetGlobalManager

func GetGlobalManager() *Manager

GetGlobalManager returns the global ACP manager instance. If no global manager exists, returns nil.

func GetOrCreateGlobalManager

func GetOrCreateGlobalManager(cfg *config.Config) *Manager

GetOrCreateGlobalManager gets the existing global manager or creates a new one.

func NewManager

func NewManager(cfg *config.Config) *Manager

NewManager creates a new ACP session manager.

func (*Manager) CancelSession

func (m *Manager) CancelSession(ctx context.Context, input CancelSessionInput) error

func (*Manager) CloseSession

func (m *Manager) CloseSession(ctx context.Context, input CloseSessionInput) (*CloseSessionResult, error)

func (*Manager) GetErrorCounts

func (m *Manager) GetErrorCounts() map[string]int

GetErrorCounts returns a copy of error counts by code.

func (*Manager) GetObservabilitySnapshot

func (m *Manager) GetObservabilitySnapshot() ManagerObservabilitySnapshot

GetObservabilitySnapshot returns observability data.

func (*Manager) GetSessionStatus

func (m *Manager) GetSessionStatus(ctx context.Context, input GetSessionStatusInput) (*AcpSessionStatus, error)

func (*Manager) InitializeSession

func (*Manager) ReconcilePendingSessionIdentities

func (m *Manager) ReconcilePendingSessionIdentities(ctx context.Context) StartupIdentityReconcileResult

ReconcilePendingSessionIdentities reconciles pending session identities on startup.

func (*Manager) RecordError

func (m *Manager) RecordError(code string)

RecordError records an error by code for observability.

func (*Manager) ResolveSession

func (m *Manager) ResolveSession(sessionKey string) SessionResolution

ResolveSession resolves a session to determine if it has ACP capabilities.

func (*Manager) RunTrackedTurn

func (m *Manager) RunTrackedTurn(ctx context.Context, input RunTrackedTurnInput) (*RunTrackedTurnResult, error)

RunTrackedTurn runs a turn with proper tracking for cancellation.

func (*Manager) SetSessionConfigOption

func (m *Manager) SetSessionConfigOption(ctx context.Context, input SetSessionConfigOptionInput) (map[string]any, error)

func (*Manager) SetSessionRuntimeMode

func (m *Manager) SetSessionRuntimeMode(ctx context.Context, input SetSessionRuntimeModeInput) (map[string]any, error)

type ManagerObservabilitySnapshot

type ManagerObservabilitySnapshot struct {
	RuntimeCache RuntimeCacheSnapshot `json:"runtime_cache"`
	Turns        TurnsSnapshot        `json:"turns"`
	ErrorsByCode map[string]int       `json:"errors_by_code"`
}

ManagerObservabilitySnapshot represents observability data.

type RunTrackedTurnInput

type RunTrackedTurnInput struct {
	Cfg        *config.Config
	SessionKey string
	Text       string
	Mode       runtime.AcpRuntimePromptMode
	RequestID  string
}

RunTrackedTurnInput contains parameters for running a tracked turn.

type RunTrackedTurnResult

type RunTrackedTurnResult struct {
	EventChan <-chan runtime.AcpRuntimeEvent
	RequestID string
}

RunTrackedTurnResult contains the result of running a tracked turn.

type RuntimeCache

type RuntimeCache struct {
	// contains filtered or unexported fields
}

RuntimeCache caches active runtime handles.

func NewRuntimeCache

func NewRuntimeCache() *RuntimeCache

NewRuntimeCache creates a new runtime cache.

func (*RuntimeCache) Clear

func (c *RuntimeCache) Clear(sessionKey string)

Clear removes a state from the cache.

func (*RuntimeCache) CollectIdleCandidates

func (c *RuntimeCache) CollectIdleCandidates(maxIdle time.Duration, now time.Time) []IdleCandidate

CollectIdleCandidates collects sessions that have been idle longer than maxIdleMs.

func (*RuntimeCache) Get

func (c *RuntimeCache) Get(sessionKey string) *CachedRuntimeState

Get retrieves a cached runtime state and updates lastTouchedAt.

func (*RuntimeCache) GetLastTouchedAt

func (c *RuntimeCache) GetLastTouchedAt(sessionKey string) time.Time

GetLastTouchedAt returns the last touched time for a session.

func (*RuntimeCache) GetSnapshot

func (c *RuntimeCache) GetSnapshot() RuntimeCacheSnapshot

GetSnapshot returns a snapshot of the cache statistics.

func (*RuntimeCache) Has

func (c *RuntimeCache) Has(sessionKey string) bool

Has checks if a session key is in the cache.

func (*RuntimeCache) IncrementEvicted

func (c *RuntimeCache) IncrementEvicted()

IncrementEvicted increments the evicted counter.

func (*RuntimeCache) Peek

func (c *RuntimeCache) Peek(sessionKey string) *CachedRuntimeState

Peek retrieves a cached state without updating lastTouchedAt.

func (*RuntimeCache) Set

func (c *RuntimeCache) Set(sessionKey string, state *CachedRuntimeState)

Set stores a runtime state in the cache.

func (*RuntimeCache) Size

func (c *RuntimeCache) Size() int

Size returns the number of cached states.

type RuntimeCacheSnapshot

type RuntimeCacheSnapshot struct {
	ActiveSessions int    `json:"active_sessions"`
	IdleTtlMs      int64  `json:"idle_ttl_ms"`
	EvictedTotal   int    `json:"evicted_total"`
	LastEvictedAt  *int64 `json:"last_evicted_at,omitempty"`
}

RuntimeCacheSnapshot represents runtime cache statistics.

type SessionAcpMeta

type SessionAcpMeta struct {
	Backend            string                        `json:"backend"`
	Agent              string                        `json:"agent"`
	RuntimeSessionName string                        `json:"runtime_session_name"`
	Identity           *SessionIdentity              `json:"identity,omitempty"`
	Mode               runtime.AcpRuntimeSessionMode `json:"mode"`
	RuntimeOptions     map[string]any                `json:"runtime_options,omitempty"`
	Cwd                string                        `json:"cwd"`
	State              string                        `json:"state"` // "idle", "running", "error"
	LastError          string                        `json:"last_error,omitempty"`
	LastActivityAt     int64                         `json:"last_activity_at"`
}

SessionAcpMeta contains ACP session metadata.

type SessionIdentity

type SessionIdentity struct {
	State            string `json:"state"`  // "pending" or "resolved"
	Source           string `json:"source"` // "ensure" or "status"
	LastUpdatedAt    int64  `json:"last_updated_at"`
	BackendSessionID string `json:"backend_session_id,omitempty"`
	AgentSessionID   string `json:"agent_session_id,omitempty"`
}

SessionIdentity tracks the identity of an ACP session from the runtime.

func CreateIdentityFromEnsure

func CreateIdentityFromEnsure(handle runtime.AcpRuntimeHandle, now int64) *SessionIdentity

CreateIdentityFromEnsure creates a session identity from an ensure session response.

func MergeSessionIdentity

func MergeSessionIdentity(current *SessionIdentity, incoming *SessionIdentity, now int64) *SessionIdentity

MergeSessionIdentity merges an incoming identity with the current identity.

func ResolveSessionIdentityFromMeta

func ResolveSessionIdentityFromMeta(meta *SessionAcpMeta) *SessionIdentity

ResolveSessionIdentityFromMeta extracts session identity from metadata.

func UpdateIdentityFromStatus

func UpdateIdentityFromStatus(identity *SessionIdentity, status *runtime.AcpRuntimeStatus) *SessionIdentity

UpdateIdentityFromStatus updates identity from a status response.

type SessionResolution

type SessionResolution struct {
	Kind       string // "none", "ready", "stale"
	SessionKey string
	Meta       *SessionAcpMeta
	Error      error
}

SessionResolution represents the result of resolving a session.

type SessionRuntimeOptions

type SessionRuntimeOptions struct {
	RuntimeMode string `json:"runtime_mode"`
	Cwd         string `json:"cwd"`
}

SessionRuntimeOptions represents runtime options for an ACP session.

func MergeRuntimeOptions

func MergeRuntimeOptions(current, patch SessionRuntimeOptions) SessionRuntimeOptions

MergeRuntimeOptions merges a patch onto current options.

func NormalizeRuntimeOptions

func NormalizeRuntimeOptions(options SessionRuntimeOptions) SessionRuntimeOptions

NormalizeRuntimeOptions normalizes runtime options.

func ValidateRuntimeOptionPatch

func ValidateRuntimeOptionPatch(patch map[string]any) (SessionRuntimeOptions, error)

ValidateRuntimeOptionPatch validates a runtime option patch.

type SetSessionConfigOptionInput

type SetSessionConfigOptionInput struct {
	Cfg        *config.Config
	SessionKey string
	Key        string
	Value      string
}

SetSessionConfigOption sets a config option on an ACP session.

type SetSessionRuntimeModeInput

type SetSessionRuntimeModeInput struct {
	Cfg         *config.Config
	SessionKey  string
	RuntimeMode string
}

SetSessionRuntimeMode sets the runtime mode for an ACP session.

type SpawnAcpContext

type SpawnAcpContext struct {
	AgentSessionKey string // Parent session key
	AgentChannel    string // Channel type
	AgentAccountID  string // Channel account ID
	AgentTo         string // Target recipient
	AgentThreadID   string // Thread ID
}

SpawnAcpContext contains context for spawning an ACP session.

type SpawnAcpMode

type SpawnAcpMode string

SpawnAcpMode represents the spawn mode.

const (
	// SpawnModeRun is oneshot mode - session closes after task completion.
	SpawnModeRun SpawnAcpMode = "run"

	// SpawnModeSession is persistent mode - session stays active.
	SpawnModeSession SpawnAcpMode = "session"
)

type SpawnAcpParams

type SpawnAcpParams struct {
	Task    string       // The task to execute
	Label   string       // Optional label for the session
	AgentID string       // Target agent ID
	Cwd     string       // Working directory
	Mode    SpawnAcpMode // "run" or "session"
	Thread  bool         // Whether to bind to a thread
}

SpawnAcpParams contains parameters for spawning an ACP session.

type SpawnAcpResult

type SpawnAcpResult struct {
	Status          string       // "accepted", "forbidden", "error"
	ChildSessionKey string       // The session key of the spawned ACP session
	RunID           string       // The run ID
	Mode            SpawnAcpMode // The spawn mode
	Note            string       // Optional note
	Error           string       // Error message if status is "error"
}

SpawnAcpResult represents the result of spawning an ACP session.

func SpawnAcpDirect

func SpawnAcpDirect(ctx context.Context, cfg *config.Config, params SpawnAcpParams, spawnCtx SpawnAcpContext) (*SpawnAcpResult, error)

SpawnAcpDirect spawns a new ACP session.

type StartupIdentityReconcileResult

type StartupIdentityReconcileResult struct {
	Checked  int
	Resolved int
	Failed   int
}

StartupIdentityReconcileResult represents the result of startup identity reconciliation.

type StartupReconciler

type StartupReconciler struct {
	// contains filtered or unexported fields
}

StartupReconciler handles startup identity reconciliation for ACP sessions.

func NewStartupReconciler

func NewStartupReconciler(manager *Manager, cfg *config.Config) *StartupReconciler

NewStartupReconciler creates a new startup reconciler.

func (*StartupReconciler) GetReconciliationStats

func (r *StartupReconciler) GetReconciliationStats() map[string]interface{}

GetReconciliationStats returns statistics about the reconciliation process.

func (*StartupReconciler) ReconcilePendingIdentities

func (r *StartupReconciler) ReconcilePendingIdentities(ctx context.Context) StartupIdentityReconcileResult

ReconcilePendingIdentities reconciles pending session identities on startup.

func (*StartupReconciler) ReconcileSessionIdentity

func (r *StartupReconciler) ReconcileSessionIdentity(ctx context.Context, sessionKey string) error

ReconcileSessionIdentity reconciles a single session's identity.

type ThreadBindingPolicy

type ThreadBindingPolicy struct {
	Channel       string // Channel type
	AccountID     string // Account identifier
	Kind          string // "acp" or "subagent"
	Enabled       bool   // Whether thread bindings are enabled
	SpawnEnabled  bool   // Whether spawning new bindings is allowed
	IdleTimeoutMs int    // Idle timeout in milliseconds
	MaxAgeMs      int    // Maximum age in milliseconds
}

ThreadBindingPolicy represents thread binding policy for a channel.

func ResolveThreadBindingSpawnPolicy

func ResolveThreadBindingSpawnPolicy(cfg *config.Config, channel, accountID, kind string) ThreadBindingPolicy

ResolveThreadBindingSpawnPolicy resolves the spawn policy for thread bindings.

type TurnLatencyStats

type TurnLatencyStats struct {
	// contains filtered or unexported fields
}

TurnLatencyStats tracks turn execution statistics.

func (*TurnLatencyStats) GetAverageLatency

func (s *TurnLatencyStats) GetAverageLatency() int64

GetAverageLatency returns the average latency in milliseconds.

func (*TurnLatencyStats) GetStats

func (s *TurnLatencyStats) GetStats() (completed, failed int, totalMs, maxMs int64)

GetStats returns the current statistics.

func (*TurnLatencyStats) RecordCompletion

func (s *TurnLatencyStats) RecordCompletion(startedAt time.Time, err error)

RecordCompletion records a completed turn.

type TurnsSnapshot

type TurnsSnapshot struct {
	Active           int   `json:"active"`
	QueueDepth       int   `json:"queue_depth"`
	Completed        int   `json:"completed"`
	Failed           int   `json:"failed"`
	AverageLatencyMs int64 `json:"average_latency_ms"`
	MaxLatencyMs     int64 `json:"max_latency_ms"`
}

TurnsSnapshot represents turn execution statistics.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL