Documentation
¶
Overview ¶
Package syncmgr implements the WSProxy sync manager that maintains persistent WebSocket connections to every Worker cluster and pushes API key, SandboxTemplate, and ClusterConfig updates.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // WSSyncConnectionsActive tracks the number of currently active sync // WebSocket connections (one per Worker cluster). WSSyncConnectionsActive = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "agentbox_wsproxy_sync_connections_active", Help: "Number of active WebSocket sync connections to Worker clusters.", }) // WSSyncReconnectsTotal counts the total number of successful (re)connections // to Worker clusters, partitioned by cluster ID. WSSyncReconnectsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "agentbox_wsproxy_sync_reconnects_total", Help: "Total number of sync WebSocket (re)connections to Worker clusters.", }, []string{"cluster"}) // WSSyncDisconnectsTotal counts the total number of disconnections, // partitioned by cluster ID. WSSyncDisconnectsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "agentbox_wsproxy_sync_disconnects_total", Help: "Total number of sync WebSocket disconnections from Worker clusters.", }, []string{"cluster"}) // WSSyncFramesTotal counts sync protocol frames sent/received, // partitioned by cluster and direction (tx/rx). WSSyncFramesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "agentbox_wsproxy_sync_frames_total", Help: "Total number of sync protocol frames sent (tx) or received (rx).", }, []string{"cluster", "direction"}) // WSSyncPingFailuresTotal counts Ping write failures (connection likely dead). WSSyncPingFailuresTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "agentbox_wsproxy_sync_ping_failures_total", Help: "Total number of WebSocket Ping failures to Worker clusters.", }, []string{"cluster"}) )
Functions ¶
func MetricsHandler ¶
MetricsHandler returns an http.Handler for the /metrics endpoint.
Types ¶
type Deps ¶
type Deps struct {
KeyStore KeyStore
TemplateClient client.Client // websocket sync path: raw K8s ops + snapshot
TemplateService service.SandboxTemplateService // internal HTTP API: business logic + rendered responses
MaxPerUser int
}
Deps bundles all optional dependencies injected into SyncManager.
type ImageDataset ¶
type ImageDataset struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
ImageCount int `json:"imageCount"`
Category string `json:"category"`
Source string `json:"source"`
HuggingFaceURL string `json:"huggingFaceUrl"`
Tags []string `json:"tags"`
ClusterDocs map[string]string `json:"clusterDocs"`
}
ImageDataset mirrors the TypeScript type in components/images/data.ts.
type KeyStore ¶
type KeyStore interface {
List(ctx context.Context) ([]apikey.KeyMetadata, error)
ListByTeamAndUser(ctx context.Context, team, user string) ([]apikey.KeyMetadata, error)
CountUserKeys(ctx context.Context, namespace, user string) (int, error)
Create(ctx context.Context, meta apikey.KeyMetadata) (rawToken, keyID string, err error)
CreateFromHash(ctx context.Context, meta apikey.KeyMetadata, tokenHash, hashPrefix string) error
Get(ctx context.Context, keyID string) (*apikey.KeyMetadata, error)
Delete(ctx context.Context, keyID string) error
}
KeyStore is the subset of apikey.SecretKeyStore methods used by syncmgr.
type SyncManager ¶
type SyncManager struct {
// contains filtered or unexported fields
}
SyncManager maintains persistent WebSocket connections to every configured Worker cluster and pushes API key, SandboxTemplate, and ClusterConfig updates.
func New ¶
func New(clusters *cluster.Store, syncToken, managerToken string, deps Deps) *SyncManager
New creates a new SyncManager. clusters is the shared cluster store loaded from clusters.yaml. syncToken is sent in the AGENTBOX-SYNC-TOKEN header to authenticate with Workers. managerToken gates the internal HTTP API.
func (*SyncManager) BroadcastClusterConfig ¶
func (m *SyncManager) BroadcastClusterConfig()
BroadcastClusterConfig serialises the full cluster config and broadcasts it as a cluster_config_sync frame to every connected Worker. Call this after reloading the Manager's cluster config file so Workers pick up changes (new Gateway fields, host-alias updates, ...) without restarting. A no-op when the snapshot is empty (does not clear Worker config).
func (*SyncManager) ClusterIDs ¶
func (m *SyncManager) ClusterIDs() []string
ClusterIDs returns a snapshot of known cluster IDs (for logging / status).
func (*SyncManager) ExportClusterConfigSnapshot ¶
func (m *SyncManager) ExportClusterConfigSnapshot() json.RawMessage
ExportClusterConfigSnapshot is a test helper that returns the current serialised snapshot as a json.RawMessage. Exported so tests in the _test package can verify the encoding without a live WebSocket.
func (*SyncManager) InternalAPIHandler ¶
func (m *SyncManager) InternalAPIHandler() http.Handler
InternalAPIHandler returns the HTTP handler for the internal management API (:9004).
func (*SyncManager) Run ¶
func (m *SyncManager) Run(ctx context.Context)
Run starts the sync manager loop: immediately dials all known clusters and then re-dials every 30 s to pick up newly added clusters.