ccbroker

package
v0.41.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Port                string
	DatabaseURL         string
	LogLevel            slog.Level
	ExecutorRegistryURL string
	AgentserverURL      string
	S3Endpoint          string
	S3Region            string
	S3Bucket            string
	S3AccessKeyID       string
	S3SecretAccessKey   string
	S3UseSSL            bool
	S3PathStyle         bool
	IMBridgeURL         string
	IMBridgeSecret      string
}

func LoadConfigFromEnv

func LoadConfigFromEnv() (Config, error)

type EventInput

type EventInput struct {
	EventID   string
	Payload   json.RawMessage
	Ephemeral bool
}

type InsertedEvent

type InsertedEvent struct {
	SeqNum  int64
	EventID string
}

type ProcessTurnRequest

type ProcessTurnRequest struct {
	SessionID   string `json:"session_id"`
	WorkspaceID string `json:"workspace_id"`
	UserMessage string `json:"user_message"`
	IMChannelID string `json:"im_channel_id,omitempty"`
	IMUserID    string `json:"im_user_id,omitempty"`
}

ProcessTurnRequest is the external API request body for POST /api/turns.

IMChannelID and IMUserID are optional. When set (for turns originated by an IM inbound) the cc-broker ToolRouter can route send_* MCP tool calls back through imbridge to the originating IM channel / user.

type SSEBroker

type SSEBroker struct {
	// contains filtered or unexported fields
}

SSEBroker fans out events to per-session subscribers.

func NewSSEBroker

func NewSSEBroker() *SSEBroker

NewSSEBroker creates a new SSE broker.

func (*SSEBroker) Publish

func (b *SSEBroker) Publish(sessionID string, event *StreamClientEvent)

Publish sends an event to all subscribers of a session. If a subscriber's channel is full, it is closed (force reconnect).

func (*SSEBroker) Subscribe

func (b *SSEBroker) Subscribe(sessionID string) *SSESubscriber

Subscribe creates a new subscriber for a session.

func (*SSEBroker) Unsubscribe

func (b *SSEBroker) Unsubscribe(sessionID string, sub *SSESubscriber)

Unsubscribe removes a subscriber.

type SSESubscriber

type SSESubscriber struct {
	Ch chan *StreamClientEvent
	// contains filtered or unexported fields
}

SSESubscriber receives events for a single session.

func (*SSESubscriber) Close

func (s *SSESubscriber) Close()

Close marks the subscriber as done.

func (*SSESubscriber) Done

func (s *SSESubscriber) Done() <-chan struct{}

Done returns a channel closed when the subscriber is removed.

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(cfg Config, store *Store) (*Server, error)

func (*Server) Routes

func (s *Server) Routes() http.Handler

type Session

type Session struct {
	ID          string    `json:"id"`
	WorkspaceID string    `json:"workspace_id"`
	Title       string    `json:"title,omitempty"`
	Status      string    `json:"status"`
	Epoch       int       `json:"epoch"`
	ExternalID  *string   `json:"external_id,omitempty"`
	Source      string    `json:"source,omitempty"`
	CreatedAt   time.Time `json:"created_at"`
}

type SessionEvent

type SessionEvent struct {
	ID        int64           `json:"id"`
	SessionID string          `json:"session_id"`
	EventID   string          `json:"event_id"`
	EventType string          `json:"event_type"`
	Source    string          `json:"source"`
	Epoch     int             `json:"epoch"`
	Payload   json.RawMessage `json:"payload"`
	Ephemeral bool            `json:"ephemeral"`
	CreatedAt time.Time       `json:"created_at"`
}

type Store

type Store struct {
	*sql.DB
}

Store provides database access for the cc-broker.

func NewStore

func NewStore(databaseURL string) (*Store, error)

NewStore opens a database connection and runs migrations.

func (*Store) BumpSessionEpoch

func (s *Store) BumpSessionEpoch(ctx context.Context, id string) (int, error)

BumpSessionEpoch atomically increments the epoch and returns the new value.

func (*Store) Close

func (s *Store) Close() error

Close closes the underlying database connection.

func (*Store) CreateSession

func (s *Store) CreateSession(ctx context.Context, id, workspaceID, title, source string, externalID *string) error

CreateSession inserts a new session.

func (*Store) GetSession

func (s *Store) GetSession(ctx context.Context, id string) (*Session, error)

GetSession retrieves a session by ID. Returns nil if not found.

func (*Store) GetSessionEpoch

func (s *Store) GetSessionEpoch(ctx context.Context, sessionID string) (int, error)

GetSessionEpoch returns the current epoch for a session.

func (*Store) InsertEvents

func (s *Store) InsertEvents(ctx context.Context, sessionID string, epoch int, events []EventInput) ([]InsertedEvent, error)

InsertEvents inserts a batch of events, skipping duplicates. Returns only the successfully inserted events with their sequence numbers.

type StreamClientEvent

type StreamClientEvent struct {
	EventID     string          `json:"event_id"`
	SequenceNum int64           `json:"sequence_num"`
	EventType   string          `json:"event_type"`
	Source      string          `json:"source"`
	Payload     json.RawMessage `json:"payload"`
	CreatedAt   string          `json:"created_at"`
}

type TurnLock

type TurnLock struct {
	// contains filtered or unexported fields
}

func NewTurnLock

func NewTurnLock() *TurnLock

func (*TurnLock) Acquire

func (t *TurnLock) Acquire(sessionID string)

func (*TurnLock) Release

func (t *TurnLock) Release(sessionID string)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL