syncmgr

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Overview

Package syncmgr implements the WSProxy sync manager that maintains persistent WebSocket connections to every Worker cluster and pushes API key, SandboxTemplate, and ClusterConfig updates.

Index

Constants

This section is empty.

Variables

View Source
var (
	// WSSyncConnectionsActive tracks the number of currently active sync
	// WebSocket connections (one per Worker cluster).
	WSSyncConnectionsActive = prometheus.NewGauge(prometheus.GaugeOpts{
		Name: "agentbox_wsproxy_sync_connections_active",
		Help: "Number of active WebSocket sync connections to Worker clusters.",
	})

	// WSSyncReconnectsTotal counts the total number of successful (re)connections
	// to Worker clusters, partitioned by cluster ID.
	WSSyncReconnectsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Name: "agentbox_wsproxy_sync_reconnects_total",
		Help: "Total number of sync WebSocket (re)connections to Worker clusters.",
	}, []string{"cluster"})

	// WSSyncDisconnectsTotal counts the total number of disconnections,
	// partitioned by cluster ID.
	WSSyncDisconnectsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Name: "agentbox_wsproxy_sync_disconnects_total",
		Help: "Total number of sync WebSocket disconnections from Worker clusters.",
	}, []string{"cluster"})

	// WSSyncEventsTotal counts proto events emitted to Worker streams,
	// partitioned by cluster and kind (key_upsert / key_delete /
	// template_upsert / template_delete / cluster_config).
	WSSyncEventsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Name: "agentbox_wsproxy_sync_events_total",
		Help: "Total number of proto sync events emitted to Worker streams.",
	}, []string{"cluster", "kind"})

	// WSSyncEventsDroppedTotal counts events that could not be enqueued onto
	// a cluster's broadcast channel because the buffer was full. A non-zero
	// value indicates the Worker is slow to consume; investigate.
	WSSyncEventsDroppedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Name: "agentbox_wsproxy_sync_events_dropped_total",
		Help: "Total number of sync events dropped because the per-cluster broadcast buffer was full.",
	}, []string{"cluster", "kind"})

	// WSSyncPingFailuresTotal counts Ping write failures (connection likely dead).
	WSSyncPingFailuresTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Name: "agentbox_wsproxy_sync_ping_failures_total",
		Help: "Total number of WebSocket Ping failures to Worker clusters.",
	}, []string{"cluster"})
)

Functions

func MetricsHandler

func MetricsHandler() http.Handler

MetricsHandler returns an http.Handler for the /metrics endpoint.

Types

type Deps

type Deps struct {
	KeyStore        KeyStore
	AdminKeyMgr     *apikey.AdminKeyManager        // validates the shared admin key; nil = dev mode
	IAMService      service.IAMService             // namespace resolution for API key auth; may be nil
	TemplateClient  client.Client                  // websocket sync path: raw K8s ops + snapshot
	TemplateService service.SandboxTemplateService // internal HTTP API: business logic + rendered responses
	MaxPerUser      int
	JWTSecret       string // HS256 secret shared with the BFF; enables Bearer JWT auth on internal API
}

Deps bundles all optional dependencies injected into SyncManager.

type ImageDataset

type ImageDataset struct {
	ID             string            `json:"id"`
	Name           string            `json:"name"`
	Description    string            `json:"description"`
	ImageCount     int               `json:"imageCount"`
	Category       string            `json:"category"`
	Source         string            `json:"source"`
	HuggingFaceURL string            `json:"huggingFaceUrl"`
	Tags           []string          `json:"tags"`
	ClusterDocs    map[string]string `json:"clusterDocs"`
}

ImageDataset mirrors the TypeScript type in components/images/data.ts.

type KeyStore

type KeyStore interface {
	List(ctx context.Context) ([]apikey.KeyMetadata, error)
	ListByTeamAndUser(ctx context.Context, team, user string) ([]apikey.KeyMetadata, error)
	CountUserKeys(ctx context.Context, namespace, user string) (int, error)
	Create(ctx context.Context, meta apikey.KeyMetadata) (rawToken, keyID string, err error)
	CreateFromHash(ctx context.Context, meta apikey.KeyMetadata, tokenHash, hashPrefix string) error
	Validate(ctx context.Context, rawToken string) (*apikey.KeyMetadata, error)
	Get(ctx context.Context, keyID string) (*apikey.KeyMetadata, error)
	Delete(ctx context.Context, keyID string) error
}

KeyStore is the subset of apikey.SecretKeyStore methods used by syncmgr.

type SyncManager

type SyncManager struct {
	// contains filtered or unexported fields
}

SyncManager maintains persistent WebSocket connections to every configured Worker cluster and exposes the three sync gRPC services on each one via yamux multiplexing.

func New

func New(clusters *cluster.Store, syncToken, managerToken string, deps Deps) *SyncManager

New creates a new SyncManager.

func (*SyncManager) BroadcastClusterConfig

func (m *SyncManager) BroadcastClusterConfig()

BroadcastClusterConfig serialises the full cluster config snapshot and pushes it to every connected Worker via the per-cluster ClusterConfig stream. Suppresses the log line when the snapshot is identical to the last broadcast (steady-state ticks shouldn't spam the log).

func (*SyncManager) BroadcastKeyDelete added in v0.0.3

func (m *SyncManager) BroadcastKeyDelete(secretName string)

BroadcastKeyDelete broadcasts an APIKey delete event by secret name.

func (*SyncManager) BroadcastKeyMeta added in v0.0.3

func (m *SyncManager) BroadcastKeyMeta(meta apikey.KeyMetadata)

BroadcastKeyMeta broadcasts an APIKey upsert event derived from an apikey.KeyMetadata. Used by the dashboard-facing /v1/api-keys handler in the handlers subpackage so a Hub-side create lands in every Worker.

func (*SyncManager) BroadcastTemplateDelete added in v0.0.3

func (m *SyncManager) BroadcastTemplateDelete(name string)

BroadcastTemplateDelete broadcasts a SandboxTemplate delete event by name.

func (*SyncManager) BroadcastTemplateUpsert added in v0.0.3

func (m *SyncManager) BroadcastTemplateUpsert(raw []byte)

BroadcastTemplateUpsert broadcasts a SandboxTemplate upsert event carrying the full JSON-encoded template body.

func (*SyncManager) ClusterIDs

func (m *SyncManager) ClusterIDs() []string

ClusterIDs returns a snapshot of known cluster IDs (for logging / status).

func (*SyncManager) ExportClusterConfigSnapshot

func (m *SyncManager) ExportClusterConfigSnapshot() json.RawMessage

ExportClusterConfigSnapshot is a test helper that returns the current snapshot as a json.RawMessage. Exported so unit tests can verify the snapshot composition without standing up a WebSocket session.

func (*SyncManager) GetDeps added in v0.0.2

func (m *SyncManager) GetDeps() Deps

GetDeps returns the Deps struct for read-only access by the handlers subpackage.

func (*SyncManager) LoadCatalog added in v0.0.2

func (m *SyncManager) LoadCatalog(ctx context.Context) ([]ImageDataset, error)

LoadCatalog reads the images catalog from the master cluster's ConfigMap.

func (*SyncManager) RegisterLegacyRoutes added in v0.0.2

func (m *SyncManager) RegisterLegacyRoutes(rg *gin.RouterGroup)

RegisterLegacyRoutes registers the legacy /internal/* HTTP handlers onto the provided Gin RouterGroup.

func (*SyncManager) Run

func (m *SyncManager) Run(ctx context.Context)

Run starts the sync manager loop: dial all known clusters and rescan every 30 s for newly added ones.

func (*SyncManager) SaveCatalog added in v0.0.2

func (m *SyncManager) SaveCatalog(ctx context.Context, datasets []ImageDataset) error

SaveCatalog writes the images catalog to the master cluster's ConfigMap.

Directories

Path Synopsis
Package handlers implements the wsproxygen.StrictServerInterface for the internal management API (:9004).
Package handlers implements the wsproxygen.StrictServerInterface for the internal management API (:9004).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL