Documentation
¶
Index ¶
- type EventBus
- func (b *EventBus) CloseRun(runID string)
- func (b *EventBus) HasSubscribers(runID string) bool
- func (b *EventBus) Publish(runID string, evt agent.ReactEvent)
- func (b *EventBus) Subscribe(runID string, bufSize int) chan agent.ReactEvent
- func (b *EventBus) Unsubscribe(runID string, ch chan agent.ReactEvent)
- type Handlers
- func (h *Handlers) CancelRun(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) CreateRunOnThread(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) CreateRunOnThreadStream(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) CreateRunOnThreadWait(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) CreateStateless(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) CreateStatelessStream(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) CreateStatelessWait(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) CreateThread(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) DeleteThread(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) GetRun(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) GetThread(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) Health(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) SearchThreads(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) StreamRun(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) ThreadHistory(w http.ResponseWriter, r *http.Request)
- func (h *Handlers) WaitRun(w http.ResponseWriter, r *http.Request)
- type Runner
- func (r *Runner) ActiveRuns() int
- func (r *Runner) Bus() *EventBus
- func (r *Runner) CancelRun(runID string) bool
- func (r *Runner) Config() RunnerConfig
- func (r *Runner) Shutdown(ctx context.Context) error
- func (r *Runner) StartRun(ctx context.Context, threadID, content string, metadata map[string]interface{}) (*protocol.Run, error)
- type RunnerConfig
- type Server
- type ServerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus provides run-scoped pub/sub for SSE streaming. External apps can use this directly or replace with their own.
func (*EventBus) HasSubscribers ¶
HasSubscribers returns true if any subscriber is listening.
func (*EventBus) Publish ¶
func (b *EventBus) Publish(runID string, evt agent.ReactEvent)
Publish sends an event to all subscribers of a run.
func (*EventBus) Subscribe ¶
func (b *EventBus) Subscribe(runID string, bufSize int) chan agent.ReactEvent
Subscribe returns a channel that receives events for the given run.
func (*EventBus) Unsubscribe ¶
func (b *EventBus) Unsubscribe(runID string, ch chan agent.ReactEvent)
Unsubscribe removes a subscriber.
type Handlers ¶
type Handlers struct {
// contains filtered or unexported fields
}
Handlers holds the HTTP handlers for Agent Protocol endpoints. All handlers use agent.App and agent.Store — no internal/ dependencies.
func (*Handlers) CancelRun ¶
func (h *Handlers) CancelRun(w http.ResponseWriter, r *http.Request)
POST /runs/{run_id}/cancel
func (*Handlers) CreateRunOnThread ¶
func (h *Handlers) CreateRunOnThread(w http.ResponseWriter, r *http.Request)
POST /threads/{thread_id}/runs
func (*Handlers) CreateRunOnThreadStream ¶
func (h *Handlers) CreateRunOnThreadStream(w http.ResponseWriter, r *http.Request)
POST /threads/{thread_id}/runs/stream
func (*Handlers) CreateRunOnThreadWait ¶
func (h *Handlers) CreateRunOnThreadWait(w http.ResponseWriter, r *http.Request)
POST /threads/{thread_id}/runs/wait
func (*Handlers) CreateStateless ¶
func (h *Handlers) CreateStateless(w http.ResponseWriter, r *http.Request)
POST /runs — stateless background
func (*Handlers) CreateStatelessStream ¶
func (h *Handlers) CreateStatelessStream(w http.ResponseWriter, r *http.Request)
POST /runs/stream — stateless (auto-create thread)
func (*Handlers) CreateStatelessWait ¶
func (h *Handlers) CreateStatelessWait(w http.ResponseWriter, r *http.Request)
POST /runs/wait — stateless wait
func (*Handlers) CreateThread ¶
func (h *Handlers) CreateThread(w http.ResponseWriter, r *http.Request)
POST /threads
func (*Handlers) DeleteThread ¶
func (h *Handlers) DeleteThread(w http.ResponseWriter, r *http.Request)
DELETE /threads/{thread_id}
func (*Handlers) GetRun ¶
func (h *Handlers) GetRun(w http.ResponseWriter, r *http.Request)
GET /runs/{run_id}
func (*Handlers) GetThread ¶
func (h *Handlers) GetThread(w http.ResponseWriter, r *http.Request)
GET /threads/{thread_id}
func (*Handlers) SearchThreads ¶
func (h *Handlers) SearchThreads(w http.ResponseWriter, r *http.Request)
POST /threads/search
func (*Handlers) StreamRun ¶
func (h *Handlers) StreamRun(w http.ResponseWriter, r *http.Request)
GET /runs/{run_id}/stream — reconnect to SSE
func (*Handlers) ThreadHistory ¶
func (h *Handlers) ThreadHistory(w http.ResponseWriter, r *http.Request)
GET /threads/{thread_id}/history
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner executes agent tasks in background goroutines, decoupled from HTTP. Features:
- Semaphore-based concurrency limit (configurable)
- WaitGroup for graceful shutdown
- Panic recovery in all goroutines
- Configurable run timeout
- Cancel individual runs or all on shutdown
func NewRunner ¶
func NewRunner(app *agent.App, bus *EventBus, cfgs ...RunnerConfig) *Runner
NewRunner creates a runner from an App, EventBus, and optional config.
func (*Runner) ActiveRuns ¶
ActiveRuns returns the number of currently executing runs.
func (*Runner) Config ¶
func (r *Runner) Config() RunnerConfig
Config returns the runner configuration.
func (*Runner) Shutdown ¶
Shutdown cancels all running tasks and waits for them to finish. Safe to call multiple times.
func (*Runner) StartRun ¶
func (r *Runner) StartRun(ctx context.Context, threadID, content string, metadata map[string]interface{}) (*protocol.Run, error)
StartRun creates a Run record and executes App.SendWithEvents in a background goroutine. Returns the Run immediately; the task continues regardless of caller. Returns error if the runner is shutting down or the semaphore can't be acquired.
type RunnerConfig ¶
type RunnerConfig struct {
// MaxConcurrent is the maximum number of runs executing simultaneously.
// 0 = unlimited (not recommended for production). Default: 100.
MaxConcurrent int
// RunTimeout is the max duration for a single run. Default: 5 minutes.
RunTimeout time.Duration
// ShutdownTimeout is how long Shutdown() waits for active runs. Default: 30s.
ShutdownTimeout time.Duration
}
RunnerConfig configures the task runner.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the Agent Protocol HTTP server. It wraps an agent.App with a Runner, EventBus, and HTTP handlers.
func New ¶
func New(cfg ServerConfig) (*Server, error)
New creates a new Agent Protocol server. Call Start() to begin listening, or use Router() to mount on your own server.
func (*Server) Router ¶
Router returns the http.Handler for mounting on your own server. Use this if you don't want to call Start() but want to serve the routes yourself.
type ServerConfig ¶
type ServerConfig struct {
// App is the agent application (required).
App *agent.App
// Port for the HTTP server (default: "8080").
Port string
// Router lets you bring your own chi.Router.
// If nil, a default router with standard middleware is created.
// Use this to mount Agent Protocol routes alongside your own routes.
Router chi.Router
// Middleware adds additional HTTP middleware to the default router.
// Ignored if Router is provided (add middleware to your own router).
Middleware []func(http.Handler) http.Handler
// SSEFormatter customizes how ReactEvents are formatted for SSE.
// Default: event type as SSE event name, JSON-marshaled event as data.
// Return eventType and data bytes. Return empty eventType to skip the event.
SSEFormatter func(evt agent.ReactEvent) (eventType string, data []byte)
// OnBeforeRun is called before a run starts. Return error to reject.
OnBeforeRun func(ctx context.Context, threadID, input string) error
// OnAfterRun is called after a run completes (success or error).
OnAfterRun func(ctx context.Context, threadID string, result *agent.ReactResult, err error)
// CORSOrigin sets the Access-Control-Allow-Origin header (default: "*").
CORSOrigin string
// ReadTimeout for the HTTP server (default: 30s).
ReadTimeout time.Duration
// WriteTimeout for the HTTP server (default: 300s).
WriteTimeout time.Duration
// Runner configures the background task runner (concurrency, timeouts).
Runner RunnerConfig
}
ServerConfig configures the Agent Protocol HTTP server. All fields except App are optional — sensible defaults are used.