controllers

package
v1.9.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2025 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AssignResult

type AssignResult struct {
	Process       *core.Process
	IsPaused      bool
	ResumeChannel <-chan bool // Only set when IsPaused=true
}

AssignResult contains the result of a process assignment attempt

type ColoniesController

type ColoniesController struct {
	// contains filtered or unexported fields
}

func CreateColoniesController

func CreateColoniesController(db database.Database,
	thisNode cluster.Node,
	clusterConfig cluster.Config,
	etcdDataPath string,
	generatorPeriod int,
	cronPeriod int,
	retention bool,
	retentionPolicy int64,
	retentionPeriod int) *ColoniesController

func (*ColoniesController) AddAttribute

func (controller *ColoniesController) AddAttribute(attribute *core.Attribute) (*core.Attribute, error)

func (*ColoniesController) AddChild

func (controller *ColoniesController) AddChild(
	processGraphID string,
	parentProcessID string,
	childProcessID string,
	process *core.Process,
	executorID string,
	insert bool) (*core.Process, error)

func (*ColoniesController) AddColony

func (controller *ColoniesController) AddColony(colony *core.Colony) (*core.Colony, error)

func (*ColoniesController) AddCron

func (controller *ColoniesController) AddCron(cron *core.Cron) (*core.Cron, error)

func (*ColoniesController) AddExecutor

func (controller *ColoniesController) AddExecutor(executor *core.Executor, allowExecutorReregister bool) (*core.Executor, error)

func (*ColoniesController) AddFunction

func (controller *ColoniesController) AddFunction(function *core.Function) (*core.Function, error)

func (*ColoniesController) AddGenerator

func (controller *ColoniesController) AddGenerator(generator *core.Generator) (*core.Generator, error)

func (*ColoniesController) AddProcess

func (controller *ColoniesController) AddProcess(process *core.Process) (*core.Process, error)

func (*ColoniesController) AddProcessToDB

func (controller *ColoniesController) AddProcessToDB(process *core.Process) (*core.Process, error)

func (*ColoniesController) AreColonyAssignmentsPaused

func (controller *ColoniesController) AreColonyAssignmentsPaused(colonyName string) (bool, error)

func (*ColoniesController) Assign

func (controller *ColoniesController) Assign(executorID string, colonyName string, cpu int64, mem int64) (*AssignResult, error)

func (*ColoniesController) BlockingCmdQueueWorker

func (controller *ColoniesController) BlockingCmdQueueWorker()

func (*ColoniesController) CalcNextRun

func (controller *ColoniesController) CalcNextRun(cron *core.Cron) time.Time

func (*ColoniesController) CleanupWorker added in v1.9.3

func (controller *ColoniesController) CleanupWorker()

func (*ColoniesController) CloseFailed

func (controller *ColoniesController) CloseFailed(processID string, errs []string) error

func (*ColoniesController) CloseSuccessful

func (controller *ColoniesController) CloseSuccessful(processID string, executorID string, output []interface{}) error

func (*ColoniesController) CmdQueueWorker

func (controller *ColoniesController) CmdQueueWorker()

func (*ColoniesController) CreateProcessGraph

func (controller *ColoniesController) CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, kwargs map[string]interface{}, rootInput []interface{}, recoveredID string) (*core.ProcessGraph, error)

func (*ColoniesController) CreateResumeChannel

func (controller *ColoniesController) CreateResumeChannel(colonyName string) <-chan bool

createResumeChannel creates a channel that will be signaled when assignments are resumed for a colony

func (*ColoniesController) CronTriggerLoop

func (controller *ColoniesController) CronTriggerLoop()

func (*ColoniesController) DistributedAssign added in v1.9.6

func (controller *ColoniesController) DistributedAssign(executor *core.Executor, colonyName string, cpu int64, memory int64, storage int64) (*AssignResult, error)

DistributedAssign performs atomic process assignment using database-level locking. This method bypasses the scheduler and blocking queue, enabling horizontal scaling of the assign operation across multiple server replicas. Uses SELECT FOR UPDATE SKIP LOCKED to handle concurrent access without race conditions.

func (*ColoniesController) FindFailedProcessGraphs

func (controller *ColoniesController) FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)

func (*ColoniesController) FindProcessHistory

func (controller *ColoniesController) FindProcessHistory(colonyName string, executorID string, seconds int, state int) ([]*core.Process, error)

func (*ColoniesController) FindRunningProcessGraphs

func (controller *ColoniesController) FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)

func (*ColoniesController) FindSuccessfulProcessGraphs

func (controller *ColoniesController) FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)

func (*ColoniesController) FindWaitingProcessGraphs

func (controller *ColoniesController) FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)

func (*ColoniesController) GeneratorTriggerLoop

func (controller *ColoniesController) GeneratorTriggerLoop()

func (*ColoniesController) GetAttribute

func (controller *ColoniesController) GetAttribute(attributeID string) (*core.Attribute, error)

func (*ColoniesController) GetChannelRouter added in v1.9.6

func (controller *ColoniesController) GetChannelRouter() *channel.Router

GetChannelRouter returns the channel router

func (*ColoniesController) GetColonies

func (controller *ColoniesController) GetColonies() ([]*core.Colony, error)

func (*ColoniesController) GetColony

func (controller *ColoniesController) GetColony(colonyName string) (*core.Colony, error)

func (*ColoniesController) GetColonyStatistics

func (controller *ColoniesController) GetColonyStatistics(colonyName string) (*core.Statistics, error)

func (*ColoniesController) GetCron

func (controller *ColoniesController) GetCron(cronID string) (*core.Cron, error)

func (*ColoniesController) GetCronByName added in v1.9.6

func (controller *ColoniesController) GetCronByName(colonyName string, cronName string) (*core.Cron, error)

func (*ColoniesController) GetCronPeriod

func (controller *ColoniesController) GetCronPeriod() int

func (*ColoniesController) GetCrons

func (controller *ColoniesController) GetCrons(colonyName string, count int) ([]*core.Cron, error)

func (*ColoniesController) GetEtcdServer

func (controller *ColoniesController) GetEtcdServer() *cluster.EtcdServer

func (*ColoniesController) GetEventHandler

func (controller *ColoniesController) GetEventHandler() backends.RealtimeEventHandler

func (*ColoniesController) GetExecutor

func (controller *ColoniesController) GetExecutor(executorID string) (*core.Executor, error)

func (*ColoniesController) GetExecutorByColonyName

func (controller *ColoniesController) GetExecutorByColonyName(colonyName string) ([]*core.Executor, error)

func (*ColoniesController) GetFunctionByID

func (controller *ColoniesController) GetFunctionByID(functionID string) (*core.Function, error)

func (*ColoniesController) GetFunctionsByColonyName

func (controller *ColoniesController) GetFunctionsByColonyName(colonyName string) ([]*core.Function, error)

func (*ColoniesController) GetFunctionsByExecutorName

func (controller *ColoniesController) GetFunctionsByExecutorName(colonyName string, executorName string) ([]*core.Function, error)

func (*ColoniesController) GetGenerator

func (controller *ColoniesController) GetGenerator(generatorID string) (*core.Generator, error)

func (*ColoniesController) GetGeneratorPeriod

func (controller *ColoniesController) GetGeneratorPeriod() int

func (*ColoniesController) GetGenerators

func (controller *ColoniesController) GetGenerators(colonyName string, count int) ([]*core.Generator, error)

func (*ColoniesController) GetProcess

func (controller *ColoniesController) GetProcess(processID string) (*core.Process, error)

func (*ColoniesController) GetProcessGraphByID

func (controller *ColoniesController) GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)

func (*ColoniesController) GetProcessGraphStorage

func (controller *ColoniesController) GetProcessGraphStorage() *processGraphStorageAdapter

getProcessGraphStorage creates a storage adapter for ProcessGraph

func (*ColoniesController) GetStatistics

func (controller *ColoniesController) GetStatistics() (*core.Statistics, error)

func (*ColoniesController) GetThisNode

func (controller *ColoniesController) GetThisNode() cluster.Node

func (*ColoniesController) HandleDefunctProcessgraph

