daemon

package
v0.11.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2026 License: MIT Imports: 45 Imported by: 0

Documentation

Index

Constants

View Source
const ProtoVersion = 1

ProtoVersion is the current protocol version. Increment this when making breaking proto changes (removing/renaming fields or RPCs). Minor additions such as new fields or new RPCs do not require a bump.

Variables

View Source
var CheckpointPath = func() string {
	return filepath.Join(DataDir(), "checkpoint.json")
}

CheckpointPath returns the path to the checkpoint file. It is a variable so tests can override it.

Functions

func ClassifyStep

func ClassifyStep(stepID string) stepComplexity

ClassifyStep returns the complexity tier for a step based on its type/name.

func CleanupPID

func CleanupPID()

func CleanupSocket

func CleanupSocket()

func Compress

func Compress(ctx context.Context, messages []provider.Message, preserveCount int, prov provider.Provider) ([]provider.Message, string, error)

Compress summarizes older messages using a fast provider call and returns the compressed history (summary message + preserved recent messages) plus the summary text.

preserveCount controls how many of the most recent messages are kept verbatim. If no provider is given, a simple concatenation summary is used.

func DBPath

func DBPath() string

func DataDir

func DataDir() string

func EnsureDataDir

func EnsureDataDir() error

func IsRunning

func IsRunning() bool

IsRunning checks if a daemon process is alive.

func ModelForStep

func ModelForStep(stepID string, routing config.ModelRouting) string

ModelForStep returns the model to use for a step based on routing config. Falls back to an empty string (caller should use default) when not configured.

func PIDPath

func PIDPath() string

func ReadPID

func ReadPID() (int, error)

func ReloadDaemon

func ReloadDaemon(newBinaryPath string) error

ReloadDaemon performs a full graceful reload:

  1. Sends SIGUSR1 to the daemon (checkpoint + graceful stop).
  2. Waits for the old process to exit.
  3. Starts the new binary as a background daemon.

newBinaryPath should be the path to the updated binary (typically os.Executable() from the new CLI process).

func SaveCheckpoint

func SaveCheckpoint(cp *Checkpoint) error

SaveCheckpoint writes cp to the checkpoint file (~/.ratchet/checkpoint.json).

func SendNotification

func SendNotification(title, body string)

SendNotification sends an OS-native notification. Non-blocking; errors are logged.

func SocketPath

func SocketPath() string

func Start

func Start(ctx context.Context, debug bool) error

Start runs the daemon in the foreground. It creates the Unix socket, starts the gRPC server, and blocks until signal.

func StartBackground

func StartBackground(debug bool) error

StartBackground forks the current process as a background daemon.

func Status

func Status() (string, error)

Status returns daemon health info.

func Stop

func Stop() error

Stop sends SIGTERM to the running daemon.

func TriggerReload

func TriggerReload() error

TriggerReload sends SIGUSR1 to the running daemon, which causes it to checkpoint and exit gracefully. The caller is responsible for restarting.

func WritePID

func WritePID() error

Types

type ActorManager

type ActorManager struct {
	// contains filtered or unexported fields
}

ActorManager manages the goakt actor system used by the daemon.

func NewActorManager

func NewActorManager(ctx context.Context, db *sql.DB) (*ActorManager, error)

NewActorManager creates and starts an actor system with SQLite-backed state. The provided context is stored and propagated to the actor system and rehydration.

func (*ActorManager) Close

func (am *ActorManager) Close(ctx context.Context) error

Close stops the actor system.

func (*ActorManager) SpawnApproval

func (am *ActorManager) SpawnApproval(ctx context.Context, requestID string, gate *ApprovalGate, timeout time.Duration) (*actor.PID, error)

SpawnApproval spawns an ApprovalActor for the given requestID and returns its PID. Callers use actor.Ask to send an ApprovalRequest and receive an ApprovalResponse. gate and timeout are wired through to the actor; pass nil gate to auto-deny.

func (*ActorManager) SpawnSession

func (am *ActorManager) SpawnSession(ctx context.Context, sessionID, workingDir string) (*actor.PID, error)

SpawnSession spawns a persistent SessionActor for the given session. If one already exists the existing PID is returned.

type ApprovalActor

type ApprovalActor struct {
	// contains filtered or unexported fields
}

ApprovalActor blocks (via actor.Ask) until the TUI user responds to a permission prompt or the timeout elapses.

func (*ApprovalActor) PostStop

func (a *ApprovalActor) PostStop(ctx *actor.Context) error

func (*ApprovalActor) PreStart

func (a *ApprovalActor) PreStart(ctx *actor.Context) error

func (*ApprovalActor) Receive

func (a *ApprovalActor) Receive(ctx *actor.ReceiveContext)

type ApprovalGate

type ApprovalGate struct {
	// contains filtered or unexported fields
}

ApprovalGate manages pending approval requests, allowing the TUI to resolve them asynchronously. It implements executor.Approver so it can be wired into executor.Execute() to gate tool calls that require user approval.

func NewApprovalGate

func NewApprovalGate() *ApprovalGate

NewApprovalGate returns an initialised ApprovalGate.

func (*ApprovalGate) PendingCount

func (g *ApprovalGate) PendingCount() int

PendingCount returns the number of unresolved approval requests.

func (*ApprovalGate) Request

func (g *ApprovalGate) Request(requestID string) <-chan ApprovalResponse

Request registers a pending approval and returns a channel that will receive exactly one ApprovalResponse when Resolve is called.

func (*ApprovalGate) Resolve

func (g *ApprovalGate) Resolve(requestID string, approved bool, reason string) bool

Resolve delivers a resolution to a pending approval request. Returns true if the request was found and resolved, false if it was unknown or already resolved.

func (*ApprovalGate) WaitForResolution

func (g *ApprovalGate) WaitForResolution(ctx context.Context, approvalID string, timeout time.Duration) (*executor.ApprovalRecord, error)

WaitForResolution implements executor.Approver. It registers a request, then blocks until the TUI resolves it, the timeout elapses, or ctx is cancelled.

type ApprovalRequest

type ApprovalRequest struct {
	ToolName string
	Input    string
}

ApprovalRequest is sent to an ApprovalActor to request user approval.

type ApprovalResponse

type ApprovalResponse struct {
	Approved bool
	Reason   string
}

ApprovalResponse is the reply from an ApprovalActor.

type AutorespondConfig

type AutorespondConfig struct {
	Rules []AutorespondRule `yaml:"rules"`
}

AutorespondConfig is the top-level config from .ratchet/autorespond.yaml.

type AutorespondRule

type AutorespondRule struct {
	Match   string `yaml:"match"`
	Action  string `yaml:"action"`  // "approve", "reply", "queue"
	Message string `yaml:"message"` // used when action is "reply"
}

AutorespondRule defines a pattern-matching rule.

type Autoresponder

type Autoresponder struct {
	// contains filtered or unexported fields
}

Autoresponder evaluates incoming human requests against rules.

func LoadAutoresponder

func LoadAutoresponder(projectDir string) *Autoresponder

LoadAutoresponder reads .ratchet/autorespond.yaml if it exists. Returns nil if the file does not exist or cannot be parsed.

func NewAutoresponder

func NewAutoresponder(rules []AutorespondRule) *Autoresponder

NewAutoresponder compiles rules into an evaluator.

func (*Autoresponder) Match

func (ar *Autoresponder) Match(question string) (string, string)

Match evaluates the question against rules and returns (action, message). Returns ("queue", "") if no rule matches.

type Checkpoint

type Checkpoint struct {
	Version   string               `json:"version"`
	Sessions  []SessionCheckpoint  `json:"sessions"`
	CronJobs  []CronCheckpoint     `json:"cron_jobs"`
	Providers []ProviderCheckpoint `json:"providers"`
}

Checkpoint captures daemon state for graceful reload. The actual message history lives in SQLite; the checkpoint records only what needs active resumption so the new daemon knows what to restart.

func ExportCheckpoint

func ExportCheckpoint(svc *Service) (*Checkpoint, error)

ExportCheckpoint reads active state from svc and returns a Checkpoint.

func LoadCheckpoint

func LoadCheckpoint() (*Checkpoint, error)

LoadCheckpoint reads the checkpoint file. Returns an error if the file does not exist or cannot be parsed.

type CronCheckpoint

type CronCheckpoint struct {
	ID        string `json:"id"`
	SessionID string `json:"session_id"`
	Schedule  string `json:"schedule"`
	Command   string `json:"command"`
	Status    string `json:"status"`
}

CronCheckpoint records a cron job that should be resumed after reload.

type CronJob

type CronJob struct {
	ID        string
	SessionID string
	Schedule  string // duration ("5m") or cron expr ("*/10 * * * *")
	Command   string
	Status    string // active, paused, stopped
	LastRun   string
	NextRun   string
	RunCount  int32
}

CronJob represents a scheduled recurring command.

type CronJobProvider

type CronJobProvider struct {
	// contains filtered or unexported fields
}

CronJobProvider wraps CronScheduler to expose cron jobs.

func NewCronJobProvider

func NewCronJobProvider(cs *CronScheduler) *CronJobProvider

func (*CronJobProvider) ActiveJobs

func (p *CronJobProvider) ActiveJobs() []*pb.Job

func (*CronJobProvider) KillJob

func (p *CronJobProvider) KillJob(id string) error

func (*CronJobProvider) PauseJob

func (p *CronJobProvider) PauseJob(id string) error

func (*CronJobProvider) ResumeJob

func (p *CronJobProvider) ResumeJob(id string) error

type CronScheduler

type CronScheduler struct {
	// contains filtered or unexported fields
}

CronScheduler manages cron jobs with SQLite persistence.

func NewCronScheduler

func NewCronScheduler(db *sql.DB, onTick func(sessionID, command string)) *CronScheduler

NewCronScheduler creates a scheduler. onTick is called each time a job fires.

func (*CronScheduler) Create

func (cs *CronScheduler) Create(ctx context.Context, sessionID, schedule, command string) (CronJob, error)

Create adds a new cron job and starts it immediately.

func (*CronScheduler) List

func (cs *CronScheduler) List(ctx context.Context) ([]CronJob, error)

List returns all cron jobs from the database.

func (*CronScheduler) Pause

func (cs *CronScheduler) Pause(ctx context.Context, jobID string) error

Pause suspends a job without removing it.

func (*CronScheduler) Resume

func (cs *CronScheduler) Resume(ctx context.Context, jobID string) error

Resume restarts a paused job.

func (*CronScheduler) Start

func (cs *CronScheduler) Start(ctx context.Context) error

Start reloads persisted active jobs and begins running them. The context is stored so Resume can propagate it to restarted goroutines.

func (*CronScheduler) Stop

func (cs *CronScheduler) Stop(ctx context.Context, jobID string) error

Stop permanently stops a job.

type EngineContext

type EngineContext struct {
	DB               *sql.DB
	ProviderRegistry *ratchetplugin.ProviderRegistry
	ToolRegistry     *ratchetplugin.ToolRegistry
	MemoryStore      *ratchetplugin.MemoryStore
	SecretGuard      *ratchetplugin.SecretGuard
	SecretsProvider  secrets.Provider
	MCPDiscoverer    *mcp.Discoverer
	ModelRouting     config.ModelRouting
	Actors           *ActorManager
	Hooks            *hooks.HookConfig
	Debug            bool // enable request/response debug logging to ~/.ratchet/debug.log
	// Plugin-contributed capabilities
	PluginSkills   []skills.Skill
	PluginAgents   []agent.AgentDefinition
	PluginCommands []plugins.Command
	PluginDaemons  []*plugins.DaemonTool // stopped on Close()
}

EngineContext holds the daemon's runtime services.

func NewEngineContext

func NewEngineContext(ctx context.Context, dbPath string) (*EngineContext, error)

func (*EngineContext) Close

func (ec *EngineContext) Close()

type FleetJobProvider

type FleetJobProvider struct {
	// contains filtered or unexported fields
}

FleetJobProvider wraps FleetManager to expose fleet worker jobs.

func NewFleetJobProvider

func NewFleetJobProvider(fm *FleetManager) *FleetJobProvider

func (*FleetJobProvider) ActiveJobs

func (p *FleetJobProvider) ActiveJobs() []*pb.Job

func (*FleetJobProvider) KillJob

func (p *FleetJobProvider) KillJob(id string) error

func (*FleetJobProvider) PauseJob

func (p *FleetJobProvider) PauseJob(id string) error

func (*FleetJobProvider) ResumeJob

func (p *FleetJobProvider) ResumeJob(id string) error

type FleetManager

type FleetManager struct {
	// contains filtered or unexported fields
}

FleetManager manages fleet instances.

func NewFleetManager

func NewFleetManager(routing config.ModelRouting, engine *EngineContext, hks *hooks.HookConfig) *FleetManager

NewFleetManager returns an initialized FleetManager with optional model routing config.

func (*FleetManager) FindWorker

func (fm *FleetManager) FindWorker(workerID string) *pb.Agent

FindWorker looks up a worker by ID across all fleets.

func (*FleetManager) GetStatus

func (fm *FleetManager) GetStatus(fleetID string) (*pb.FleetStatus, error)

GetStatus returns the current FleetStatus for the given fleet ID.

func (*FleetManager) KillWorker

func (fm *FleetManager) KillWorker(fleetID, workerID string) error

KillWorker cancels a specific worker within a fleet.

func (*FleetManager) ListAllWorkers

func (fm *FleetManager) ListAllWorkers() []*pb.Agent

ListAllWorkers aggregates workers from all active fleet instances as Agent protos.

func (*FleetManager) StartFleet

func (fm *FleetManager) StartFleet(ctx context.Context, req *pb.StartFleetReq, steps []string, eventCh chan<- *pb.FleetStatus) string

StartFleet spawns worker goroutines for each step in the plan (up to maxWorkers). It sends FleetStatus updates on the returned channel until all workers finish.

type HumanGate

type HumanGate struct {
	// contains filtered or unexported fields
}

HumanGate queues human requests and blocks agents until answered.

func NewHumanGate

func NewHumanGate() *HumanGate

NewHumanGate returns an initialized HumanGate.

func (*HumanGate) Pending

func (hg *HumanGate) Pending(teamID string) []HumanRequestEntry

Pending returns all pending human requests, optionally filtered by team.

func (*HumanGate) Request

func (hg *HumanGate) Request(teamID, fromAgent, question string) string

Request enqueues a human request and returns its ID. The calling agent should then call Wait(ctx, id) to block until responded.

func (*HumanGate) Respond

func (hg *HumanGate) Respond(reqID, content string) error

Respond provides a human response to a pending request.

func (*HumanGate) Wait

func (hg *HumanGate) Wait(ctx context.Context, reqID string) (string, error)

Wait blocks until the human responds or the context is cancelled. On success, the request is removed from the pending set.

type HumanRequestEntry

type HumanRequestEntry struct {
	ID        string
	TeamID    string
	FromAgent string
	Question  string
	Timestamp time.Time
	// contains filtered or unexported fields
}

HumanRequestEntry represents a pending question from an agent.

type JobProvider

type JobProvider interface {
	ActiveJobs() []*pb.Job
	PauseJob(id string) error
	ResumeJob(id string) error
	KillJob(id string) error
}

JobProvider is implemented by each manager that owns trackable jobs.

type JobRegistry

type JobRegistry struct {
	// contains filtered or unexported fields
}

JobRegistry aggregates jobs from all registered providers.

func NewJobRegistry

func NewJobRegistry() *JobRegistry

NewJobRegistry returns an empty registry. Register providers after creation.

func (*JobRegistry) KillJob

func (jr *JobRegistry) KillJob(id string) error

KillJob routes to the correct provider by job type prefix.

func (*JobRegistry) ListJobs

func (jr *JobRegistry) ListJobs() []*pb.Job

ListJobs returns jobs aggregated from all providers.

func (*JobRegistry) PauseJob

func (jr *JobRegistry) PauseJob(id string) error

PauseJob routes to the correct provider by job type prefix (e.g. "session:id").

func (*JobRegistry) Register

func (jr *JobRegistry) Register(jobType string, p JobProvider)

Register adds a provider under the given type key (e.g. "session", "cron").

func (*JobRegistry) ResumeJob

func (jr *JobRegistry) ResumeJob(id string) error

ResumeJob routes to the correct provider by job type prefix.

type PlanManager

type PlanManager struct {
	// contains filtered or unexported fields
}

PlanManager holds all plans keyed by plan ID.

func NewPlanManager

func NewPlanManager(hks *hooks.HookConfig) *PlanManager

func (*PlanManager) Approve

func (pm *PlanManager) Approve(planID string, skipSteps []string) error

Approve marks a plan as approved, optionally skipping specific step IDs. Returns an error if the plan is not found or not in proposed status.

func (*PlanManager) Create

func (pm *PlanManager) Create(sessionID, goal string, steps []*pb.PlanStep) *pb.Plan

Create stores a new plan and returns it.

func (*PlanManager) ForSession

func (pm *PlanManager) ForSession(sessionID string) []*pb.Plan

ForSession returns all plans for the given session ID.

func (*PlanManager) Get

func (pm *PlanManager) Get(planID string) *pb.Plan

Get returns a plan by ID, or nil if not found.

func (*PlanManager) Reject

func (pm *PlanManager) Reject(planID, feedback string) error

Reject marks a plan as rejected with optional feedback. Returns an error if the plan is not found or is already in a terminal state (approved, executing, completed, or rejected).

func (*PlanManager) UpdateStep

func (pm *PlanManager) UpdateStep(planID, stepID, stepStatus, errMsg string) error

UpdateStep updates the status (and optional error) of a step within a plan. If all non-skipped steps are completed or failed, the plan is marked completed.

type Project

type Project struct {
	ID         string
	Name       string
	ConfigPath string
	Cwd        string   // directory where the project was started
	WorkDir    string   // working directory for agents (defaults to Cwd)
	Paths      []string // whitelisted directories for tool/agent interaction (empty = unrestricted under WorkDir)
	Status     string   // active, paused, killed, completed
	TeamIDs    []string
	CreatedAt  time.Time
}

Project is a registered multi-team project.

type ProjectRegistry

type ProjectRegistry struct {
	// contains filtered or unexported fields
}

ProjectRegistry manages active projects in memory.

func NewProjectRegistry

func NewProjectRegistry() *ProjectRegistry

NewProjectRegistry returns an initialized ProjectRegistry.

func (*ProjectRegistry) AddTeam

func (pr *ProjectRegistry) AddTeam(projectID, teamID string)

AddTeam associates a team ID with the project.

func (*ProjectRegistry) Get

func (pr *ProjectRegistry) Get(id string) (*Project, error)

Get returns a project by ID.

func (*ProjectRegistry) GetByName

func (pr *ProjectRegistry) GetByName(name string) (*Project, error)

GetByName returns a project by name.

func (*ProjectRegistry) List

func (pr *ProjectRegistry) List() []Project

List returns all projects.

func (*ProjectRegistry) Register

func (pr *ProjectRegistry) Register(name, configPath string, opts *RegisterOpts) (*Project, error)

Register creates a new project entry. Cwd is auto-captured from the current working directory if not provided in opts.

func (*ProjectRegistry) RemoveTeam

func (pr *ProjectRegistry) RemoveTeam(projectID, teamID string)

RemoveTeam disassociates a team from the project.

func (*ProjectRegistry) SetStatus

func (pr *ProjectRegistry) SetStatus(id, status string) error

SetStatus updates the project status.

type ProviderCheckpoint

type ProviderCheckpoint struct {
	Alias string `json:"alias"`
	Type  string `json:"type"`
	Model string `json:"model"`
}

