ccbroker

package
v0.40.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2026 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrJWTExpired   = errors.New("jwt expired")
	ErrJWTMalformed = errors.New("jwt malformed")
	ErrJWTSignature = errors.New("jwt signature invalid")
)

Functions

func EpochFromContext

func EpochFromContext(ctx context.Context) int

EpochFromContext extracts the epoch set by workerAuthMiddleware.

func IssueWorkerJWT

func IssueWorkerJWT(secret []byte, claims WorkerJWTClaims) (string, error)

IssueWorkerJWT creates an HMAC-SHA256 JWT with the given claims. If claims.Exp is zero, it defaults to now + 24 hours.

func SessionIDFromContext

func SessionIDFromContext(ctx context.Context) string

SessionIDFromContext extracts the session ID set by workerAuthMiddleware.

func WorkspaceIDFromContext

func WorkspaceIDFromContext(ctx context.Context) string

WorkspaceIDFromContext extracts the workspace ID set by workerAuthMiddleware.

Types

type BoundedUUIDSet

type BoundedUUIDSet struct {
	// contains filtered or unexported fields
}

BoundedUUIDSet is a fixed-capacity set with FIFO eviction. Used for echo/replay deduplication, matching CC's BoundedUUIDSet (capacity 2000). Thread-safe.

func NewBoundedUUIDSet

func NewBoundedUUIDSet(capacity int) *BoundedUUIDSet

NewBoundedUUIDSet creates a new bounded set with the given capacity.

func (*BoundedUUIDSet) Add

func (s *BoundedUUIDSet) Add(uuid string) bool

Add inserts a UUID. Returns false if already present (duplicate).

type BridgeResponse

type BridgeResponse struct {
	WorkerJWT   string `json:"worker_jwt"`
	APIBaseURL  string `json:"api_base_url"`
	ExpiresIn   int    `json:"expires_in"`
	WorkerEpoch int    `json:"worker_epoch"`
}

type CCWorker

type CCWorker struct {
	ID          string
	Process     *exec.Cmd
	SessionID   string
	WorkspaceID string
	Status      string // "running" | "done" | "failed"
	StartedAt   time.Time
	// contains filtered or unexported fields
}

CCWorker represents a running Claude Code worker process.

type Config

type Config struct {
	Port                string
	DatabaseURL         string
	JWTSecret           []byte
	LogLevel            slog.Level
	ExecutorRegistryURL string
	AgentserverURL      string
	OpenVikingURL       string
	OpenVikingAPIKey    string
	IMBridgeURL         string
	IMBridgeSecret      string
}

func LoadConfigFromEnv

func LoadConfigFromEnv() (Config, error)

type DedupRegistry

type DedupRegistry struct {
	// contains filtered or unexported fields
}

DedupRegistry maintains per-session deduplication sets.

func NewDedupRegistry

func NewDedupRegistry() *DedupRegistry

NewDedupRegistry creates a new dedup registry.

func (*DedupRegistry) GetOrCreate

func (r *DedupRegistry) GetOrCreate(sessionID string) *BoundedUUIDSet

GetOrCreate returns the dedup set for a session, creating one if needed.

type EventBatchItem

type EventBatchItem struct {
	Payload   json.RawMessage `json:"payload"`
	Ephemeral bool            `json:"ephemeral"`
}

type EventBatchRequest

type EventBatchRequest struct {
	WorkerEpoch int              `json:"worker_epoch"`
	Events      []EventBatchItem `json:"events"`
}

type EventInput

type EventInput struct {
	EventID   string
	Payload   json.RawMessage
	Ephemeral bool
}

type HeartbeatRequest

type HeartbeatRequest struct {
	WorkerEpoch int `json:"worker_epoch"`
}

type InsertedEvent

type InsertedEvent struct {
	SeqNum  int64
	EventID string
}

type InternalEventBatchItem

type InternalEventBatchItem struct {
	Payload      json.RawMessage `json:"payload"`
	IsCompaction bool            `json:"is_compaction"`
	AgentID      string          `json:"agent_id,omitempty"`
}

type InternalEventBatchRequest

type InternalEventBatchRequest struct {
	WorkerEpoch int                      `json:"worker_epoch"`
	Events      []InternalEventBatchItem `json:"events"`
}

type InternalEventInput

type InternalEventInput struct {
	EventType    string
	Payload      json.RawMessage
	IsCompaction bool
	AgentID      string
}

type JSONRPCRequest

type JSONRPCRequest struct {
	JSONRPC string           `json:"jsonrpc"`
	ID      *json.RawMessage `json:"id,omitempty"`
	Method  string           `json:"method"`
	Params  json.RawMessage  `json:"params,omitempty"`
}

type JSONRPCResponse

type JSONRPCResponse struct {
	JSONRPC string           `json:"jsonrpc"`
	ID      *json.RawMessage `json:"id,omitempty"`
	Result  interface{}      `json:"result,omitempty"`
	Error   *RPCError        `json:"error,omitempty"`
}

type MCPContentBlock

type MCPContentBlock struct {
	Type string `json:"type"`
	Text string `json:"text,omitempty"`
}

type MCPServer

type MCPServer struct {
	// contains filtered or unexported fields
}

MCPServer implements http.Handler and speaks JSON-RPC 2.0 / MCP.

func NewMCPServer

func NewMCPServer(router *ToolRouter, logger *slog.Logger) *MCPServer

func (*MCPServer) ServeHTTP

func (s *MCPServer) ServeHTTP(w http.ResponseWriter, r *http.Request)

type MCPToolCallParams

type MCPToolCallParams struct {
	Name      string                 `json:"name"`
	Arguments map[string]interface{} `json:"arguments,omitempty"`
}

type MCPToolDef

type MCPToolDef struct {
	Name        string      `json:"name"`
	Description string      `json:"description,omitempty"`
	InputSchema interface{} `json:"inputSchema"`
}

type MCPToolResult

type MCPToolResult struct {
	Content []MCPContentBlock `json:"content"`
	IsError bool              `json:"isError,omitempty"`
}

type ProcessTurnRequest

type ProcessTurnRequest struct {
	SessionID   string `json:"session_id"`
	WorkspaceID string `json:"workspace_id"`
	UserMessage string `json:"user_message"`
	IMChannelID string `json:"im_channel_id,omitempty"`
	IMUserID    string `json:"im_user_id,omitempty"`
}

ProcessTurnRequest is the external API request body for POST /api/turns.

IMChannelID and IMUserID are optional. When set (for turns originated by an IM inbound) the cc-broker ToolRouter can route send_* MCP tool calls back through imbridge to the originating IM channel / user.

type RPCError

type RPCError struct {
	Code    int    `json:"code"`
	Message string `json:"message"`
}

type SSEBroker

type SSEBroker struct {
	// contains filtered or unexported fields
}

SSEBroker fans out events to per-session subscribers.

func NewSSEBroker

func NewSSEBroker() *SSEBroker

NewSSEBroker creates a new SSE broker.

func (*SSEBroker) Publish

func (b *SSEBroker) Publish(sessionID string, event *StreamClientEvent)

Publish sends an event to all subscribers of a session. If a subscriber's channel is full, it is closed (force reconnect).

func (*SSEBroker) Subscribe

func (b *SSEBroker) Subscribe(sessionID string) *SSESubscriber

Subscribe creates a new subscriber for a session.

func (*SSEBroker) Unsubscribe

func (b *SSEBroker) Unsubscribe(sessionID string, sub *SSESubscriber)

Unsubscribe removes a subscriber.

type SSESubscriber

type SSESubscriber struct {
	Ch chan *StreamClientEvent
	// contains filtered or unexported fields
}

SSESubscriber receives events for a single session.

func (*SSESubscriber) Close

func (s *SSESubscriber) Close()

Close marks the subscriber as done.

func (*SSESubscriber) Done

func (s *SSESubscriber) Done() <-chan struct{}

Done returns a channel closed when the subscriber is removed.

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(cfg Config, store *Store) *Server

func (*Server) CleanupWorker

func (s *Server) CleanupWorker(ctx context.Context, worker *CCWorker)

CleanupWorker diffs the claude-config directory against the initial snapshot, uploads changed/new files back to OpenViking, then tears down the MCP server and removes the temporary directory.

func (*Server) CreateMCPServer

func (s *Server) CreateMCPServer(sessionID, workspaceID, workspaceDir, imChannelID, imUserID string) (*MCPServer, int, func(), error)

CreateMCPServer creates a per-worker MCP server and returns (server, port, closer, error). The caller should call closer() to stop the MCP server when done.

func (*Server) Routes

func (s *Server) Routes() http.Handler

func (*Server) SpawnWorker

func (s *Server) SpawnWorker(ctx context.Context, sessionID, workspaceID, imChannelID, imUserID string) (*CCWorker, error)

SpawnWorker sets up a temporary workspace, downloads files from OpenViking, starts an MCP server, and launches a Claude Code worker process. imChannelID and imUserID are optional — populated when the turn was originated by an IM inbound so the MCP server can route send_* tool calls back through imbridge.

type Session

type Session struct {
	ID          string    `json:"id"`
	WorkspaceID string    `json:"workspace_id"`
	Title       string    `json:"title,omitempty"`
	Status      string    `json:"status"`
	Epoch       int       `json:"epoch"`
	ExternalID  *string   `json:"external_id,omitempty"`
	Source      string    `json:"source,omitempty"`
	CreatedAt   time.Time `json:"created_at"`
}

type SessionEvent

type SessionEvent struct {
	ID        int64           `json:"id"`
	SessionID string          `json:"session_id"`
	EventID   string          `json:"event_id"`
	EventType string          `json:"event_type"`
	Source    string          `json:"source"`
	Epoch     int             `json:"epoch"`
	Payload   json.RawMessage `json:"payload"`
	Ephemeral bool            `json:"ephemeral"`
	CreatedAt time.Time       `json:"created_at"`
}

type Store

type Store struct {
	*sql.DB
}

Store provides database access for the cc-broker.

func NewStore

func NewStore(databaseURL string) (*Store, error)

NewStore opens a database connection and runs migrations.

func (*Store) BumpSessionEpoch

func (s *Store) BumpSessionEpoch(ctx context.Context, id string) (int, error)

BumpSessionEpoch atomically increments the epoch and returns the new value.

func (*Store) Close

func (s *Store) Close() error

Close closes the underlying database connection.

func (*Store) CreateSession

func (s *Store) CreateSession(ctx context.Context, id, workspaceID, title, source string, externalID *string) error

CreateSession inserts a new session.

func (*Store) GetEventsSince

func (s *Store) GetEventsSince(ctx context.Context, sessionID string, sinceSeqNum int64, limit int) ([]SessionEvent, error)

GetEventsSince returns events with sequence number > sinceSeqNum.

func (*Store) GetInternalEventsSince

func (s *Store) GetInternalEventsSince(ctx context.Context, sessionID string, sinceID int64, limit int) ([]SessionEvent, error)

GetInternalEventsSince returns internal events with id > sinceID.

func (*Store) GetSession

func (s *Store) GetSession(ctx context.Context, id string) (*Session, error)

GetSession retrieves a session by ID. Returns nil if not found.

func (*Store) GetSessionEpoch

func (s *Store) GetSessionEpoch(ctx context.Context, sessionID string) (int, error)

GetSessionEpoch returns the current epoch for a session.

func (*Store) InsertEvents

func (s *Store) InsertEvents(ctx context.Context, sessionID string, epoch int, events []EventInput) ([]InsertedEvent, error)

InsertEvents inserts a batch of events, skipping duplicates. Returns only the successfully inserted events with their sequence numbers.

func (*Store) InsertInternalEvents

func (s *Store) InsertInternalEvents(ctx context.Context, sessionID string, events []InternalEventInput) error

InsertInternalEvents inserts a batch of internal events.

func (*Store) UpdateWorkerHeartbeat

func (s *Store) UpdateWorkerHeartbeat(ctx context.Context, sessionID string, epoch int) error

UpdateWorkerHeartbeat updates the heartbeat timestamp for a worker.

func (*Store) UpdateWorkerState

func (s *Store) UpdateWorkerState(ctx context.Context, sessionID string, epoch int, state string, metadata, actionDetails json.RawMessage) error

UpdateWorkerState updates the state and metadata for a worker.

func (*Store) UpsertWorker

func (s *Store) UpsertWorker(ctx context.Context, sessionID string, epoch int) error

UpsertWorker inserts or updates a worker registration.

type StreamClientEvent

type StreamClientEvent struct {
	EventID     string          `json:"event_id"`
	SequenceNum int64           `json:"sequence_num"`
	EventType   string          `json:"event_type"`
	Source      string          `json:"source"`
	Payload     json.RawMessage `json:"payload"`
	CreatedAt   string          `json:"created_at"`
}

type ToolRouter

type ToolRouter struct {
	// contains filtered or unexported fields
}

ToolRouter dispatches MCP tool calls to the appropriate backend: executor-registry, local workspace, IM, or scheduler.

func NewToolRouter

func NewToolRouter(cfg ToolRouterConfig, logger *slog.Logger) *ToolRouter

NewToolRouter creates a ToolRouter with the given configuration.

func (*ToolRouter) Route

func (r *ToolRouter) Route(ctx context.Context, toolName string, args map[string]interface{}) (*MCPToolResult, error)

Route dispatches a tool call to the appropriate handler based on tool name.

type ToolRouterConfig

type ToolRouterConfig struct {
	ExecutorRegistryURL string
	AgentserverURL      string
	IMBridgeURL         string
	IMBridgeSecret      string
	WorkspaceDir        string
	SessionID           string
	WorkspaceID         string
	IMChannelID         string
	IMUserID            string
}

ToolRouterConfig holds the configuration for creating a ToolRouter.

type TurnLock

type TurnLock struct {
	// contains filtered or unexported fields
}

func NewTurnLock

func NewTurnLock() *TurnLock

func (*TurnLock) Acquire

func (t *TurnLock) Acquire(sessionID string)

func (*TurnLock) Release

func (t *TurnLock) Release(sessionID string)

type VikingClient

type VikingClient struct {
	// contains filtered or unexported fields
}

func NewVikingClient

func NewVikingClient(baseURL, apiKey string) *VikingClient

func (*VikingClient) CreateFile

func (v *VikingClient) CreateFile(ctx context.Context, vikingURI string, content []byte) error

CreateFile creates a new file in OpenViking using temp_upload + add_resource.

func (*VikingClient) DownloadTree

func (v *VikingClient) DownloadTree(ctx context.Context, vikingURI, localDir string) error

DownloadTree recursively downloads files from a Viking URI to a local directory.

func (*VikingClient) UploadFile

func (v *VikingClient) UploadFile(ctx context.Context, vikingURI, content string) error

UploadFile writes content to an existing file in OpenViking.

type Worker

type Worker struct {
	SessionID             string
	Epoch                 int
	State                 string
	ExternalMetadata      json.RawMessage
	RequiresActionDetails json.RawMessage
	LastHeartbeatAt       *time.Time
	RegisteredAt          time.Time
}

type WorkerJWTClaims

type WorkerJWTClaims struct {
	SessionID   string `json:"sid"`
	WorkspaceID string `json:"wid"`
	Epoch       int    `json:"epoch"`
	Exp         int64  `json:"exp"`
}

func ValidateWorkerJWT

func ValidateWorkerJWT(secret []byte, token string) (*WorkerJWTClaims, error)

ValidateWorkerJWT validates an HMAC-SHA256 JWT and returns the claims.

type WorkerStateRequest

type WorkerStateRequest struct {
	WorkerStatus          string          `json:"worker_status"`
	WorkerEpoch           int             `json:"worker_epoch"`
	ExternalMetadata      json.RawMessage `json:"external_metadata,omitempty"`
	RequiresActionDetails json.RawMessage `json:"requires_action_details,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL