Documentation
¶
Index ¶
- Constants
- func NewPrometheusProvider() (metric.MeterProvider, error)
- func SafeSendStatus(wctx api.BridgeWorkerContext, status *api.ActionResult, originalErr error) error
- type BridgeDispatcher
- func (d *BridgeDispatcher) Apply(ctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
- func (d *BridgeDispatcher) Destroy(ctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
- func (d *BridgeDispatcher) Finalize(ctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
- func (d *BridgeDispatcher) ID() api.BridgeWorkerID
- func (d *BridgeDispatcher) Import(ctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
- func (d *BridgeDispatcher) Info(opts api.InfoOptions) api.BridgeWorkerInfo
- func (d *BridgeDispatcher) Refresh(ctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
- func (d *BridgeDispatcher) RegisterWorker(toolchainType workerapi.ToolchainType, providerType api.ProviderType, ...)
- func (d *BridgeDispatcher) WatchForApply(wctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
- func (d *BridgeDispatcher) WatchForDestroy(wctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
- type QueueType
- type StatusWriter
- type UnitQueueManager
- func (u *UnitQueueManager) CancelOperation(operationID uuid.UUID) bool
- func (u *UnitQueueManager) ClearRunningOperation(unitID uuid.UUID, action api.ActionType, operationID uuid.UUID)
- func (u *UnitQueueManager) ErrorChannel() <-chan error
- func (u *UnitQueueManager) GetAndCancelRunningOperation(unitID uuid.UUID, action api.ActionType) (*runningOperation, bool)
- func (u *UnitQueueManager) GetRunningOperationByID(operationID uuid.UUID) *runningOperation
- func (u *UnitQueueManager) QueueBridgeEvent(ctx context.Context, event api.BridgeWorkerEventRequest, ...)
- func (u *UnitQueueManager) QueueFunctionEvent(ctx context.Context, event api.FunctionWorkerEventRequest, ...)
- func (u *UnitQueueManager) QueueStatusEvent(unitID string, result *api.ActionResult, sendFn func(*api.ActionResult) error)
- func (u *UnitQueueManager) RegisterCancelFunc(operationID uuid.UUID, cancel context.CancelFunc)
- func (u *UnitQueueManager) SetRunningOperation(unitID, spaceID, operationID uuid.UUID, action api.ActionType, ...)
- func (u *UnitQueueManager) Start(ctx context.Context)
- func (u *UnitQueueManager) Stop()
- func (u *UnitQueueManager) UnregisterCancelFunc(operationID uuid.UUID)
- type WatcherManager
- func (m *WatcherManager) CancelAll()
- func (m *WatcherManager) IsWatcherActive(unitID uuid.UUID, watchType api.ActionType) bool
- func (m *WatcherManager) Stats() (activeWatchers int, runningWorkers int, idleWorkers int, pendingTasks int)
- func (m *WatcherManager) StopAndWait()
- func (m *WatcherManager) SubmitWatcher(ctx context.Context, unitID uuid.UUID, watchType api.ActionType, ...)
- func (m *WatcherManager) SubmitWatcherAndWait(ctx context.Context, unitID uuid.UUID, watchType api.ActionType, ...)
- type Worker
- func (b *Worker) Start(ctx context.Context) error
- func (b *Worker) WaitForPendingOperations()
- func (b *Worker) WithBridgeWorker(bridgeWorker api.BridgeWorker) *Worker
- func (b *Worker) WithFunctionExecutor(functionExecutor executor.FunctionExecutor) *Worker
- func (b *Worker) WithMetricsMeter(meter metric.Meter) *Worker
- type WorkerFrontdoorClient
- type WorkerKey
Constants ¶
const ( ErrorTypeUnmarshalWorkerEvent = "failed_to_unmarshal_worker_event" ErrorTypeUnmarshalBridgeEvent = "failed_to_unmarshal_bridge_event" ErrorTypeUnmarshalFunctionEvent = "failed_to_unmarshal_function_event" ErrorTypeMemoryStatsFailure = "failed_to_fetch_memory_stats" ErrorTypeHeartbeatResponse = "failed_to_response_to_heartbeat" ErrorTypeBridgeCommand = "failed_to_execute_bridge_command" ErrorTypeFunctionCommand = "failed_to_execute_function_command" )
Variables ¶
This section is empty.
Functions ¶
func NewPrometheusProvider ¶
func NewPrometheusProvider() (metric.MeterProvider, error)
func SafeSendStatus ¶
func SafeSendStatus(wctx api.BridgeWorkerContext, status *api.ActionResult, originalErr error) error
SafeSendStatus wraps the SendStatus call, logs errors, and returns a wrapped error if applicable.
Types ¶
type BridgeDispatcher ¶
type BridgeDispatcher struct {
// contains filtered or unexported fields
}
BridgeDispatcher is a bridge worker that delegates operations to registered workers based on the toolchain and provider information in the request payload It ensures operations on the same unit are processed sequentially
func NewBridgeDispatcher ¶
func NewBridgeDispatcher() *BridgeDispatcher
NewBridgeDispatcher creates a new Dispatcher instance with unit queue management
func (*BridgeDispatcher) Apply ¶
func (d *BridgeDispatcher) Apply(ctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
Apply delegates the Apply operation to the appropriate worker and ensures operations on the same unit ID are processed sequentially
func (*BridgeDispatcher) Destroy ¶
func (d *BridgeDispatcher) Destroy(ctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
Destroy delegates the Destroy operation to the appropriate worker and ensures operations on the same unit ID are processed sequentially
func (*BridgeDispatcher) Finalize ¶
func (d *BridgeDispatcher) Finalize(ctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
Finalize delegates the Finalize operation to the appropriate worker and ensures operations on the same unit ID are processed sequentially
func (*BridgeDispatcher) ID ¶
func (d *BridgeDispatcher) ID() api.BridgeWorkerID
ID returns an empty BridgeWorkerID since the dispatcher aggregates multiple workers.
func (*BridgeDispatcher) Import ¶
func (d *BridgeDispatcher) Import(ctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
Import delegates the Import operation to the appropriate worker and ensures operations on the same unit ID are processed sequentially
func (*BridgeDispatcher) Info ¶
func (d *BridgeDispatcher) Info(opts api.InfoOptions) api.BridgeWorkerInfo
Info returns aggregated information about all registered workers
func (*BridgeDispatcher) Refresh ¶
func (d *BridgeDispatcher) Refresh(ctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
Refresh delegates the Refresh operation to the appropriate worker and ensures operations on the same unit ID are processed sequentially
func (*BridgeDispatcher) RegisterWorker ¶
func (d *BridgeDispatcher) RegisterWorker(toolchainType workerapi.ToolchainType, providerType api.ProviderType, worker api.BridgeWorker)
RegisterWorker registers a bridge worker for a specific toolchain and provider combination
func (*BridgeDispatcher) WatchForApply ¶
func (d *BridgeDispatcher) WatchForApply(wctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
func (*BridgeDispatcher) WatchForDestroy ¶
func (d *BridgeDispatcher) WatchForDestroy(wctx api.BridgeWorkerContext, payload api.BridgeWorkerPayload) error
type StatusWriter ¶
type StatusWriter struct {
// contains filtered or unexported fields
}
func NewStatusWriter ¶
func NewStatusWriter(wctx api.BridgeWorkerContext, action api.ActionType) *StatusWriter
func (*StatusWriter) Flush ¶
func (w *StatusWriter) Flush() error
type UnitQueueManager ¶
type UnitQueueManager struct {
// contains filtered or unexported fields
}
UnitQueueManager manages queues for different units to ensure serialized operations
func NewUnitQueueManager ¶
func NewUnitQueueManager() *UnitQueueManager
NewUnitQueueManager creates a new UnitQueueManager instance
func (*UnitQueueManager) CancelOperation ¶
func (u *UnitQueueManager) CancelOperation(operationID uuid.UUID) bool
CancelOperation cancels a running operation by its ID Returns true if operation was found and cancelled, false otherwise
func (*UnitQueueManager) ClearRunningOperation ¶
func (u *UnitQueueManager) ClearRunningOperation(unitID uuid.UUID, action api.ActionType, operationID uuid.UUID)
ClearRunningOperation removes the running operation tracking for a unit/category Only removes if the operationID matches (prevents removing a replacement operation)
func (*UnitQueueManager) ErrorChannel ¶
func (u *UnitQueueManager) ErrorChannel() <-chan error
ErrorChannel returns a read-only channel for receiving queue errors Usage example:
go func() {
for err := range manager.ErrorChannel() {
log.Printf("Queue error: %v", err)
}
}()
func (*UnitQueueManager) GetAndCancelRunningOperation ¶
func (u *UnitQueueManager) GetAndCancelRunningOperation(unitID uuid.UUID, action api.ActionType) (*runningOperation, bool)
GetAndCancelRunningOperation checks for and cancels a running same-type operation Returns the cancelled operation info and true if one was found and cancelled Returns nil and false if no conflicting operation was running
func (*UnitQueueManager) GetRunningOperationByID ¶
func (u *UnitQueueManager) GetRunningOperationByID(operationID uuid.UUID) *runningOperation
GetRunningOperationByID looks up a running operation by its operation ID Returns the running operation info if found, nil otherwise This iterates over all running operations since they are keyed by unitID:category
func (*UnitQueueManager) QueueBridgeEvent ¶
func (u *UnitQueueManager) QueueBridgeEvent(ctx context.Context, event api.BridgeWorkerEventRequest, handler func(api.BridgeWorkerEventRequest))
QueueBridgeEvent enqueues a bridge worker event to be processed serially for its unit
func (*UnitQueueManager) QueueFunctionEvent ¶
func (u *UnitQueueManager) QueueFunctionEvent(ctx context.Context, event api.FunctionWorkerEventRequest, handler func(api.FunctionWorkerEventRequest))
QueueFunctionEvent enqueues a function worker event to be processed serially for its unit
func (*UnitQueueManager) QueueStatusEvent ¶
func (u *UnitQueueManager) QueueStatusEvent( unitID string, result *api.ActionResult, sendFn func(*api.ActionResult) error, )
QueueStatusEvent queues a status update for async delivery with infinite retry. Returns immediately (non-blocking). Status is delivered in FIFO order per unit.
func (*UnitQueueManager) RegisterCancelFunc ¶
func (u *UnitQueueManager) RegisterCancelFunc(operationID uuid.UUID, cancel context.CancelFunc)
RegisterCancelFunc stores a cancel function for an operation This allows the operation to be canceled later via CancelOperation
func (*UnitQueueManager) SetRunningOperation ¶
func (u *UnitQueueManager) SetRunningOperation(unitID, spaceID, operationID uuid.UUID, action api.ActionType, revisionNum int64)
SetRunningOperation registers an operation as currently running for a unit/category This enables automatic override detection for same-type operations
func (*UnitQueueManager) Start ¶
func (u *UnitQueueManager) Start(ctx context.Context)
Start initializes the queue manager
func (*UnitQueueManager) Stop ¶
func (u *UnitQueueManager) Stop()
Stop gracefully shuts down the queue manager and all its queues
func (*UnitQueueManager) UnregisterCancelFunc ¶
func (u *UnitQueueManager) UnregisterCancelFunc(operationID uuid.UUID)
UnregisterCancelFunc removes a cancel function when operation completes This should be called via defer to ensure cleanup even if operation fails
type WatcherManager ¶
type WatcherManager struct {
// contains filtered or unexported fields
}
WatcherManager combines watcher lifecycle management with worker pool execution. It ensures only one watcher is active per unit per watch type at any time, with newer watchers canceling and replacing older ones of the same type. This means Apply watchers only replace Apply watchers, and Destroy watchers only replace Destroy watchers. It also manages the goroutine pool for watchers.
func NewWatcherManager ¶
func NewWatcherManager(maxWorkers, maxQueueSize int) *WatcherManager
NewWatcherManager creates a new WatcherManager with the specified pool size
func (*WatcherManager) CancelAll ¶
func (m *WatcherManager) CancelAll()
CancelAll cancels all active watchers (useful for shutdown)
func (*WatcherManager) IsWatcherActive ¶
func (m *WatcherManager) IsWatcherActive(unitID uuid.UUID, watchType api.ActionType) bool
IsWatcherActive checks if a watcher of a specific type is currently active for a unit
func (*WatcherManager) Stats ¶
func (m *WatcherManager) Stats() (activeWatchers int, runningWorkers int, idleWorkers int, pendingTasks int)
Stats returns statistics about the watcher manager
func (*WatcherManager) StopAndWait ¶
func (m *WatcherManager) StopAndWait()
StopAndWait stops the worker pool and waits for all workers to finish
func (*WatcherManager) SubmitWatcher ¶
func (m *WatcherManager) SubmitWatcher(ctx context.Context, unitID uuid.UUID, watchType api.ActionType, task func(context.Context))
SubmitWatcher submits a watcher task for a unit, canceling any existing watcher. The task will be executed in the worker pool with proper lifecycle management. This function returns immediately (async). Use SubmitWatcherAndWait for sync behavior.
func (*WatcherManager) SubmitWatcherAndWait ¶
func (m *WatcherManager) SubmitWatcherAndWait(ctx context.Context, unitID uuid.UUID, watchType api.ActionType, task func(context.Context))
SubmitWatcherAndWait submits a watcher task and waits for it to complete. This keeps the caller blocked until the watcher finishes, which is important for maintaining running operation registration during override detection.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
func (*Worker) WaitForPendingOperations ¶
func (b *Worker) WaitForPendingOperations()
WaitForPendingOperations waits for all in-flight operations to complete
func (*Worker) WithBridgeWorker ¶
func (b *Worker) WithBridgeWorker(bridgeWorker api.BridgeWorker) *Worker
func (*Worker) WithFunctionExecutor ¶
func (b *Worker) WithFunctionExecutor(functionExecutor executor.FunctionExecutor) *Worker
type WorkerFrontdoorClient ¶
type WorkerFrontdoorClient struct {
// contains filtered or unexported fields
}
func NewWorkerFrontdoorClient ¶
func NewWorkerFrontdoorClient(serverURL, serverPort, workerID, workerSecret string) *WorkerFrontdoorClient
NewWorkerFrontdoorClient creates a new WorkerFrontdoorClient
func (*WorkerFrontdoorClient) Close ¶
func (w *WorkerFrontdoorClient) Close() error
func (*WorkerFrontdoorClient) GetClient ¶
func (w *WorkerFrontdoorClient) GetClient() *goclientnew.ClientWithResponses
getClient returns the authenticated client (thread-safe)
type WorkerKey ¶
type WorkerKey struct {
ToolchainType workerapi.ToolchainType
ProviderType api.ProviderType
}
WorkerKey represents a unique identifier for a registered bridge worker