scheduling

package
v0.42.0-alpha.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsTimedout

func IsTimedout(qi *QueueItemWithOrder) bool

Types

type ExhaustedRateLimitCache

type ExhaustedRateLimitCache struct {
	// contains filtered or unexported fields
}

ExhaustedRateLimitCache is a cache of rate limits to their next refill time, which avoids querying queues where we know we're already rate-limited.

func NewExhaustedRateLimitCache

func NewExhaustedRateLimitCache(maxCacheDuration time.Duration) *ExhaustedRateLimitCache

NewExhaustedRateLimitCache creates a new ExhaustedRateLimitCache.

func (*ExhaustedRateLimitCache) IsExhausted

func (rlc *ExhaustedRateLimitCache) IsExhausted(tenantId, queue string) bool

Get returns true if the rate limit is not exhausted, false otherwise.

func (*ExhaustedRateLimitCache) Set

func (rlc *ExhaustedRateLimitCache) Set(tenantId, queue string, exhaustedRateLimitRefillTimes []time.Time)

type QueueItemWithOrder

type QueueItemWithOrder struct {
	*dbsqlc.QueueItem

	Order int
}

type RateLimit

type RateLimit struct {
	// contains filtered or unexported fields
}

func NewRateLimit

func NewRateLimit(key string, rl *dbsqlc.ListRateLimitsForTenantRow) *RateLimit

func (*RateLimit) AddStepRunId

func (rl *RateLimit) AddStepRunId(stepRunId string, units int32) bool

func (*RateLimit) Key

func (rl *RateLimit) Key() string

func (*RateLimit) NextRefill

func (rl *RateLimit) NextRefill() time.Time

func (*RateLimit) Rollback

func (rl *RateLimit) Rollback(stepRunId string)

func (*RateLimit) UnitsConsumed

func (rl *RateLimit) UnitsConsumed() int32

type SchedulePlan

type SchedulePlan struct {
	StepRunIds             []pgtype.UUID
	StepRunTimeouts        []string
	SlotIds                []pgtype.UUID
	WorkerIds              []pgtype.UUID
	UnassignedStepRunIds   []pgtype.UUID
	QueuedStepRuns         []repository.QueuedStepRun
	TimedOutStepRuns       []pgtype.UUID
	RateLimitedStepRuns    []pgtype.UUID
	RateLimitedQueues      map[string][]time.Time
	QueuedItems            []int64
	ShouldContinue         bool
	MinQueuedIds           map[string]int64
	RateLimitUnitsConsumed map[string]int32
}

func GeneratePlan

func GeneratePlan(
	slots []*dbsqlc.ListSemaphoreSlotsToAssignRow,
	uniqueActionsArr []string,
	queueItems []*QueueItemWithOrder,
	stepRateUnits map[string]map[string]int32,
	currRateLimits map[string]*dbsqlc.ListRateLimitsForTenantRow,
	workerLabels map[string][]*dbsqlc.GetWorkerLabelsRow,
	stepDesiredLabels map[string][]*dbsqlc.GetDesiredLabelsRow,
) (SchedulePlan, error)

func (*SchedulePlan) AssignQiToSlot

func (plan *SchedulePlan) AssignQiToSlot(qi *QueueItemWithOrder, slot *dbsqlc.ListSemaphoreSlotsToAssignRow)

func (*SchedulePlan) HandleNoSlots

func (plan *SchedulePlan) HandleNoSlots(qi *QueueItemWithOrder)

func (*SchedulePlan) HandleRateLimited

func (plan *SchedulePlan) HandleRateLimited(qi *QueueItemWithOrder)

func (*SchedulePlan) HandleTimedOut

func (plan *SchedulePlan) HandleTimedOut(qi *QueueItemWithOrder)

func (*SchedulePlan) HandleUnassigned

func (plan *SchedulePlan) HandleUnassigned(qi *QueueItemWithOrder)

func (*SchedulePlan) UpdateMinQueuedIds

func (sp *SchedulePlan) UpdateMinQueuedIds(qi *QueueItemWithOrder) []repository.QueuedStepRun

type WorkerState

type WorkerState struct {
	// contains filtered or unexported fields
}

func NewWorkerState

func NewWorkerState(workerId string, labels []*dbsqlc.GetWorkerLabelsRow) *WorkerState

func (*WorkerState) AddSlot

func (*WorkerState) AssignSlot

func (*WorkerState) CanAssign

func (w *WorkerState) CanAssign(qi *QueueItemWithOrder, desiredLabels []*dbsqlc.GetDesiredLabelsRow) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL