sync

package
v4.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DecodeLockName

func DecodeLockName(ctx context.Context, name string) (*lockName, error)

Types

type GetSyncLimit

type GetSyncLimit func(context.Context, string) (int, error)

type Key

type Key = string

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewLockManager

func NewLockManager(ctx context.Context, kubectlConfig kubernetes.Interface, namespace string, config *config.SyncConfig, getSyncLimit GetSyncLimit, nextWorkflow NextWorkflow, workflowExists WorkflowExists) *Manager

func (*Manager) CheckWorkflowExistence

func (sm *Manager) CheckWorkflowExistence(ctx context.Context)

func (*Manager) Initialize

func (sm *Manager) Initialize(ctx context.Context, wfs []wfv1.Workflow) ([]StaleHold, error)

Initialize re-establishes, in the in-memory lock map, the holds that Running workflows record in their status.

It fails closed only when a holder is genuinely unrecoverable (see initFailureFatal): an undecodable lock name, or a database-backed hold with no database session. Those return an error the controller treats as fatal, because we can neither poison the lock nor prove the spec re-acquires it, so continuing risks a silent double-acquire.

Recoverable failures never crashloop: a lock that cannot be built (transient ConfigMap/DB read) or whose holder key is unresolvable is poisoned (lock-scoped, clears on restart). A database-backed hold that the database no longer records is returned as a StaleHold (at most one per workflow) for the controller to fail the workflow.

func (*Manager) Release

func (sm *Manager) Release(ctx context.Context, wf *wfv1.Workflow, nodeName string, syncRef *wfv1.Synchronization)

func (*Manager) ReleaseAll

func (sm *Manager) ReleaseAll(ctx context.Context, wf *wfv1.Workflow) bool

func (*Manager) TryAcquire

func (sm *Manager) TryAcquire(ctx context.Context, wf *wfv1.Workflow, nodeName string, syncLockRef *wfv1.Synchronization) (bool, bool, string, string, error)

TryAcquire tries to acquire the lock from semaphore. It returns status of acquiring a lock , status of Workflow status updated, waiting message if lock is not available, the failed lock, and any error encountered

type NextWorkflow

type NextWorkflow func(string)

type QueueFunc

type QueueFunc func(Key)

type StaleHold added in v4.0.6

type StaleHold struct {
	WF     *wfv1.Workflow
	Reason string
}

StaleHold records a workflow whose recorded hold on a database-backed lock could not be verified against the database during Initialize. The database is the single source of truth for such locks, so the workflow is running on a hold the database no longer backs (e.g. it was expired while the controller was down and may since have been acquired by another holder). The controller fails these workflows; their teardown releases any locks they still hold.

type SyncLevelType

type SyncLevelType int
const (
	WorkflowLevel SyncLevelType = 1
	TemplateLevel SyncLevelType = 2
	ErrorLevel    SyncLevelType = 3
)

type Throttler

type Throttler interface {
	Init(wfs []wfv1.Workflow) error
	Add(key Key, priority int32, creationTime time.Time)
	// Admit returns if the item should be processed.
	Admit(key Key) bool
	// Remove notifies throttler that item processing is no longer needed
	Remove(key Key)
	// UpdateParallelism
	UpdateParallelism(limit int)
	// UpdateNamespaceParallelism updates the namespace parallelism
	UpdateNamespaceParallelism(namespace string, limit int)
	// ResetNamespaceParallelism sets the namespace parallelism to the default value
	ResetNamespaceParallelism(namespace string)
}

Throttler allows the controller to limit number of items it is processing in parallel. Items are processed in priority order, and one processing starts, other items (including higher-priority items) will be kept pending until the processing is complete. Implementations should be idempotent.

func NewMultiThrottler

func NewMultiThrottler(parallelism int, namespaceParallelismLimit int, queue QueueFunc) Throttler

NewMultiThrottler creates a new multi throttler for throttling both namespace and global parallelism, a parallelism value of zero disables throttling

type WorkflowExists

type WorkflowExists func(string) bool

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL