server

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus provides run-scoped pub/sub for SSE streaming. External apps can use this directly or replace with their own.

func NewEventBus

func NewEventBus() *EventBus

NewEventBus creates a new event bus.

func (*EventBus) CloseRun

func (b *EventBus) CloseRun(runID string)

CloseRun closes all subscriber channels for a run.

func (*EventBus) HasSubscribers

func (b *EventBus) HasSubscribers(runID string) bool

HasSubscribers returns true if any subscriber is listening.

func (*EventBus) Publish

func (b *EventBus) Publish(runID string, evt agent.ReactEvent)

Publish sends an event to all subscribers of a run.

func (*EventBus) Subscribe

func (b *EventBus) Subscribe(runID string, bufSize int) chan agent.ReactEvent

Subscribe returns a channel that receives events for the given run.

func (*EventBus) Unsubscribe

func (b *EventBus) Unsubscribe(runID string, ch chan agent.ReactEvent)

Unsubscribe removes a subscriber.

type Handlers

type Handlers struct {
	// contains filtered or unexported fields
}

Handlers holds the HTTP handlers for Agent Protocol endpoints. All handlers use agent.App and agent.Store — no internal/ dependencies.

func (*Handlers) CancelRun

func (h *Handlers) CancelRun(w http.ResponseWriter, r *http.Request)

POST /runs/{run_id}/cancel

func (*Handlers) CreateRunOnThread

func (h *Handlers) CreateRunOnThread(w http.ResponseWriter, r *http.Request)

POST /threads/{thread_id}/runs

func (*Handlers) CreateRunOnThreadStream

func (h *Handlers) CreateRunOnThreadStream(w http.ResponseWriter, r *http.Request)

POST /threads/{thread_id}/runs/stream

func (*Handlers) CreateRunOnThreadWait

func (h *Handlers) CreateRunOnThreadWait(w http.ResponseWriter, r *http.Request)

POST /threads/{thread_id}/runs/wait

func (*Handlers) CreateStateless

func (h *Handlers) CreateStateless(w http.ResponseWriter, r *http.Request)

POST /runs — stateless background

func (*Handlers) CreateStatelessStream

func (h *Handlers) CreateStatelessStream(w http.ResponseWriter, r *http.Request)

POST /runs/stream — stateless (auto-create thread)

func (*Handlers) CreateStatelessWait

func (h *Handlers) CreateStatelessWait(w http.ResponseWriter, r *http.Request)

POST /runs/wait — stateless wait

func (*Handlers) CreateThread

func (h *Handlers) CreateThread(w http.ResponseWriter, r *http.Request)

POST /threads

func (*Handlers) DeleteThread

func (h *Handlers) DeleteThread(w http.ResponseWriter, r *http.Request)

DELETE /threads/{thread_id}

func (*Handlers) GetRun

func (h *Handlers) GetRun(w http.ResponseWriter, r *http.Request)

GET /runs/{run_id}

func (*Handlers) GetThread

func (h *Handlers) GetThread(w http.ResponseWriter, r *http.Request)

GET /threads/{thread_id}

func (*Handlers) Health

func (h *Handlers) Health(w http.ResponseWriter, r *http.Request)

func (*Handlers) SearchThreads

func (h *Handlers) SearchThreads(w http.ResponseWriter, r *http.Request)

POST /threads/search

func (*Handlers) StreamRun

func (h *Handlers) StreamRun(w http.ResponseWriter, r *http.Request)

GET /runs/{run_id}/stream — reconnect to SSE

func (*Handlers) ThreadHistory

func (h *Handlers) ThreadHistory(w http.ResponseWriter, r *http.Request)

GET /threads/{thread_id}/history

func (*Handlers) WaitRun

func (h *Handlers) WaitRun(w http.ResponseWriter, r *http.Request)

GET /runs/{run_id}/wait

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner executes agent tasks in background goroutines, decoupled from HTTP. Features:

  • Semaphore-based concurrency limit (configurable)
  • WaitGroup for graceful shutdown
  • Panic recovery in all goroutines
  • Configurable run timeout
  • Cancel individual runs or all on shutdown

func NewRunner

func NewRunner(app *agent.App, bus *EventBus, cfgs ...RunnerConfig) *Runner

NewRunner creates a runner from an App, EventBus, and optional config.

func (*Runner) ActiveRuns

func (r *Runner) ActiveRuns() int

ActiveRuns returns the number of currently executing runs.

func (*Runner) Bus

func (r *Runner) Bus() *EventBus

Bus returns the event bus.

func (*Runner) CancelRun

func (r *Runner) CancelRun(runID string) bool

CancelRun cancels a single running task.

func (*Runner) Config

func (r *Runner) Config() RunnerConfig

Config returns the runner configuration.

func (*Runner) Shutdown

func (r *Runner) Shutdown(ctx context.Context) error

Shutdown cancels all running tasks and waits for them to finish. Safe to call multiple times.

func (*Runner) StartRun

func (r *Runner) StartRun(ctx context.Context, threadID, content string, metadata map[string]interface{}) (*protocol.Run, error)

StartRun creates a Run record and executes App.SendWithEvents in a background goroutine. Returns the Run immediately; the task continues regardless of caller. Returns error if the runner is shutting down or the semaphore can't be acquired.

type RunnerConfig

type RunnerConfig struct {
	// MaxConcurrent is the maximum number of runs executing simultaneously.
	// 0 = unlimited (not recommended for production). Default: 100.
	MaxConcurrent int

	// RunTimeout is the max duration for a single run. Default: 5 minutes.
	RunTimeout time.Duration

	// ShutdownTimeout is how long Shutdown() waits for active runs. Default: 30s.
	ShutdownTimeout time.Duration
}

RunnerConfig configures the task runner.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the Agent Protocol HTTP server. It wraps an agent.App with a Runner, EventBus, and HTTP handlers.

func New

func New(cfg ServerConfig) (*Server, error)

New creates a new Agent Protocol server. Call Start() to begin listening, or use Router() to mount on your own server.

func (*Server) Bus

func (s *Server) Bus() *EventBus

Bus returns the event bus for direct SSE subscription.

func (*Server) Handlers

func (s *Server) Handlers() *Handlers

Handlers returns the HTTP handlers for individual route override.

func (*Server) Router

func (s *Server) Router() http.Handler

Router returns the http.Handler for mounting on your own server. Use this if you don't want to call Start() but want to serve the routes yourself.

func (*Server) Runner

func (s *Server) Runner() *Runner

Runner returns the task runner for direct access.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the server. First stops accepting new HTTP requests, then cancels all active runs and waits.

func (*Server) Start

func (s *Server) Start() error

Start begins listening on the configured port.

type ServerConfig

type ServerConfig struct {
	// App is the agent application (required).
	App *agent.App

	// Port for the HTTP server (default: "8080").
	Port string

	// Router lets you bring your own chi.Router.
	// If nil, a default router with standard middleware is created.
	// Use this to mount Agent Protocol routes alongside your own routes.
	Router chi.Router

	// Middleware adds additional HTTP middleware to the default router.
	// Ignored if Router is provided (add middleware to your own router).
	Middleware []func(http.Handler) http.Handler

	// SSEFormatter customizes how ReactEvents are formatted for SSE.
	// Default: event type as SSE event name, JSON-marshaled event as data.
	// Return eventType and data bytes. Return empty eventType to skip the event.
	SSEFormatter func(evt agent.ReactEvent) (eventType string, data []byte)

	// OnBeforeRun is called before a run starts. Return error to reject.
	OnBeforeRun func(ctx context.Context, threadID, input string) error

	// OnAfterRun is called after a run completes (success or error).
	OnAfterRun func(ctx context.Context, threadID string, result *agent.ReactResult, err error)

	// CORSOrigin sets the Access-Control-Allow-Origin header (default: "*").
	CORSOrigin string

	// ReadTimeout for the HTTP server (default: 30s).
	ReadTimeout time.Duration

	// WriteTimeout for the HTTP server (default: 300s).
	WriteTimeout time.Duration

	// Runner configures the background task runner (concurrency, timeouts).
	Runner RunnerConfig
}

ServerConfig configures the Agent Protocol HTTP server. All fields except App are optional — sensible defaults are used.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL