Documentation
¶
Index ¶
- Constants
- func NewPrometheusProvider() (metric.MeterProvider, error)
- func SafeSendStatus(wctx api.BridgeWorkerContext, status *api.ActionResult, originalErr error) error
- type QueueType
- type StatusWriter
- type UnitQueueManager
- func (u *UnitQueueManager) CancelOperation(operationID uuid.UUID) bool
- func (u *UnitQueueManager) ClearRunningOperation(unitID uuid.UUID, action api.ActionType, operationID uuid.UUID)
- func (u *UnitQueueManager) ErrorChannel() <-chan error
- func (u *UnitQueueManager) GetAndCancelRunningOperation(unitID uuid.UUID, action api.ActionType) (*runningOperation, bool)
- func (u *UnitQueueManager) GetRunningOperationByID(operationID uuid.UUID) *runningOperation
- func (u *UnitQueueManager) QueueBridgeEvent(ctx context.Context, event api.BridgeWorkerEventRequest, ...)
- func (u *UnitQueueManager) QueueFunctionEvent(ctx context.Context, event api.FunctionWorkerEventRequest, ...)
- func (u *UnitQueueManager) QueueStatusEvent(unitID string, result *api.ActionResult, sendFn func(*api.ActionResult) error)
- func (u *UnitQueueManager) RegisterCancelFunc(operationID uuid.UUID, cancel context.CancelFunc)
- func (u *UnitQueueManager) SetRunningOperation(unitID, spaceID, operationID uuid.UUID, action api.ActionType, ...)
- func (u *UnitQueueManager) Start(ctx context.Context)
- func (u *UnitQueueManager) Stop()
- func (u *UnitQueueManager) UnregisterCancelFunc(operationID uuid.UUID)
- type WatcherManager
- func (m *WatcherManager) CancelAll()
- func (m *WatcherManager) IsWatcherActive(unitID uuid.UUID, watchType api.ActionType) bool
- func (m *WatcherManager) Stats() (activeWatchers int, runningWorkers int, idleWorkers int, pendingTasks int)
- func (m *WatcherManager) StopAndWait()
- func (m *WatcherManager) SubmitWatcher(ctx context.Context, unitID uuid.UUID, watchType api.ActionType, ...)
- func (m *WatcherManager) SubmitWatcherAndWait(ctx context.Context, unitID uuid.UUID, watchType api.ActionType, ...)
- type Worker
- func (b *Worker) Start(ctx context.Context) error
- func (b *Worker) WaitForPendingOperations()
- func (b *Worker) WithBridgeWorker(bridgeWorker api.BridgeWorker) *Worker
- func (b *Worker) WithFunctionWorker(functionWorker api.FunctionWorker) *Worker
- func (b *Worker) WithMetricsMeter(meter metric.Meter) *Worker
- type WorkerFrontdoorClient
Constants ¶
const ( ErrorTypeUnmarshalWorkerEvent = "failed_to_unmarshal_worker_event" ErrorTypeUnmarshalBridgeEvent = "failed_to_unmarshal_bridge_event" ErrorTypeUnmarshalFunctionEvent = "failed_to_unmarshal_function_event" ErrorTypeMemoryStatsFailure = "failed_to_fetch_memory_stats" ErrorTypeHeartbeatResponse = "failed_to_response_to_heartbeat" ErrorTypeBridgeCommand = "failed_to_execute_bridge_command" ErrorTypeFunctionCommand = "failed_to_execute_function_command" )
Variables ¶
This section is empty.
Functions ¶
func NewPrometheusProvider ¶
func NewPrometheusProvider() (metric.MeterProvider, error)
func SafeSendStatus ¶
func SafeSendStatus(wctx api.BridgeWorkerContext, status *api.ActionResult, originalErr error) error
SafeSendStatus wraps the SendStatus call, logs errors, and returns a wrapped error if applicable.
Types ¶
type StatusWriter ¶
type StatusWriter struct {
// contains filtered or unexported fields
}
func NewStatusWriter ¶
func NewStatusWriter(wctx api.BridgeWorkerContext, action api.ActionType) *StatusWriter
func (*StatusWriter) Flush ¶
func (w *StatusWriter) Flush() error
type UnitQueueManager ¶
type UnitQueueManager struct {
// contains filtered or unexported fields
}
UnitQueueManager manages queues for different units to ensure serialized operations
func NewUnitQueueManager ¶
func NewUnitQueueManager() *UnitQueueManager
NewUnitQueueManager creates a new UnitQueueManager instance
func (*UnitQueueManager) CancelOperation ¶
func (u *UnitQueueManager) CancelOperation(operationID uuid.UUID) bool
CancelOperation cancels a running operation by its ID Returns true if operation was found and cancelled, false otherwise
func (*UnitQueueManager) ClearRunningOperation ¶
func (u *UnitQueueManager) ClearRunningOperation(unitID uuid.UUID, action api.ActionType, operationID uuid.UUID)
ClearRunningOperation removes the running operation tracking for a unit/category Only removes if the operationID matches (prevents removing a replacement operation)
func (*UnitQueueManager) ErrorChannel ¶
func (u *UnitQueueManager) ErrorChannel() <-chan error
ErrorChannel returns a read-only channel for receiving queue errors Usage example:
go func() {
for err := range manager.ErrorChannel() {
log.Printf("Queue error: %v", err)
}
}()
func (*UnitQueueManager) GetAndCancelRunningOperation ¶
func (u *UnitQueueManager) GetAndCancelRunningOperation(unitID uuid.UUID, action api.ActionType) (*runningOperation, bool)
GetAndCancelRunningOperation checks for and cancels a running same-type operation Returns the cancelled operation info and true if one was found and cancelled Returns nil and false if no conflicting operation was running
func (*UnitQueueManager) GetRunningOperationByID ¶
func (u *UnitQueueManager) GetRunningOperationByID(operationID uuid.UUID) *runningOperation
GetRunningOperationByID looks up a running operation by its operation ID Returns the running operation info if found, nil otherwise This iterates over all running operations since they are keyed by unitID:category
func (*UnitQueueManager) QueueBridgeEvent ¶
func (u *UnitQueueManager) QueueBridgeEvent(ctx context.Context, event api.BridgeWorkerEventRequest, handler func(api.BridgeWorkerEventRequest))
QueueBridgeEvent enqueues a bridge worker event to be processed serially for its unit
func (*UnitQueueManager) QueueFunctionEvent ¶
func (u *UnitQueueManager) QueueFunctionEvent(ctx context.Context, event api.FunctionWorkerEventRequest, handler func(api.FunctionWorkerEventRequest))
QueueFunctionEvent enqueues a function worker event to be processed serially for its unit
func (*UnitQueueManager) QueueStatusEvent ¶
func (u *UnitQueueManager) QueueStatusEvent( unitID string, result *api.ActionResult, sendFn func(*api.ActionResult) error, )
QueueStatusEvent queues a status update for async delivery with infinite retry. Returns immediately (non-blocking). Status is delivered in FIFO order per unit.
func (*UnitQueueManager) RegisterCancelFunc ¶
func (u *UnitQueueManager) RegisterCancelFunc(operationID uuid.UUID, cancel context.CancelFunc)
RegisterCancelFunc stores a cancel function for an operation This allows the operation to be canceled later via CancelOperation
func (*UnitQueueManager) SetRunningOperation ¶
func (u *UnitQueueManager) SetRunningOperation(unitID, spaceID, operationID uuid.UUID, action api.ActionType, revisionNum int64)
SetRunningOperation registers an operation as currently running for a unit/category This enables automatic override detection for same-type operations
func (*UnitQueueManager) Start ¶
func (u *UnitQueueManager) Start(ctx context.Context)
Start initializes the queue manager
func (*UnitQueueManager) Stop ¶
func (u *UnitQueueManager) Stop()
Stop gracefully shuts down the queue manager and all its queues
func (*UnitQueueManager) UnregisterCancelFunc ¶
func (u *UnitQueueManager) UnregisterCancelFunc(operationID uuid.UUID)
UnregisterCancelFunc removes a cancel function when operation completes This should be called via defer to ensure cleanup even if operation fails
type WatcherManager ¶
type WatcherManager struct {
// contains filtered or unexported fields
}
WatcherManager combines watcher lifecycle management with worker pool execution. It ensures only one watcher is active per unit per watch type at any time, with newer watchers canceling and replacing older ones of the same type. This means Apply watchers only replace Apply watchers, and Destroy watchers only replace Destroy watchers. It also manages the goroutine pool for watchers.
func NewWatcherManager ¶
func NewWatcherManager(maxWorkers, maxQueueSize int) *WatcherManager
NewWatcherManager creates a new WatcherManager with the specified pool size
func (*WatcherManager) CancelAll ¶
func (m *WatcherManager) CancelAll()
CancelAll cancels all active watchers (useful for shutdown)
func (*WatcherManager) IsWatcherActive ¶
func (m *WatcherManager) IsWatcherActive(unitID uuid.UUID, watchType api.ActionType) bool
IsWatcherActive checks if a watcher of a specific type is currently active for a unit
func (*WatcherManager) Stats ¶
func (m *WatcherManager) Stats() (activeWatchers int, runningWorkers int, idleWorkers int, pendingTasks int)
Stats returns statistics about the watcher manager
func (*WatcherManager) StopAndWait ¶
func (m *WatcherManager) StopAndWait()
StopAndWait stops the worker pool and waits for all workers to finish
func (*WatcherManager) SubmitWatcher ¶
func (m *WatcherManager) SubmitWatcher(ctx context.Context, unitID uuid.UUID, watchType api.ActionType, task func(context.Context))
SubmitWatcher submits a watcher task for a unit, canceling any existing watcher. The task will be executed in the worker pool with proper lifecycle management. This function returns immediately (async). Use SubmitWatcherAndWait for sync behavior.
func (*WatcherManager) SubmitWatcherAndWait ¶
func (m *WatcherManager) SubmitWatcherAndWait(ctx context.Context, unitID uuid.UUID, watchType api.ActionType, task func(context.Context))
SubmitWatcherAndWait submits a watcher task and waits for it to complete. This keeps the caller blocked until the watcher finishes, which is important for maintaining running operation registration during override detection.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
func (*Worker) WaitForPendingOperations ¶
func (b *Worker) WaitForPendingOperations()
WaitForPendingOperations waits for all in-flight operations to complete
func (*Worker) WithBridgeWorker ¶
func (b *Worker) WithBridgeWorker(bridgeWorker api.BridgeWorker) *Worker
func (*Worker) WithFunctionWorker ¶
func (b *Worker) WithFunctionWorker(functionWorker api.FunctionWorker) *Worker
type WorkerFrontdoorClient ¶ added in v0.1.1
type WorkerFrontdoorClient struct {
// contains filtered or unexported fields
}
func NewWorkerFrontdoorClient ¶ added in v0.1.1
func NewWorkerFrontdoorClient(serverURL, serverPort, workerID, workerSecret string) *WorkerFrontdoorClient
NewWorkerFrontdoorClient creates a new WorkerFrontdoorClient
func (*WorkerFrontdoorClient) Close ¶ added in v0.1.1
func (w *WorkerFrontdoorClient) Close() error
func (*WorkerFrontdoorClient) GetClient ¶ added in v0.1.1
func (w *WorkerFrontdoorClient) GetClient() *goclientnew.ClientWithResponses
getClient returns the authenticated client (thread-safe)