Documentation
¶
Index ¶
- Variables
- func EpochFromContext(ctx context.Context) int
- func IssueWorkerJWT(secret []byte, claims WorkerJWTClaims) (string, error)
- func SessionIDFromContext(ctx context.Context) string
- func WorkspaceIDFromContext(ctx context.Context) string
- type BoundedUUIDSet
- type BridgeResponse
- type CCWorker
- type Config
- type DedupRegistry
- type EventBatchItem
- type EventBatchRequest
- type EventInput
- type HeartbeatRequest
- type InsertedEvent
- type InternalEventBatchItem
- type InternalEventBatchRequest
- type InternalEventInput
- type JSONRPCRequest
- type JSONRPCResponse
- type MCPContentBlock
- type MCPServer
- type MCPToolCallParams
- type MCPToolDef
- type MCPToolResult
- type ProcessTurnRequest
- type RPCError
- type SSEBroker
- type SSESubscriber
- type Server
- func (s *Server) CleanupWorker(ctx context.Context, worker *CCWorker)
- func (s *Server) CreateMCPServer(sessionID, workspaceID, workspaceDir, imChannelID, imUserID string) (*MCPServer, int, func(), error)
- func (s *Server) Routes() http.Handler
- func (s *Server) SpawnWorker(ctx context.Context, sessionID, workspaceID, imChannelID, imUserID string) (*CCWorker, error)
- type Session
- type SessionEvent
- type Store
- func (s *Store) BumpSessionEpoch(ctx context.Context, id string) (int, error)
- func (s *Store) Close() error
- func (s *Store) CreateSession(ctx context.Context, id, workspaceID, title, source string, externalID *string) error
- func (s *Store) GetEventsSince(ctx context.Context, sessionID string, sinceSeqNum int64, limit int) ([]SessionEvent, error)
- func (s *Store) GetInternalEventsSince(ctx context.Context, sessionID string, sinceID int64, limit int) ([]SessionEvent, error)
- func (s *Store) GetSession(ctx context.Context, id string) (*Session, error)
- func (s *Store) GetSessionEpoch(ctx context.Context, sessionID string) (int, error)
- func (s *Store) InsertEvents(ctx context.Context, sessionID string, epoch int, events []EventInput) ([]InsertedEvent, error)
- func (s *Store) InsertInternalEvents(ctx context.Context, sessionID string, events []InternalEventInput) error
- func (s *Store) UpdateWorkerHeartbeat(ctx context.Context, sessionID string, epoch int) error
- func (s *Store) UpdateWorkerState(ctx context.Context, sessionID string, epoch int, state string, ...) error
- func (s *Store) UpsertWorker(ctx context.Context, sessionID string, epoch int) error
- type StreamClientEvent
- type ToolRouter
- type ToolRouterConfig
- type TurnLock
- type VikingClient
- type Worker
- type WorkerJWTClaims
- type WorkerStateRequest
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func EpochFromContext ¶
EpochFromContext extracts the epoch set by workerAuthMiddleware.
func IssueWorkerJWT ¶
func IssueWorkerJWT(secret []byte, claims WorkerJWTClaims) (string, error)
IssueWorkerJWT creates an HMAC-SHA256 JWT with the given claims. If claims.Exp is zero, it defaults to now + 24 hours.
func SessionIDFromContext ¶
SessionIDFromContext extracts the session ID set by workerAuthMiddleware.
func WorkspaceIDFromContext ¶
WorkspaceIDFromContext extracts the workspace ID set by workerAuthMiddleware.
Types ¶
type BoundedUUIDSet ¶
type BoundedUUIDSet struct {
// contains filtered or unexported fields
}
BoundedUUIDSet is a fixed-capacity set with FIFO eviction. Used for echo/replay deduplication, matching CC's BoundedUUIDSet (capacity 2000). Thread-safe.
func NewBoundedUUIDSet ¶
func NewBoundedUUIDSet(capacity int) *BoundedUUIDSet
NewBoundedUUIDSet creates a new bounded set with the given capacity.
func (*BoundedUUIDSet) Add ¶
func (s *BoundedUUIDSet) Add(uuid string) bool
Add inserts a UUID. Returns false if already present (duplicate).
type BridgeResponse ¶
type CCWorker ¶
type CCWorker struct {
ID string
Process *exec.Cmd
SessionID string
WorkspaceID string
Status string // "running" | "done" | "failed"
StartedAt time.Time
// contains filtered or unexported fields
}
CCWorker represents a running Claude Code worker process.
type Config ¶
type Config struct {
Port string
DatabaseURL string
JWTSecret []byte
LogLevel slog.Level
ExecutorRegistryURL string
AgentserverURL string
OpenVikingURL string
OpenVikingAPIKey string
IMBridgeURL string
IMBridgeSecret string
}
func LoadConfigFromEnv ¶
type DedupRegistry ¶
type DedupRegistry struct {
// contains filtered or unexported fields
}
DedupRegistry maintains per-session deduplication sets.
func NewDedupRegistry ¶
func NewDedupRegistry() *DedupRegistry
NewDedupRegistry creates a new dedup registry.
func (*DedupRegistry) GetOrCreate ¶
func (r *DedupRegistry) GetOrCreate(sessionID string) *BoundedUUIDSet
GetOrCreate returns the dedup set for a session, creating one if needed.
type EventBatchItem ¶
type EventBatchItem struct {
Payload json.RawMessage `json:"payload"`
Ephemeral bool `json:"ephemeral"`
}
type EventBatchRequest ¶
type EventBatchRequest struct {
WorkerEpoch int `json:"worker_epoch"`
Events []EventBatchItem `json:"events"`
}
type EventInput ¶
type EventInput struct {
EventID string
Payload json.RawMessage
Ephemeral bool
}
type HeartbeatRequest ¶
type HeartbeatRequest struct {
WorkerEpoch int `json:"worker_epoch"`
}
type InsertedEvent ¶
type InternalEventBatchItem ¶
type InternalEventBatchItem struct {
Payload json.RawMessage `json:"payload"`
IsCompaction bool `json:"is_compaction"`
AgentID string `json:"agent_id,omitempty"`
}
type InternalEventBatchRequest ¶
type InternalEventBatchRequest struct {
WorkerEpoch int `json:"worker_epoch"`
Events []InternalEventBatchItem `json:"events"`
}
type InternalEventInput ¶
type InternalEventInput struct {
EventType string
Payload json.RawMessage
IsCompaction bool
AgentID string
}
type JSONRPCRequest ¶
type JSONRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
ID *json.RawMessage `json:"id,omitempty"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
}
type JSONRPCResponse ¶
type JSONRPCResponse struct {
JSONRPC string `json:"jsonrpc"`
ID *json.RawMessage `json:"id,omitempty"`
Result interface{} `json:"result,omitempty"`
Error *RPCError `json:"error,omitempty"`
}
type MCPContentBlock ¶
type MCPServer ¶
type MCPServer struct {
// contains filtered or unexported fields
}
MCPServer implements http.Handler and speaks JSON-RPC 2.0 / MCP.
func NewMCPServer ¶
func NewMCPServer(router *ToolRouter, logger *slog.Logger) *MCPServer
type MCPToolCallParams ¶
type MCPToolDef ¶
type MCPToolResult ¶
type MCPToolResult struct {
Content []MCPContentBlock `json:"content"`
IsError bool `json:"isError,omitempty"`
}
type ProcessTurnRequest ¶
type ProcessTurnRequest struct {
SessionID string `json:"session_id"`
WorkspaceID string `json:"workspace_id"`
UserMessage string `json:"user_message"`
IMChannelID string `json:"im_channel_id,omitempty"`
IMUserID string `json:"im_user_id,omitempty"`
}
ProcessTurnRequest is the external API request body for POST /api/turns.
IMChannelID and IMUserID are optional. When set (for turns originated by an IM inbound) the cc-broker ToolRouter can route send_* MCP tool calls back through imbridge to the originating IM channel / user.
type SSEBroker ¶
type SSEBroker struct {
// contains filtered or unexported fields
}
SSEBroker fans out events to per-session subscribers.
func (*SSEBroker) Publish ¶
func (b *SSEBroker) Publish(sessionID string, event *StreamClientEvent)
Publish sends an event to all subscribers of a session. If a subscriber's channel is full, it is closed (force reconnect).
func (*SSEBroker) Subscribe ¶
func (b *SSEBroker) Subscribe(sessionID string) *SSESubscriber
Subscribe creates a new subscriber for a session.
func (*SSEBroker) Unsubscribe ¶
func (b *SSEBroker) Unsubscribe(sessionID string, sub *SSESubscriber)
Unsubscribe removes a subscriber.
type SSESubscriber ¶
type SSESubscriber struct {
Ch chan *StreamClientEvent
// contains filtered or unexported fields
}
SSESubscriber receives events for a single session.
func (*SSESubscriber) Done ¶
func (s *SSESubscriber) Done() <-chan struct{}
Done returns a channel closed when the subscriber is removed.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func (*Server) CleanupWorker ¶
CleanupWorker diffs the claude-config directory against the initial snapshot, uploads changed/new files back to OpenViking, then tears down the MCP server and removes the temporary directory.
func (*Server) CreateMCPServer ¶
func (s *Server) CreateMCPServer(sessionID, workspaceID, workspaceDir, imChannelID, imUserID string) (*MCPServer, int, func(), error)
CreateMCPServer creates a per-worker MCP server and returns (server, port, closer, error). The caller should call closer() to stop the MCP server when done.
func (*Server) SpawnWorker ¶
func (s *Server) SpawnWorker(ctx context.Context, sessionID, workspaceID, imChannelID, imUserID string) (*CCWorker, error)
SpawnWorker sets up a temporary workspace, downloads files from OpenViking, starts an MCP server, and launches a Claude Code worker process. imChannelID and imUserID are optional — populated when the turn was originated by an IM inbound so the MCP server can route send_* tool calls back through imbridge.
type Session ¶
type Session struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
Title string `json:"title,omitempty"`
Status string `json:"status"`
Epoch int `json:"epoch"`
ExternalID *string `json:"external_id,omitempty"`
Source string `json:"source,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
type SessionEvent ¶
type SessionEvent struct {
ID int64 `json:"id"`
SessionID string `json:"session_id"`
EventID string `json:"event_id"`
EventType string `json:"event_type"`
Source string `json:"source"`
Epoch int `json:"epoch"`
Payload json.RawMessage `json:"payload"`
Ephemeral bool `json:"ephemeral"`
CreatedAt time.Time `json:"created_at"`
}
type Store ¶
Store provides database access for the cc-broker.
func (*Store) BumpSessionEpoch ¶
BumpSessionEpoch atomically increments the epoch and returns the new value.
func (*Store) CreateSession ¶
func (s *Store) CreateSession(ctx context.Context, id, workspaceID, title, source string, externalID *string) error
CreateSession inserts a new session.
func (*Store) GetEventsSince ¶
func (s *Store) GetEventsSince(ctx context.Context, sessionID string, sinceSeqNum int64, limit int) ([]SessionEvent, error)
GetEventsSince returns events with sequence number > sinceSeqNum.
func (*Store) GetInternalEventsSince ¶
func (s *Store) GetInternalEventsSince(ctx context.Context, sessionID string, sinceID int64, limit int) ([]SessionEvent, error)
GetInternalEventsSince returns internal events with id > sinceID.
func (*Store) GetSession ¶
GetSession retrieves a session by ID. Returns nil if not found.
func (*Store) GetSessionEpoch ¶
GetSessionEpoch returns the current epoch for a session.
func (*Store) InsertEvents ¶
func (s *Store) InsertEvents(ctx context.Context, sessionID string, epoch int, events []EventInput) ([]InsertedEvent, error)
InsertEvents inserts a batch of events, skipping duplicates. Returns only the successfully inserted events with their sequence numbers.
func (*Store) InsertInternalEvents ¶
func (s *Store) InsertInternalEvents(ctx context.Context, sessionID string, events []InternalEventInput) error
InsertInternalEvents inserts a batch of internal events.
func (*Store) UpdateWorkerHeartbeat ¶
UpdateWorkerHeartbeat updates the heartbeat timestamp for a worker.
type StreamClientEvent ¶
type ToolRouter ¶
type ToolRouter struct {
// contains filtered or unexported fields
}
ToolRouter dispatches MCP tool calls to the appropriate backend: executor-registry, local workspace, IM, or scheduler.
func NewToolRouter ¶
func NewToolRouter(cfg ToolRouterConfig, logger *slog.Logger) *ToolRouter
NewToolRouter creates a ToolRouter with the given configuration.
func (*ToolRouter) Route ¶
func (r *ToolRouter) Route(ctx context.Context, toolName string, args map[string]interface{}) (*MCPToolResult, error)
Route dispatches a tool call to the appropriate handler based on tool name.
type ToolRouterConfig ¶
type ToolRouterConfig struct {
ExecutorRegistryURL string
AgentserverURL string
IMBridgeURL string
IMBridgeSecret string
WorkspaceDir string
SessionID string
WorkspaceID string
IMChannelID string
IMUserID string
}
ToolRouterConfig holds the configuration for creating a ToolRouter.
type TurnLock ¶
type TurnLock struct {
// contains filtered or unexported fields
}
func NewTurnLock ¶
func NewTurnLock() *TurnLock
type VikingClient ¶
type VikingClient struct {
// contains filtered or unexported fields
}
func NewVikingClient ¶
func NewVikingClient(baseURL, apiKey string) *VikingClient
func (*VikingClient) CreateFile ¶
CreateFile creates a new file in OpenViking using temp_upload + add_resource.
func (*VikingClient) DownloadTree ¶
func (v *VikingClient) DownloadTree(ctx context.Context, vikingURI, localDir string) error
DownloadTree recursively downloads files from a Viking URI to a local directory.
func (*VikingClient) UploadFile ¶
func (v *VikingClient) UploadFile(ctx context.Context, vikingURI, content string) error
UploadFile writes content to an existing file in OpenViking.
type WorkerJWTClaims ¶
type WorkerJWTClaims struct {
SessionID string `json:"sid"`
WorkspaceID string `json:"wid"`
Epoch int `json:"epoch"`
Exp int64 `json:"exp"`
}
func ValidateWorkerJWT ¶
func ValidateWorkerJWT(secret []byte, token string) (*WorkerJWTClaims, error)
ValidateWorkerJWT validates an HMAC-SHA256 JWT and returns the claims.
type WorkerStateRequest ¶
type WorkerStateRequest struct {
WorkerStatus string `json:"worker_status"`
WorkerEpoch int `json:"worker_epoch"`
ExternalMetadata json.RawMessage `json:"external_metadata,omitempty"`
RequiresActionDetails json.RawMessage `json:"requires_action_details,omitempty"`
}