workers

package
v0.0.4-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2025 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseWorker

type BaseWorker struct {
	// contains filtered or unexported fields
}

func NewBaseWorker

func NewBaseWorker(name string, nc *nats.Conn, js nats.JetStreamContext, stream, consumer, subject string) *BaseWorker

func (*BaseWorker) HealthCheck

func (w *BaseWorker) HealthCheck() error

func (*BaseWorker) Name

func (w *BaseWorker) Name() string

func (*BaseWorker) Stop

func (w *BaseWorker) Stop(ctx context.Context) error

type CommandWorker

type CommandWorker struct {
	*BaseWorker
}

func NewCommandWorker

func NewCommandWorker(nc *nats.Conn, js nats.JetStreamContext) *CommandWorker

func (*CommandWorker) Start

func (w *CommandWorker) Start(ctx context.Context) error

type EntityRegistry

type EntityRegistry struct {
	// contains filtered or unexported fields
}

EntityRegistry maintains an in-memory set of registered entity IDs This prevents excessive database reads for telemetry validation

func NewEntityRegistry

func NewEntityRegistry(db *sql.DB) (*EntityRegistry, error)

NewEntityRegistry creates a new entity registry and loads existing entities from DB

func (*EntityRegistry) Count

func (r *EntityRegistry) Count() int

Count returns the number of registered entities

func (*EntityRegistry) GetAll

func (r *EntityRegistry) GetAll() []string

GetAll returns all registered entity IDs

func (*EntityRegistry) InitializeKVStoreFromDB

func (r *EntityRegistry) InitializeKVStoreFromDB(kv nats.KeyValue) error

InitializeKVStoreFromDB ensures all entities in the database have a corresponding KV entry This is called on boot to populate the KV store with initial entity states

func (*EntityRegistry) IsRegistered

func (r *EntityRegistry) IsRegistered(entityID string) bool

IsRegistered checks if an entity_id is in the registry

func (*EntityRegistry) LoadFromDB

func (r *EntityRegistry) LoadFromDB() error

LoadFromDB loads all entity IDs from the database into memory

func (*EntityRegistry) Register

func (r *EntityRegistry) Register(entityID string)

Register adds an entity_id to the registry

func (*EntityRegistry) Unregister

func (r *EntityRegistry) Unregister(entityID string)

Unregister removes an entity_id from the registry

type EntityWorker

type EntityWorker struct {
	*BaseWorker
}

func NewEntityWorker

func NewEntityWorker(nc *nats.Conn, js nats.JetStreamContext) *EntityWorker

func (*EntityWorker) Start

func (w *EntityWorker) Start(ctx context.Context) error

type EventWorker

type EventWorker struct {
	*BaseWorker
	// contains filtered or unexported fields
}

func NewEventWorker

func NewEventWorker(nc *nats.Conn, js nats.JetStreamContext, db *sql.DB, registry *EntityRegistry) *EventWorker

func (*EventWorker) Start

func (w *EventWorker) Start(ctx context.Context) error

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(natsClient *embeddednats.EmbeddedNATS, db *sql.DB) (*Manager, error)

NewManager creates a worker manager with database and KV store access

func (*Manager) Start

func (m *Manager) Start() error

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context) error

type TelemetryWorker

type TelemetryWorker struct {
	*BaseWorker
	// contains filtered or unexported fields
}

TelemetryWorker processes telemetry messages and maintains global entity state

func NewTelemetryWorker

func NewTelemetryWorker(nc *nats.Conn, js nats.JetStreamContext, db *sql.DB, kv nats.KeyValue, registry *EntityRegistry) *TelemetryWorker

NewTelemetryWorker creates a new telemetry worker with database and KV store access

func (*TelemetryWorker) Start

func (w *TelemetryWorker) Start(ctx context.Context) error

type VideoWorker added in v0.0.5

type VideoWorker struct {
	*BaseWorker
	// contains filtered or unexported fields
}

VideoWorker processes video frame messages from vision2constellation agents

func NewVideoWorker added in v0.0.5

func NewVideoWorker(nc *nats.Conn, js nats.JetStreamContext, registry *EntityRegistry) *VideoWorker

NewVideoWorker creates a new video frame worker

func (*VideoWorker) Start added in v0.0.5

func (w *VideoWorker) Start(ctx context.Context) error

type Worker

type Worker interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	Name() string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL