Documentation
¶
Index ¶
- Constants
- Variables
- func ReadStderrFromHTTPSession(_ *slog.Logger)
- type CLISessionIO
- func (c *CLISessionIO) Close() error
- func (c *CLISessionIO) IsAlive() bool
- func (c *CLISessionIO) IsCLI() bool
- func (c *CLISessionIO) Logger() *slog.Logger
- func (c *CLISessionIO) Stderr() io.ReadCloser
- func (c *CLISessionIO) Stdout() io.ReadCloser
- func (c *CLISessionIO) WriteInput(msg map[string]any) error
- type CLISessionStarter
- type Callback
- type EngineOptions
- type HTTPSessionIO
- func (h *HTTPSessionIO) Close() error
- func (h *HTTPSessionIO) IsAlive() bool
- func (h *HTTPSessionIO) IsCLI() bool
- func (h *HTTPSessionIO) Logger() *slog.Logger
- func (h *HTTPSessionIO) SetCallback(cb SessionCallback)
- func (h *HTTPSessionIO) StartReading()
- func (h *HTTPSessionIO) WriteInput(msg map[string]any) error
- type HTTPSessionStarter
- type Session
- func (s *Session) GetCallback() Callback
- func (s *Session) GetExt() any
- func (s *Session) GetLastActive() time.Time
- func (s *Session) GetLogPath() string
- func (s *Session) GetStatus() SessionStatus
- func (s *Session) GetStatusChange() <-chan SessionStatus
- func (s *Session) IsAlive() bool
- func (s *Session) OpenLogFile() error
- func (s *Session) ReadStderr()
- func (s *Session) ReadStdout()
- func (s *Session) SetCallback(cb Callback)
- func (s *Session) SetExt(data any)
- func (s *Session) SetStatus(status SessionStatus)
- func (s *Session) Touch()
- func (s *Session) Wait() error
- func (s *Session) WriteInput(msg map[string]any) error
- type SessionCallback
- type SessionConfig
- type SessionIO
- type SessionManager
- type SessionPool
- func (sm *SessionPool) CleanupSessionFiles(providerSessionID string, workDir string) error
- func (sm *SessionPool) DeleteMarker(providerSessionID string) error
- func (sm *SessionPool) GetOrCreateSession(ctx context.Context, sessionID string, cfg SessionConfig, prompt string) (*Session, bool, error)
- func (sm *SessionPool) GetSession(sessionID string) (*Session, bool)
- func (sm *SessionPool) ListActiveSessions() []*Session
- func (sm *SessionPool) SetStreamStore(saver StreamDataSaver)
- func (sm *SessionPool) Shutdown()
- func (sm *SessionPool) TerminateSession(sessionID string) error
- type SessionStarter
- type SessionStatus
- type StreamDataSaver
Constants ¶
const ( ScannerInitialBufSize = 4 * 1024 // 4 KB - start small ScannerMaxBufSize = 512 * 1024 // 512 KB - sufficient for most CLI outputs )
Scanner buffer sizes for CLI output parsing.
const (
DefaultReadyTimeout = 10 * time.Second // Maximum time to wait for session to be ready
)
Session lifecycle constants.
Variables ¶
var SessionLogDir = filepath.Join(os.Getenv("HOME"), ".hotplex", "logs")
SessionLogDir is the directory for session log files. Defaults to ~/.hotplex/logs/
Functions ¶
func ReadStderrFromHTTPSession ¶ added in v0.36.0
ReadStderrFromHTTPSession is a no-op for HTTP sessions (no stderr).
Types ¶
type CLISessionIO ¶ added in v0.36.0
type CLISessionIO struct {
// contains filtered or unexported fields
}
CLISessionIO wraps os pipe handles for CLI subprocess transport.
func NewCLISessionIO ¶ added in v0.36.0
func NewCLISessionIO( cmd *exec.Cmd, stdin io.WriteCloser, stdout, stderr io.ReadCloser, cancel func(), logger *slog.Logger, ) *CLISessionIO
NewCLISessionIO creates a new CLISessionIO from pipe handles.
func (*CLISessionIO) Close ¶ added in v0.36.0
func (c *CLISessionIO) Close() error
Close implements SessionIO. It closes all pipes and cancels the subprocess context.
func (*CLISessionIO) IsAlive ¶ added in v0.36.0
func (c *CLISessionIO) IsAlive() bool
IsAlive implements SessionIO. Returns true if the CLI process is still running.
func (*CLISessionIO) IsCLI ¶ added in v0.36.0
func (c *CLISessionIO) IsCLI() bool
IsCLI implements SessionIO. Returns true for CLI transport.
func (*CLISessionIO) Logger ¶ added in v0.36.0
func (c *CLISessionIO) Logger() *slog.Logger
Logger implements SessionIO.
func (*CLISessionIO) Stderr ¶ added in v0.36.0
func (c *CLISessionIO) Stderr() io.ReadCloser
Stderr returns the stderr reader for the Session's read goroutines.
func (*CLISessionIO) Stdout ¶ added in v0.36.0
func (c *CLISessionIO) Stdout() io.ReadCloser
Stdout returns the stdout reader for the Session's read goroutines.
func (*CLISessionIO) WriteInput ¶ added in v0.36.0
func (c *CLISessionIO) WriteInput(msg map[string]any) error
WriteInput implements SessionIO.
type CLISessionStarter ¶ added in v0.36.0
type CLISessionStarter struct {
// contains filtered or unexported fields
}
CLISessionStarter spawns CLI subprocess sessions.
func NewCLISessionStarter ¶ added in v0.36.0
func NewCLISessionStarter( cliPath string, provider provider.Provider, markerStore persistence.SessionMarkerStore, logger *slog.Logger, opts EngineOptions, ) *CLISessionStarter
NewCLISessionStarter creates a CLISessionStarter from pool dependencies.
func (*CLISessionStarter) StartSession ¶ added in v0.36.0
func (s *CLISessionStarter) StartSession( ctx context.Context, sessionID string, cfg SessionConfig, prompt string, cb Callback, ) (*Session, error)
StartSession starts a CLI subprocess session.
func (*CLISessionStarter) TransportType ¶ added in v0.36.0
func (s *CLISessionStarter) TransportType() string
TransportType implements SessionStarter.
type Callback ¶
Callback handles streaming events from the CLI. Events are dispatched as they occur, allowing real-time UI updates.
type EngineOptions ¶
type EngineOptions struct {
Timeout time.Duration // Maximum time to wait for a single execution turn to complete
IdleTimeout time.Duration // Time after which an idle session is eligible for termination
Logger *slog.Logger // Optional logger instance; defaults to slog.Default()
// Namespace is used to generate isolated, deterministic UUID v5 Session IDs.
// This ensures that the same Conversation ID creates an isolated but persistent sandbox,
// preventing cross-application or cross-user session leaks.
Namespace string
// Foundational Security & Context (Engine-level boundaries)
PermissionMode string // Controls CLI permissions (e.g., "bypassPermissions", "acceptEdits", "default"). Defaults to strict mode.
DangerouslySkipPermissions bool // Bypasses all permission checks. Equivalent to --dangerously-skip-permissions.
BaseSystemPrompt string // Foundational instructions injected at CLI startup for all sessions.
AllowedTools []string // Explicit list of tools allowed (whitelist). If empty, all tools are allowed.
DisallowedTools []string // Explicit list of tools forbidden (blacklist).
// AdminToken is the secret required to toggle security bypass mode.
// If empty, bypass will be disabled for security.
AdminToken string
// Provider is the AI CLI provider (e.g., Claude Code, OpenCode).
// If nil, defaults to ClaudeCodeProvider.
Provider provider.Provider
}
EngineOptions defines the configuration parameters for initializing a new Engine. It allows customization of timeouts, logging, and foundational security boundaries that apply to all sessions managed by this engine instance.
type HTTPSessionIO ¶ added in v0.36.0
type HTTPSessionIO struct {
// contains filtered or unexported fields
}
HTTPSessionIO sends prompts via HTTP POST and receives events via SSE.
func NewHTTPSessionIO ¶ added in v0.36.0
func NewHTTPSessionIO(transport provider.Transport, sessionID string, cancelFn func(), logger *slog.Logger) *HTTPSessionIO
NewHTTPSessionIO creates a new HTTPSessionIO wrapping a Transport and session ID. The callback must be set via SetCallback before StartReading is called.
func (*HTTPSessionIO) Close ¶ added in v0.36.0
func (h *HTTPSessionIO) Close() error
Close implements SessionIO. It cancels the SSE context and deletes the server session.
func (*HTTPSessionIO) IsAlive ¶ added in v0.36.0
func (h *HTTPSessionIO) IsAlive() bool
IsAlive implements SessionIO. Returns true if the HTTP session is not closed.
func (*HTTPSessionIO) IsCLI ¶ added in v0.36.0
func (h *HTTPSessionIO) IsCLI() bool
IsCLI implements SessionIO. Returns false for HTTP transport.
func (*HTTPSessionIO) Logger ¶ added in v0.36.0
func (h *HTTPSessionIO) Logger() *slog.Logger
Logger implements SessionIO.
func (*HTTPSessionIO) SetCallback ¶ added in v0.36.0
func (h *HTTPSessionIO) SetCallback(cb SessionCallback)
SetCallback sets the session callback for event dispatch.
func (*HTTPSessionIO) StartReading ¶ added in v0.36.0
func (h *HTTPSessionIO) StartReading()
StartReading starts the SSE event dispatch loop. It reads from the subscribed events channel and dispatches "raw_line" and "runner_exit" callbacks. Call via panicx.SafeGo.
func (*HTTPSessionIO) WriteInput ¶ added in v0.36.0
func (h *HTTPSessionIO) WriteInput(msg map[string]any) error
WriteInput implements SessionIO. It sends the message via HTTP POST to the opencode serve session.
type HTTPSessionStarter ¶ added in v0.36.0
type HTTPSessionStarter struct {
// contains filtered or unexported fields
}
HTTPSessionStarter connects to opencode serve via HTTP/SSE.
func NewHTTPSessionStarter ¶ added in v0.36.0
func NewHTTPSessionStarter(transport provider.Transport, provider provider.Provider, logger *slog.Logger, opts EngineOptions) *HTTPSessionStarter
NewHTTPSessionStarter creates an HTTPSessionStarter wrapping a Transport.
func (*HTTPSessionStarter) StartSession ¶ added in v0.36.0
func (s *HTTPSessionStarter) StartSession( ctx context.Context, sessionID string, cfg SessionConfig, prompt string, cb Callback, ) (*Session, error)
StartSession creates a new HTTP/SSE session.
func (*HTTPSessionStarter) TransportType ¶ added in v0.36.0
func (s *HTTPSessionStarter) TransportType() string
TransportType implements SessionStarter.
type Session ¶
type Session struct {
ID string // Internal SDK identifier (provided by the user)
ProviderSessionID string // The deterministic UUID (v5) passed to CLI for persistent DB storage
Config SessionConfig // Snapshot of the configuration used to initialize the session
TaskInstructions string // Persistent instructions for the session
CreatedAt time.Time
LastActive time.Time
Status SessionStatus
IsResuming bool // True if session was resumed from persistent marker
// contains filtered or unexported fields
}
Session represents a persistent, hot-multiplexed instance of an AI CLI agent. It manages the underlying OS process group, handles streaming I/O via full-duplex pipes, and tracks the operational readiness and lifecycle status of the agent sandbox.
func NewTestSession ¶
func NewTestSession(id string, status SessionStatus) *Session
NewTestSession creates a Session for testing purposes. This should only be used in test code.
func (*Session) GetCallback ¶
GetCallback returns the current callback.
func (*Session) GetExt ¶ added in v0.8.1
GetExt retrieves the external state attached to the session.
func (*Session) GetLastActive ¶ added in v0.9.2
GetLastActive returns the last active time with proper locking.
func (*Session) GetLogPath ¶ added in v0.23.4
GetLogPath returns the path to the session log file.
func (*Session) GetStatus ¶
func (s *Session) GetStatus() SessionStatus
GetStatus returns the current session status.
func (*Session) GetStatusChange ¶
func (s *Session) GetStatusChange() <-chan SessionStatus
GetStatusChange returns the status change channel for waiting on status updates.
func (*Session) OpenLogFile ¶ added in v0.23.4
OpenLogFile opens a log file for this session in SessionLogDir.
func (*Session) ReadStderr ¶
func (s *Session) ReadStderr()
ReadStderr asynchronously reads CLI stderr to prevent buffer deadlocks. For CLI sessions, this reads from the subprocess stderr pipe. For HTTP sessions, this is a no-op.
func (*Session) ReadStdout ¶
func (s *Session) ReadStdout()
ReadStdout asynchronously reads CLI stdout, parses JSON, and dispatches callbacks. For CLI sessions, this reads from the subprocess stdout pipe. For HTTP sessions, this is a no-op (events are dispatched via HTTPSessionIO goroutines).
func (*Session) SetCallback ¶
SetCallback registers the callback to handle stream events for the current turn.
func (*Session) SetStatus ¶
func (s *Session) SetStatus(status SessionStatus)
SetStatus updates the session status with proper locking.
func (*Session) Wait ¶ added in v0.35.2
Wait reaps the process exactly once using sync.Once. Both the synchronous cleanup path (cleanupSessionLocked) and the async SafeGo goroutine (startSession) call this method; sync.Once ensures only the first call executes cmd.Wait(), and subsequent calls return immediately without contending for the exec.Cmd internal mutex or duplicating the reap.
type SessionCallback ¶ added in v0.36.0
SessionCallback handles streaming events from an active session. This is the concrete callback signature used by HTTPSessionIO goroutines. Events are dispatched as they occur, allowing real-time UI updates.
type SessionConfig ¶
type SessionConfig struct {
WorkDir string // Absolute path to the isolated sandbox directory
TaskInstructions string // Persistent instructions for the session
BaseSystemPrompt string // Session-level system prompt override (takes precedence over Engine-level)
IdleTimeout time.Duration // Per-session idle timeout; 0 means use pool default
Namespace string // Per-session namespace override; empty means use pool default
}
SessionConfig contains the minimal configuration needed for session management. This is a subset of the root Config to avoid circular dependencies.
type SessionIO ¶ added in v0.36.0
type SessionIO interface {
// WriteInput serializes msg as JSON and delivers it to the agent session.
WriteInput(msg map[string]any) error
// Close releases all resources held by the SessionIO.
// Close is idempotent and goroutine-safe.
Close() error
// Logger returns the session logger.
Logger() *slog.Logger
// IsCLI returns true if this is a CLI subprocess transport.
// Used by cleanupSessionLocked to determine cleanup strategy.
IsCLI() bool
// IsAlive returns true if the transport is still active.
// For CLI: checks if the process is still running.
// For HTTP: checks if the session connection is still open.
IsAlive() bool
}
SessionIO abstracts the I/O transport between HotPlex and the AI agent backend. Two implementations:
- CLISessionIO: stdin/stdout/stderr pipes to a subprocess
- HTTPSessionIO: HTTP client to opencode serve
Session holds a concrete SessionIO and calls its methods without nil checks.
type SessionManager ¶
type SessionManager interface {
// GetOrCreateSession retrieves an active session or performs a Cold Start if none exists.
// Returns the session, a boolean indicating if it was a Cold Start, and an error if any.
GetOrCreateSession(ctx context.Context, sessionID string, cfg SessionConfig, prompt string) (*Session, bool, error)
// GetSession performs a non-side-effect lookup of an active session.
GetSession(sessionID string) (*Session, bool)
// TerminateSession kills the OS process group and removes the session from the pool.
TerminateSession(sessionID string) error
// ListActiveSessions provides a snapshot of all tracked sessions.
ListActiveSessions() []*Session
// Shutdown performing a total cleanup of the pool and its background workers.
Shutdown()
}
SessionManager defines the behavioral interface for managing a process pool.
type SessionPool ¶
type SessionPool struct {
// contains filtered or unexported fields
}
func NewSessionPool ¶
func NewSessionPool(logger *slog.Logger, timeout time.Duration, opts EngineOptions, starter SessionStarter, prv provider.Provider) *SessionPool
NewSessionPool creates a new session manager with default file-based marker storage.
func (*SessionPool) CleanupSessionFiles ¶ added in v0.17.0
func (sm *SessionPool) CleanupSessionFiles(providerSessionID string, workDir string) error
CleanupSessionFiles proxies the cleanup call to the underlying provider.
func (*SessionPool) DeleteMarker ¶ added in v0.17.0
func (sm *SessionPool) DeleteMarker(providerSessionID string) error
DeleteMarker removes the HotPlex session marker file, preventing future resumption.
func (*SessionPool) GetOrCreateSession ¶
func (sm *SessionPool) GetOrCreateSession(ctx context.Context, sessionID string, cfg SessionConfig, prompt string) (*Session, bool, error)
GetOrCreateSession returns an existing session or starts a new one.
func (*SessionPool) GetSession ¶
func (sm *SessionPool) GetSession(sessionID string) (*Session, bool)
GetSession retrieves an active session.
func (*SessionPool) ListActiveSessions ¶
func (sm *SessionPool) ListActiveSessions() []*Session
ListActiveSessions returns all active sessions.
func (*SessionPool) SetStreamStore ¶ added in v0.35.4
func (sm *SessionPool) SetStreamStore(saver StreamDataSaver)
SetStreamStore sets the stream data saver for protecting uncommitted stream data before session termination. This is typically called by chatapps adapters that enable message storage with streaming.
func (*SessionPool) Shutdown ¶
func (sm *SessionPool) Shutdown()
Shutdown gracefully stops the session manager and all active sessions.
func (*SessionPool) TerminateSession ¶
func (sm *SessionPool) TerminateSession(sessionID string) error
TerminateSession stops and removes a session.
type SessionStarter ¶ added in v0.36.0
type SessionStarter interface {
// StartSession starts a new session and returns it fully initialized.
// The callback is invoked for each raw event from the session.
StartSession(ctx context.Context, sessionID string, cfg SessionConfig, prompt string, cb Callback) (*Session, error)
// TransportType returns the transport type identifier ("cli" or "http").
TransportType() string
}
SessionStarter creates and manages agent sessions. Two implementations:
- CLISessionStarter: spawns a local CLI subprocess
- HTTPSessionStarter: connects to an HTTP/SSE server
type SessionStatus ¶
type SessionStatus string
SessionStatus defines the current state of a session.
const ( SessionStatusStarting SessionStatus = "starting" SessionStatusReady SessionStatus = "ready" SessionStatusBusy SessionStatus = "busy" SessionStatusDead SessionStatus = "dead" )
type StreamDataSaver ¶ added in v0.35.4
type StreamDataSaver interface {
// GetBuffer retrieves the stream buffer for a session.
// Returns nil if no buffer exists.
GetBuffer(sessionID string) any
// SaveIncompleteStream saves uncommitted stream data before termination.
// This is a synchronous operation to ensure data is persisted before
// the session is destroyed.
SaveIncompleteStream(ctx context.Context, sessionID string, buffer any) error
}
StreamDataSaver protects uncommitted stream data before session termination. Implemented by chatapps/base.StreamMessageStore to flush incomplete buffers.