func (controller *ColoniesController) HandleDefunctProcessgraph(processGraphID string, processID string, err error) error

func (*ColoniesController) IsLeader

func (controller *ColoniesController) IsLeader() bool

func (*ColoniesController) NotifyChildren

func (controller *ColoniesController) NotifyChildren(process *core.Process) error

func (*ColoniesController) PackGenerator

func (controller *ColoniesController) PackGenerator(generatorID string, colonyName, arg string) error

func (*ColoniesController) PauseColonyAssignments

func (controller *ColoniesController) PauseColonyAssignments(colonyName string) error

func (*ColoniesController) RemoveAllProcessGraphs

func (controller *ColoniesController) RemoveAllProcessGraphs(colonyName string, state int) error

func (*ColoniesController) RemoveAllProcesses

func (controller *ColoniesController) RemoveAllProcesses(colonyName string, state int) error

func (*ColoniesController) RemoveColony

func (controller *ColoniesController) RemoveColony(colonyName string) error

func (*ColoniesController) RemoveCron

func (controller *ColoniesController) RemoveCron(cronID string) error

func (*ColoniesController) RemoveFunction

func (controller *ColoniesController) RemoveFunction(functionID string) error

func (*ColoniesController) RemoveGenerator

func (controller *ColoniesController) RemoveGenerator(generatorID string) error

func (*ColoniesController) RemoveProcess

func (controller *ColoniesController) RemoveProcess(processID string) error

func (*ColoniesController) RemoveProcessGraph

func (controller *ColoniesController) RemoveProcessGraph(processID string) error

func (*ColoniesController) ResetDatabase

func (controller *ColoniesController) ResetDatabase() error

func (*ColoniesController) ResetProcess

func (controller *ColoniesController) ResetProcess(processID string) error

func (*ColoniesController) ResolveGenerator

func (controller *ColoniesController) ResolveGenerator(colonyName string, generatorName string) (*core.Generator, error)

func (*ColoniesController) ResumeColonyAssignments

func (controller *ColoniesController) ResumeColonyAssignments(colonyName string) error

func (*ColoniesController) RetentionWorker

func (controller *ColoniesController) RetentionWorker()

func (*ColoniesController) RunCron

func (controller *ColoniesController) RunCron(cronID string) (*core.Cron, error)

func (*ColoniesController) SetOutput

func (controller *ColoniesController) SetOutput(processID string, output []interface{}) error

func (*ColoniesController) StartCron

func (controller *ColoniesController) StartCron(cron *core.Cron)

func (*ColoniesController) Stop

func (controller *ColoniesController) Stop()

func (*ColoniesController) SubmitWorkflow

func (controller *ColoniesController) SubmitWorkflow(generator *core.Generator, counter int, recoveredID string)

func (*ColoniesController) SubmitWorkflowSpec

func (controller *ColoniesController) SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)

func (*ColoniesController) SubscribeProcess

func (controller *ColoniesController) SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error

func (*ColoniesController) SubscribeProcesses

func (controller *ColoniesController) SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error

func (*ColoniesController) TimeoutLoop

func (controller *ColoniesController) TimeoutLoop()

func (*ColoniesController) TriggerCrons

func (controller *ColoniesController) TriggerCrons()

func (*ColoniesController) TriggerGenerators

func (controller *ColoniesController) TriggerGenerators()

func (*ColoniesController) TryBecomeLeader

func (controller *ColoniesController) TryBecomeLeader() bool

func (*ColoniesController) UnassignExecutor

func (controller *ColoniesController) UnassignExecutor(processID string) error

func (*ColoniesController) UpdateProcessGraph

func (controller *ColoniesController) UpdateProcessGraph(graph *core.ProcessGraph) error

func (*ColoniesController) WakeupPausedAssignments

func (controller *ColoniesController) WakeupPausedAssignments(colonyName string)

wakeupPausedAssignments signals all waiting executors that assignments have been resumed

type Controller

type Controller interface {
	GetCronPeriod() int
	GetGeneratorPeriod() int
	GetEtcdServer() *cluster.EtcdServer
	GetEventHandler() backends.RealtimeEventHandler
	GetThisNode() cluster.Node
	SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error
	SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error
	GetColonies() ([]*core.Colony, error)
	GetColony(colonyName string) (*core.Colony, error)
	AddColony(colony *core.Colony) (*core.Colony, error)
	RemoveColony(colonyName string) error
	AddExecutor(executor *core.Executor, allowExecutorReregister bool) (*core.Executor, error)
	GetExecutor(executorID string) (*core.Executor, error)
	GetExecutorByColonyName(colonyName string) ([]*core.Executor, error)
	AddProcessToDB(process *core.Process) (*core.Process, error)
	AddProcess(process *core.Process) (*core.Process, error)
	AddChild(processGraphID string, parentProcessID string, childProcessID string, process *core.Process, executorID string, insert bool) (*core.Process, error)
	GetProcess(processID string) (*core.Process, error)
	FindProcessHistory(colonyName string, executorID string, seconds int, state int) ([]*core.Process, error)
	UpdateProcessGraph(graph *core.ProcessGraph) error
	CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, kwargs map[string]interface{}, rootInput []interface{}, recoveredID string) (*core.ProcessGraph, error)
	SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)
	GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)
	FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
	FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
	FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
	FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
	RemoveProcess(processID string) error
	RemoveAllProcesses(colonyName string, state int) error
	RemoveProcessGraph(processID string) error
	RemoveAllProcessGraphs(colonyName string, state int) error
	SetOutput(processID string, output []interface{}) error
	CloseSuccessful(processID string, executorID string, output []interface{}) error
	NotifyChildren(process *core.Process) error
	CloseFailed(processID string, errs []string) error
	HandleDefunctProcessgraph(processGraphID string, processID string, err error) error
	Assign(executorID string, colonyName string, cpu int64, memory int64) (*AssignResult, error)
	DistributedAssign(executor *core.Executor, colonyName string, cpu int64, memory int64, storage int64) (*AssignResult, error)
	UnassignExecutor(processID string) error
	ResetProcess(processID string) error
	GetColonyStatistics(colonyName string) (*core.Statistics, error)
	GetStatistics() (*core.Statistics, error)
	AddAttribute(attribute *core.Attribute) (*core.Attribute, error)
	GetAttribute(attributeID string) (*core.Attribute, error)
	AddFunction(function *core.Function) (*core.Function, error)
	GetFunctionsByExecutorName(colonyName string, executorName string) ([]*core.Function, error)
	GetFunctionsByColonyName(colonyName string) ([]*core.Function, error)
	GetFunctionByID(functionID string) (*core.Function, error)
	RemoveFunction(functionID string) error
	AddGenerator(generator *core.Generator) (*core.Generator, error)
	GetGenerator(generatorID string) (*core.Generator, error)
	ResolveGenerator(colonyName string, generatorName string) (*core.Generator, error)
	GetGenerators(colonyName string, count int) ([]*core.Generator, error)
	PackGenerator(generatorID string, colonyName, arg string) error
	GeneratorTriggerLoop()
	TriggerGenerators()
	SubmitWorkflow(generator *core.Generator, counter int, recoveredID string) // TODO: change name, there is also a submitWorkflowSpec()
	AddCron(cron *core.Cron) (*core.Cron, error)
	RemoveGenerator(generatorID string) error
	GetCron(cronID string) (*core.Cron, error)
	GetCrons(colonyName string, count int) ([]*core.Cron, error)
	GetCronByName(colonyName string, cronName string) (*core.Cron, error)
	RunCron(cronID string) (*core.Cron, error)
	RemoveCron(cronID string) error
	CalcNextRun(cron *core.Cron) time.Time
	StartCron(cron *core.Cron)
	TriggerCrons()
	CronTriggerLoop()
	ResetDatabase() error
	PauseColonyAssignments(colonyName string) error
	ResumeColonyAssignments(colonyName string) error
	AreColonyAssignmentsPaused(colonyName string) (bool, error)
	Stop()
	IsLeader() bool
	TryBecomeLeader() bool
	TimeoutLoop()
	BlockingCmdQueueWorker()
	RetentionWorker()
	CmdQueueWorker()
	GetChannelRouter() *channel.Router
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL