metadatabatcher

package
v2.31.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2026 License: AGPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// Channel to publish batch metadata updates to, each update contains a list of all Agent IDs that have an update in
	// the most recent batch
	MetadataBatchPubsubChannel = "workspace_agent_metadata_batch"
)
View Source
const (
	// uuidBase64Size is the size of a base64-encoded UUID without padding (22 characters).
	UUIDBase64Size = 22
)

Variables

This section is empty.

Functions

func EncodeAgentID

func EncodeAgentID(agentID uuid.UUID, dst []byte) error

func EncodeAgentIDChunks

func EncodeAgentIDChunks(agentIDs []uuid.UUID) ([][]byte, error)

EncodeAgentIDChunks encodes agent IDs into chunks that fit within the PostgreSQL NOTIFY 8KB payload size limit. Each UUID is base64-encoded (without padding) and concatenated into a single byte slice per chunk.

Types

type Batcher

type Batcher struct {

	// Metrics collects Prometheus metrics for the batcher.
	Metrics Metrics
	// contains filtered or unexported fields
}

Batcher holds a buffer of agent metadata updates and periodically flushes them to the database and pubsub. This reduces database write frequency and pubsub publish rate.

func NewBatcher

func NewBatcher(ctx context.Context, reg prometheus.Registerer, store database.Store, ps pubsub.Pubsub, opts ...Option) (*Batcher, error)

NewBatcher creates a new Batcher and starts it. Here ctx controls the lifetime of the batcher, canceling it will result in the Batcher exiting it's processing routine (run).

func (*Batcher) Add

func (b *Batcher) Add(agentID uuid.UUID, keys []string, values []string, errors []string, collectedAt []time.Time) error

Add adds metadata updates for an agent to the batcher by writing to a buffered channel. If the channel is full, updates are dropped. Updates to the same metadata key for the same agent are deduplicated in the batch, keeping only the value with the most recent collectedAt timestamp.

func (*Batcher) Close

func (b *Batcher) Close()

type Metrics

type Metrics struct {
	BatchUtilization prometheus.Histogram
	FlushDuration    *prometheus.HistogramVec
	BatchSize        prometheus.Histogram
	BatchesTotal     *prometheus.CounterVec
	DroppedKeysTotal prometheus.Counter
	MetadataTotal    prometheus.Counter
	PublishErrors    prometheus.Counter
}

func NewMetrics

func NewMetrics() Metrics

func (Metrics) Collectors

func (m Metrics) Collectors() []prometheus.Collector

type Option

type Option func(b *Batcher)

Option is a functional option for configuring a Batcher.

func WithBatchSize

func WithBatchSize(size int) Option

func WithClock

func WithClock(clock quartz.Clock) Option

func WithInterval

func WithInterval(d time.Duration) Option

func WithLogger

func WithLogger(log slog.Logger) Option

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL