v1

package
v0.78.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const MAX_RATE_LIMIT_UPDATE_FREQUENCY = 500 * time.Millisecond // avoid boundary conditions on 1 second polls

Variables

View Source
var ErrNoOptimisticSlots = fmt.Errorf("no optimistic slots for scheduling")
View Source
var ErrTenantNotFound = fmt.Errorf("tenant not found in pool")

Functions

This section is empty.

Types

type AssignedItemWithTask added in v0.78.0

type AssignedItemWithTask struct {
	AssignedItem *v1.AssignedItem
	Task         *v1.V1TaskWithPayload
}

type ConcurrencyManager

type ConcurrencyManager struct {
	// contains filtered or unexported fields
}

func (*ConcurrencyManager) Cleanup

func (c *ConcurrencyManager) Cleanup()

type ConcurrencyResults

type ConcurrencyResults struct {
	*v1.RunConcurrencyResult

	TenantId uuid.UUID
}

type Extensions

type Extensions struct {
	// contains filtered or unexported fields
}

func (*Extensions) Add

func (e *Extensions) Add(ext SchedulerExtension)

func (*Extensions) Cleanup

func (e *Extensions) Cleanup() error

func (*Extensions) PostAssign

func (e *Extensions) PostAssign(tenantId uuid.UUID, input *PostAssignInput)

func (*Extensions) ReportSnapshot

func (e *Extensions) ReportSnapshot(tenantId uuid.UUID, input *SnapshotInput)

func (*Extensions) SetTenants

func (e *Extensions) SetTenants(tenants []*sqlcv1.Tenant)

type LeaseManager

type LeaseManager struct {
	// contains filtered or unexported fields
}

LeaseManager is responsible for leases on multiple queues and multiplexing queue results to callers. It is still tenant-scoped.

type PostAssignInput

type PostAssignInput struct {
	HasUnassignedStepRuns bool
}

type PrometheusExtension added in v0.72.2

type PrometheusExtension struct {
	// contains filtered or unexported fields
}

func NewPrometheusExtension added in v0.72.2

func NewPrometheusExtension() *PrometheusExtension

func (*PrometheusExtension) Cleanup added in v0.72.2

func (p *PrometheusExtension) Cleanup() error

func (*PrometheusExtension) PostAssign added in v0.72.2

func (p *PrometheusExtension) PostAssign(tenantId uuid.UUID, input *PostAssignInput)

func (*PrometheusExtension) ReportSnapshot added in v0.72.2

func (p *PrometheusExtension) ReportSnapshot(tenantId uuid.UUID, input *SnapshotInput)

func (*PrometheusExtension) SetTenants added in v0.72.2

func (p *PrometheusExtension) SetTenants(tenants []*sqlcv1.Tenant)

type QueueResults

type QueueResults struct {
	TenantId uuid.UUID
	Assigned []*v1.AssignedItem

	Unassigned         []*sqlcv1.V1QueueItem
	SchedulingTimedOut []*sqlcv1.V1QueueItem
	RateLimited        []*v1.RateLimitResult
}

type Queuer

type Queuer struct {
	// contains filtered or unexported fields
}

func (*Queuer) Cleanup

func (q *Queuer) Cleanup()

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is responsible for scheduling steps to workers as efficiently as possible. This is tenant-scoped, so each tenant will have its own scheduler.

type SchedulerExtension

type SchedulerExtension interface {
	SetTenants(tenants []*sqlcv1.Tenant)
	ReportSnapshot(tenantId uuid.UUID, input *SnapshotInput)
	PostAssign(tenantId uuid.UUID, input *PostAssignInput)
	Cleanup() error
}

type SchedulingPool

type SchedulingPool struct {
	Extensions *Extensions
	// contains filtered or unexported fields
}

SchedulingPool is responsible for managing a pool of tenantManagers.

func NewSchedulingPool

func NewSchedulingPool(
	repo v1.SchedulerRepository,
	l *zerolog.Logger,
	singleQueueLimit int,
	schedulerConcurrencyRateLimit int,
	schedulerConcurrencyPollingMinInterval time.Duration,
	schedulerConcurrencyPollingMaxInterval time.Duration,
	optimisticSchedulingEnabled bool,
	optimisticSlots int,
) (*SchedulingPool, func() error, error)

func (*SchedulingPool) GetConcurrencyResultsCh

func (p *SchedulingPool) GetConcurrencyResultsCh() chan *ConcurrencyResults

func (*SchedulingPool) GetResultsCh

func (p *SchedulingPool) GetResultsCh() chan *QueueResults

func (*SchedulingPool) NotifyConcurrency

func (p *SchedulingPool) NotifyConcurrency(ctx context.Context, tenantId uuid.UUID, strategyIds []int64)

func (*SchedulingPool) NotifyQueues

func (p *SchedulingPool) NotifyQueues(ctx context.Context, tenantId uuid.UUID, queueNames []string)

func (*SchedulingPool) Replenish

func (p *SchedulingPool) Replenish(ctx context.Context, tenantId uuid.UUID)

func (*SchedulingPool) RunOptimisticScheduling added in v0.78.0

func (p *SchedulingPool) RunOptimisticScheduling(ctx context.Context, tenantId uuid.UUID, opts []*v1.WorkflowNameTriggerOpts, localWorkerIds map[uuid.UUID]struct{}) (map[uuid.UUID][]*AssignedItemWithTask, []*v1.V1TaskWithPayload, []*v1.DAGWithData, error)

func (*SchedulingPool) RunOptimisticSchedulingFromEvents added in v0.78.0

func (p *SchedulingPool) RunOptimisticSchedulingFromEvents(ctx context.Context, tenantId uuid.UUID, opts []v1.EventTriggerOpts, localWorkerIds map[uuid.UUID]struct{}) (map[uuid.UUID][]*AssignedItemWithTask, *v1.TriggerFromEventsResult, error)

func (*SchedulingPool) SetTenants

func (p *SchedulingPool) SetTenants(tenants []*sqlcv1.Tenant)

type SlotCp

type SlotCp struct {
	WorkerId uuid.UUID
	Used     bool
}

type SlotUtilization

type SlotUtilization struct {
	UtilizedSlots    int
	NonUtilizedSlots int
}

type SnapshotInput

type SnapshotInput struct {
	Workers               map[uuid.UUID]*WorkerCp
	WorkerSlotUtilization map[uuid.UUID]*SlotUtilization
}

type WorkerCp

type WorkerCp struct {
	WorkerId uuid.UUID
	MaxRuns  int
	Labels   []*sqlcv1.ListManyWorkerLabelsRow
	Name     string
}

type WorkerPromLabels added in v0.72.2

type WorkerPromLabels struct {
	ID   uuid.UUID
	Name string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL