Documentation
¶
Overview ¶
internal/codexappgateway/scheduler/agentserver_client.go
internal/codexappgateway/scheduler/broadcast.go
internal/codexappgateway/scheduler/dispatcher.go
internal/codexappgateway/scheduler/loop.go
internal/codexappgateway/scheduler/script.go
internal/codexappgateway/scheduler/spawn.go
Index ¶
- func RunPreScript(ctx context.Context, script string, env []string) (wake bool, data json.RawMessage, err error)
- type AgentserverClient
- type BroadcastReport
- type Broadcaster
- type ChannelRef
- type Config
- type Dispatcher
- type LeaseRequest
- type Loop
- type ResultRequest
- type SpawnInput
- type SpawnResult
- type Spawner
- type Task
- type WorkspaceTokenFetcher
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RunPreScript ¶
func RunPreScript(ctx context.Context, script string, env []string) (wake bool, data json.RawMessage, err error)
RunPreScript runs a user-supplied bash script and parses its stdout as {wakeAgent: bool, data: any}. Mirrors nanoclaw's pre-task script protocol.
Types ¶
type AgentserverClient ¶
type AgentserverClient struct {
// contains filtered or unexported fields
}
func NewAgentserverClient ¶
func NewAgentserverClient(base, secret, pod string, pid int) *AgentserverClient
func (*AgentserverClient) LeaseDue ¶
func (c *AgentserverClient) LeaseDue(ctx context.Context, req LeaseRequest) ([]Task, error)
func (*AgentserverClient) ListChannels ¶
func (c *AgentserverClient) ListChannels(ctx context.Context, workspaceID string) ([]ChannelRef, error)
ListChannels fetches the IM channels for a workspace so the dispatcher can fan-out broadcast results. Called by Dispatcher.report.
func (*AgentserverClient) PostResult ¶
func (c *AgentserverClient) PostResult(ctx context.Context, req ResultRequest) error
type BroadcastReport ¶
type Broadcaster ¶
type Broadcaster struct {
// contains filtered or unexported fields
}
func NewBroadcaster ¶
func NewBroadcaster(base, secret string) *Broadcaster
func (*Broadcaster) Send ¶
func (b *Broadcaster) Send(ctx context.Context, workspaceID, text string, channels []ChannelRef) BroadcastReport
Send fan-outs `text` to every channel, accumulating per-channel errors without aborting the rest. Returns a report identifying which channels were attempted and which failed. `workspaceID` is informational/audit.
type ChannelRef ¶
type ChannelRef struct {
ID string `json:"id"` // workspace_im_channels.id
UserID string `json:"userId"` // the IM-side user to message
}
ChannelRef is the minimal info needed to send to one workspace_im_channels row via imbridge /api/internal/imbridge/send.
type Config ¶
type Config struct {
AgentserverBase string
InternalSecret string
ImbridgeBase string
ImbridgeSecret string
CodexBin string
PodID string
PID int
TickInterval time.Duration
LeaseSeconds int
Concurrency int
Tokens WorkspaceTokenFetcher // workspace Bearer fetcher; nil disables credential injection
ModelEnvKey string // env var name to set with the token (e.g. "CODEX_API_KEY")
}
Config holds the runtime parameters for the scheduler Loop.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher implements the fire pipeline: script gate → spawn → report → broadcast.
func NewDispatcher ¶
func NewDispatcher(a agentClient, sp spawner, br broadcaster, tokens WorkspaceTokenFetcher, modelEnvKey string) *Dispatcher
NewDispatcher constructs a Dispatcher from its collaborators. tokens and modelEnvKey are optional: when tokens is nil, codexEnv skips credential injection (useful in tests).
type LeaseRequest ¶
type Loop ¶
type Loop struct {
// contains filtered or unexported fields
}
Loop polls agentserver-main for due tasks and dispatches them concurrently. Multi-replica safe: the lease uses FOR UPDATE SKIP LOCKED.
type ResultRequest ¶
type ResultRequest struct {
TaskID string `json:"taskId"`
RunID string `json:"runId"`
Status string `json:"status"`
ExitCode int `json:"exitCode"`
DurationMS int64 `json:"durationMs"`
Summary string `json:"summary"`
TranscriptURI string `json:"transcriptUri"`
CostUSD *float64 `json:"costUsd,omitempty"`
NumTurns *int `json:"numTurns,omitempty"`
BroadcastTo []string `json:"broadcastTo"`
BroadcastErrors json.RawMessage `json:"broadcastErrors"`
}
type SpawnResult ¶
type Spawner ¶
type Spawner struct {
// contains filtered or unexported fields
}
func NewSpawner ¶
func (*Spawner) Run ¶
func (s *Spawner) Run(ctx context.Context, in SpawnInput) (SpawnResult, error)
type Task ¶
type Task struct {
ID string `json:"id"`
WorkspaceID string `json:"workspaceId"`
SeriesID string `json:"seriesId"`
Prompt string `json:"prompt"`
Script *string `json:"script,omitempty"`
Timezone string `json:"timezone"`
Recurrence *string `json:"recurrence,omitempty"`
ProcessAfter string `json:"processAfter"`
TimeoutSeconds int `json:"timeoutSeconds"`
RunID string `json:"runId,omitempty"` // populated server-side
}
type WorkspaceTokenFetcher ¶
type WorkspaceTokenFetcher interface {
GetOrCreate(ctx context.Context, workspaceID string) (string, error)
}
WorkspaceTokenFetcher resolves a workspace's Bearer token (the same one the live-spawn path uses). Implementations are in the codexappgateway package; the scheduler holds only the interface to keep this package importable from tests without that dependency.