Documentation
¶
Index ¶
- type AssignResult
- type ColoniesController
- func (controller *ColoniesController) AddAttribute(attribute *core.Attribute) (*core.Attribute, error)
- func (controller *ColoniesController) AddChild(processGraphID string, parentProcessID string, childProcessID string, ...) (*core.Process, error)
- func (controller *ColoniesController) AddColony(colony *core.Colony) (*core.Colony, error)
- func (controller *ColoniesController) AddCron(cron *core.Cron) (*core.Cron, error)
- func (controller *ColoniesController) AddExecutor(executor *core.Executor, allowExecutorReregister bool) (*core.Executor, error)
- func (controller *ColoniesController) AddFunction(function *core.Function) (*core.Function, error)
- func (controller *ColoniesController) AddGenerator(generator *core.Generator) (*core.Generator, error)
- func (controller *ColoniesController) AddProcess(process *core.Process) (*core.Process, error)
- func (controller *ColoniesController) AddProcessToDB(process *core.Process) (*core.Process, error)
- func (controller *ColoniesController) AreColonyAssignmentsPaused(colonyName string) (bool, error)
- func (controller *ColoniesController) Assign(executorID string, colonyName string, cpu int64, mem int64) (*AssignResult, error)
- func (controller *ColoniesController) BlockingCmdQueueWorker()
- func (controller *ColoniesController) CalcNextRun(cron *core.Cron) time.Time
- func (controller *ColoniesController) CleanupWorker()
- func (controller *ColoniesController) CloseFailed(processID string, errs []string) error
- func (controller *ColoniesController) CloseSuccessful(processID string, executorID string, output []interface{}) error
- func (controller *ColoniesController) CmdQueueWorker()
- func (controller *ColoniesController) CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, ...) (*core.ProcessGraph, error)
- func (controller *ColoniesController) CreateResumeChannel(colonyName string) <-chan bool
- func (controller *ColoniesController) CronTriggerLoop()
- func (controller *ColoniesController) DistributedAssign(executor *core.Executor, colonyName string, cpu int64, memory int64, ...) (*AssignResult, error)
- func (controller *ColoniesController) FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
- func (controller *ColoniesController) FindProcessHistory(colonyName string, executorID string, seconds int, state int) ([]*core.Process, error)
- func (controller *ColoniesController) FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
- func (controller *ColoniesController) FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
- func (controller *ColoniesController) FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
- func (controller *ColoniesController) GeneratorTriggerLoop()
- func (controller *ColoniesController) GetAttribute(attributeID string) (*core.Attribute, error)
- func (controller *ColoniesController) GetChannelRouter() *channel.Router
- func (controller *ColoniesController) GetColonies() ([]*core.Colony, error)
- func (controller *ColoniesController) GetColony(colonyName string) (*core.Colony, error)
- func (controller *ColoniesController) GetColonyStatistics(colonyName string) (*core.Statistics, error)
- func (controller *ColoniesController) GetCron(cronID string) (*core.Cron, error)
- func (controller *ColoniesController) GetCronByName(colonyName string, cronName string) (*core.Cron, error)
- func (controller *ColoniesController) GetCronPeriod() int
- func (controller *ColoniesController) GetCrons(colonyName string, count int) ([]*core.Cron, error)
- func (controller *ColoniesController) GetEtcdServer() *cluster.EtcdServer
- func (controller *ColoniesController) GetEventHandler() backends.RealtimeEventHandler
- func (controller *ColoniesController) GetExecutor(executorID string) (*core.Executor, error)
- func (controller *ColoniesController) GetExecutorByColonyName(colonyName string) ([]*core.Executor, error)
- func (controller *ColoniesController) GetFunctionByID(functionID string) (*core.Function, error)
- func (controller *ColoniesController) GetFunctionsByColonyName(colonyName string) ([]*core.Function, error)
- func (controller *ColoniesController) GetFunctionsByExecutorName(colonyName string, executorName string) ([]*core.Function, error)
- func (controller *ColoniesController) GetGenerator(generatorID string) (*core.Generator, error)
- func (controller *ColoniesController) GetGeneratorPeriod() int
- func (controller *ColoniesController) GetGenerators(colonyName string, count int) ([]*core.Generator, error)
- func (controller *ColoniesController) GetProcess(processID string) (*core.Process, error)
- func (controller *ColoniesController) GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)
- func (controller *ColoniesController) GetProcessGraphStorage() *processGraphStorageAdapter
- func (controller *ColoniesController) GetStatistics() (*core.Statistics, error)
- func (controller *ColoniesController) GetThisNode() cluster.Node
- func (controller *ColoniesController) HandleDefunctProcessgraph(processGraphID string, processID string, err error) error
- func (controller *ColoniesController) IsLeader() bool
- func (controller *ColoniesController) NotifyChildren(process *core.Process) error
- func (controller *ColoniesController) PackGenerator(generatorID string, colonyName, arg string) error
- func (controller *ColoniesController) PauseColonyAssignments(colonyName string) error
- func (controller *ColoniesController) RemoveAllProcessGraphs(colonyName string, state int) error
- func (controller *ColoniesController) RemoveAllProcesses(colonyName string, state int) error
- func (controller *ColoniesController) RemoveColony(colonyName string) error
- func (controller *ColoniesController) RemoveCron(cronID string) error
- func (controller *ColoniesController) RemoveFunction(functionID string) error
- func (controller *ColoniesController) RemoveGenerator(generatorID string) error
- func (controller *ColoniesController) RemoveProcess(processID string) error
- func (controller *ColoniesController) RemoveProcessGraph(processID string) error
- func (controller *ColoniesController) ResetDatabase() error
- func (controller *ColoniesController) ResetProcess(processID string) error
- func (controller *ColoniesController) ResolveGenerator(colonyName string, generatorName string) (*core.Generator, error)
- func (controller *ColoniesController) ResumeColonyAssignments(colonyName string) error
- func (controller *ColoniesController) RetentionWorker()
- func (controller *ColoniesController) RunCron(cronID string) (*core.Cron, error)
- func (controller *ColoniesController) SetOutput(processID string, output []interface{}) error
- func (controller *ColoniesController) StartCron(cron *core.Cron)
- func (controller *ColoniesController) Stop()
- func (controller *ColoniesController) SubmitWorkflow(generator *core.Generator, counter int, recoveredID string)
- func (controller *ColoniesController) SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)
- func (controller *ColoniesController) SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error
- func (controller *ColoniesController) SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error
- func (controller *ColoniesController) TimeoutLoop()
- func (controller *ColoniesController) TriggerCrons()
- func (controller *ColoniesController) TriggerGenerators()
- func (controller *ColoniesController) TryBecomeLeader() bool
- func (controller *ColoniesController) UnassignExecutor(processID string) error
- func (controller *ColoniesController) UpdateProcessGraph(graph *core.ProcessGraph) error
- func (controller *ColoniesController) WakeupPausedAssignments(colonyName string)
- type Controller
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AssignResult ¶
type AssignResult struct {
Process *core.Process
IsPaused bool
ResumeChannel <-chan bool // Only set when IsPaused=true
}
AssignResult contains the result of a process assignment attempt
type ColoniesController ¶
type ColoniesController struct {
// contains filtered or unexported fields
}
func (*ColoniesController) AddAttribute ¶
func (*ColoniesController) AddExecutor ¶
func (*ColoniesController) AddFunction ¶
func (*ColoniesController) AddGenerator ¶
func (*ColoniesController) AddProcess ¶
func (*ColoniesController) AddProcessToDB ¶
func (*ColoniesController) AreColonyAssignmentsPaused ¶
func (controller *ColoniesController) AreColonyAssignmentsPaused(colonyName string) (bool, error)
func (*ColoniesController) Assign ¶
func (controller *ColoniesController) Assign(executorID string, colonyName string, cpu int64, mem int64) (*AssignResult, error)
func (*ColoniesController) BlockingCmdQueueWorker ¶
func (controller *ColoniesController) BlockingCmdQueueWorker()
func (*ColoniesController) CalcNextRun ¶
func (controller *ColoniesController) CalcNextRun(cron *core.Cron) time.Time
func (*ColoniesController) CleanupWorker ¶ added in v1.9.3
func (controller *ColoniesController) CleanupWorker()
func (*ColoniesController) CloseFailed ¶
func (controller *ColoniesController) CloseFailed(processID string, errs []string) error
func (*ColoniesController) CloseSuccessful ¶
func (controller *ColoniesController) CloseSuccessful(processID string, executorID string, output []interface{}) error
func (*ColoniesController) CmdQueueWorker ¶
func (controller *ColoniesController) CmdQueueWorker()
func (*ColoniesController) CreateProcessGraph ¶
func (controller *ColoniesController) CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, kwargs map[string]interface{}, rootInput []interface{}, recoveredID string) (*core.ProcessGraph, error)
func (*ColoniesController) CreateResumeChannel ¶
func (controller *ColoniesController) CreateResumeChannel(colonyName string) <-chan bool
createResumeChannel creates a channel that will be signaled when assignments are resumed for a colony
func (*ColoniesController) CronTriggerLoop ¶
func (controller *ColoniesController) CronTriggerLoop()
func (*ColoniesController) DistributedAssign ¶ added in v1.9.6
func (controller *ColoniesController) DistributedAssign(executor *core.Executor, colonyName string, cpu int64, memory int64, storage int64) (*AssignResult, error)
DistributedAssign performs atomic process assignment using database-level locking. This method bypasses the scheduler and blocking queue, enabling horizontal scaling of the assign operation across multiple server replicas. Uses SELECT FOR UPDATE SKIP LOCKED to handle concurrent access without race conditions.
func (*ColoniesController) FindFailedProcessGraphs ¶
func (controller *ColoniesController) FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
func (*ColoniesController) FindProcessHistory ¶
func (*ColoniesController) FindRunningProcessGraphs ¶
func (controller *ColoniesController) FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
func (*ColoniesController) FindSuccessfulProcessGraphs ¶
func (controller *ColoniesController) FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
func (*ColoniesController) FindWaitingProcessGraphs ¶
func (controller *ColoniesController) FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
func (*ColoniesController) GeneratorTriggerLoop ¶
func (controller *ColoniesController) GeneratorTriggerLoop()
func (*ColoniesController) GetAttribute ¶
func (controller *ColoniesController) GetAttribute(attributeID string) (*core.Attribute, error)
func (*ColoniesController) GetChannelRouter ¶ added in v1.9.6
func (controller *ColoniesController) GetChannelRouter() *channel.Router
GetChannelRouter returns the channel router
func (*ColoniesController) GetColonies ¶
func (controller *ColoniesController) GetColonies() ([]*core.Colony, error)
func (*ColoniesController) GetColony ¶
func (controller *ColoniesController) GetColony(colonyName string) (*core.Colony, error)
func (*ColoniesController) GetColonyStatistics ¶
func (controller *ColoniesController) GetColonyStatistics(colonyName string) (*core.Statistics, error)
func (*ColoniesController) GetCron ¶
func (controller *ColoniesController) GetCron(cronID string) (*core.Cron, error)
func (*ColoniesController) GetCronByName ¶ added in v1.9.6
func (*ColoniesController) GetCronPeriod ¶
func (controller *ColoniesController) GetCronPeriod() int
func (*ColoniesController) GetEtcdServer ¶
func (controller *ColoniesController) GetEtcdServer() *cluster.EtcdServer
func (*ColoniesController) GetEventHandler ¶
func (controller *ColoniesController) GetEventHandler() backends.RealtimeEventHandler
func (*ColoniesController) GetExecutor ¶
func (controller *ColoniesController) GetExecutor(executorID string) (*core.Executor, error)
func (*ColoniesController) GetExecutorByColonyName ¶
func (controller *ColoniesController) GetExecutorByColonyName(colonyName string) ([]*core.Executor, error)
func (*ColoniesController) GetFunctionByID ¶
func (controller *ColoniesController) GetFunctionByID(functionID string) (*core.Function, error)
func (*ColoniesController) GetFunctionsByColonyName ¶
func (controller *ColoniesController) GetFunctionsByColonyName(colonyName string) ([]*core.Function, error)
func (*ColoniesController) GetFunctionsByExecutorName ¶
func (*ColoniesController) GetGenerator ¶
func (controller *ColoniesController) GetGenerator(generatorID string) (*core.Generator, error)
func (*ColoniesController) GetGeneratorPeriod ¶
func (controller *ColoniesController) GetGeneratorPeriod() int
func (*ColoniesController) GetGenerators ¶
func (*ColoniesController) GetProcess ¶
func (controller *ColoniesController) GetProcess(processID string) (*core.Process, error)
func (*ColoniesController) GetProcessGraphByID ¶
func (controller *ColoniesController) GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)
func (*ColoniesController) GetProcessGraphStorage ¶
func (controller *ColoniesController) GetProcessGraphStorage() *processGraphStorageAdapter
getProcessGraphStorage creates a storage adapter for ProcessGraph
func (*ColoniesController) GetStatistics ¶
func (controller *ColoniesController) GetStatistics() (*core.Statistics, error)
func (*ColoniesController) GetThisNode ¶
func (controller *ColoniesController) GetThisNode() cluster.Node
func (*ColoniesController) HandleDefunctProcessgraph ¶
func (controller *ColoniesController) HandleDefunctProcessgraph(processGraphID string, processID string, err error) error
func (*ColoniesController) IsLeader ¶
func (controller *ColoniesController) IsLeader() bool
func (*ColoniesController) NotifyChildren ¶
func (controller *ColoniesController) NotifyChildren(process *core.Process) error
func (*ColoniesController) PackGenerator ¶
func (controller *ColoniesController) PackGenerator(generatorID string, colonyName, arg string) error
func (*ColoniesController) PauseColonyAssignments ¶
func (controller *ColoniesController) PauseColonyAssignments(colonyName string) error
func (*ColoniesController) RemoveAllProcessGraphs ¶
func (controller *ColoniesController) RemoveAllProcessGraphs(colonyName string, state int) error
func (*ColoniesController) RemoveAllProcesses ¶
func (controller *ColoniesController) RemoveAllProcesses(colonyName string, state int) error
func (*ColoniesController) RemoveColony ¶
func (controller *ColoniesController) RemoveColony(colonyName string) error
func (*ColoniesController) RemoveCron ¶
func (controller *ColoniesController) RemoveCron(cronID string) error
func (*ColoniesController) RemoveFunction ¶
func (controller *ColoniesController) RemoveFunction(functionID string) error
func (*ColoniesController) RemoveGenerator ¶
func (controller *ColoniesController) RemoveGenerator(generatorID string) error
func (*ColoniesController) RemoveProcess ¶
func (controller *ColoniesController) RemoveProcess(processID string) error
func (*ColoniesController) RemoveProcessGraph ¶
func (controller *ColoniesController) RemoveProcessGraph(processID string) error
func (*ColoniesController) ResetDatabase ¶
func (controller *ColoniesController) ResetDatabase() error
func (*ColoniesController) ResetProcess ¶
func (controller *ColoniesController) ResetProcess(processID string) error
func (*ColoniesController) ResolveGenerator ¶
func (*ColoniesController) ResumeColonyAssignments ¶
func (controller *ColoniesController) ResumeColonyAssignments(colonyName string) error
func (*ColoniesController) RetentionWorker ¶
func (controller *ColoniesController) RetentionWorker()
func (*ColoniesController) RunCron ¶
func (controller *ColoniesController) RunCron(cronID string) (*core.Cron, error)
func (*ColoniesController) SetOutput ¶
func (controller *ColoniesController) SetOutput(processID string, output []interface{}) error
func (*ColoniesController) StartCron ¶
func (controller *ColoniesController) StartCron(cron *core.Cron)
func (*ColoniesController) Stop ¶
func (controller *ColoniesController) Stop()
func (*ColoniesController) SubmitWorkflow ¶
func (controller *ColoniesController) SubmitWorkflow(generator *core.Generator, counter int, recoveredID string)
func (*ColoniesController) SubmitWorkflowSpec ¶
func (controller *ColoniesController) SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)
func (*ColoniesController) SubscribeProcess ¶
func (controller *ColoniesController) SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error
func (*ColoniesController) SubscribeProcesses ¶
func (controller *ColoniesController) SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error
func (*ColoniesController) TimeoutLoop ¶
func (controller *ColoniesController) TimeoutLoop()
func (*ColoniesController) TriggerCrons ¶
func (controller *ColoniesController) TriggerCrons()
func (*ColoniesController) TriggerGenerators ¶
func (controller *ColoniesController) TriggerGenerators()
func (*ColoniesController) TryBecomeLeader ¶
func (controller *ColoniesController) TryBecomeLeader() bool
func (*ColoniesController) UnassignExecutor ¶
func (controller *ColoniesController) UnassignExecutor(processID string) error
func (*ColoniesController) UpdateProcessGraph ¶
func (controller *ColoniesController) UpdateProcessGraph(graph *core.ProcessGraph) error
func (*ColoniesController) WakeupPausedAssignments ¶
func (controller *ColoniesController) WakeupPausedAssignments(colonyName string)
wakeupPausedAssignments signals all waiting executors that assignments have been resumed
type Controller ¶
type Controller interface {
GetCronPeriod() int
GetGeneratorPeriod() int
GetEtcdServer() *cluster.EtcdServer
GetEventHandler() backends.RealtimeEventHandler
GetThisNode() cluster.Node
SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error
SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error
GetColonies() ([]*core.Colony, error)
GetColony(colonyName string) (*core.Colony, error)
AddColony(colony *core.Colony) (*core.Colony, error)
RemoveColony(colonyName string) error
AddExecutor(executor *core.Executor, allowExecutorReregister bool) (*core.Executor, error)
GetExecutor(executorID string) (*core.Executor, error)
GetExecutorByColonyName(colonyName string) ([]*core.Executor, error)
AddProcessToDB(process *core.Process) (*core.Process, error)
AddProcess(process *core.Process) (*core.Process, error)
AddChild(processGraphID string, parentProcessID string, childProcessID string, process *core.Process, executorID string, insert bool) (*core.Process, error)
GetProcess(processID string) (*core.Process, error)
FindProcessHistory(colonyName string, executorID string, seconds int, state int) ([]*core.Process, error)
UpdateProcessGraph(graph *core.ProcessGraph) error
CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, kwargs map[string]interface{}, rootInput []interface{}, recoveredID string) (*core.ProcessGraph, error)
SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)
GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)
FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
RemoveProcess(processID string) error
RemoveAllProcesses(colonyName string, state int) error
RemoveProcessGraph(processID string) error
RemoveAllProcessGraphs(colonyName string, state int) error
SetOutput(processID string, output []interface{}) error
CloseSuccessful(processID string, executorID string, output []interface{}) error
NotifyChildren(process *core.Process) error
CloseFailed(processID string, errs []string) error
HandleDefunctProcessgraph(processGraphID string, processID string, err error) error
Assign(executorID string, colonyName string, cpu int64, memory int64) (*AssignResult, error)
DistributedAssign(executor *core.Executor, colonyName string, cpu int64, memory int64, storage int64) (*AssignResult, error)
UnassignExecutor(processID string) error
ResetProcess(processID string) error
GetColonyStatistics(colonyName string) (*core.Statistics, error)
GetStatistics() (*core.Statistics, error)
AddAttribute(attribute *core.Attribute) (*core.Attribute, error)
GetAttribute(attributeID string) (*core.Attribute, error)
AddFunction(function *core.Function) (*core.Function, error)
GetFunctionsByExecutorName(colonyName string, executorName string) ([]*core.Function, error)
GetFunctionsByColonyName(colonyName string) ([]*core.Function, error)
GetFunctionByID(functionID string) (*core.Function, error)
RemoveFunction(functionID string) error
AddGenerator(generator *core.Generator) (*core.Generator, error)
GetGenerator(generatorID string) (*core.Generator, error)
ResolveGenerator(colonyName string, generatorName string) (*core.Generator, error)
GetGenerators(colonyName string, count int) ([]*core.Generator, error)
PackGenerator(generatorID string, colonyName, arg string) error
GeneratorTriggerLoop()
TriggerGenerators()
SubmitWorkflow(generator *core.Generator, counter int, recoveredID string) // TODO: change name, there is also a submitWorkflowSpec()
AddCron(cron *core.Cron) (*core.Cron, error)
RemoveGenerator(generatorID string) error
GetCron(cronID string) (*core.Cron, error)
GetCrons(colonyName string, count int) ([]*core.Cron, error)
GetCronByName(colonyName string, cronName string) (*core.Cron, error)
RunCron(cronID string) (*core.Cron, error)
RemoveCron(cronID string) error
CalcNextRun(cron *core.Cron) time.Time
StartCron(cron *core.Cron)
TriggerCrons()
CronTriggerLoop()
ResetDatabase() error
PauseColonyAssignments(colonyName string) error
ResumeColonyAssignments(colonyName string) error
AreColonyAssignmentsPaused(colonyName string) (bool, error)
Stop()
IsLeader() bool
TryBecomeLeader() bool
TimeoutLoop()
BlockingCmdQueueWorker()
RetentionWorker()
CmdQueueWorker()
GetChannelRouter() *channel.Router
}