syncmgr

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

Package syncmgr implements the WSProxy sync manager that maintains persistent WebSocket connections to every Worker cluster and pushes API key, SandboxTemplate, and ClusterConfig updates.

Index

Constants

This section is empty.

Variables

View Source
var (
	// WSSyncConnectionsActive tracks the number of currently active sync
	// WebSocket connections (one per Worker cluster).
	WSSyncConnectionsActive = prometheus.NewGauge(prometheus.GaugeOpts{
		Name: "agentbox_wsproxy_sync_connections_active",
		Help: "Number of active WebSocket sync connections to Worker clusters.",
	})

	// WSSyncReconnectsTotal counts the total number of successful (re)connections
	// to Worker clusters, partitioned by cluster ID.
	WSSyncReconnectsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Name: "agentbox_wsproxy_sync_reconnects_total",
		Help: "Total number of sync WebSocket (re)connections to Worker clusters.",
	}, []string{"cluster"})

	// WSSyncDisconnectsTotal counts the total number of disconnections,
	// partitioned by cluster ID.
	WSSyncDisconnectsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Name: "agentbox_wsproxy_sync_disconnects_total",
		Help: "Total number of sync WebSocket disconnections from Worker clusters.",
	}, []string{"cluster"})

	// WSSyncFramesTotal counts sync protocol frames sent/received,
	// partitioned by cluster and direction (tx/rx).
	WSSyncFramesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Name: "agentbox_wsproxy_sync_frames_total",
		Help: "Total number of sync protocol frames sent (tx) or received (rx).",
	}, []string{"cluster", "direction"})

	// WSSyncPingFailuresTotal counts Ping write failures (connection likely dead).
	WSSyncPingFailuresTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Name: "agentbox_wsproxy_sync_ping_failures_total",
		Help: "Total number of WebSocket Ping failures to Worker clusters.",
	}, []string{"cluster"})
)

Functions

func MetricsHandler

func MetricsHandler() http.Handler

MetricsHandler returns an http.Handler for the /metrics endpoint.

Types

type Deps

type Deps struct {
	KeyStore        KeyStore
	TemplateClient  client.Client                  // websocket sync path: raw K8s ops + snapshot
	TemplateService service.SandboxTemplateService // internal HTTP API: business logic + rendered responses
	MaxPerUser      int
}

Deps bundles all optional dependencies injected into SyncManager.

type ImageDataset

type ImageDataset struct {
	ID             string            `json:"id"`
	Name           string            `json:"name"`
	Description    string            `json:"description"`
	ImageCount     int               `json:"imageCount"`
	Category       string            `json:"category"`
	Source         string            `json:"source"`
	HuggingFaceURL string            `json:"huggingFaceUrl"`
	Tags           []string          `json:"tags"`
	ClusterDocs    map[string]string `json:"clusterDocs"`
}

ImageDataset mirrors the TypeScript type in components/images/data.ts.

type KeyStore

type KeyStore interface {
	List(ctx context.Context) ([]apikey.KeyMetadata, error)
	ListByTeamAndUser(ctx context.Context, team, user string) ([]apikey.KeyMetadata, error)
	CountUserKeys(ctx context.Context, namespace, user string) (int, error)
	Create(ctx context.Context, meta apikey.KeyMetadata) (rawToken, keyID string, err error)
	CreateFromHash(ctx context.Context, meta apikey.KeyMetadata, tokenHash, hashPrefix string) error
	Get(ctx context.Context, keyID string) (*apikey.KeyMetadata, error)
	Delete(ctx context.Context, keyID string) error
}

KeyStore is the subset of apikey.SecretKeyStore methods used by syncmgr.

type SyncManager

type SyncManager struct {
	// contains filtered or unexported fields
}

SyncManager maintains persistent WebSocket connections to every configured Worker cluster and pushes API key, SandboxTemplate, and ClusterConfig updates.

func New

func New(clusters *cluster.Store, syncToken, managerToken string, deps Deps) *SyncManager

New creates a new SyncManager. clusters is the shared cluster store loaded from clusters.yaml. syncToken is sent in the AGENTBOX-SYNC-TOKEN header to authenticate with Workers. managerToken gates the internal HTTP API.

func (*SyncManager) BroadcastClusterConfig

func (m *SyncManager) BroadcastClusterConfig()

BroadcastClusterConfig serialises the full cluster config and broadcasts it as a cluster_config_sync frame to every connected Worker. Call this after reloading the Manager's cluster config file so Workers pick up changes (new Gateway fields, host-alias updates, ...) without restarting. A no-op when the snapshot is empty (does not clear Worker config).

func (*SyncManager) ClusterIDs

func (m *SyncManager) ClusterIDs() []string

ClusterIDs returns a snapshot of known cluster IDs (for logging / status).

func (*SyncManager) ExportClusterConfigSnapshot

func (m *SyncManager) ExportClusterConfigSnapshot() json.RawMessage

ExportClusterConfigSnapshot is a test helper that returns the current serialised snapshot as a json.RawMessage. Exported so tests in the _test package can verify the encoding without a live WebSocket.

func (*SyncManager) InternalAPIHandler

func (m *SyncManager) InternalAPIHandler() http.Handler

InternalAPIHandler returns the HTTP handler for the internal management API (:9004).

func (*SyncManager) Run

func (m *SyncManager) Run(ctx context.Context)

Run starts the sync manager loop: immediately dials all known clusters and then re-dials every 30 s to pick up newly added clusters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL