Documentation
¶
Index ¶
- func DecodeLockName(ctx context.Context, name string) (*lockName, error)
- type GetSyncLimit
- type Key
- type Manager
- func (sm *Manager) CheckWorkflowExistence(ctx context.Context)
- func (sm *Manager) Initialize(ctx context.Context, wfs []wfv1.Workflow) ([]StaleHold, error)
- func (sm *Manager) Release(ctx context.Context, wf *wfv1.Workflow, nodeName string, ...)
- func (sm *Manager) ReleaseAll(ctx context.Context, wf *wfv1.Workflow) bool
- func (sm *Manager) TryAcquire(ctx context.Context, wf *wfv1.Workflow, nodeName string, ...) (bool, bool, string, string, error)
- type NextWorkflow
- type QueueFunc
- type StaleHold
- type SyncLevelType
- type Throttler
- type WorkflowExists
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func NewLockManager ¶
func NewLockManager(ctx context.Context, kubectlConfig kubernetes.Interface, namespace string, config *config.SyncConfig, getSyncLimit GetSyncLimit, nextWorkflow NextWorkflow, workflowExists WorkflowExists) *Manager
func (*Manager) CheckWorkflowExistence ¶
func (*Manager) Initialize ¶
Initialize re-establishes, in the in-memory lock map, the holds that Running workflows record in their status.
It fails closed only when a holder is genuinely unrecoverable (see initFailureFatal): an undecodable lock name, or a database-backed hold with no database session. Those return an error the controller treats as fatal, because we can neither poison the lock nor prove the spec re-acquires it, so continuing risks a silent double-acquire.
Recoverable failures never crashloop: a lock that cannot be built (transient ConfigMap/DB read) or whose holder key is unresolvable is poisoned (lock-scoped, clears on restart). A database-backed hold that the database no longer records is returned as a StaleHold (at most one per workflow) for the controller to fail the workflow.
func (*Manager) ReleaseAll ¶
func (*Manager) TryAcquire ¶
func (sm *Manager) TryAcquire(ctx context.Context, wf *wfv1.Workflow, nodeName string, syncLockRef *wfv1.Synchronization) (bool, bool, string, string, error)
TryAcquire tries to acquire the lock from semaphore. It returns status of acquiring a lock , status of Workflow status updated, waiting message if lock is not available, the failed lock, and any error encountered
type NextWorkflow ¶
type NextWorkflow func(string)
type StaleHold ¶ added in v4.0.6
StaleHold records a workflow whose recorded hold on a database-backed lock could not be verified against the database during Initialize. The database is the single source of truth for such locks, so the workflow is running on a hold the database no longer backs (e.g. it was expired while the controller was down and may since have been acquired by another holder). The controller fails these workflows; their teardown releases any locks they still hold.
type SyncLevelType ¶
type SyncLevelType int
const ( WorkflowLevel SyncLevelType = 1 TemplateLevel SyncLevelType = 2 ErrorLevel SyncLevelType = 3 )
type Throttler ¶
type Throttler interface {
Init(wfs []wfv1.Workflow) error
Add(key Key, priority int32, creationTime time.Time)
// Admit returns if the item should be processed.
Admit(key Key) bool
// Remove notifies throttler that item processing is no longer needed
Remove(key Key)
// UpdateParallelism
UpdateParallelism(limit int)
// UpdateNamespaceParallelism updates the namespace parallelism
UpdateNamespaceParallelism(namespace string, limit int)
// ResetNamespaceParallelism sets the namespace parallelism to the default value
ResetNamespaceParallelism(namespace string)
}
Throttler allows the controller to limit number of items it is processing in parallel. Items are processed in priority order, and one processing starts, other items (including higher-priority items) will be kept pending until the processing is complete. Implementations should be idempotent.