ProviderCheckpoint records a configured provider alias.

type RegisterOpts

type RegisterOpts struct {
	Cwd     string
	WorkDir string
	Paths   []string
}

RegisterOpts holds optional fields for project registration.

type Service

type Service struct {
	pb.UnimplementedRatchetDaemonServer
	// contains filtered or unexported fields
}

func NewService

func NewService(ctx context.Context) (*Service, error)

func (*Service) AddProvider

func (s *Service) AddProvider(ctx context.Context, req *pb.AddProviderReq) (*pb.Provider, error)

func (*Service) ApprovePlan

func (s *Service) ApprovePlan(req *pb.ApprovePlanReq, stream pb.RatchetDaemon_ApprovePlanServer) error

ApprovePlan implements the ApprovePlan RPC.

func (*Service) AttachSession

func (s *Service) AttachSession(req *pb.AttachReq, stream pb.RatchetDaemon_AttachSessionServer) error

func (*Service) AttachTeam

func (s *Service) AttachTeam(req *pb.AttachTeamReq, stream pb.RatchetDaemon_AttachTeamServer) error

func (*Service) CheckVersion

func (s *Service) CheckVersion(ctx context.Context, req *pb.VersionCheckReq) (*pb.VersionCheckResp, error)

CheckVersion compares CLI version/proto against the running daemon and returns compatibility information.

func (*Service) ClaimTask

func (s *Service) ClaimTask(_ context.Context, req *pb.TaskClaimReq) (*pb.TaskInfo, error)

func (*Service) CreateCron

func (s *Service) CreateCron(ctx context.Context, req *pb.CreateCronReq) (*pb.CronJob, error)

func (*Service) CreateSession

func (s *Service) CreateSession(ctx context.Context, req *pb.CreateSessionReq) (*pb.Session, error)

func (*Service) CreateTask

func (s *Service) CreateTask(_ context.Context, req *pb.TaskCreateReq) (*pb.TaskInfo, error)

func (*Service) DetachSession

func (s *Service) DetachSession(ctx context.Context, req *pb.DetachReq) (*pb.Empty, error)

func (*Service) DirectMessage

func (s *Service) DirectMessage(ctx context.Context, req *pb.DirectMessageReq) (*pb.Empty, error)

func (*Service) GetAgentStatus

func (s *Service) GetAgentStatus(ctx context.Context, req *pb.AgentStatusReq) (*pb.Agent, error)

func (*Service) GetFleetStatus

func (s *Service) GetFleetStatus(ctx context.Context, req *pb.FleetStatusReq) (*pb.FleetStatus, error)

GetFleetStatus returns the current status of a fleet.

func (*Service) GetProjectStatus

func (s *Service) GetProjectStatus(ctx context.Context, req *pb.ProjectReq) (*pb.ProjectStatus, error)

func (*Service) GetTask

func (s *Service) GetTask(_ context.Context, req *pb.TaskReq) (*pb.TaskInfo, error)

func (*Service) GetTeamStatus

func (s *Service) GetTeamStatus(ctx context.Context, req *pb.TeamStatusReq) (*pb.TeamStatus, error)

func (*Service) Health

func (s *Service) Health(ctx context.Context, _ *pb.Empty) (*pb.HealthResponse, error)

func (*Service) KillFleetWorker

func (s *Service) KillFleetWorker(ctx context.Context, req *pb.KillFleetWorkerReq) (*pb.Empty, error)

KillFleetWorker cancels a specific worker within a fleet.

func (*Service) KillJob

func (s *Service) KillJob(ctx context.Context, req *pb.JobReq) (*pb.Empty, error)

func (*Service) KillProject

func (s *Service) KillProject(ctx context.Context, req *pb.ProjectReq) (*pb.Empty, error)

func (*Service) KillSession

func (s *Service) KillSession(ctx context.Context, req *pb.KillReq) (*pb.Empty, error)

func (*Service) KillTeam

func (s *Service) KillTeam(ctx context.Context, req *pb.KillTeamReq) (*pb.Empty, error)

func (*Service) ListAgents

func (s *Service) ListAgents(ctx context.Context, _ *pb.Empty) (*pb.AgentList, error)

func (*Service) ListCrons

func (s *Service) ListCrons(ctx context.Context, _ *pb.Empty) (*pb.CronJobList, error)

func (*Service) ListJobs

func (s *Service) ListJobs(ctx context.Context, _ *pb.Empty) (*pb.JobList, error)

func (*Service) ListPendingHuman

func (s *Service) ListPendingHuman(ctx context.Context, req *pb.PendingHumanReq) (*pb.PendingHumanList, error)

func (*Service) ListProjects

func (s *Service) ListProjects(ctx context.Context, _ *pb.Empty) (*pb.ProjectList, error)

func (*Service) ListProviders

func (s *Service) ListProviders(ctx context.Context, _ *pb.Empty) (*pb.ProviderList, error)

func (*Service) ListSessions

func (s *Service) ListSessions(ctx context.Context, _ *pb.Empty) (*pb.SessionList, error)

func (*Service) ListTasks

func (s *Service) ListTasks(_ context.Context, req *pb.TaskListReq) (*pb.TaskList, error)

func (*Service) ListTeams

func (s *Service) ListTeams(ctx context.Context, req *pb.ListTeamsReq) (*pb.TeamList, error)

func (*Service) MeshStream

func (s *Service) MeshStream(stream pb.RatchetDaemon_MeshStreamServer) error

MeshStream handles bidirectional mesh event exchange with a remote daemon node.

Protocol: The first message from the client should be a node_registered event carrying the nodeID from RegisterMeshNode. If missing, a new ID is generated.

All outgoing sends are serialized through a single channel to avoid concurrent stream.Send calls. Both AgentMessages and BlackboardSync events are forwarded.

func (*Service) PauseCron

func (s *Service) PauseCron(ctx context.Context, req *pb.CronJobReq) (*pb.Empty, error)

func (*Service) PauseJob

func (s *Service) PauseJob(ctx context.Context, req *pb.JobReq) (*pb.Empty, error)

func (*Service) PauseProject

func (s *Service) PauseProject(ctx context.Context, req *pb.ProjectReq) (*pb.Empty, error)

func (*Service) RegisterMeshNode

func (s *Service) RegisterMeshNode(ctx context.Context, req *pb.RegisterNodeReq) (*pb.RegisterNodeResp, error)

RegisterMeshNode registers a remote node in the service mesh and returns its generated ID. The returned ID should be used by the client when opening a MeshStream (sent as the first MeshEvent.node_registered message) so both sides agree on the node identity.

func (*Service) RejectPlan

func (s *Service) RejectPlan(ctx context.Context, req *pb.RejectPlanReq) (*pb.Empty, error)

RejectPlan implements the RejectPlan RPC.

func (*Service) RemoveProvider

func (s *Service) RemoveProvider(ctx context.Context, req *pb.RemoveProviderReq) (*pb.Empty, error)

func (*Service) RenameTeam

func (s *Service) RenameTeam(ctx context.Context, req *pb.TeamRenameReq) (*pb.Empty, error)

func (*Service) RequestReload

func (s *Service) RequestReload(req *pb.ReloadReq, stream pb.RatchetDaemon_RequestReloadServer) error

RequestReload checkpoints state and initiates a graceful daemon restart. It streams status events back to the caller.

func (*Service) RespondToHuman

func (s *Service) RespondToHuman(ctx context.Context, req *pb.HumanResponse) (*pb.Empty, error)

func (*Service) RespondToPermission

func (s *Service) RespondToPermission(ctx context.Context, req *pb.PermissionResponse) (*pb.Empty, error)

func (*Service) ResumeCron

func (s *Service) ResumeCron(ctx context.Context, req *pb.CronJobReq) (*pb.Empty, error)

func (*Service) ResumeJob

func (s *Service) ResumeJob(ctx context.Context, req *pb.JobReq) (*pb.Empty, error)

func (*Service) ResumeProject

func (s *Service) ResumeProject(ctx context.Context, req *pb.ProjectReq) (*pb.Empty, error)

func (*Service) SendMessage

func (s *Service) SendMessage(req *pb.SendMessageReq, stream pb.RatchetDaemon_SendMessageServer) error

func (*Service) SendMessageChan

func (s *Service) SendMessageChan(ctx context.Context, sessionID, content string) (<-chan *pb.ChatEvent, error)

SendMessageChan sends a message to a session and returns ChatEvents on a channel. This is used by the ACP bridge to avoid implementing the full gRPC stream interface.

func (*Service) SetDefaultProvider

func (s *Service) SetDefaultProvider(ctx context.Context, req *pb.SetDefaultProviderReq) (*pb.Empty, error)

func (*Service) SetShutdownFunc

func (s *Service) SetShutdownFunc(fn func())

SetShutdownFunc injects the cancel function that shuts down the daemon. Called by daemon main after NewService returns.

func (*Service) Shutdown

func (s *Service) Shutdown(ctx context.Context, _ *pb.Empty) (*pb.Empty, error)

func (*Service) StartFleet

func (s *Service) StartFleet(req *pb.StartFleetReq, stream pb.RatchetDaemon_StartFleetServer) error

StartFleet starts a fleet of workers for plan execution and streams status events.

func (*Service) StartProject

func (s *Service) StartProject(ctx context.Context, req *pb.StartProjectReq) (*pb.ProjectStatus, error)

func (*Service) StartTeam

func (s *Service) StartTeam(req *pb.StartTeamReq, stream pb.RatchetDaemon_StartTeamServer) error

func (*Service) SteerTeam

func (s *Service) SteerTeam(ctx context.Context, req *pb.SteerTeamReq) (*pb.Empty, error)

func (*Service) StopCron

func (s *Service) StopCron(ctx context.Context, req *pb.CronJobReq) (*pb.Empty, error)

func (*Service) TeamAddAgent

func (s *Service) TeamAddAgent(ctx context.Context, req *pb.TeamAddAgentReq) (*pb.Empty, error)

func (*Service) TeamRemoveAgent

func (s *Service) TeamRemoveAgent(ctx context.Context, req *pb.TeamRemoveAgentReq) (*pb.Empty, error)

func (*Service) TestProvider

func (s *Service) TestProvider(ctx context.Context, req *pb.TestProviderReq) (*pb.TestProviderResult, error)

func (*Service) UpdateProviderModel

func (s *Service) UpdateProviderModel(ctx context.Context, req *pb.UpdateProviderModelReq) (*pb.Empty, error)

func (*Service) UpdateTask

func (s *Service) UpdateTask(_ context.Context, req *pb.TaskUpdateReq) (*pb.TaskInfo, error)

type SessionActor

type SessionActor struct {
	// contains filtered or unexported fields
}

SessionActor maintains per-session state (working dir, active permissions) and persists messages to SQLite for rehydration across daemon restarts.

func (*SessionActor) PostStop

func (a *SessionActor) PostStop(ctx *actor.Context) error

func (*SessionActor) PreStart

func (a *SessionActor) PreStart(ctx *actor.Context) error

func (*SessionActor) Receive

func (a *SessionActor) Receive(ctx *actor.ReceiveContext)

type SessionBroadcaster

type SessionBroadcaster struct {
	// contains filtered or unexported fields
}

SessionBroadcaster fans out ChatEvents to multiple subscribers per session.

func NewSessionBroadcaster

func NewSessionBroadcaster() *SessionBroadcaster

func (*SessionBroadcaster) Publish

func (b *SessionBroadcaster) Publish(sessionID string, event *pb.ChatEvent)

Publish sends an event to all current subscribers for sessionID. Sends are non-blocking: slow subscribers are skipped without blocking the caller.

func (*SessionBroadcaster) Subscribe

func (b *SessionBroadcaster) Subscribe(sessionID string) (<-chan *pb.ChatEvent, string)

Subscribe returns a channel that receives events for the given sessionID plus a unique subscription ID that must be passed to Unsubscribe.

func (*SessionBroadcaster) Unsubscribe

func (b *SessionBroadcaster) Unsubscribe(sessionID, subID string)

Unsubscribe removes the subscription and closes the channel.

type SessionCheckpoint

type SessionCheckpoint struct {
	ID         string `json:"id"`
	Name       string `json:"name"`
	WorkingDir string `json:"working_dir"`
	Provider   string `json:"provider"`
	Model      string `json:"model"`
	Status     string `json:"status"`
}

SessionCheckpoint records an active session to resume after reload.

type SessionInfo

type SessionInfo struct {
	ID         string
	Name       string
	Status     string // active, background, completed
	WorkingDir string
	Provider   string
	Model      string
	CreatedAt  time.Time
	Agents     int
}

type SessionJobProvider

type SessionJobProvider struct {
	// contains filtered or unexported fields
}

SessionJobProvider wraps SessionManager to expose session jobs.

func NewSessionJobProvider

func NewSessionJobProvider(sm *SessionManager) *SessionJobProvider

func (*SessionJobProvider) ActiveJobs

func (p *SessionJobProvider) ActiveJobs() []*pb.Job

func (*SessionJobProvider) KillJob

func (p *SessionJobProvider) KillJob(id string) error

func (*SessionJobProvider) PauseJob

func (p *SessionJobProvider) PauseJob(id string) error

func (*SessionJobProvider) ResumeJob

func (p *SessionJobProvider) ResumeJob(id string) error

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

func NewSessionManager

func NewSessionManager(db *sql.DB) *SessionManager

func (*SessionManager) CleanupStale

func (sm *SessionManager) CleanupStale(ctx context.Context, maxAge time.Duration) (int64, error)

CleanupStale marks sessions older than maxAge as completed. Called on daemon startup to prevent indefinite accumulation.

func (*SessionManager) Create

func (sm *SessionManager) Create(ctx context.Context, workingDir, provider, model, initialPrompt string) (*SessionInfo, error)

func (*SessionManager) Get

func (sm *SessionManager) Get(ctx context.Context, id string) (*SessionInfo, error)

func (*SessionManager) Kill

func (sm *SessionManager) Kill(ctx context.Context, id string) error

func (*SessionManager) List

func (sm *SessionManager) List(ctx context.Context) ([]SessionInfo, error)

func (*SessionManager) Publish

func (sm *SessionManager) Publish(sessionID string, event any)

Publish sends an event to all subscribers of a session.

func (*SessionManager) SessionsInDir

func (sm *SessionManager) SessionsInDir(dir string) []string

SessionsInDir returns session IDs operating in the same directory.

func (*SessionManager) SetBackground

func (sm *SessionManager) SetBackground(ctx context.Context, id string) error

func (*SessionManager) Subscribe

func (sm *SessionManager) Subscribe(sessionID string) chan any

Subscribe returns a channel for receiving session events.

type SessionMessage

type SessionMessage struct {
	Role    string
	Content string
}

SessionMessage is delivered to a SessionActor to record a chat message.

type TeamJobProvider

type TeamJobProvider struct {
	// contains filtered or unexported fields
}

TeamJobProvider wraps TeamManager to expose team agent jobs.

func NewTeamJobProvider

func NewTeamJobProvider(tm *TeamManager) *TeamJobProvider

func (*TeamJobProvider) ActiveJobs

func (p *TeamJobProvider) ActiveJobs() []*pb.Job

func (*TeamJobProvider) KillJob

func (p *TeamJobProvider) KillJob(id string) error

func (*TeamJobProvider) PauseJob

func (p *TeamJobProvider) PauseJob(id string) error

func (*TeamJobProvider) ResumeJob

func (p *TeamJobProvider) ResumeJob(id string) error

type TeamManager

type TeamManager struct {
	// contains filtered or unexported fields
}

TeamManager manages team instances.

func NewTeamManager

func NewTeamManager(engine *EngineContext, hks *hooks.HookConfig) *TeamManager

NewTeamManager returns an initialized TeamManager.

func (*TeamManager) AddAgent

func (tm *TeamManager) AddAgent(teamID, agentSpec string) error

AddAgent dynamically adds an agent to a running team's mesh. This is a stub; full wiring requires access to the team's BB/Router.

func (*TeamManager) AttachTeam

func (tm *TeamManager) AttachTeam(teamID, mode string) (string, <-chan *pb.TeamActivityEvent, error)

AttachTeam registers an observer for a team and returns the event channel.

func (*TeamManager) DetachTeam

func (tm *TeamManager) DetachTeam(teamID, observerID string)

DetachTeam removes an observer by cancelling its context.

func (*TeamManager) FindAgent

func (tm *TeamManager) FindAgent(agentID string) *pb.Agent

FindAgent looks up an agent by ID across all teams.

func (*TeamManager) GetStatus

func (tm *TeamManager) GetStatus(teamID string) (*pb.TeamStatus, error)

GetStatus returns the current TeamStatus for a given team ID or name.

func (*TeamManager) KillAgent

func (tm *TeamManager) KillAgent(teamID string) error

KillAgent cancels the team that owns the given agent (team-level cancel).

func (*TeamManager) ListAllAgents

func (tm *TeamManager) ListAllAgents() []*pb.Agent

ListAllAgents aggregates agents from all active team instances.

func (*TeamManager) ListTeams

func (tm *TeamManager) ListTeams(projectID string) []*pb.TeamStatus

ListTeams returns status for all teams, optionally filtered by project (unused in Phase 2).

func (*TeamManager) RemoveAgent

func (tm *TeamManager) RemoveAgent(teamID, agentName string) error

RemoveAgent dynamically removes an agent from a running team.

func (*TeamManager) Rename

func (tm *TeamManager) Rename(teamID, newName string) error

Rename assigns a user-friendly name to a team.

func (*TeamManager) StartMeshTeam

func (tm *TeamManager) StartMeshTeam(
	ctx context.Context,
	task string,
	configs []mesh.NodeConfig,
	providerFactory func(mesh.NodeConfig) provider.Provider,
) (string, <-chan *pb.TeamEvent)

StartMeshTeam creates a team via the mesh orchestrator, converts mesh Events to pb.TeamEvents, and returns a channel of events.

func (*TeamManager) StartTeam

func (tm *TeamManager) StartTeam(ctx context.Context, req *pb.StartTeamReq) (string, <-chan *pb.TeamEvent)

StartTeam creates a team, spawns default agents, and returns the team ID. Events are sent on the returned channel; it is closed when the team finishes. If req.TeamConfigName is set, the team is launched via the mesh orchestrator.

type TokenTracker

type TokenTracker struct {
	// contains filtered or unexported fields
}

TokenTracker tracks input/output token usage per session.

func NewTokenTracker

func NewTokenTracker() *TokenTracker

func (*TokenTracker) AddTokens

func (t *TokenTracker) AddTokens(sessionID string, input, output int)

AddTokens updates the running token count for a session.

func (*TokenTracker) Reset

func (t *TokenTracker) Reset(sessionID string)

Reset clears the token count for a session (after compression).

func (*TokenTracker) ShouldCompress

func (t *TokenTracker) ShouldCompress(sessionID string, threshold float64, modelLimit int) bool

ShouldCompress returns true when the session token total exceeds threshold fraction of modelLimit.

func (*TokenTracker) Total

func (t *TokenTracker) Total(sessionID string) int

Total returns the combined input+output token count for a session.

type WorkerCostEntry

type WorkerCostEntry struct {
	WorkerName string
	StepID     string
	Model      string
	Complexity string
}

WorkerCostEntry holds the model assignment for a single fleet worker step.

func FleetCostBreakdown

func FleetCostBreakdown(steps []string, routing config.ModelRouting) []WorkerCostEntry

FleetCostBreakdown returns per-worker model assignments for a set of step IDs. This is the basis for estimating per-worker cost when routing to different models.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL