Documentation
¶
Index ¶
- Constants
- Variables
- type CLISessionIO
- func (c *CLISessionIO) Close() error
- func (c *CLISessionIO) IsAlive() bool
- func (c *CLISessionIO) IsCLI() bool
- func (c *CLISessionIO) Logger() *slog.Logger
- func (c *CLISessionIO) Stderr() io.ReadCloser
- func (c *CLISessionIO) Stdout() io.ReadCloser
- func (c *CLISessionIO) WriteInput(msg map[string]any) error
- type CLISessionStarter
- type Callback
- type EngineOptions
- type HTTPSessionIO
- func (h *HTTPSessionIO) Close() error
- func (h *HTTPSessionIO) IsAlive() bool
- func (h *HTTPSessionIO) IsCLI() bool
- func (h *HTTPSessionIO) Logger() *slog.Logger
- func (h *HTTPSessionIO) SetCallback(cb SessionCallback)
- func (h *HTTPSessionIO) StartReading()
- func (h *HTTPSessionIO) WriteInput(msg map[string]any) error
- type HTTPSessionStarter
- type Session
- func (s *Session) GetCallback() Callback
- func (s *Session) GetExt() any
- func (s *Session) GetLastActive() time.Time
- func (s *Session) GetLogPath() string
- func (s *Session) GetStatus() SessionStatus
- func (s *Session) GetStatusChange() <-chan SessionStatus
- func (s *Session) IsAlive() bool
- func (s *Session) OpenLogFile() error
- func (s *Session) ReadStderr()
- func (s *Session) ReadStdout()
- func (s *Session) SetCallback(cb Callback)
- func (s *Session) SetExt(data any)
- func (s *Session) SetIOCallback(cb Callback)
- func (s *Session) SetStatus(status SessionStatus)
- func (s *Session) Touch()
- func (s *Session) Wait() error
- func (s *Session) WriteInput(msg map[string]any) error
- type SessionCallback
- type SessionConfig
- type SessionIO
- type SessionManager
- type SessionPool
- func (sm *SessionPool) CleanupSessionFiles(providerSessionID string, workDir string) error
- func (sm *SessionPool) DeleteMarker(providerSessionID string) error
- func (sm *SessionPool) GetOrCreateSession(ctx context.Context, sessionID string, cfg SessionConfig, prompt string) (*Session, bool, error)
- func (sm *SessionPool) GetSession(sessionID string) (*Session, bool)
- func (sm *SessionPool) ListActiveSessions() []*Session
- func (sm *SessionPool) SetStreamStore(saver StreamDataSaver)
- func (sm *SessionPool) Shutdown()
- func (sm *SessionPool) TerminateSession(sessionID string) error
- type SessionStarter
- type SessionStatus
- type StreamDataSaver
Constants ¶
const ( ScannerInitialBufSize = 4 * 1024 // 4 KB - start small ScannerMaxBufSize = 512 * 1024 // 512 KB - sufficient for most CLI outputs )
Scanner buffer sizes for CLI output parsing.
const (
DefaultReadyTimeout = 10 * time.Second // Maximum time to wait for session to be ready
)
Session lifecycle constants.
Variables ¶
var SessionLogDir = filepath.Join(os.Getenv("HOME"), ".hotplex", "logs")
SessionLogDir is the directory for session log files. Defaults to ~/.hotplex/logs/
Functions ¶
This section is empty.
Types ¶
type CLISessionIO ¶ added in v0.36.0
type CLISessionIO struct {
// contains filtered or unexported fields
}
CLISessionIO wraps os pipe handles for CLI subprocess transport.
func NewCLISessionIO ¶ added in v0.36.0
func NewCLISessionIO( cmd *exec.Cmd, stdin io.WriteCloser, stdout, stderr io.ReadCloser, cancel func(), logger *slog.Logger, ) *CLISessionIO
NewCLISessionIO creates a new CLISessionIO from pipe handles.
func (*CLISessionIO) Close ¶ added in v0.36.0
func (c *CLISessionIO) Close() error
Close implements SessionIO. It closes all pipes and cancels the subprocess context.
func (*CLISessionIO) IsAlive ¶ added in v0.36.0
func (c *CLISessionIO) IsAlive() bool
IsAlive implements SessionIO. Returns true if the CLI process is still running.
func (*CLISessionIO) IsCLI ¶ added in v0.36.0
func (c *CLISessionIO) IsCLI() bool
IsCLI implements SessionIO. Returns true for CLI transport.
func (*CLISessionIO) Logger ¶ added in v0.36.0
func (c *CLISessionIO) Logger() *slog.Logger
Logger implements SessionIO.
func (*CLISessionIO) Stderr ¶ added in v0.36.0
func (c *CLISessionIO) Stderr() io.ReadCloser
Stderr returns the stderr reader for the Session's read goroutines.
func (*CLISessionIO) Stdout ¶ added in v0.36.0
func (c *CLISessionIO) Stdout() io.ReadCloser
Stdout returns the stdout reader for the Session's read goroutines.
func (*CLISessionIO) WriteInput ¶ added in v0.36.0
func (c *CLISessionIO) WriteInput(msg map[string]any) error
WriteInput implements SessionIO.
type CLISessionStarter ¶ added in v0.36.0
type CLISessionStarter struct {
// contains filtered or unexported fields
}
CLISessionStarter spawns CLI subprocess sessions.
func NewCLISessionStarter ¶ added in v0.36.0
func NewCLISessionStarter( cliPath string, provider provider.Provider, markerStore persistence.SessionMarkerStore, logger *slog.Logger, opts EngineOptions, ) *CLISessionStarter
NewCLISessionStarter creates a CLISessionStarter from pool dependencies.
func (*CLISessionStarter) CleanupOnError ¶ added in v0.36.1
func (s *CLISessionStarter) CleanupOnError()
CleanupOnError is a no-op for CLISessionStarter. CLI sessions are cleaned up via Session.close() + sys.KillProcessGroup in cleanupSessionLocked.
func (*CLISessionStarter) StartSession ¶ added in v0.36.0
func (s *CLISessionStarter) StartSession( ctx context.Context, sessionID string, cfg SessionConfig, prompt string, cb Callback, ) (*Session, error)
StartSession starts a CLI subprocess session.
func (*CLISessionStarter) TransportType ¶ added in v0.36.0
func (s *CLISessionStarter) TransportType() string
TransportType implements SessionStarter.
type Callback ¶
Callback handles streaming events from the CLI. Events are dispatched as they occur, allowing real-time UI updates.
type EngineOptions ¶
type EngineOptions struct {
Timeout time.Duration // Maximum time to wait for a single execution turn to complete
IdleTimeout time.Duration // Time after which an idle session is eligible for termination
Logger *slog.Logger // Optional logger instance; defaults to slog.Default()
// Namespace is used to generate isolated, deterministic UUID v5 Session IDs.
// This ensures that the same Conversation ID creates an isolated but persistent sandbox,
// preventing cross-application or cross-user session leaks.
Namespace string
// Foundational Security & Context (Engine-level boundaries)
PermissionMode string // Controls CLI permissions (e.g., "bypassPermissions", "acceptEdits", "default"). Defaults to strict mode.
DangerouslySkipPermissions bool // Bypasses all permission checks. Equivalent to --dangerously-skip-permissions.
BaseSystemPrompt string // Foundational instructions injected at CLI startup for all sessions.
AllowedTools []string // Explicit list of tools allowed (whitelist). If empty, all tools are allowed.
DisallowedTools []string // Explicit list of tools forbidden (blacklist).
// AdminToken is the secret required to toggle security bypass mode.
// If empty, bypass will be disabled for security.
AdminToken string
// Provider is the AI CLI provider (e.g., Claude Code, OpenCode).
// If nil, defaults to ClaudeCodeProvider.
Provider provider.Provider
}
EngineOptions defines the configuration parameters for initializing a new Engine. It allows customization of timeouts, logging, and foundational security boundaries that apply to all sessions managed by this engine instance.
type HTTPSessionIO ¶ added in v0.36.0
type HTTPSessionIO struct {
// contains filtered or unexported fields
}
HTTPSessionIO sends prompts via HTTP POST and receives events via SSE.
func NewHTTPSessionIO ¶ added in v0.36.0
func NewHTTPSessionIO(transport provider.Transport, sessionID string, cancelFn func(), logger *slog.Logger) *HTTPSessionIO
NewHTTPSessionIO creates a new HTTPSessionIO wrapping a Transport and session ID. The callback must be set via SetCallback before StartReading is called.
func (*HTTPSessionIO) Close ¶ added in v0.36.0
func (h *HTTPSessionIO) Close() error
Close implements SessionIO. It cancels the SSE context and deletes the server session.
func (*HTTPSessionIO) IsAlive ¶ added in v0.36.0
func (h *HTTPSessionIO) IsAlive() bool
IsAlive implements SessionIO. Returns true if the HTTP session is not closed.
func (*HTTPSessionIO) IsCLI ¶ added in v0.36.0
func (h *HTTPSessionIO) IsCLI() bool
IsCLI implements SessionIO. Returns false for HTTP transport.
func (*HTTPSessionIO) Logger ¶ added in v0.36.0
func (h *HTTPSessionIO) Logger() *slog.Logger
Logger implements SessionIO.
func (*HTTPSessionIO) SetCallback ¶ added in v0.36.0
func (h *HTTPSessionIO) SetCallback(cb SessionCallback)
SetCallback sets the session callback for event dispatch. It also closes the startReadingGate to unblock the SSE reader goroutine. Safe to call multiple times: closing an already-closed channel recovers via deferred recover().
func (*HTTPSessionIO) StartReading ¶ added in v0.36.0
func (h *HTTPSessionIO) StartReading()
StartReading starts the SSE event dispatch loop. It blocks on startReadingGate until SetCallback is called (in runner.go), ensuring the SSE reader never processes events without a valid callback. Call via panicx.SafeGo.
func (*HTTPSessionIO) WriteInput ¶ added in v0.36.0
func (h *HTTPSessionIO) WriteInput(msg map[string]any) error
WriteInput implements SessionIO. It sends the message via HTTP POST to the opencode serve session. Holds h.mu through the entire Send to prevent Close() from racing in the gap between the closed-check and the network call.
type HTTPSessionStarter ¶ added in v0.36.0
type HTTPSessionStarter struct {
// contains filtered or unexported fields
}
HTTPSessionStarter connects to opencode serve via HTTP/SSE.
func NewHTTPSessionStarter ¶ added in v0.36.0
func NewHTTPSessionStarter(transport provider.Transport, provider provider.Provider, markerStore persistence.SessionMarkerStore, logger *slog.Logger, opts EngineOptions) *HTTPSessionStarter
NewHTTPSessionStarter creates an HTTPSessionStarter wrapping a Transport. The markerStore is used to detect session resumption across daemon restarts, mirroring the CLI session recovery mechanism.
func (*HTTPSessionStarter) CleanupOnError ¶ added in v0.36.1
func (s *HTTPSessionStarter) CleanupOnError()
CleanupOnError cleans up any partially-initialized HTTPSessionIO when StartSession returns an error. This is necessary because HTTPSessionStarter starts the StartReading goroutine before returning, and if StartSession returns an error, the goroutine's 30s timeout is the only automatic cleanup without this. Safe to call multiple times (idempotent).
func (*HTTPSessionStarter) StartSession ¶ added in v0.36.0
func (s *HTTPSessionStarter) StartSession( ctx context.Context, sessionID string, cfg SessionConfig, prompt string, cb Callback, ) (*Session, error)
StartSession creates a new HTTP/SSE session.
func (*HTTPSessionStarter) TransportType ¶ added in v0.36.0
func (s *HTTPSessionStarter) TransportType() string
TransportType implements SessionStarter.
type Session ¶
type Session struct {
ID string // Internal SDK identifier (provided by the user)
ProviderSessionID string // The deterministic UUID (v5) passed to CLI for persistent DB storage
Config SessionConfig // Snapshot of the configuration used to initialize the session
TaskInstructions string // Persistent instructions for the session
CreatedAt time.Time
LastActive time.Time
Status SessionStatus
IsResuming bool // True if session was resumed from persistent marker
FirstMessageOnSession bool // True until the first BuildInputMessage is sent (for HTTP hot-multiplexing gate)
// contains filtered or unexported fields
}
Session represents a persistent, hot-multiplexed instance of an AI CLI agent. It manages the underlying OS process group, handles streaming I/O via full-duplex pipes, and tracks the operational readiness and lifecycle status of the agent sandbox.
func NewTestSession ¶
func NewTestSession(id string, status SessionStatus) *Session
NewTestSession creates a Session for testing purposes. This should only be used in test code.
func (*Session) GetCallback ¶
GetCallback returns the current callback.
func (*Session) GetExt ¶ added in v0.8.1
GetExt retrieves the external state attached to the session.
func (*Session) GetLastActive ¶ added in v0.9.2
GetLastActive returns the last active time with proper locking.
func (*Session) GetLogPath ¶ added in v0.23.4
GetLogPath returns the path to the session log file.
func (*Session) GetStatus ¶
func (s *Session) GetStatus() SessionStatus
GetStatus returns the current session status.
func (*Session) GetStatusChange ¶
func (s *Session) GetStatusChange() <-chan SessionStatus
GetStatusChange returns the status change channel for waiting on status updates.
func (*Session) OpenLogFile ¶ added in v0.23.4
OpenLogFile opens a log file for this session in SessionLogDir.
func (*Session) ReadStderr ¶
func (s *Session) ReadStderr()
ReadStderr asynchronously reads CLI stderr to prevent buffer deadlocks. For CLI sessions, this reads from the subprocess stderr pipe. For HTTP sessions, this is a no-op.
func (*Session) ReadStdout ¶
func (s *Session) ReadStdout()
ReadStdout asynchronously reads CLI stdout, parses JSON, and dispatches callbacks. For CLI sessions, this reads from the subprocess stdout pipe. For HTTP sessions, this is a no-op (events are dispatched via HTTPSessionIO goroutines).
func (*Session) SetCallback ¶
SetCallback registers the callback to handle stream events for the current turn.
func (*Session) SetIOCallback ¶ added in v0.36.1
SetIOCallback propagates the callback to the underlying HTTP I/O layer. This is required for HTTP sessions where HTTPSessionIO.StartReading() is blocked by a gate until SetCallback is called. Without this, StartReading() times out after 30 seconds and all SSE events are silently dropped.
func (*Session) SetStatus ¶
func (s *Session) SetStatus(status SessionStatus)
SetStatus updates the session status with proper locking.
func (*Session) Wait ¶ added in v0.35.2
Wait reaps the process exactly once using sync.Once. Both the synchronous cleanup path (cleanupSessionLocked) and the async SafeGo goroutine (startSession) call this method; sync.Once ensures only the first call executes cmd.Wait(), and subsequent calls return immediately without contending for the exec.Cmd internal mutex or duplicating the reap.
type SessionCallback ¶ added in v0.36.0
SessionCallback handles streaming events from an active session. This is the concrete callback signature used by HTTPSessionIO goroutines. Events are dispatched as they occur, allowing real-time UI updates.
type SessionConfig ¶
type SessionConfig struct {
WorkDir string // Absolute path to the isolated sandbox directory
TaskInstructions string // Persistent instructions for the session
BaseSystemPrompt string // Session-level system prompt override (takes precedence over Engine-level)
IdleTimeout time.Duration // Per-session idle timeout; 0 means use pool default
Namespace string // Per-session namespace override; empty means use pool default
}
SessionConfig contains the minimal configuration needed for session management. This is a subset of the root Config to avoid circular dependencies.
type SessionIO ¶ added in v0.36.0
type SessionIO interface {
// WriteInput serializes msg as JSON and delivers it to the agent session.
WriteInput(msg map[string]any) error
// Close releases all resources held by the SessionIO.
// Close is idempotent and goroutine-safe.
Close() error
// Logger returns the session logger.
Logger() *slog.Logger
// IsCLI returns true if this is a CLI subprocess transport.
// Used by cleanupSessionLocked to determine cleanup strategy.
IsCLI() bool
// IsAlive returns true if the transport is still active.
// For CLI: checks if the process is still running.
// For HTTP: checks if the session connection is still open.
IsAlive() bool
}
SessionIO abstracts the I/O transport between HotPlex and the AI agent backend. Two implementations:
- CLISessionIO: stdin/stdout/stderr pipes to a subprocess
- HTTPSessionIO: HTTP client to opencode serve
Session holds a concrete SessionIO and calls its methods without nil checks.
type SessionManager ¶
type SessionManager interface {
// GetOrCreateSession retrieves an active session or performs a Cold Start if none exists.
// Returns the session, a boolean indicating if it was a Cold Start, and an error if any.
GetOrCreateSession(ctx context.Context, sessionID string, cfg SessionConfig, prompt string) (*Session, bool, error)
// GetSession performs a non-side-effect lookup of an active session.
GetSession(sessionID string) (*Session, bool)
// TerminateSession kills the OS process group and removes the session from the pool.
TerminateSession(sessionID string) error
// ListActiveSessions provides a snapshot of all tracked sessions.
ListActiveSessions() []*Session
// Shutdown performing a total cleanup of the pool and its background workers.
Shutdown()
}
SessionManager defines the behavioral interface for managing a process pool.
type SessionPool ¶
type SessionPool struct {
// contains filtered or unexported fields
}
func NewSessionPool ¶
func NewSessionPool(logger *slog.Logger, timeout time.Duration, opts EngineOptions, starter SessionStarter, prv provider.Provider) *SessionPool
NewSessionPool creates a new session manager with default file-based marker storage.
func (*SessionPool) CleanupSessionFiles ¶ added in v0.17.0
func (sm *SessionPool) CleanupSessionFiles(providerSessionID string, workDir string) error
CleanupSessionFiles proxies the cleanup call to the underlying provider.
func (*SessionPool) DeleteMarker ¶ added in v0.17.0
func (sm *SessionPool) DeleteMarker(providerSessionID string) error
DeleteMarker removes the HotPlex session marker file, preventing future resumption.
func (*SessionPool) GetOrCreateSession ¶
func (sm *SessionPool) GetOrCreateSession(ctx context.Context, sessionID string, cfg SessionConfig, prompt string) (*Session, bool, error)
GetOrCreateSession returns an existing session or starts a new one.
func (*SessionPool) GetSession ¶
func (sm *SessionPool) GetSession(sessionID string) (*Session, bool)
GetSession retrieves an active session.
func (*SessionPool) ListActiveSessions ¶
func (sm *SessionPool) ListActiveSessions() []*Session
ListActiveSessions returns all active sessions.
func (*SessionPool) SetStreamStore ¶ added in v0.35.4
func (sm *SessionPool) SetStreamStore(saver StreamDataSaver)
SetStreamStore sets the stream data saver for protecting uncommitted stream data before session termination. This is typically called by chatapps adapters that enable message storage with streaming.
func (*SessionPool) Shutdown ¶
func (sm *SessionPool) Shutdown()
Shutdown gracefully stops the session manager and all active sessions.
func (*SessionPool) TerminateSession ¶
func (sm *SessionPool) TerminateSession(sessionID string) error
TerminateSession stops and removes a session.
type SessionStarter ¶ added in v0.36.0
type SessionStarter interface {
// StartSession starts a new session and returns it fully initialized.
// The callback is invoked for each raw event from the session.
StartSession(ctx context.Context, sessionID string, cfg SessionConfig, prompt string, cb Callback) (*Session, error)
// TransportType returns the transport type identifier ("cli" or "http").
TransportType() string
// CleanupOnError cleans up any partially-initialized session resources
// when StartSession returns an error. This is needed because HTTPSessionStarter
// starts the StartReading goroutine before returning; if StartSession returns
// an error, the goroutine's 30s timeout is the only cleanup unless this is called.
// For CLISessionStarter this is a no-op.
CleanupOnError()
}
SessionStarter creates and manages agent sessions. Two implementations:
- CLISessionStarter: spawns a local CLI subprocess
- HTTPSessionStarter: connects to an HTTP/SSE server
type SessionStatus ¶
type SessionStatus string
SessionStatus defines the current state of a session.
const ( SessionStatusStarting SessionStatus = "starting" SessionStatusReady SessionStatus = "ready" SessionStatusBusy SessionStatus = "busy" SessionStatusDead SessionStatus = "dead" )
type StreamDataSaver ¶ added in v0.35.4
type StreamDataSaver interface {
// GetBuffer retrieves the stream buffer for a session.
// Returns nil if no buffer exists.
GetBuffer(sessionID string) any
// SaveIncompleteStream saves uncommitted stream data before termination.
// This is a synchronous operation to ensure data is persisted before
// the session is destroyed.
SaveIncompleteStream(ctx context.Context, sessionID string, buffer any) error
}
StreamDataSaver protects uncommitted stream data before session termination. Implemented by chatapps/base.StreamMessageStore to flush incomplete buffers.