Documentation
¶
Overview ¶
Package syncmgr implements the WSProxy sync manager that maintains persistent WebSocket connections to every Worker cluster and pushes API key, SandboxTemplate, and ClusterConfig updates.
Index ¶
- Variables
- func MetricsHandler() http.Handler
- type Deps
- type ImageDataset
- type KeyStore
- type SyncManager
- func (m *SyncManager) BroadcastClusterConfig()
- func (m *SyncManager) BroadcastKeyDelete(secretName string)
- func (m *SyncManager) BroadcastKeyMeta(meta apikey.KeyMetadata)
- func (m *SyncManager) BroadcastTemplateDelete(name string)
- func (m *SyncManager) BroadcastTemplateUpsert(raw []byte)
- func (m *SyncManager) ClusterIDs() []string
- func (m *SyncManager) ExportClusterConfigSnapshot() json.RawMessage
- func (m *SyncManager) GetDeps() Deps
- func (m *SyncManager) LoadCatalog(ctx context.Context) ([]ImageDataset, error)
- func (m *SyncManager) RegisterLegacyRoutes(rg *gin.RouterGroup)
- func (m *SyncManager) Run(ctx context.Context)
- func (m *SyncManager) SaveCatalog(ctx context.Context, datasets []ImageDataset) error
Constants ¶
This section is empty.
Variables ¶
var ( // WSSyncConnectionsActive tracks the number of currently active sync // WebSocket connections (one per Worker cluster). WSSyncConnectionsActive = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "agentbox_wsproxy_sync_connections_active", Help: "Number of active WebSocket sync connections to Worker clusters.", }) // WSSyncReconnectsTotal counts the total number of successful (re)connections // to Worker clusters, partitioned by cluster ID. WSSyncReconnectsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "agentbox_wsproxy_sync_reconnects_total", Help: "Total number of sync WebSocket (re)connections to Worker clusters.", }, []string{"cluster"}) // WSSyncDisconnectsTotal counts the total number of disconnections, // partitioned by cluster ID. WSSyncDisconnectsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "agentbox_wsproxy_sync_disconnects_total", Help: "Total number of sync WebSocket disconnections from Worker clusters.", }, []string{"cluster"}) // WSSyncEventsTotal counts proto events emitted to Worker streams, // partitioned by cluster and kind (key_upsert / key_delete / // template_upsert / template_delete / cluster_config). WSSyncEventsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "agentbox_wsproxy_sync_events_total", Help: "Total number of proto sync events emitted to Worker streams.", }, []string{"cluster", "kind"}) // WSSyncEventsDroppedTotal counts events that could not be enqueued onto // a cluster's broadcast channel because the buffer was full. A non-zero // value indicates the Worker is slow to consume; investigate. WSSyncEventsDroppedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "agentbox_wsproxy_sync_events_dropped_total", Help: "Total number of sync events dropped because the per-cluster broadcast buffer was full.", }, []string{"cluster", "kind"}) // WSSyncPingFailuresTotal counts Ping write failures (connection likely dead). WSSyncPingFailuresTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "agentbox_wsproxy_sync_ping_failures_total", Help: "Total number of WebSocket Ping failures to Worker clusters.", }, []string{"cluster"}) )
Functions ¶
func MetricsHandler ¶
MetricsHandler returns an http.Handler for the /metrics endpoint.
Types ¶
type Deps ¶
type Deps struct {
KeyStore KeyStore
AdminKeyMgr *apikey.AdminKeyManager // validates the shared admin key; nil = dev mode
IAMService service.IAMService // namespace resolution for API key auth; may be nil
TemplateClient client.Client // websocket sync path: raw K8s ops + snapshot
TemplateService service.SandboxTemplateService // internal HTTP API: business logic + rendered responses
MaxPerUser int
JWTSecret string // HS256 secret shared with the BFF; enables Bearer JWT auth on internal API
}
Deps bundles all optional dependencies injected into SyncManager.
type ImageDataset ¶
type ImageDataset struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
ImageCount int `json:"imageCount"`
Category string `json:"category"`
Source string `json:"source"`
HuggingFaceURL string `json:"huggingFaceUrl"`
Tags []string `json:"tags"`
ClusterDocs map[string]string `json:"clusterDocs"`
}
ImageDataset mirrors the TypeScript type in components/images/data.ts.
type KeyStore ¶
type KeyStore interface {
List(ctx context.Context) ([]apikey.KeyMetadata, error)
ListByTeamAndUser(ctx context.Context, team, user string) ([]apikey.KeyMetadata, error)
CountUserKeys(ctx context.Context, namespace, user string) (int, error)
Create(ctx context.Context, meta apikey.KeyMetadata) (rawToken, keyID string, err error)
CreateFromHash(ctx context.Context, meta apikey.KeyMetadata, tokenHash, hashPrefix string) error
Validate(ctx context.Context, rawToken string) (*apikey.KeyMetadata, error)
Get(ctx context.Context, keyID string) (*apikey.KeyMetadata, error)
Delete(ctx context.Context, keyID string) error
}
KeyStore is the subset of apikey.SecretKeyStore methods used by syncmgr.
type SyncManager ¶
type SyncManager struct {
// contains filtered or unexported fields
}
SyncManager maintains persistent WebSocket connections to every configured Worker cluster and exposes the three sync gRPC services on each one via yamux multiplexing.
func New ¶
func New(clusters *cluster.Store, syncToken, managerToken string, deps Deps) *SyncManager
New creates a new SyncManager.
func (*SyncManager) BroadcastClusterConfig ¶
func (m *SyncManager) BroadcastClusterConfig()
BroadcastClusterConfig serialises the full cluster config snapshot and pushes it to every connected Worker via the per-cluster ClusterConfig stream. Suppresses the log line when the snapshot is identical to the last broadcast (steady-state ticks shouldn't spam the log).
func (*SyncManager) BroadcastKeyDelete ¶ added in v0.0.3
func (m *SyncManager) BroadcastKeyDelete(secretName string)
BroadcastKeyDelete broadcasts an APIKey delete event by secret name.
func (*SyncManager) BroadcastKeyMeta ¶ added in v0.0.3
func (m *SyncManager) BroadcastKeyMeta(meta apikey.KeyMetadata)
BroadcastKeyMeta broadcasts an APIKey upsert event derived from an apikey.KeyMetadata. Used by the dashboard-facing /v1/api-keys handler in the handlers subpackage so a Hub-side create lands in every Worker.
func (*SyncManager) BroadcastTemplateDelete ¶ added in v0.0.3
func (m *SyncManager) BroadcastTemplateDelete(name string)
BroadcastTemplateDelete broadcasts a SandboxTemplate delete event by name.
func (*SyncManager) BroadcastTemplateUpsert ¶ added in v0.0.3
func (m *SyncManager) BroadcastTemplateUpsert(raw []byte)
BroadcastTemplateUpsert broadcasts a SandboxTemplate upsert event carrying the full JSON-encoded template body.
func (*SyncManager) ClusterIDs ¶
func (m *SyncManager) ClusterIDs() []string
ClusterIDs returns a snapshot of known cluster IDs (for logging / status).
func (*SyncManager) ExportClusterConfigSnapshot ¶
func (m *SyncManager) ExportClusterConfigSnapshot() json.RawMessage
ExportClusterConfigSnapshot is a test helper that returns the current snapshot as a json.RawMessage. Exported so unit tests can verify the snapshot composition without standing up a WebSocket session.
func (*SyncManager) GetDeps ¶ added in v0.0.2
func (m *SyncManager) GetDeps() Deps
GetDeps returns the Deps struct for read-only access by the handlers subpackage.
func (*SyncManager) LoadCatalog ¶ added in v0.0.2
func (m *SyncManager) LoadCatalog(ctx context.Context) ([]ImageDataset, error)
LoadCatalog reads the images catalog from the master cluster's ConfigMap.
func (*SyncManager) RegisterLegacyRoutes ¶ added in v0.0.2
func (m *SyncManager) RegisterLegacyRoutes(rg *gin.RouterGroup)
RegisterLegacyRoutes registers the legacy /internal/* HTTP handlers onto the provided Gin RouterGroup.
func (*SyncManager) Run ¶
func (m *SyncManager) Run(ctx context.Context)
Run starts the sync manager loop: dial all known clusters and rescan every 30 s for newly added ones.
func (*SyncManager) SaveCatalog ¶ added in v0.0.2
func (m *SyncManager) SaveCatalog(ctx context.Context, datasets []ImageDataset) error
SaveCatalog writes the images catalog to the master cluster's ConfigMap.