lib

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2026 License: MIT Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrorTypeUnmarshalWorkerEvent   = "failed_to_unmarshal_worker_event"
	ErrorTypeUnmarshalBridgeEvent   = "failed_to_unmarshal_bridge_event"
	ErrorTypeUnmarshalFunctionEvent = "failed_to_unmarshal_function_event"
	ErrorTypeMemoryStatsFailure     = "failed_to_fetch_memory_stats"
	ErrorTypeHeartbeatResponse      = "failed_to_response_to_heartbeat"
	ErrorTypeBridgeCommand          = "failed_to_execute_bridge_command"
	ErrorTypeFunctionCommand        = "failed_to_execute_function_command"
)

Variables

This section is empty.

Functions

func NewPrometheusProvider

func NewPrometheusProvider() (metric.MeterProvider, error)

func SafeSendStatus

func SafeSendStatus(wctx api.BridgeWorkerContext, status *api.ActionResult, originalErr error) error

SafeSendStatus wraps the SendStatus call, logs errors, and returns a wrapped error if applicable.

Types

type QueueType

type QueueType string

QueueType represents the type of event queue

const (
	BridgeQueueType   QueueType = "bridge"
	FunctionQueueType QueueType = "function"
)

type StatusWriter

type StatusWriter struct {
	// contains filtered or unexported fields
}

func NewStatusWriter

func NewStatusWriter(wctx api.BridgeWorkerContext, action api.ActionType) *StatusWriter

func (*StatusWriter) Flush

func (w *StatusWriter) Flush() error

func (*StatusWriter) Write

func (w *StatusWriter) Write(p []byte) (n int, err error)

type UnitQueueManager

type UnitQueueManager struct {
	// contains filtered or unexported fields
}

UnitQueueManager manages queues for different units to ensure serialized operations

func NewUnitQueueManager

func NewUnitQueueManager() *UnitQueueManager

NewUnitQueueManager creates a new UnitQueueManager instance

func (*UnitQueueManager) CancelOperation

func (u *UnitQueueManager) CancelOperation(operationID uuid.UUID) bool

CancelOperation cancels a running operation by its ID Returns true if operation was found and cancelled, false otherwise

func (*UnitQueueManager) ClearRunningOperation

func (u *UnitQueueManager) ClearRunningOperation(unitID uuid.UUID, action api.ActionType, operationID uuid.UUID)

ClearRunningOperation removes the running operation tracking for a unit/category Only removes if the operationID matches (prevents removing a replacement operation)

func (*UnitQueueManager) ErrorChannel

func (u *UnitQueueManager) ErrorChannel() <-chan error

ErrorChannel returns a read-only channel for receiving queue errors Usage example:

go func() {
  for err := range manager.ErrorChannel() {
    log.Printf("Queue error: %v", err)
  }
}()

func (*UnitQueueManager) GetAndCancelRunningOperation

func (u *UnitQueueManager) GetAndCancelRunningOperation(unitID uuid.UUID, action api.ActionType) (*runningOperation, bool)

GetAndCancelRunningOperation checks for and cancels a running same-type operation Returns the cancelled operation info and true if one was found and cancelled Returns nil and false if no conflicting operation was running

func (*UnitQueueManager) GetRunningOperationByID

func (u *UnitQueueManager) GetRunningOperationByID(operationID uuid.UUID) *runningOperation

GetRunningOperationByID looks up a running operation by its operation ID Returns the running operation info if found, nil otherwise This iterates over all running operations since they are keyed by unitID:category

func (*UnitQueueManager) QueueBridgeEvent

func (u *UnitQueueManager) QueueBridgeEvent(ctx context.Context, event api.BridgeWorkerEventRequest, handler func(api.BridgeWorkerEventRequest))

QueueBridgeEvent enqueues a bridge worker event to be processed serially for its unit

func (*UnitQueueManager) QueueFunctionEvent

func (u *UnitQueueManager) QueueFunctionEvent(ctx context.Context, event api.FunctionWorkerEventRequest, handler func(api.FunctionWorkerEventRequest))

QueueFunctionEvent enqueues a function worker event to be processed serially for its unit

func (*UnitQueueManager) QueueStatusEvent

func (u *UnitQueueManager) QueueStatusEvent(
	unitID string,
	result *api.ActionResult,
	sendFn func(*api.ActionResult) error,
)

QueueStatusEvent queues a status update for async delivery with infinite retry. Returns immediately (non-blocking). Status is delivered in FIFO order per unit.

func (*UnitQueueManager) RegisterCancelFunc

func (u *UnitQueueManager) RegisterCancelFunc(operationID uuid.UUID, cancel context.CancelFunc)

RegisterCancelFunc stores a cancel function for an operation This allows the operation to be canceled later via CancelOperation

func (*UnitQueueManager) SetRunningOperation

func (u *UnitQueueManager) SetRunningOperation(unitID, spaceID, operationID uuid.UUID, action api.ActionType, revisionNum int64)

SetRunningOperation registers an operation as currently running for a unit/category This enables automatic override detection for same-type operations

func (*UnitQueueManager) Start

func (u *UnitQueueManager) Start(ctx context.Context)

Start initializes the queue manager

func (*UnitQueueManager) Stop

func (u *UnitQueueManager) Stop()

Stop gracefully shuts down the queue manager and all its queues

func (*UnitQueueManager) UnregisterCancelFunc

func (u *UnitQueueManager) UnregisterCancelFunc(operationID uuid.UUID)

UnregisterCancelFunc removes a cancel function when operation completes This should be called via defer to ensure cleanup even if operation fails

type WatcherManager

type WatcherManager struct {
	// contains filtered or unexported fields
}

WatcherManager combines watcher lifecycle management with worker pool execution. It ensures only one watcher is active per unit per watch type at any time, with newer watchers canceling and replacing older ones of the same type. This means Apply watchers only replace Apply watchers, and Destroy watchers only replace Destroy watchers. It also manages the goroutine pool for watchers.

func NewWatcherManager

func NewWatcherManager(maxWorkers, maxQueueSize int) *WatcherManager

NewWatcherManager creates a new WatcherManager with the specified pool size

func (*WatcherManager) CancelAll

func (m *WatcherManager) CancelAll()

CancelAll cancels all active watchers (useful for shutdown)

func (*WatcherManager) IsWatcherActive

func (m *WatcherManager) IsWatcherActive(unitID uuid.UUID, watchType api.ActionType) bool

IsWatcherActive checks if a watcher of a specific type is currently active for a unit

func (*WatcherManager) Stats

func (m *WatcherManager) Stats() (activeWatchers int, runningWorkers int, idleWorkers int, pendingTasks int)

Stats returns statistics about the watcher manager

func (*WatcherManager) StopAndWait

func (m *WatcherManager) StopAndWait()

StopAndWait stops the worker pool and waits for all workers to finish

func (*WatcherManager) SubmitWatcher

func (m *WatcherManager) SubmitWatcher(ctx context.Context, unitID uuid.UUID, watchType api.ActionType, task func(context.Context))

SubmitWatcher submits a watcher task for a unit, canceling any existing watcher. The task will be executed in the worker pool with proper lifecycle management. This function returns immediately (async). Use SubmitWatcherAndWait for sync behavior.

func (*WatcherManager) SubmitWatcherAndWait

func (m *WatcherManager) SubmitWatcherAndWait(ctx context.Context, unitID uuid.UUID, watchType api.ActionType, task func(context.Context))

SubmitWatcherAndWait submits a watcher task and waits for it to complete. This keeps the caller blocked until the watcher finishes, which is important for maintaining running operation registration during override detection.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func New

func New(url, id, secret string) *Worker

func (*Worker) Start

func (b *Worker) Start(ctx context.Context) error

func (*Worker) WaitForPendingOperations

func (b *Worker) WaitForPendingOperations()

WaitForPendingOperations waits for all in-flight operations to complete

func (*Worker) WithBridgeWorker

func (b *Worker) WithBridgeWorker(bridgeWorker api.BridgeWorker) *Worker

func (*Worker) WithFunctionWorker

func (b *Worker) WithFunctionWorker(functionWorker api.FunctionWorker) *Worker

func (*Worker) WithMetricsMeter

func (b *Worker) WithMetricsMeter(meter metric.Meter) *Worker

type WorkerFrontdoorClient added in v0.1.1

type WorkerFrontdoorClient struct {
	// contains filtered or unexported fields
}

func NewWorkerFrontdoorClient added in v0.1.1

func NewWorkerFrontdoorClient(serverURL, serverPort, workerID, workerSecret string) *WorkerFrontdoorClient

NewWorkerFrontdoorClient creates a new WorkerFrontdoorClient

func (*WorkerFrontdoorClient) Close added in v0.1.1

func (w *WorkerFrontdoorClient) Close() error

func (*WorkerFrontdoorClient) GetClient added in v0.1.1

getClient returns the authenticated client (thread-safe)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL