broker

package
v0.61.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package broker is a thin REST→ws codex v2 JSON-RPC adapter inside CXG. It owns no business logic: it converts a single /api/turns REST call into a turn lifecycle on a loopback ws to a codex app-server subprocess, returning the resulting codex Turn object verbatim.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn is one loopback ws to a codex app-server subprocess. Safe for concurrent Turn() / StartThread() calls — internally serializes writes and demuxes responses + turn/completed notifications.

func Dial

func Dial(ctx context.Context, wsURL string) (*Conn, error)

Dial opens a fresh ws, performs the codex initialize / initialized handshake, and starts the reader goroutine. Caller must Close().

func (*Conn) Close

func (c *Conn) Close()

Close shuts down the ws. Safe to call multiple times.

func (*Conn) StartThread

func (c *Conn) StartThread(ctx context.Context) (string, error)

StartThread issues thread/start with empty params and returns the new thread id. Other ThreadStartResponse fields are discarded — CXG only owns the loopback, agentserver tracks per-conversation state.

func (*Conn) Turn

func (c *Conn) Turn(ctx context.Context, threadID string, callerParams json.RawMessage, timeout time.Duration) (json.RawMessage, error)

Turn sends turn/start and blocks until the matching turn/completed notification arrives or timeout elapses. Returns the raw codex Turn JSON for verbatim REST passthrough.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool caches one *Conn per workspace id. Connections idle for longer than idleTTL are reaped and closed. Safe for concurrent use.

func NewPool

func NewPool(resolver WSURLResolver, idleTTL time.Duration) *Pool

NewPool starts a background reaper goroutine. Caller must Close().

func (*Pool) Close

func (p *Pool) Close()

Close stops the reaper and closes all live connections. It is safe to call Close more than once; subsequent calls are no-ops.

func (*Pool) Get

func (p *Pool) Get(ctx context.Context, workspaceID string) (*Conn, error)

Get returns a live *Conn for workspaceID, dialing if necessary. Concurrent Get calls for the same workspace share one Conn.

func (*Pool) Touch

func (p *Pool) Touch(workspaceID string)

Touch bumps lastUsedAt for workspaceID, extending the idle-reap deadline. Call after long-running operations (Turn, StartThread) so the 5-minute reaper does not kill a connection that is still in use.

type TimeoutError

type TimeoutError struct {
	ThreadID, TurnID string
}

TimeoutError is returned when timeoutMs elapses without turn/completed.

func (*TimeoutError) Error

func (e *TimeoutError) Error() string

type TurnRPCError

type TurnRPCError struct {
	Code    int
	Message string
	Data    json.RawMessage
}

TurnRPCError is returned by Turn when codex returns a JSON-RPC error in response to turn/start (rare; usually means malformed request).

func (*TurnRPCError) Error

func (e *TurnRPCError) Error() string

type WSURLResolver

type WSURLResolver func(ctx context.Context, workspaceID string) (wsURL string, err error)

WSURLResolver is the per-workspace loopback ws URL provider. In production this calls into supervisor.EnsureSubprocess to spawn / reuse the codex subprocess; tests inject a fixed URL.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL