hotplex

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package hotplex provides the core SDK for managing persistent, hot-multiplexed Large Language Model (LLM) CLI agent sessions (e.g. Claude Code).

It resolves the "cold start" latency issue by maintaining a pool of long-lived execution environments (sandboxes). The SDK uses a persistent marker system to allow seamless session recovery across process restarts and crashes.

Security is enforced via an integrated Web Application Firewall (WAF) that inspects commands before execution. HotPlex also provides real-time streaming events, comprehensive token usage tracking, and automated cost reporting.

Index

Constants

View Source
const (
	// Maximum input length to log (prevents log flooding)
	MaxInputLogLength = 50
	// Maximum pattern match length to log
	MaxPatternLogLength = 100
	// Maximum command display length for UI
	MaxDisplayLength = 100
)

Constants for danger detector logging and display limits.

Variables

View Source
var (
	// ErrDangerBlocked indicates that a command or input was intercepted and blocked by the WAF (Detector).
	ErrDangerBlocked = errors.New("danger event blocked: input matched forbidden patterns")

	// ErrSessionTerminated indicates that the underlying CLI process was killed or unexpectedly exited.
	ErrSessionTerminated = errors.New("underlying session process terminated")

	// ErrContextCancelled indicates that the request execution was cancelled by the provided context.
	ErrContextCancelled = errors.New("execution context cancelled")

	// ErrInvalidConfig indicates that the provided configuration is invalid or missing required fields.
	ErrInvalidConfig = errors.New("invalid execution configuration")
)

Standard Sentinel Errors for the HotPlex SDK

Functions

func SummarizeInput

func SummarizeInput(input map[string]any) string

SummarizeInput creates a human-readable summary of tool input. Uses rune-level truncation to avoid creating invalid UTF-8.

func TruncateString

func TruncateString(s string, maxLen int) string

TruncateString truncates a string to a maximum length for logging. Uses rune-level truncation to avoid creating invalid UTF-8.

Types

type AssistantMessage

type AssistantMessage struct {
	ID      string         `json:"id,omitempty"`      // Unique message ID
	Type    string         `json:"type,omitempty"`    // Typically "message"
	Role    string         `json:"role,omitempty"`    // Typically "assistant"
	Content []ContentBlock `json:"content,omitempty"` // Sequence of text and tool blocks
}

AssistantMessage represents the structured message emitted by the model.

type Callback

type Callback func(eventType string, data any) error

Callback is a function type for receiving streamed events

func WrapSafe

func WrapSafe(logger *slog.Logger, cb Callback) Callback

WrapSafe executes a callback safely, handling expected errors

type Config

type Config struct {
	WorkDir          string // Absolute path to the isolated sandbox directory where CLI operations occur
	SessionID        string // Unique identifier used to route the request to a persistent process in the pool
	TaskSystemPrompt string // Instructions prepended to the user prompt for this specific turn
}

Config defines the execution context for a single HotPlex interaction cycle (turn).

type ContentBlock

type ContentBlock struct {
	Type      string         `json:"type"`                  // Block type: "text", "tool_use", "tool_result"
	Text      string         `json:"text,omitempty"`        // Raw text content (for type="text")
	Name      string         `json:"name,omitempty"`        // Tool name (for type="tool_use")
	ID        string         `json:"id,omitempty"`          // Block identifier
	ToolUseID string         `json:"tool_use_id,omitempty"` // References the original tool_use (for type="tool_result")
	Input     map[string]any `json:"input,omitempty"`       // Tool input JSON (for type="tool_use")
	Content   string         `json:"content,omitempty"`     // Result content (for type="tool_result")
	IsError   bool           `json:"is_error,omitempty"`    // Whether the tool result indicates a failure
}

ContentBlock represents an atomic unit of model output (text or tool call).

func (*ContentBlock) GetUnifiedToolID

func (b *ContentBlock) GetUnifiedToolID() string

GetUnifiedToolID returns a tool identifier suitable for matching calls with results.

type DangerBlockEvent

type DangerBlockEvent struct {
	Operation      string      `json:"operation"`             // The specific command line that triggered the block
	Reason         string      `json:"reason"`                // Description of the threat
	PatternMatched string      `json:"pattern_matched"`       // The specific signature that matched the input
	Level          DangerLevel `json:"level"`                 // Severity level
	Category       string      `json:"category"`              // Category classification
	BypassAllowed  bool        `json:"bypass_allowed"`        // Whether the user has administrative privileges to bypass this block
	Suggestions    []string    `json:"suggestions,omitempty"` // Safe alternatives to the blocked command
}

DangerBlockEvent contains detailed forensics after a dangerous operation is successfully intercepted.

type DangerLevel

type DangerLevel int

DangerLevel classifies the severity of a detected potentially harmful operation.

const (
	// DangerLevelCritical represents irreparable damage (e.g., recursive root deletion or disk wiping).
	DangerLevelCritical DangerLevel = iota
	// DangerLevelHigh represents significant damage potential (e.g., deleting user home or system config).
	DangerLevelHigh
	// DangerLevelModerate represents unintended side effects (e.g., resetting Git history).
	DangerLevelModerate
)

func (DangerLevel) String

func (d DangerLevel) String() string

String returns a string representation of the danger level.

type DangerPattern

type DangerPattern struct {
	Pattern     *regexp.Regexp // The compiled regex identifying the dangerous sequence
	Description string         // Human-readable explanation of why this pattern is blocked
	Level       DangerLevel    // Severity used for logging and alerting
	Category    string         // Functional category (e.g., "file_delete", "network", "system")
}

DangerPattern defines a signature for a dangerous operation, processed via regular expressions.

type Detector

type Detector struct {
	// contains filtered or unexported fields
}

Detector acts as a Web Application Firewall (WAF) for the local system. It inspects LLM-generated commands before they are dispatched to the host shell, enforcing strict security boundaries regardless of the model's own safety alignment.

func NewDetector

func NewDetector(logger *slog.Logger) *Detector

NewDetector creates a new danger detector with default patterns.

func (*Detector) CheckFileAccess

func (dd *Detector) CheckFileAccess(filePath string) bool

CheckFileAccess checks if file access is within allowed paths. Returns true if the access is safe (within allowed paths), false otherwise.

func (*Detector) CheckInput

func (dd *Detector) CheckInput(input string) *DangerBlockEvent

CheckInput checks if the input contains any dangerous operations. Returns a DangerBlockEvent if a dangerous operation is detected, nil otherwise.

func (*Detector) IsPathAllowed

func (dd *Detector) IsPathAllowed(path string) bool

IsPathAllowed checks if a path is in the allowlist. Both the input path and allowed paths should be cleaned first.

func (*Detector) LoadCustomPatterns

func (dd *Detector) LoadCustomPatterns(filename string) error

LoadCustomPatterns loads custom danger patterns from a file. File format: one pattern per line: "regex|description|level|category"

func (*Detector) SetAllowPaths

func (dd *Detector) SetAllowPaths(paths []string)

SetAllowPaths sets the list of allowed safe paths. Paths are cleaned to eliminate arbitrary trailing slashes or relative segments.

func (*Detector) SetBypassEnabled

func (dd *Detector) SetBypassEnabled(enabled bool)

SetBypassEnabled enables or disables bypass mode. When enabled, dangerous operations are NOT blocked (admin/Evolution mode only).

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the unified process integration layer for Hot-Multiplexing. Configured as a long-lived Singleton, it provides a persistent execution engine that manages a pool of active Claude Code CLI processes, applying security rules (WAF) and routing traffic.

func (*Engine) Close

func (r *Engine) Close() error

Close terminates all active sessions managed by this runner and cleans up resources. It triggers Graceful Shutdown by cascading termination signals down to the SessionManager, which drops the entire process group (PGID) to prevent zombie processes.

func (*Engine) Execute

func (r *Engine) Execute(ctx context.Context, cfg *Config, prompt string, callback Callback) error

Execute runs Claude Code CLI with the given configuration and streams

func (*Engine) GetCLIVersion

func (r *Engine) GetCLIVersion() (string, error)

GetCLIVersion returns the Claude Code CLI version.

func (*Engine) GetDangerDetector

func (r *Engine) GetDangerDetector() *Detector

GetDangerDetector returns the danger detector instance.

func (*Engine) GetSessionStats

func (r *Engine) GetSessionStats() *SessionStats

GetSessionStats returns a copy of the current session stats.

func (*Engine) SetDangerAllowPaths

func (r *Engine) SetDangerAllowPaths(paths []string)

SetDangerAllowPaths sets the allowed safe paths for the danger detector.

func (*Engine) SetDangerBypassEnabled

func (r *Engine) SetDangerBypassEnabled(enabled bool)

SetDangerBypassEnabled enables or disables danger detection bypass. WARNING: Only use for Evolution mode (admin only).

func (*Engine) StopSession

func (r *Engine) StopSession(sessionID string, reason string) error

StopSession terminates a running session by session ID. This is the implementation for session.stop from the spec.

func (*Engine) ValidateConfig

func (r *Engine) ValidateConfig(cfg *Config) error

ValidateConfig validates the Config.

type EngineOptions

type EngineOptions struct {
	Timeout     time.Duration // Maximum time to wait for a single execution turn to complete
	IdleTimeout time.Duration // Time after which an idle session is eligible for termination
	Logger      *slog.Logger  // Optional logger instance; defaults to slog.Default()

	// Namespace is used to generate isolated, deterministic UUID v5 Session IDs.
	// This ensures that the same Conversation ID creates an isolated but persistent sandbox,
	// preventing cross-application or cross-user session leaks.
	Namespace string

	// Foundational Security & Context (Engine-level boundaries)
	PermissionMode   string   // Controls CLI permissions (e.g., "bypass-permissions"). Defaults to strict mode.
	BaseSystemPrompt string   // Foundational instructions injected at CLI startup for all sessions.
	AllowedTools     []string // Explicit list of tools allowed (whitelist). If empty, all tools are allowed.
	DisallowedTools  []string // Explicit list of tools forbidden (blacklist).
}

EngineOptions defines the configuration parameters for initializing a new HotPlex Engine. It allows customization of timeouts, logging, and foundational security boundaries that apply to all sessions managed by this engine instance.

type EventMeta

type EventMeta struct {
	// Timing
	DurationMs      int64 `json:"duration_ms"`       // Event duration in milliseconds
	TotalDurationMs int64 `json:"total_duration_ms"` // Total elapsed time since start

	// Tool call info
	ToolName string `json:"tool_name"` // Tool name (e.g., "bash", "editor_write", "memo_search")
	ToolID   string `json:"tool_id"`   // Unique tool call ID
	Status   string `json:"status"`    // "running", "success", "error"
	ErrorMsg string `json:"error_msg"` // Error message if status=error

	// Token usage (when available)
	InputTokens      int32 `json:"input_tokens"`       // Input tokens
	OutputTokens     int32 `json:"output_tokens"`      // Output tokens
	CacheWriteTokens int32 `json:"cache_write_tokens"` // Cache write tokens
	CacheReadTokens  int32 `json:"cache_read_tokens"`  // Cache read tokens

	// Summaries for UI
	InputSummary  string `json:"input_summary"`  // Human-readable input summary
	OutputSummary string `json:"output_summary"` // Truncated output preview

	// File operations
	FilePath  string `json:"file_path"`  // Affected file path
	LineCount int32  `json:"line_count"` // Number of lines affected

	// Progress (for long-running operations)
	Progress    int32 `json:"progress"`     // Progress percentage (0-100)
	TotalSteps  int32 `json:"total_steps"`  // Total number of steps (for multi-stage operations)
	CurrentStep int32 `json:"current_step"` // Current step number
}

EventMeta contains detailed metadata for streaming

type EventWithMeta

type EventWithMeta struct {
	EventType string     // Event type (thinking, tool_use, tool_result, etc.)
	EventData string     // Event data content
	Meta      *EventMeta // Enhanced metadata (never nil when created via NewEventWithMeta)
}

EventWithMeta extends the basic event with metadata for observability. This type is used by executors (DirectExecutor, ReActExecutor, PlanningExecutor) and CCRunner to send detailed event metadata to the frontend.

func NewEventWithMeta

func NewEventWithMeta(eventType, eventData string, meta *EventMeta) *EventWithMeta

NewEventWithMeta creates a new EventWithMeta with guaranteed non-nil Meta. If meta is nil, an empty EventMeta{} is used instead. This prevents nil pointer dereferences when accessing Meta fields.

type HotPlexClient

type HotPlexClient interface {
	// Execute runs a command or prompt within the HotPlex sandbox and streams events back via the Callback.
	// It uses Hot-Multiplexing to reuse existing processes if a matching SessionID is provided in the Config.
	Execute(ctx context.Context, cfg *Config, prompt string, callback Callback) error

	// Close gracefully terminates all managed sessions in the pool and releases all OS-level resources.
	// This includes sweeping process groups (PGID) to ensure no zombie processes remain.
	Close() error
}

HotPlexClient defines the public API for the HotPlex engine. It abstracts the underlying process management and provides a clean, session-aware interface for callers to interact with Claude Code CLI agents.

func NewEngine

func NewEngine(options EngineOptions) (HotPlexClient, error)

NewEngine creates a new HotPlex Engine instance.

type Session

type Session struct {
	ID          string // Internal SDK identifier (provided by the user)
	CCSessionID string // The deterministic UUID (v5) passed to Claude CLI for persistent DB storage
	Config      Config // Snapshot of the configuration used to initialize the session

	CreatedAt  time.Time     // When the process was first spawned
	LastActive time.Time     // When the process was last used (used for LRU/Idle GC)
	Status     SessionStatus // Runtime state: starting, ready, busy, or dead
	// contains filtered or unexported fields
}

Session represents a persistent, long-lived process of the Claude Code CLI. It wraps the OS process, manages standard I/O pipes for real-time multiplexing, and tracks the session's readiness and lifecycle status.

func (*Session) GetStatus

func (s *Session) GetStatus() SessionStatus

GetStatus returns the current session status.

func (*Session) IsAlive

func (s *Session) IsAlive() bool

IsAlive checks if the process is still running.

func (*Session) SetCallback

func (s *Session) SetCallback(cb Callback)

SetCallback registers the callback to handle stream events for the current turn.

func (*Session) SetStatus

func (s *Session) SetStatus(status SessionStatus)

SetStatus updates the session status with proper locking.

func (*Session) Touch

func (s *Session) Touch()

Touch updates LastActive time.

func (*Session) WriteInput

func (s *Session) WriteInput(msg map[string]any) error

WriteInput injects a JSON message to Stdin. Transitions session to Busy during write, back to Ready after completion.

type SessionManager

type SessionManager interface {
	// GetOrCreateSession retrieves an active session or performs a Cold Start if none exists.
	GetOrCreateSession(ctx context.Context, sessionID string, cfg Config) (*Session, error)
	// GetSession performs a non-side-effect lookup of an active session.
	GetSession(sessionID string) (*Session, bool)
	// TerminateSession kills the OS process group and removes the session from the pool.
	TerminateSession(sessionID string) error
	// ListActiveSessions provides a snapshot of all tracked sessions.
	ListActiveSessions() []*Session
	// Shutdown performing a total cleanup of the pool and its background workers.
	Shutdown()
}

SessionManager defines the behavioral interface for managing a process pool.

type SessionPool

type SessionPool struct {
	// contains filtered or unexported fields
}

SessionPool implements the SessionManager as a thread-safe singleton. It orchestrates the lifecycle of multiple CLI processes, ensuring that idle processes are garbage collected to conserve system memory.

func NewSessionPool

func NewSessionPool(logger *slog.Logger, timeout time.Duration, opts EngineOptions, cliPath string) *SessionPool

NewSessionPool creates a new session manager.

func (*SessionPool) GetOrCreateSession

func (sm *SessionPool) GetOrCreateSession(ctx context.Context, sessionID string, cfg Config) (*Session, error)

GetOrCreateSession returns an existing session or starts a new one.

func (*SessionPool) GetSession

func (sm *SessionPool) GetSession(sessionID string) (*Session, bool)

GetSession retrieves an active session.

func (*SessionPool) ListActiveSessions

func (sm *SessionPool) ListActiveSessions() []*Session

ListActiveSessions returns all active sessions.

func (*SessionPool) Shutdown

func (sm *SessionPool) Shutdown()

Shutdown gracefully stops the session manager and all active sessions.

func (*SessionPool) TerminateSession

func (sm *SessionPool) TerminateSession(sessionID string) error

TerminateSession stops and removes a session.

type SessionStats

type SessionStats struct {
	SessionID            string          // Unique identifier for the session
	StartTime            time.Time       // Timestamp when the session started
	TotalDurationMs      int64           // Total duration of the session in milliseconds
	ThinkingDurationMs   int64           // Total time spent in thinking phase in milliseconds
	ToolDurationMs       int64           // Total time spent executing tools in milliseconds
	GenerationDurationMs int64           // Total time spent generating text in milliseconds
	InputTokens          int32           // Total input tokens consumed
	OutputTokens         int32           // Total output tokens generated
	CacheWriteTokens     int32           // Total tokens written to cache
	CacheReadTokens      int32           // Total tokens read from cache
	ToolCallCount        int32           // Number of tool calls made
	ToolsUsed            map[string]bool // Set of unique tool names used
	FilesModified        int32           // Number of files modified
	FilePaths            []string        // List of paths of files modified
	// contains filtered or unexported fields
}

SessionStats collects session-level statistics for Geek/Evolution modes.

func (*SessionStats) EndGeneration

func (s *SessionStats) EndGeneration()

EndGeneration marks the end of the generation phase and records its duration.

func (*SessionStats) EndThinking

func (s *SessionStats) EndThinking()

EndThinking marks the end of the thinking phase and records its duration.

func (*SessionStats) FinalizeDuration

func (s *SessionStats) FinalizeDuration() *SessionStats

FinalizeDuration finalizes any ongoing phase tracking and returns the final stats.

func (*SessionStats) RecordFileModification

func (s *SessionStats) RecordFileModification(filePath string)

RecordFileModification records that a file was modified. Uses O(1) map lookup for deduplication instead of O(n) linear scan.

func (*SessionStats) RecordTokens

func (s *SessionStats) RecordTokens(input, output, cacheWrite, cacheRead int32)

RecordTokens records token usage.

func (*SessionStats) RecordToolResult

func (s *SessionStats) RecordToolResult() (durationMs int64)

RecordToolResult records the end of a tool call.

func (*SessionStats) RecordToolUse

func (s *SessionStats) RecordToolUse(toolName, toolID string)

RecordToolUse records the start of a tool call.

func (*SessionStats) StartGeneration

func (s *SessionStats) StartGeneration()

StartGeneration marks the start of the generation phase.

func (*SessionStats) StartThinking

func (s *SessionStats) StartThinking()

StartThinking marks the start of the thinking phase.

func (*SessionStats) ToSummary

func (s *SessionStats) ToSummary() map[string]interface{}

ToSummary converts stats to a summary map for JSON serialization.

type SessionStatsData

type SessionStatsData struct {
	SessionID            string   `json:"session_id"`
	StartTime            int64    `json:"start_time"` // Unix timestamp
	EndTime              int64    `json:"end_time"`   // Unix timestamp
	TotalDurationMs      int64    `json:"total_duration_ms"`
	ThinkingDurationMs   int64    `json:"thinking_duration_ms"`
	ToolDurationMs       int64    `json:"tool_duration_ms"`
	GenerationDurationMs int64    `json:"generation_duration_ms"`
	InputTokens          int32    `json:"input_tokens"`
	OutputTokens         int32    `json:"output_tokens"`
	CacheWriteTokens     int32    `json:"cache_write_tokens"`
	CacheReadTokens      int32    `json:"cache_read_tokens"`
	TotalTokens          int32    `json:"total_tokens"`
	ToolCallCount        int32    `json:"tool_call_count"`
	ToolsUsed            []string `json:"tools_used"`
	FilesModified        int32    `json:"files_modified"`
	FilePaths            []string `json:"file_paths"`
	TotalCostUSD         float64  `json:"total_cost_usd"`
	ModelUsed            string   `json:"model_used"`
	IsError              bool     `json:"is_error"`
	ErrorMessage         string   `json:"error_message,omitempty"`
}

SessionStatsData represents the final session statistics sent to the frontend and stored in database.

type SessionStatus

type SessionStatus string

SessionStatus defines the current state of a session.

const (
	SessionStatusStarting SessionStatus = "starting"
	SessionStatusReady    SessionStatus = "ready"
	SessionStatusBusy     SessionStatus = "busy"
	SessionStatusDead     SessionStatus = "dead"
)

type StreamMessage

type StreamMessage struct {
	Message      *AssistantMessage `json:"message,omitempty"`        // Details of an assistant-generated message
	Input        map[string]any    `json:"input,omitempty"`          // Arguments passed to a tool invocation
	Type         string            `json:"type"`                     // Event category: "assistant", "tool_use", "result", "thought", etc.
	Timestamp    string            `json:"timestamp,omitempty"`      // ISO8601 timestamp of the event
	SessionID    string            `json:"session_id,omitempty"`     // The persistent session identifier in the CLI's internal DB
	Role         string            `json:"role,omitempty"`           // Message role: "user", "assistant", or "system"
	Name         string            `json:"name,omitempty"`           // Name of the tool being used or the block type
	Output       string            `json:"output,omitempty"`         // Raw string output from a tool or assistant
	Status       string            `json:"status,omitempty"`         // Lifecycle status of the event (e.g., "running", "success")
	Error        string            `json:"error,omitempty"`          // Detailed error message if an operation fails
	Content      []ContentBlock    `json:"content,omitempty"`        // Hierarchical content blocks (text, tool_use, etc.)
	Duration     int               `json:"duration_ms,omitempty"`    // Execution time in milliseconds for "result" messages
	Subtype      string            `json:"subtype,omitempty"`        // Fine-grained type classification (e.g., for "result" events)
	IsError      bool              `json:"is_error,omitempty"`       // Flag indicating if the overall turn resulted in an error
	TotalCostUSD float64           `json:"total_cost_usd,omitempty"` // Cumulative cost of the turn in USD
	Usage        *UsageStats       `json:"usage,omitempty"`          // Precise token consumption for the turn
	Result       string            `json:"result,omitempty"`         // Final summarized result of the execution
}

StreamMessage represents a single event in the stream-json format emitted by the Claude Code CLI. This is the internal wire protocol used for communication between the CLI and the SDK.

func (*StreamMessage) GetContentBlocks

func (m *StreamMessage) GetContentBlocks() []ContentBlock

GetContentBlocks returns the primary content blocks of the message, handling nested assistant structures.

type UsageStats

type UsageStats struct {
	InputTokens           int32 `json:"input_tokens"`                          // Total tokens in the prompt (including system and context)
	OutputTokens          int32 `json:"output_tokens"`                         // Total tokens generated by the model
	CacheWriteInputTokens int32 `json:"cache_creation_input_tokens,omitempty"` // Tokens saved to the provider's prompt cache
	CacheReadInputTokens  int32 `json:"cache_read_input_tokens,omitempty"`     // Tokens retrieved from the provider's prompt cache
}

UsageStats represents the token consumption breakdown for a single execution turn.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL