engine

package
v0.36.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2026 License: MIT Imports: 19 Imported by: 0

README

Engine Package (internal/engine)

Core session management and process pool implementation for HotPlex.

Overview

This package implements the Hot-Multiplexing pattern, maintaining persistent CLI agent processes that can be reused across multiple execution turns. This eliminates the cold-start latency of spawning heavy Node.js processes for each request.

Key Components

Component Description
SessionPool Thread-safe process pool with idle GC
Session Individual CLI process wrapper with full-duplex I/O
SessionManager Interface for process lifecycle management

Architecture

SessionPool
    ├── sessions map[string]*Session  (active sessions)
    ├── mu sync.RWMutex               (thread-safe access)
    ├── markerStore                   (session persistence)
    └── cleanupLoop()                 (idle session GC)

Usage

import "github.com/hrygo/hotplex/internal/engine"

// Create session pool
pool := engine.NewSessionPool(
    logger,
    30*time.Minute,  // idle timeout
    opts,            // engine options
    cliPath,         // CLI binary path
    provider,        // CLI provider
)

// Get or create session
// SessionConfig contains: WorkDir, Env, Instruction, etc.
session, created, err := pool.GetOrCreateSession(ctx, sessionID, cfg, prompt)

// Execute command
err := session.Execute(ctx, prompt, callback)

// Shutdown pool (graceful)
pool.Shutdown(ctx)

Design Principles

  • PGID Isolation: Each session runs in its own process group for clean termination
  • Idle GC: Inactive sessions are garbage collected after timeout (default: 30m)
  • Deterministic ID: Supports mapping platform-specific IDs to unique session markers
  • Thread Safety: All operations are protected by sync.RWMutex
  • Graceful Shutdown: Respects context cancellation and cleans up all processes in the pool

Files

File Purpose
pool.go SessionPool implementation
session.go Session lifecycle and I/O handling
types.go Type definitions and interfaces

Documentation

Index

Constants

View Source
const (
	ScannerInitialBufSize = 4 * 1024   // 4 KB - start small
	ScannerMaxBufSize     = 512 * 1024 // 512 KB - sufficient for most CLI outputs
)

Scanner buffer sizes for CLI output parsing.

View Source
const (
	DefaultReadyTimeout = 10 * time.Second // Maximum time to wait for session to be ready
)

Session lifecycle constants.

Variables

View Source
var SessionLogDir = filepath.Join(os.Getenv("HOME"), ".hotplex", "logs")

SessionLogDir is the directory for session log files. Defaults to ~/.hotplex/logs/

Functions

func ReadStderrFromHTTPSession added in v0.36.0

func ReadStderrFromHTTPSession(_ *slog.Logger)

ReadStderrFromHTTPSession is a no-op for HTTP sessions (no stderr).

Types

type CLISessionIO added in v0.36.0

type CLISessionIO struct {
	// contains filtered or unexported fields
}

CLISessionIO wraps os pipe handles for CLI subprocess transport.

func NewCLISessionIO added in v0.36.0

func NewCLISessionIO(
	cmd *exec.Cmd,
	stdin io.WriteCloser,
	stdout, stderr io.ReadCloser,
	cancel func(),
	logger *slog.Logger,
) *CLISessionIO

NewCLISessionIO creates a new CLISessionIO from pipe handles.

func (*CLISessionIO) Close added in v0.36.0

func (c *CLISessionIO) Close() error

Close implements SessionIO. It closes all pipes and cancels the subprocess context.

func (*CLISessionIO) IsAlive added in v0.36.0

func (c *CLISessionIO) IsAlive() bool

IsAlive implements SessionIO. Returns true if the CLI process is still running.

func (*CLISessionIO) IsCLI added in v0.36.0

func (c *CLISessionIO) IsCLI() bool

IsCLI implements SessionIO. Returns true for CLI transport.

func (*CLISessionIO) Logger added in v0.36.0

func (c *CLISessionIO) Logger() *slog.Logger

Logger implements SessionIO.

func (*CLISessionIO) Stderr added in v0.36.0

func (c *CLISessionIO) Stderr() io.ReadCloser

Stderr returns the stderr reader for the Session's read goroutines.

func (*CLISessionIO) Stdout added in v0.36.0

func (c *CLISessionIO) Stdout() io.ReadCloser

Stdout returns the stdout reader for the Session's read goroutines.

func (*CLISessionIO) WriteInput added in v0.36.0

func (c *CLISessionIO) WriteInput(msg map[string]any) error

WriteInput implements SessionIO.

type CLISessionStarter added in v0.36.0

type CLISessionStarter struct {
	// contains filtered or unexported fields
}

CLISessionStarter spawns CLI subprocess sessions.

func NewCLISessionStarter added in v0.36.0

func NewCLISessionStarter(
	cliPath string,
	provider provider.Provider,
	markerStore persistence.SessionMarkerStore,
	logger *slog.Logger,
	opts EngineOptions,
) *CLISessionStarter

NewCLISessionStarter creates a CLISessionStarter from pool dependencies.

func (*CLISessionStarter) StartSession added in v0.36.0

func (s *CLISessionStarter) StartSession(
	ctx context.Context,
	sessionID string,
	cfg SessionConfig,
	prompt string,
	cb Callback,
) (*Session, error)

StartSession starts a CLI subprocess session.

func (*CLISessionStarter) TransportType added in v0.36.0

func (s *CLISessionStarter) TransportType() string

TransportType implements SessionStarter.

type Callback

type Callback func(eventType string, data any) error

Callback handles streaming events from the CLI. Events are dispatched as they occur, allowing real-time UI updates.

type EngineOptions

type EngineOptions struct {
	Timeout     time.Duration // Maximum time to wait for a single execution turn to complete
	IdleTimeout time.Duration // Time after which an idle session is eligible for termination
	Logger      *slog.Logger  // Optional logger instance; defaults to slog.Default()

	// Namespace is used to generate isolated, deterministic UUID v5 Session IDs.
	// This ensures that the same Conversation ID creates an isolated but persistent sandbox,
	// preventing cross-application or cross-user session leaks.
	Namespace string

	// Foundational Security & Context (Engine-level boundaries)
	PermissionMode             string   // Controls CLI permissions (e.g., "bypassPermissions", "acceptEdits", "default"). Defaults to strict mode.
	DangerouslySkipPermissions bool     // Bypasses all permission checks. Equivalent to --dangerously-skip-permissions.
	BaseSystemPrompt           string   // Foundational instructions injected at CLI startup for all sessions.
	AllowedTools               []string // Explicit list of tools allowed (whitelist). If empty, all tools are allowed.
	DisallowedTools            []string // Explicit list of tools forbidden (blacklist).

	// AdminToken is the secret required to toggle security bypass mode.
	// If empty, bypass will be disabled for security.
	AdminToken string

	// Provider is the AI CLI provider (e.g., Claude Code, OpenCode).
	// If nil, defaults to ClaudeCodeProvider.
	Provider provider.Provider
}

EngineOptions defines the configuration parameters for initializing a new Engine. It allows customization of timeouts, logging, and foundational security boundaries that apply to all sessions managed by this engine instance.

type HTTPSessionIO added in v0.36.0

type HTTPSessionIO struct {
	// contains filtered or unexported fields
}

HTTPSessionIO sends prompts via HTTP POST and receives events via SSE.

func NewHTTPSessionIO added in v0.36.0

func NewHTTPSessionIO(transport provider.Transport, sessionID string, cancelFn func(), logger *slog.Logger) *HTTPSessionIO

NewHTTPSessionIO creates a new HTTPSessionIO wrapping a Transport and session ID. The callback must be set via SetCallback before StartReading is called.

func (*HTTPSessionIO) Close added in v0.36.0

func (h *HTTPSessionIO) Close() error

Close implements SessionIO. It cancels the SSE context and deletes the server session.

func (*HTTPSessionIO) IsAlive added in v0.36.0

func (h *HTTPSessionIO) IsAlive() bool

IsAlive implements SessionIO. Returns true if the HTTP session is not closed.

func (*HTTPSessionIO) IsCLI added in v0.36.0

func (h *HTTPSessionIO) IsCLI() bool

IsCLI implements SessionIO. Returns false for HTTP transport.

func (*HTTPSessionIO) Logger added in v0.36.0

func (h *HTTPSessionIO) Logger() *slog.Logger

Logger implements SessionIO.

func (*HTTPSessionIO) SetCallback added in v0.36.0

func (h *HTTPSessionIO) SetCallback(cb SessionCallback)

SetCallback sets the session callback for event dispatch.

func (*HTTPSessionIO) StartReading added in v0.36.0

func (h *HTTPSessionIO) StartReading()

StartReading starts the SSE event dispatch loop. It reads from the subscribed events channel and dispatches "raw_line" and "runner_exit" callbacks. Call via panicx.SafeGo.

func (*HTTPSessionIO) WriteInput added in v0.36.0

func (h *HTTPSessionIO) WriteInput(msg map[string]any) error

WriteInput implements SessionIO. It sends the message via HTTP POST to the opencode serve session.

type HTTPSessionStarter added in v0.36.0

type HTTPSessionStarter struct {
	// contains filtered or unexported fields
}

HTTPSessionStarter connects to opencode serve via HTTP/SSE.

func NewHTTPSessionStarter added in v0.36.0

func NewHTTPSessionStarter(transport provider.Transport, provider provider.Provider, logger *slog.Logger, opts EngineOptions) *HTTPSessionStarter

NewHTTPSessionStarter creates an HTTPSessionStarter wrapping a Transport.

func (*HTTPSessionStarter) StartSession added in v0.36.0

func (s *HTTPSessionStarter) StartSession(
	ctx context.Context,
	sessionID string,
	cfg SessionConfig,
	prompt string,
	cb Callback,
) (*Session, error)

StartSession creates a new HTTP/SSE session.

func (*HTTPSessionStarter) TransportType added in v0.36.0

func (s *HTTPSessionStarter) TransportType() string

TransportType implements SessionStarter.

type Session

type Session struct {
	ID                string        // Internal SDK identifier (provided by the user)
	ProviderSessionID string        // The deterministic UUID (v5) passed to CLI for persistent DB storage
	Config            SessionConfig // Snapshot of the configuration used to initialize the session
	TaskInstructions  string        // Persistent instructions for the session

	CreatedAt  time.Time
	LastActive time.Time
	Status     SessionStatus

	IsResuming bool // True if session was resumed from persistent marker
	// contains filtered or unexported fields
}

Session represents a persistent, hot-multiplexed instance of an AI CLI agent. It manages the underlying OS process group, handles streaming I/O via full-duplex pipes, and tracks the operational readiness and lifecycle status of the agent sandbox.

func NewTestSession

func NewTestSession(id string, status SessionStatus) *Session

NewTestSession creates a Session for testing purposes. This should only be used in test code.

func (*Session) GetCallback

func (s *Session) GetCallback() Callback

GetCallback returns the current callback.

func (*Session) GetExt added in v0.8.1

func (s *Session) GetExt() any

GetExt retrieves the external state attached to the session.

func (*Session) GetLastActive added in v0.9.2

func (s *Session) GetLastActive() time.Time

GetLastActive returns the last active time with proper locking.

func (*Session) GetLogPath added in v0.23.4

func (s *Session) GetLogPath() string

GetLogPath returns the path to the session log file.

func (*Session) GetStatus

func (s *Session) GetStatus() SessionStatus

GetStatus returns the current session status.

func (*Session) GetStatusChange

func (s *Session) GetStatusChange() <-chan SessionStatus

GetStatusChange returns the status change channel for waiting on status updates.

func (*Session) IsAlive

func (s *Session) IsAlive() bool

IsAlive checks if the process is still running.

func (*Session) OpenLogFile added in v0.23.4

func (s *Session) OpenLogFile() error

OpenLogFile opens a log file for this session in SessionLogDir.

func (*Session) ReadStderr

func (s *Session) ReadStderr()

ReadStderr asynchronously reads CLI stderr to prevent buffer deadlocks. For CLI sessions, this reads from the subprocess stderr pipe. For HTTP sessions, this is a no-op.

func (*Session) ReadStdout

func (s *Session) ReadStdout()

ReadStdout asynchronously reads CLI stdout, parses JSON, and dispatches callbacks. For CLI sessions, this reads from the subprocess stdout pipe. For HTTP sessions, this is a no-op (events are dispatched via HTTPSessionIO goroutines).

func (*Session) SetCallback

func (s *Session) SetCallback(cb Callback)

SetCallback registers the callback to handle stream events for the current turn.

func (*Session) SetExt added in v0.8.1

func (s *Session) SetExt(data any)

SetExt attaches external state to the session.

func (*Session) SetStatus

func (s *Session) SetStatus(status SessionStatus)

SetStatus updates the session status with proper locking.

func (*Session) Touch

func (s *Session) Touch()

Touch updates LastActive time.

func (*Session) Wait added in v0.35.2

func (s *Session) Wait() error

Wait reaps the process exactly once using sync.Once. Both the synchronous cleanup path (cleanupSessionLocked) and the async SafeGo goroutine (startSession) call this method; sync.Once ensures only the first call executes cmd.Wait(), and subsequent calls return immediately without contending for the exec.Cmd internal mutex or duplicating the reap.

func (*Session) WriteInput

func (s *Session) WriteInput(msg map[string]any) error

WriteInput injects a JSON message via the SessionIO transport.

type SessionCallback added in v0.36.0

type SessionCallback func(eventType string, data any) error

SessionCallback handles streaming events from an active session. This is the concrete callback signature used by HTTPSessionIO goroutines. Events are dispatched as they occur, allowing real-time UI updates.

type SessionConfig

type SessionConfig struct {
	WorkDir          string        // Absolute path to the isolated sandbox directory
	TaskInstructions string        // Persistent instructions for the session
	BaseSystemPrompt string        // Session-level system prompt override (takes precedence over Engine-level)
	IdleTimeout      time.Duration // Per-session idle timeout; 0 means use pool default
	Namespace        string        // Per-session namespace override; empty means use pool default
}

SessionConfig contains the minimal configuration needed for session management. This is a subset of the root Config to avoid circular dependencies.

type SessionIO added in v0.36.0

type SessionIO interface {
	// WriteInput serializes msg as JSON and delivers it to the agent session.
	WriteInput(msg map[string]any) error

	// Close releases all resources held by the SessionIO.
	// Close is idempotent and goroutine-safe.
	Close() error

	// Logger returns the session logger.
	Logger() *slog.Logger

	// IsCLI returns true if this is a CLI subprocess transport.
	// Used by cleanupSessionLocked to determine cleanup strategy.
	IsCLI() bool

	// IsAlive returns true if the transport is still active.
	// For CLI: checks if the process is still running.
	// For HTTP: checks if the session connection is still open.
	IsAlive() bool
}

SessionIO abstracts the I/O transport between HotPlex and the AI agent backend. Two implementations:

  • CLISessionIO: stdin/stdout/stderr pipes to a subprocess
  • HTTPSessionIO: HTTP client to opencode serve

Session holds a concrete SessionIO and calls its methods without nil checks.

type SessionManager

type SessionManager interface {
	// GetOrCreateSession retrieves an active session or performs a Cold Start if none exists.
	// Returns the session, a boolean indicating if it was a Cold Start, and an error if any.
	GetOrCreateSession(ctx context.Context, sessionID string, cfg SessionConfig, prompt string) (*Session, bool, error)
	// GetSession performs a non-side-effect lookup of an active session.
	GetSession(sessionID string) (*Session, bool)
	// TerminateSession kills the OS process group and removes the session from the pool.
	TerminateSession(sessionID string) error
	// ListActiveSessions provides a snapshot of all tracked sessions.
	ListActiveSessions() []*Session
	// Shutdown performing a total cleanup of the pool and its background workers.
	Shutdown()
}

SessionManager defines the behavioral interface for managing a process pool.

type SessionPool

type SessionPool struct {
	// contains filtered or unexported fields
}

func NewSessionPool

func NewSessionPool(logger *slog.Logger, timeout time.Duration, opts EngineOptions, starter SessionStarter, prv provider.Provider) *SessionPool

NewSessionPool creates a new session manager with default file-based marker storage.

func (*SessionPool) CleanupSessionFiles added in v0.17.0

func (sm *SessionPool) CleanupSessionFiles(providerSessionID string, workDir string) error

CleanupSessionFiles proxies the cleanup call to the underlying provider.

func (*SessionPool) DeleteMarker added in v0.17.0

func (sm *SessionPool) DeleteMarker(providerSessionID string) error

DeleteMarker removes the HotPlex session marker file, preventing future resumption.

func (*SessionPool) GetOrCreateSession

func (sm *SessionPool) GetOrCreateSession(ctx context.Context, sessionID string, cfg SessionConfig, prompt string) (*Session, bool, error)

GetOrCreateSession returns an existing session or starts a new one.

func (*SessionPool) GetSession

func (sm *SessionPool) GetSession(sessionID string) (*Session, bool)

GetSession retrieves an active session.

func (*SessionPool) ListActiveSessions

func (sm *SessionPool) ListActiveSessions() []*Session

ListActiveSessions returns all active sessions.

func (*SessionPool) SetStreamStore added in v0.35.4

func (sm *SessionPool) SetStreamStore(saver StreamDataSaver)

SetStreamStore sets the stream data saver for protecting uncommitted stream data before session termination. This is typically called by chatapps adapters that enable message storage with streaming.

func (*SessionPool) Shutdown

func (sm *SessionPool) Shutdown()

Shutdown gracefully stops the session manager and all active sessions.

func (*SessionPool) TerminateSession

func (sm *SessionPool) TerminateSession(sessionID string) error

TerminateSession stops and removes a session.

type SessionStarter added in v0.36.0

type SessionStarter interface {
	// StartSession starts a new session and returns it fully initialized.
	// The callback is invoked for each raw event from the session.
	StartSession(ctx context.Context, sessionID string, cfg SessionConfig, prompt string, cb Callback) (*Session, error)

	// TransportType returns the transport type identifier ("cli" or "http").
	TransportType() string
}

SessionStarter creates and manages agent sessions. Two implementations:

  • CLISessionStarter: spawns a local CLI subprocess
  • HTTPSessionStarter: connects to an HTTP/SSE server

type SessionStatus

type SessionStatus string

SessionStatus defines the current state of a session.

const (
	SessionStatusStarting SessionStatus = "starting"
	SessionStatusReady    SessionStatus = "ready"
	SessionStatusBusy     SessionStatus = "busy"
	SessionStatusDead     SessionStatus = "dead"
)

type StreamDataSaver added in v0.35.4

type StreamDataSaver interface {
	// GetBuffer retrieves the stream buffer for a session.
	// Returns nil if no buffer exists.
	GetBuffer(sessionID string) any

	// SaveIncompleteStream saves uncommitted stream data before termination.
	// This is a synchronous operation to ensure data is persisted before
	// the session is destroyed.
	SaveIncompleteStream(ctx context.Context, sessionID string, buffer any) error
}

StreamDataSaver protects uncommitted stream data before session termination. Implemented by chatapps/base.StreamMessageStore to flush incomplete buffers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL