scheduler

package
v0.64.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

internal/codexappgateway/scheduler/agentserver_client.go

internal/codexappgateway/scheduler/broadcast.go

internal/codexappgateway/scheduler/dispatcher.go

internal/codexappgateway/scheduler/loop.go

internal/codexappgateway/scheduler/script.go

internal/codexappgateway/scheduler/spawn.go

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunPreScript

func RunPreScript(ctx context.Context, script string, env []string) (wake bool, data json.RawMessage, err error)

RunPreScript runs a user-supplied bash script and parses its stdout as {wakeAgent: bool, data: any}. Mirrors nanoclaw's pre-task script protocol.

Types

type AgentserverClient

type AgentserverClient struct {
	// contains filtered or unexported fields
}

func NewAgentserverClient

func NewAgentserverClient(base, secret, pod string, pid int) *AgentserverClient

func (*AgentserverClient) LeaseDue

func (c *AgentserverClient) LeaseDue(ctx context.Context, req LeaseRequest) ([]Task, error)

func (*AgentserverClient) ListChannels

func (c *AgentserverClient) ListChannels(ctx context.Context, workspaceID string) ([]ChannelRef, error)

ListChannels fetches the IM channels for a workspace so the dispatcher can fan-out broadcast results. Called by Dispatcher.report.

func (*AgentserverClient) PostResult

func (c *AgentserverClient) PostResult(ctx context.Context, req ResultRequest) error

type BroadcastReport

type BroadcastReport struct {
	To     []string          // channel ids attempted
	Errors map[string]string // channel id → error text
}

type Broadcaster

type Broadcaster struct {
	// contains filtered or unexported fields
}

func NewBroadcaster

func NewBroadcaster(base, secret string) *Broadcaster

func (*Broadcaster) Send

func (b *Broadcaster) Send(ctx context.Context, workspaceID, text string, channels []ChannelRef) BroadcastReport

Send fan-outs `text` to every channel, accumulating per-channel errors without aborting the rest. Returns a report identifying which channels were attempted and which failed. `workspaceID` is informational/audit.

type ChannelRef

type ChannelRef struct {
	ID     string `json:"id"`     // workspace_im_channels.id
	UserID string `json:"userId"` // the IM-side user to message
}

ChannelRef is the minimal info needed to send to one workspace_im_channels row via imbridge /api/internal/imbridge/send.

type Config

type Config struct {
	AgentserverBase string
	InternalSecret  string
	ImbridgeBase    string
	ImbridgeSecret  string
	CodexBin        string
	PodID           string
	PID             int
	TickInterval    time.Duration
	LeaseSeconds    int
	Concurrency     int
	Tokens          WorkspaceTokenFetcher // workspace Bearer fetcher; nil disables credential injection
	ModelEnvKey     string                // env var name to set with the token (e.g. "CODEX_API_KEY")
}

Config holds the runtime parameters for the scheduler Loop.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher implements the fire pipeline: script gate → spawn → report → broadcast.

func NewDispatcher

func NewDispatcher(a agentClient, sp spawner, br broadcaster, tokens WorkspaceTokenFetcher, modelEnvKey string) *Dispatcher

NewDispatcher constructs a Dispatcher from its collaborators. tokens and modelEnvKey are optional: when tokens is nil, codexEnv skips credential injection (useful in tests).

func (*Dispatcher) Fire

func (d *Dispatcher) Fire(ctx context.Context, t Task) error

Fire executes the full scheduled-task fire pipeline for a single Task.

type LeaseRequest

type LeaseRequest struct {
	Limit        int    `json:"limit"`
	LeaseSeconds int    `json:"leaseSeconds"`
	Owner        string `json:"owner"`
}

type Loop

type Loop struct {
	// contains filtered or unexported fields
}

Loop polls agentserver-main for due tasks and dispatches them concurrently. Multi-replica safe: the lease uses FOR UPDATE SKIP LOCKED.

func New

func New(cfg Config, logger *slog.Logger) *Loop

New constructs a Loop, wiring together AgentserverClient, Spawner, and Broadcaster from cfg. Defaults are applied for zero-valued numeric fields.

func (*Loop) Run

func (l *Loop) Run(ctx context.Context)

Run blocks until ctx is cancelled, ticking the lease+dispatch loop each TickInterval. Each due task is dispatched in its own goroutine, bounded by Concurrency.

type ResultRequest

type ResultRequest struct {
	TaskID          string          `json:"taskId"`
	RunID           string          `json:"runId"`
	Status          string          `json:"status"`
	ExitCode        int             `json:"exitCode"`
	DurationMS      int64           `json:"durationMs"`
	Summary         string          `json:"summary"`
	TranscriptURI   string          `json:"transcriptUri"`
	CostUSD         *float64        `json:"costUsd,omitempty"`
	NumTurns        *int            `json:"numTurns,omitempty"`
	BroadcastTo     []string        `json:"broadcastTo"`
	BroadcastErrors json.RawMessage `json:"broadcastErrors"`
}

type SpawnInput

type SpawnInput struct {
	Prompt  string
	Env     []string
	Timeout time.Duration
}

type SpawnResult

type SpawnResult struct {
	ExitCode   int
	Transcript string // full captured stdout (truncated at 256 KiB)
	Summary    string // last non-empty line, truncated to 4 KiB
	CostUSD    *float64
	NumTurns   *int
	TimedOut   bool
	DurationMS int64
}

type Spawner

type Spawner struct {
	// contains filtered or unexported fields
}

func NewSpawner

func NewSpawner(bin string, extraEnv []string) *Spawner

func (*Spawner) Run

func (s *Spawner) Run(ctx context.Context, in SpawnInput) (SpawnResult, error)

type Task

type Task struct {
	ID             string  `json:"id"`
	WorkspaceID    string  `json:"workspaceId"`
	SeriesID       string  `json:"seriesId"`
	Prompt         string  `json:"prompt"`
	Script         *string `json:"script,omitempty"`
	Timezone       string  `json:"timezone"`
	Recurrence     *string `json:"recurrence,omitempty"`
	ProcessAfter   string  `json:"processAfter"`
	TimeoutSeconds int     `json:"timeoutSeconds"`
	RunID          string  `json:"runId,omitempty"` // populated server-side
}

type WorkspaceTokenFetcher

type WorkspaceTokenFetcher interface {
	GetOrCreate(ctx context.Context, workspaceID string) (string, error)
}

WorkspaceTokenFetcher resolves a workspace's Bearer token (the same one the live-spawn path uses). Implementations are in the codexappgateway package; the scheduler holds only the interface to keep this package importable from tests without that dependency.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL