agent

package
v0.28.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrChatTimeout = errors.New("chat timeout exceeded")

ErrChatTimeout is returned when the main agent chat exceeds its wall-clock timeout.

Functions

func BuildSessionKey

func BuildSessionKey(agentID, platform, externalUserID, channelContext string) string

BuildSessionKey constructs a session key from agent, platform, user, and context. Format: {agentID}:{platform}:{externalUserID}:{channelContext}

func BuildUserSessionKey

func BuildUserSessionKey(agentID string, authUserID int64, channelContext string) string

BuildUserSessionKey constructs a session key for a linked auth user. Linked users share a single session across all channels (for the same agent and channel context), so the key omits the platform-specific external ID. Format: {agentID}:user:{authUserID}:{channelContext}

func ChannelFromContext

func ChannelFromContext(ctx context.Context) (string, bool)

ChannelFromContext returns the current chat channel when present.

func ExcludedToolsFromContext

func ExcludedToolsFromContext(ctx context.Context) []string

ExcludedToolsFromContext returns the per-run excluded tool names when present.

func MessageText

func MessageText(message MessageContent) string

MessageText extracts and joins all text from a message.

func SaveAsset

func SaveAsset(assetsDir, fileName string, data []byte) (string, error)

SaveAsset writes data to assetsDir with a timestamp-prefixed filename to avoid collisions, returning the absolute path of the saved file.

func SetupSystemWorkspace

func SetupSystemWorkspace(agentID, basePath string) (string, error)

SetupSystemWorkspace creates the shared system workspace for agent jobs that run without a user context (e.g. builtin scheduled jobs). Returns the path basePath/workspaces/{agentID}/system/.

func SetupUserWorkspace

func SetupUserWorkspace(agentID, basePath string, userID int64) (string, error)

SetupUserWorkspace ensures per-user directories exist within an agent workspace. Creates:

  • basePath/workspaces/{agentID}/users/{userID}/.agents/skills/
  • basePath/workspaces/{agentID}/users/{userID}/data/
  • basePath/workspaces/{agentID}/users/{userID}/assets/

Returns the absolute path to the user's workspace directory (basePath/workspaces/{agentID}/users/{userID}/).

func SetupWorkspace

func SetupWorkspace(agentID, basePath string) (string, error)

SetupWorkspace ensures the per-agent workspace directory exists. Creates: basePath/workspaces/{agentID}/.agents/skills/ Returns the absolute path to the agent's workspace directory.

func SystemOverrideFromContext

func SystemOverrideFromContext(ctx context.Context) (string, bool)

SystemOverrideFromContext returns the per-run system prompt override when present.

func UserAssetsDir

func UserAssetsDir(userRoot string) string

UserAssetsDir returns the per-user assets directory within a user root. Uploaded files from all channels are stored here.

func UserSkillsDir

func UserSkillsDir(userWorkspace string) string

UserSkillsDir returns the per-user skills directory path within a user workspace.

func WithChannel

func WithChannel(ctx context.Context, channel string) context.Context

WithChannel returns a child context that carries the current chat channel.

func WithExcludedTools

func WithExcludedTools(ctx context.Context, names ...string) context.Context

WithExcludedTools returns a child context that hides the named tools for a single run.

func WithSystemOverride

func WithSystemOverride(ctx context.Context, system string) context.Context

WithSystemOverride returns a child context that carries a per-run system prompt override.

Types

type BuiltinToolsFactory

type BuiltinToolsFactory func(snap *config.Snapshot) []tools.Tool

BuiltinToolsFactory creates agent-specific builtin tools given a snapshot. It allows callers to inject always-on tools that depend on per-agent configuration (for example, notifications).

type ChatOption

type ChatOption func(*chatOptions)

ChatOption configures a single Chat call.

func WithModel

func WithModel(model string) ChatOption

WithModel overrides the model for this Chat call. If the session already has a runner with a different model, the runner is replaced.

type CompactionConfig

type CompactionConfig struct {
	// MaxTokens triggers compaction when the estimated token count exceeds this.
	// 0 (or omitted) uses the default of 80000. Negative values disable
	// automatic compaction. Manual /compact still works.
	MaxTokens int `yaml:"max_tokens"`
	// KeepTail is the number of recent message entries to preserve verbatim
	// after compaction. Default: 20.
	KeepTail int `yaml:"keep_tail"`
}

CompactionConfig controls automatic session compaction.

func (CompactionConfig) WithDefaults

func (c CompactionConfig) WithDefaults() CompactionConfig

WithDefaults returns a copy with zero-value fields replaced by defaults. MaxTokens 0 -> 80000; negative values are preserved (meaning disabled).

type Event

type Event struct {
	Text    string
	Image   *ImageEvent
	File    *FileEvent
	ToolUse *ToolUseEvent
	Store   ai.Message // if non-nil, Pool appends to session history
	Err     error
}

Event is the consumer-facing stream event. Channels read these from the stream returned by Pool.Chat().

type FileEvent

type FileEvent struct {
	Path string // absolute path on disk
	Name string // display filename (with extension)
}

FileEvent carries a local file path to be sent to the channel.

type ImageEvent

type ImageEvent struct {
	Data     string // base64 encoded
	MimeType string // e.g. "image/jpeg"
}

ImageEvent carries a base64-encoded image to be sent to the channel.

type MessageContent

type MessageContent = any

MessageContent is the type for user messages passed through the runner pipeline. It is either string (text-only) or []ai.ContentBlock (multimodal, e.g. text + images).

type NewRunnerFunc

type NewRunnerFunc func(ctx context.Context, params RunnerParams) (Runner, error)

NewRunnerFunc creates a new Runner instance with the given params.

func NewRunnerFactory

func NewRunnerFactory(cfg RunnerFactoryConfig) (NewRunnerFunc, error)

NewRunnerFactory creates a NewRunnerFunc for a given config snapshot. The returned factory creates runners scoped to one agent's provider, model, workspace, and system prompt. Memory provider, user ID, and agent ID are injected per-session from RunnerParams. Runner execution is always user-scoped, so per-user workspace directories are created for every runner instance.

Hooks are not part of the factory — they are injected via RunnerParams.HooksFn by the Pool, keeping hook lifecycle fully decoupled from model/provider config.

type PluginHooksBuilder

type PluginHooksBuilder func(ctx context.Context) []hooks.HookPlugin

PluginHooksBuilder creates hook plugins from enabled plugin state. Called at startup and on hot-reload when a hook plugin is toggled.

type PluginToolsBuilder

type PluginToolsBuilder func(ctx context.Context, build plugintools.BuildContext) []tools.Tool

PluginToolsBuilder creates tools from enabled plugin state. Called per runner so tool builders receive the active sandbox host.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool manages a set of sessions, each with its own history and It is the only type channels interact with.

func NewPool

func NewPool(factory NewRunnerFunc, mem memory.Provider, opts ...PoolOption) *Pool

NewPool creates a new Pool with the given runner factory and memory provider. The memory provider is required — it is the sole persistence layer for sessions.

func (*Pool) ActiveSession

func (p *Pool) ActiveSession(channel string) (SessionInfo, bool)

ActiveSession returns the most recent non-archived session for a channel.

func (*Pool) AgentID

func (p *Pool) AgentID() string

AgentID returns the agent ID this pool belongs to.

func (*Pool) ArchiveSession

func (p *Pool) ArchiveSession(sessionID string) error

ArchiveSession marks a session as archived, closes its runner, but keeps history on disk. The session is removed from the in-memory map; its metadata persists in the index.

func (*Pool) Chat

func (p *Pool) Chat(ctx context.Context, sessionID string, message MessageContent, opts ...ChatOption) <-chan Event

Chat sends a message in a session and streams back events. Internally: gets/creates runner, passes history, collects events, appends to session log, streams to caller.

func (*Pool) Close

func (p *Pool) Close() error

Close shuts down all sessions and runners.

func (*Pool) CompactSession

func (p *Pool) CompactSession(ctx context.Context, sessionID string) (string, error)

CompactSession delegates compaction to the memory engine and returns the summary text on success.

func (*Pool) CreateSession

func (p *Pool) CreateSession(channel string, userID ...int64) (SessionInfo, error)

CreateSession creates a new session with a generated ID and persists its metadata.

func (*Pool) GetSession

func (p *Pool) GetSession(sessionID string) (SessionInfo, error)

GetSession returns metadata for a session.

func (*Pool) History

func (p *Pool) History(sessionID string) []ai.Message

History returns the message history for a session, loading from the memory provider. Returns nil if the session has no history or the provider does not support it.

func (*Pool) ListSessions

func (p *Pool) ListSessions(includeArchived bool) ([]SessionInfo, error)

ListSessions returns metadata for all sessions.

func (*Pool) NeedsCompaction

func (p *Pool) NeedsCompaction(sessionID string) bool

NeedsCompaction reports whether a session's estimated token count exceeds the compaction threshold. Returns false if compaction is disabled or the memory provider does not support compaction.

func (*Pool) ResetRunners

func (p *Pool) ResetRunners() error

ResetRunners closes all live session runners but keeps session metadata and history. The next chat on each session will recreate a runner from the current factory.

func (*Pool) ResetRunnersForUser

func (p *Pool) ResetRunnersForUser(userID int64) error

ResetRunnersForUser closes live runners belonging to a specific user. Other sessions are not affected. The next chat for that user recreates a

func (*Pool) ResolveSession

func (p *Pool) ResolveSession(channel string, userID ...int64) (SessionInfo, error)

ResolveSession returns the active session for a channel, creating one if needed. The check-and-create is atomic to prevent duplicate sessions under concurrent access. An optional userID associates the session with a user (stored in conversations table).

func (*Pool) RotateSession

func (p *Pool) RotateSession(channel string, userID ...int64) (SessionInfo, error)

RotateSession archives the active session for a channel (if any) and creates a new one.

func (*Pool) SetDefaultModel

func (p *Pool) SetDefaultModel(model string)

SetDefaultModel updates the default model used for new runners. Call this alongside SetFactory when the user switches models at runtime.

func (*Pool) SetFactory

func (p *Pool) SetFactory(factory NewRunnerFunc)

SetFactory replaces the runner factory used for new runners. Existing runners are not affected until their session is reset.

func (*Pool) SetFastModel

func (p *Pool) SetFastModel(model string)

SetFastModel updates the fast model used for compaction and utility work.

func (*Pool) SetHooks

func (p *Pool) SetHooks(fn func() []hooks.HookPlugin)

SetHooks updates the hook getter used when creating new runners. Changing hooks never requires rebuilding the factory or resetting sessions — new runners created after this call will use the updated hooks.

func (*Pool) StartReaper

func (p *Pool) StartReaper(ctx context.Context)

StartReaper runs a background goroutine that periodically checks for idle or dead runners. It returns when ctx is cancelled.

type PoolManager

type PoolManager struct {
	// contains filtered or unexported fields
}

PoolManager manages a map of agent ID to Pool. It reads enabled agents from the config Store and creates one Pool per agent.

func NewPoolManager

func NewPoolManager(store config.Store, mem memory.Provider, opts ...PoolManagerOption) *PoolManager

NewPoolManager creates a new PoolManager.

func (*PoolManager) AddBuiltinTool

func (pm *PoolManager) AddBuiltinTool(ctx context.Context, tool tools.Tool) error

AddBuiltinTool appends a tool to the shared builtin list and rebuilds all pool factories so subsequent runners see the new tool.

func (*PoolManager) Close

func (pm *PoolManager) Close() error

Close shuts down all pools and hook plugins.

func (*PoolManager) DefaultPool

func (pm *PoolManager) DefaultPool() *Pool

DefaultPool returns the first pool found in the map, or nil if empty. Useful for backward compatibility with code expecting a single pool.

func (*PoolManager) Get

func (pm *PoolManager) Get(agentID string) *Pool

Get returns the Pool for the given agent ID, or nil if not found.

func (*PoolManager) HookPlugins

func (pm *PoolManager) HookPlugins() []hooks.HookPlugin

HookPlugins returns a snapshot copy of the current enabled hook plugins. Returns a copy so callers cannot mutate or alias the internal slice.

func (*PoolManager) InvalidateUser

func (pm *PoolManager) InvalidateUser(userID int64) error

InvalidateUser closes all live runners for userID across all pools. Satisfies the credentials.RunnerInvalidator interface.

func (*PoolManager) ReloadPluginHooks

func (pm *PoolManager) ReloadPluginHooks(ctx context.Context) error

ReloadPluginHooks rebuilds the hook plugin set from current plugin state and propagates it to every pool. No factory rebuild is needed — hooks live on the Pool and are injected via RunnerParams at runner-creation time.

func (*PoolManager) ReloadPluginProviders

func (pm *PoolManager) ReloadPluginProviders(ctx context.Context) error

ReloadPluginProviders rebuilds the runner factory for every pool so new sessions pick up changed provider credentials or enabled state. Provider creds are resolved from the Snapshot at factory-build time, so a simple factory rebuild is sufficient.

func (*PoolManager) ReloadPluginTools

func (pm *PoolManager) ReloadPluginTools(ctx context.Context) error

ReloadPluginTools updates the runner factory for every pool. Plugin tools are built per runner so builders can receive the active sandbox host.

func (*PoolManager) SetOAuthRegistry

func (pm *PoolManager) SetOAuthRegistry(r *oauth.ProviderRegistry)

SetOAuthRegistry wires the provider registry into the pool manager. When a vault env loader is later set, the constructed TokenManager also receives the registry so runners can resolve oauth.* session env sources.

func (*PoolManager) SetTokenService

func (pm *PoolManager) SetTokenService(ctx context.Context, ts *auth.TokenService)

SetTokenService sets the token service and rebuilds all pool factories so new runners ensure STELLA_TOKEN.

func (*PoolManager) SetVaultEnvLoader

func (pm *PoolManager) SetVaultEnvLoader(ctx context.Context, v sandbox.VaultEnvLoader)

SetVaultEnvLoader sets the vault env loader and rebuilds all pool factories so existing pools pick up the loader. Must be called after StartAll. If vs also satisfies oauth.VaultStore, a TokenManager is constructed and wired into the pool manager so runners can inject runtime OAuth tokens.

func (*PoolManager) StartAll

func (pm *PoolManager) StartAll(ctx context.Context) error

StartAll reads enabled agents from the store, creates a Pool per agent with per-agent runner factory, and starts reapers.

func (*PoolManager) SyncAgent

func (pm *PoolManager) SyncAgent(ctx context.Context, agentID string) error

SyncAgent reloads one agent's pool configuration immediately. If the agent was deleted or disabled, its pool is closed and removed. If it exists and is enabled, an existing pool is rebuilt and its live session runners are reset so subsequent requests use the latest snapshot.

type PoolManagerOption

type PoolManagerOption func(*PoolManager)

PoolManagerOption configures a PoolManager.

func WithBeforeRunBuilderPM

func WithBeforeRunBuilderPM(b BeforeRunBuilder) PoolManagerOption

func WithBuiltinTools

func WithBuiltinTools(tools []tools.Tool) PoolManagerOption

WithBuiltinTools sets the always-on builtin tools available to all agents.

func WithBuiltinToolsFactory

func WithBuiltinToolsFactory(f BuiltinToolsFactory) PoolManagerOption

WithBuiltinToolsFactory sets the function that creates per-agent builtin tools.

func WithCompactionPM

func WithCompactionPM(cfg CompactionConfig) PoolManagerOption

WithCompactionPM sets the compaction config for all pools.

func WithIdleTimeoutPM

func WithIdleTimeoutPM(d time.Duration) PoolManagerOption

WithIdleTimeoutPM sets the idle timeout for all pools.

func WithPluginHooksBuilder

func WithPluginHooksBuilder(b PluginHooksBuilder) PoolManagerOption

WithPluginHooksBuilder sets the function that builds hooks from plugin state.

func WithPluginToolsBuilder

func WithPluginToolsBuilder(b PluginToolsBuilder) PoolManagerOption

WithPluginToolsBuilder sets the function that builds tools from plugin state.

func WithPromptSectionsBuilder

func WithPromptSectionsBuilder(b prompt.SectionsBuilder) PoolManagerOption

func WithPromptToolsBuilder

func WithPromptToolsBuilder(b prompt.ToolsBuilder) PoolManagerOption

func WithProviderStreamBuilder added in v0.28.0

func WithProviderStreamBuilder(b ProviderStreamBuilder) PoolManagerOption

func WithSkillStore

func WithSkillStore(s pkgplugins.SkillStore) PoolManagerOption

WithSkillStore sets the skill store for runner factories.

func WithTokenManager

func WithTokenManager(tm *oauth.TokenManager) PoolManagerOption

WithTokenManager sets the OAuth token manager for runtime token injection.

func WithTokenService

func WithTokenService(ts *auth.TokenService) PoolManagerOption

WithTokenService sets the auth token service for STELLA_TOKEN lifecycle.

func WithToolLifecyclePM

func WithToolLifecyclePM(tl *coreagent.ToolLifecycle) PoolManagerOption

func WithVaultEnvLoader

func WithVaultEnvLoader(v sandbox.VaultEnvLoader) PoolManagerOption

WithVaultEnvLoader sets the vault env loader for sandbox secret injection.

type PoolOption

type PoolOption func(*Pool)

PoolOption configures a Pool.

func WithAgentID

func WithAgentID(id string) PoolOption

WithAgentID sets the agent ID this pool belongs to.

func WithBeforeRunBuilder

func WithBeforeRunBuilder(b BeforeRunBuilder) PoolOption

WithBeforeRunBuilder sets the per-run lifecycle hook builder used by the pool.

func WithCompaction

func WithCompaction(cfg CompactionConfig) PoolOption

WithCompaction sets the compaction configuration.

func WithDefaultModel

func WithDefaultModel(model string) PoolOption

WithDefaultModel sets the default model ID for new runners.

func WithFastModel

func WithFastModel(model string) PoolOption

WithFastModel sets the model ID used for compaction and other fast tasks.

func WithIdleTimeout

func WithIdleTimeout(d time.Duration) PoolOption

WithIdleTimeout sets the idle timeout for reaping runners.

func WithSnapshotPromptBuilder

func WithSnapshotPromptBuilder(fn func(ctx context.Context, userID int64, agentID string, snap memory.SessionSnapshot) string) PoolOption

WithSnapshotPromptBuilder sets the function used to build per-turn system prompts from a frozen snapshot version. When set, the pool calls this on every Chat to produce a stable system prompt for the session's snapshot version.

type ProviderStreamBuilder added in v0.28.0

type ProviderStreamBuilder func(api, apiKey, baseURL string) (providers.StreamFunc, error)

type Runner

type Runner interface {
	Chat(ctx context.Context, history []ai.Message, message MessageContent) <-chan Event
	Alive() bool
	Busy() bool
	LastActivity() time.Time
	SystemPrompt() string
	Close() error
}

Runner runs prompts against an AI backend and exposes lifecycle methods.

type RunnerFactoryConfig added in v0.28.0

type RunnerFactoryConfig struct {
	Snap                     *config.Snapshot
	BuiltinTools             []tools.Tool
	PluginToolsBuilder       PluginToolsBuilder
	ProviderStreamBuilder    ProviderStreamBuilder
	PromptToolsBuilder       prompt.ToolsBuilder
	PromptSectionsBuilder    prompt.SectionsBuilder
	SessionPluginViewBuilder SessionPluginViewBuilder
	ToolLifecycle            *coreagent.ToolLifecycle
	SkillStore               pkgplugins.SkillStore
	SandboxBackendFn         func(ctx context.Context) string
	VaultEnvLoader           sandbox.VaultEnvLoader
	TokenService             *auth.TokenService
	TokenManager             *oauth.TokenManager
}

RunnerFactoryConfig holds all dependencies needed to create a NewRunnerFunc.

type RunnerParams

type RunnerParams struct {
	Model   string                    // model ID (empty = use default)
	Memory  any                       // memory.Provider — typed as any to avoid circular imports
	UserID  int64                     // auth user ID for user-scoped runner creation
	AgentID string                    // agent ID for profile loading
	HooksFn func() []hooks.HookPlugin // resolved at runner-creation time; nil = no hooks
}

RunnerParams holds parameters for creating a new Runner instance.

type Session

type Session struct {
	Info   SessionInfo
	Runner Runner
	Model  string // model ID the current runner was created with
}

Session holds the state of a single conversation: metadata and the currently assigned Message persistence is handled by the memory engine exclusively.

type SessionInfo

type SessionInfo = memory.SessionInfo

SessionInfo is an alias for memory.SessionInfo.

type SessionPluginViewBuilder

type SessionPluginViewBuilder func(ctx context.Context) (pkgplugins.SessionPluginView, error)

type ToolUseEvent

type ToolUseEvent struct {
	Tool   string // tool name, e.g. "bash", "read"
	Status string // "running", "done", "error"
	Input  string // short summary of the tool input
	Detail string // error detail or result summary (for "error" status)
}

ToolUseEvent describes a tool invocation in progress or completed.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL