Documentation
¶
Index ¶
- type AssignResult
- type ColoniesController
- func (controller *ColoniesController) AddAttribute(attribute *core.Attribute) (*core.Attribute, error)
- func (controller *ColoniesController) AddChild(processGraphID string, parentProcessID string, childProcessID string, ...) (*core.Process, error)
- func (controller *ColoniesController) AddColony(colony *core.Colony) (*core.Colony, error)
- func (controller *ColoniesController) AddCron(cron *core.Cron) (*core.Cron, error)
- func (controller *ColoniesController) AddExecutor(executor *core.Executor, allowExecutorReregister bool) (*core.Executor, error)
- func (controller *ColoniesController) AddFunction(function *core.Function) (*core.Function, error)
- func (controller *ColoniesController) AddGenerator(generator *core.Generator) (*core.Generator, error)
- func (controller *ColoniesController) AddProcess(process *core.Process) (*core.Process, error)
- func (controller *ColoniesController) AddProcessToDB(process *core.Process) (*core.Process, error)
- func (controller *ColoniesController) AreColonyAssignmentsPaused(colonyName string) (bool, error)
- func (controller *ColoniesController) Assign(executorID string, colonyName string, cpu int64, mem int64) (*AssignResult, error)
- func (controller *ColoniesController) BlockingCmdQueueWorker()
- func (controller *ColoniesController) CalcNextRun(cron *core.Cron) time.Time
- func (controller *ColoniesController) CloseFailed(processID string, errs []string) error
- func (controller *ColoniesController) CloseSuccessful(processID string, executorID string, output []interface{}) error
- func (controller *ColoniesController) CmdQueueWorker()
- func (controller *ColoniesController) CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, ...) (*core.ProcessGraph, error)
- func (controller *ColoniesController) CreateResumeChannel(colonyName string) <-chan bool
- func (controller *ColoniesController) CronTriggerLoop()
- func (controller *ColoniesController) FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
- func (controller *ColoniesController) FindProcessHistory(colonyName string, executorID string, seconds int, state int) ([]*core.Process, error)
- func (controller *ColoniesController) FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
- func (controller *ColoniesController) FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
- func (controller *ColoniesController) FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
- func (controller *ColoniesController) GeneratorTriggerLoop()
- func (controller *ColoniesController) GetAttribute(attributeID string) (*core.Attribute, error)
- func (controller *ColoniesController) GetColonies() ([]*core.Colony, error)
- func (controller *ColoniesController) GetColony(colonyName string) (*core.Colony, error)
- func (controller *ColoniesController) GetColonyStatistics(colonyName string) (*core.Statistics, error)
- func (controller *ColoniesController) GetCron(cronID string) (*core.Cron, error)
- func (controller *ColoniesController) GetCronPeriod() int
- func (controller *ColoniesController) GetCrons(colonyName string, count int) ([]*core.Cron, error)
- func (controller *ColoniesController) GetEtcdServer() *cluster.EtcdServer
- func (controller *ColoniesController) GetEventHandler() backends.RealtimeEventHandler
- func (controller *ColoniesController) GetExecutor(executorID string) (*core.Executor, error)
- func (controller *ColoniesController) GetExecutorByColonyName(colonyName string) ([]*core.Executor, error)
- func (controller *ColoniesController) GetFunctionByID(functionID string) (*core.Function, error)
- func (controller *ColoniesController) GetFunctionsByColonyName(colonyName string) ([]*core.Function, error)
- func (controller *ColoniesController) GetFunctionsByExecutorName(colonyName string, executorName string) ([]*core.Function, error)
- func (controller *ColoniesController) GetGenerator(generatorID string) (*core.Generator, error)
- func (controller *ColoniesController) GetGeneratorPeriod() int
- func (controller *ColoniesController) GetGenerators(colonyName string, count int) ([]*core.Generator, error)
- func (controller *ColoniesController) GetProcess(processID string) (*core.Process, error)
- func (controller *ColoniesController) GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)
- func (controller *ColoniesController) GetProcessGraphStorage() *processGraphStorageAdapter
- func (controller *ColoniesController) GetStatistics() (*core.Statistics, error)
- func (controller *ColoniesController) GetThisNode() cluster.Node
- func (controller *ColoniesController) HandleDefunctProcessgraph(processGraphID string, processID string, err error) error
- func (controller *ColoniesController) IsLeader() bool
- func (controller *ColoniesController) NotifyChildren(process *core.Process) error
- func (controller *ColoniesController) PackGenerator(generatorID string, colonyName, arg string) error
- func (controller *ColoniesController) PauseColonyAssignments(colonyName string) error
- func (controller *ColoniesController) RemoveAllProcessGraphs(colonyName string, state int) error
- func (controller *ColoniesController) RemoveAllProcesses(colonyName string, state int) error
- func (controller *ColoniesController) RemoveColony(colonyName string) error
- func (controller *ColoniesController) RemoveCron(cronID string) error
- func (controller *ColoniesController) RemoveFunction(functionID string) error
- func (controller *ColoniesController) RemoveGenerator(generatorID string) error
- func (controller *ColoniesController) RemoveProcess(processID string) error
- func (controller *ColoniesController) RemoveProcessGraph(processID string) error
- func (controller *ColoniesController) ResetDatabase() error
- func (controller *ColoniesController) ResetProcess(processID string) error
- func (controller *ColoniesController) ResolveGenerator(colonyName string, generatorName string) (*core.Generator, error)
- func (controller *ColoniesController) ResumeColonyAssignments(colonyName string) error
- func (controller *ColoniesController) RetentionWorker()
- func (controller *ColoniesController) RunCron(cronID string) (*core.Cron, error)
- func (controller *ColoniesController) SetOutput(processID string, output []interface{}) error
- func (controller *ColoniesController) StartCron(cron *core.Cron)
- func (controller *ColoniesController) Stop()
- func (controller *ColoniesController) SubmitWorkflow(generator *core.Generator, counter int, recoveredID string)
- func (controller *ColoniesController) SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)
- func (controller *ColoniesController) SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error
- func (controller *ColoniesController) SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error
- func (controller *ColoniesController) TimeoutLoop()
- func (controller *ColoniesController) TriggerCrons()
- func (controller *ColoniesController) TriggerGenerators()
- func (controller *ColoniesController) TryBecomeLeader() bool
- func (controller *ColoniesController) UnassignExecutor(processID string) error
- func (controller *ColoniesController) UpdateProcessGraph(graph *core.ProcessGraph) error
- func (controller *ColoniesController) WakeupPausedAssignments(colonyName string)
- type Controller
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AssignResult ¶
type AssignResult struct {
Process *core.Process
IsPaused bool
ResumeChannel <-chan bool // Only set when IsPaused=true
}
AssignResult contains the result of a process assignment attempt
type ColoniesController ¶
type ColoniesController struct {
// contains filtered or unexported fields
}
func (*ColoniesController) AddAttribute ¶
func (*ColoniesController) AddExecutor ¶
func (*ColoniesController) AddFunction ¶
func (*ColoniesController) AddGenerator ¶
func (*ColoniesController) AddProcess ¶
func (*ColoniesController) AddProcessToDB ¶
func (*ColoniesController) AreColonyAssignmentsPaused ¶
func (controller *ColoniesController) AreColonyAssignmentsPaused(colonyName string) (bool, error)
func (*ColoniesController) Assign ¶
func (controller *ColoniesController) Assign(executorID string, colonyName string, cpu int64, mem int64) (*AssignResult, error)
func (*ColoniesController) BlockingCmdQueueWorker ¶
func (controller *ColoniesController) BlockingCmdQueueWorker()
func (*ColoniesController) CalcNextRun ¶
func (controller *ColoniesController) CalcNextRun(cron *core.Cron) time.Time
func (*ColoniesController) CloseFailed ¶
func (controller *ColoniesController) CloseFailed(processID string, errs []string) error
func (*ColoniesController) CloseSuccessful ¶
func (controller *ColoniesController) CloseSuccessful(processID string, executorID string, output []interface{}) error
func (*ColoniesController) CmdQueueWorker ¶
func (controller *ColoniesController) CmdQueueWorker()
func (*ColoniesController) CreateProcessGraph ¶
func (controller *ColoniesController) CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, kwargs map[string]interface{}, rootInput []interface{}, recoveredID string) (*core.ProcessGraph, error)
func (*ColoniesController) CreateResumeChannel ¶
func (controller *ColoniesController) CreateResumeChannel(colonyName string) <-chan bool
createResumeChannel creates a channel that will be signaled when assignments are resumed for a colony
func (*ColoniesController) CronTriggerLoop ¶
func (controller *ColoniesController) CronTriggerLoop()
func (*ColoniesController) FindFailedProcessGraphs ¶
func (controller *ColoniesController) FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
func (*ColoniesController) FindProcessHistory ¶
func (*ColoniesController) FindRunningProcessGraphs ¶
func (controller *ColoniesController) FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
func (*ColoniesController) FindSuccessfulProcessGraphs ¶
func (controller *ColoniesController) FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
func (*ColoniesController) FindWaitingProcessGraphs ¶
func (controller *ColoniesController) FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
func (*ColoniesController) GeneratorTriggerLoop ¶
func (controller *ColoniesController) GeneratorTriggerLoop()
func (*ColoniesController) GetAttribute ¶
func (controller *ColoniesController) GetAttribute(attributeID string) (*core.Attribute, error)
func (*ColoniesController) GetColonies ¶
func (controller *ColoniesController) GetColonies() ([]*core.Colony, error)
func (*ColoniesController) GetColony ¶
func (controller *ColoniesController) GetColony(colonyName string) (*core.Colony, error)
func (*ColoniesController) GetColonyStatistics ¶
func (controller *ColoniesController) GetColonyStatistics(colonyName string) (*core.Statistics, error)
func (*ColoniesController) GetCron ¶
func (controller *ColoniesController) GetCron(cronID string) (*core.Cron, error)
func (*ColoniesController) GetCronPeriod ¶
func (controller *ColoniesController) GetCronPeriod() int
func (*ColoniesController) GetEtcdServer ¶
func (controller *ColoniesController) GetEtcdServer() *cluster.EtcdServer
func (*ColoniesController) GetEventHandler ¶
func (controller *ColoniesController) GetEventHandler() backends.RealtimeEventHandler
func (*ColoniesController) GetExecutor ¶
func (controller *ColoniesController) GetExecutor(executorID string) (*core.Executor, error)
func (*ColoniesController) GetExecutorByColonyName ¶
func (controller *ColoniesController) GetExecutorByColonyName(colonyName string) ([]*core.Executor, error)
func (*ColoniesController) GetFunctionByID ¶
func (controller *ColoniesController) GetFunctionByID(functionID string) (*core.Function, error)
func (*ColoniesController) GetFunctionsByColonyName ¶
func (controller *ColoniesController) GetFunctionsByColonyName(colonyName string) ([]*core.Function, error)
func (*ColoniesController) GetFunctionsByExecutorName ¶
func (*ColoniesController) GetGenerator ¶
func (controller *ColoniesController) GetGenerator(generatorID string) (*core.Generator, error)
func (*ColoniesController) GetGeneratorPeriod ¶
func (controller *ColoniesController) GetGeneratorPeriod() int
func (*ColoniesController) GetGenerators ¶
func (*ColoniesController) GetProcess ¶
func (controller *ColoniesController) GetProcess(processID string) (*core.Process, error)
func (*ColoniesController) GetProcessGraphByID ¶
func (controller *ColoniesController) GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)
func (*ColoniesController) GetProcessGraphStorage ¶
func (controller *ColoniesController) GetProcessGraphStorage() *processGraphStorageAdapter
getProcessGraphStorage creates a storage adapter for ProcessGraph
func (*ColoniesController) GetStatistics ¶
func (controller *ColoniesController) GetStatistics() (*core.Statistics, error)
func (*ColoniesController) GetThisNode ¶
func (controller *ColoniesController) GetThisNode() cluster.Node
func (*ColoniesController) HandleDefunctProcessgraph ¶
func (controller *ColoniesController) HandleDefunctProcessgraph(processGraphID string, processID string, err error) error
func (*ColoniesController) IsLeader ¶
func (controller *ColoniesController) IsLeader() bool
func (*ColoniesController) NotifyChildren ¶
func (controller *ColoniesController) NotifyChildren(process *core.Process) error
func (*ColoniesController) PackGenerator ¶
func (controller *ColoniesController) PackGenerator(generatorID string, colonyName, arg string) error
func (*ColoniesController) PauseColonyAssignments ¶
func (controller *ColoniesController) PauseColonyAssignments(colonyName string) error
func (*ColoniesController) RemoveAllProcessGraphs ¶
func (controller *ColoniesController) RemoveAllProcessGraphs(colonyName string, state int) error
func (*ColoniesController) RemoveAllProcesses ¶
func (controller *ColoniesController) RemoveAllProcesses(colonyName string, state int) error
func (*ColoniesController) RemoveColony ¶
func (controller *ColoniesController) RemoveColony(colonyName string) error
func (*ColoniesController) RemoveCron ¶
func (controller *ColoniesController) RemoveCron(cronID string) error
func (*ColoniesController) RemoveFunction ¶
func (controller *ColoniesController) RemoveFunction(functionID string) error
func (*ColoniesController) RemoveGenerator ¶
func (controller *ColoniesController) RemoveGenerator(generatorID string) error
func (*ColoniesController) RemoveProcess ¶
func (controller *ColoniesController) RemoveProcess(processID string) error
func (*ColoniesController) RemoveProcessGraph ¶
func (controller *ColoniesController) RemoveProcessGraph(processID string) error
func (*ColoniesController) ResetDatabase ¶
func (controller *ColoniesController) ResetDatabase() error
func (*ColoniesController) ResetProcess ¶
func (controller *ColoniesController) ResetProcess(processID string) error
func (*ColoniesController) ResolveGenerator ¶
func (*ColoniesController) ResumeColonyAssignments ¶
func (controller *ColoniesController) ResumeColonyAssignments(colonyName string) error
func (*ColoniesController) RetentionWorker ¶
func (controller *ColoniesController) RetentionWorker()
func (*ColoniesController) RunCron ¶
func (controller *ColoniesController) RunCron(cronID string) (*core.Cron, error)
func (*ColoniesController) SetOutput ¶
func (controller *ColoniesController) SetOutput(processID string, output []interface{}) error
func (*ColoniesController) StartCron ¶
func (controller *ColoniesController) StartCron(cron *core.Cron)
func (*ColoniesController) Stop ¶
func (controller *ColoniesController) Stop()
func (*ColoniesController) SubmitWorkflow ¶
func (controller *ColoniesController) SubmitWorkflow(generator *core.Generator, counter int, recoveredID string)
func (*ColoniesController) SubmitWorkflowSpec ¶
func (controller *ColoniesController) SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)
func (*ColoniesController) SubscribeProcess ¶
func (controller *ColoniesController) SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error
func (*ColoniesController) SubscribeProcesses ¶
func (controller *ColoniesController) SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error
func (*ColoniesController) TimeoutLoop ¶
func (controller *ColoniesController) TimeoutLoop()
func (*ColoniesController) TriggerCrons ¶
func (controller *ColoniesController) TriggerCrons()
func (*ColoniesController) TriggerGenerators ¶
func (controller *ColoniesController) TriggerGenerators()
func (*ColoniesController) TryBecomeLeader ¶
func (controller *ColoniesController) TryBecomeLeader() bool
func (*ColoniesController) UnassignExecutor ¶
func (controller *ColoniesController) UnassignExecutor(processID string) error
func (*ColoniesController) UpdateProcessGraph ¶
func (controller *ColoniesController) UpdateProcessGraph(graph *core.ProcessGraph) error
func (*ColoniesController) WakeupPausedAssignments ¶
func (controller *ColoniesController) WakeupPausedAssignments(colonyName string)
wakeupPausedAssignments signals all waiting executors that assignments have been resumed
type Controller ¶
type Controller interface {
GetCronPeriod() int
GetGeneratorPeriod() int
GetEtcdServer() *cluster.EtcdServer
GetEventHandler() backends.RealtimeEventHandler
GetThisNode() cluster.Node
SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error
SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error
GetColonies() ([]*core.Colony, error)
GetColony(colonyName string) (*core.Colony, error)
AddColony(colony *core.Colony) (*core.Colony, error)
RemoveColony(colonyName string) error
AddExecutor(executor *core.Executor, allowExecutorReregister bool) (*core.Executor, error)
GetExecutor(executorID string) (*core.Executor, error)
GetExecutorByColonyName(colonyName string) ([]*core.Executor, error)
AddProcessToDB(process *core.Process) (*core.Process, error)
AddProcess(process *core.Process) (*core.Process, error)
AddChild(processGraphID string, parentProcessID string, childProcessID string, process *core.Process, executorID string, insert bool) (*core.Process, error)
GetProcess(processID string) (*core.Process, error)
FindProcessHistory(colonyName string, executorID string, seconds int, state int) ([]*core.Process, error)
UpdateProcessGraph(graph *core.ProcessGraph) error
CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, kwargs map[string]interface{}, rootInput []interface{}, recoveredID string) (*core.ProcessGraph, error)
SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)
GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)
FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
RemoveProcess(processID string) error
RemoveAllProcesses(colonyName string, state int) error
RemoveProcessGraph(processID string) error
RemoveAllProcessGraphs(colonyName string, state int) error
SetOutput(processID string, output []interface{}) error
CloseSuccessful(processID string, executorID string, output []interface{}) error
NotifyChildren(process *core.Process) error
CloseFailed(processID string, errs []string) error
HandleDefunctProcessgraph(processGraphID string, processID string, err error) error
Assign(executorID string, colonyName string, cpu int64, memory int64) (*AssignResult, error)
UnassignExecutor(processID string) error
ResetProcess(processID string) error
GetColonyStatistics(colonyName string) (*core.Statistics, error)
GetStatistics() (*core.Statistics, error)
AddAttribute(attribute *core.Attribute) (*core.Attribute, error)
GetAttribute(attributeID string) (*core.Attribute, error)
AddFunction(function *core.Function) (*core.Function, error)
GetFunctionsByExecutorName(colonyName string, executorName string) ([]*core.Function, error)
GetFunctionsByColonyName(colonyName string) ([]*core.Function, error)
GetFunctionByID(functionID string) (*core.Function, error)
RemoveFunction(functionID string) error
AddGenerator(generator *core.Generator) (*core.Generator, error)
GetGenerator(generatorID string) (*core.Generator, error)
ResolveGenerator(colonyName string, generatorName string) (*core.Generator, error)
GetGenerators(colonyName string, count int) ([]*core.Generator, error)
PackGenerator(generatorID string, colonyName, arg string) error
GeneratorTriggerLoop()
TriggerGenerators()
SubmitWorkflow(generator *core.Generator, counter int, recoveredID string) // TODO: change name, there is also a submitWorkflowSpec()
AddCron(cron *core.Cron) (*core.Cron, error)
RemoveGenerator(generatorID string) error
GetCron(cronID string) (*core.Cron, error)
GetCrons(colonyName string, count int) ([]*core.Cron, error)
RunCron(cronID string) (*core.Cron, error)
RemoveCron(cronID string) error
CalcNextRun(cron *core.Cron) time.Time
StartCron(cron *core.Cron)
TriggerCrons()
CronTriggerLoop()
ResetDatabase() error
PauseColonyAssignments(colonyName string) error
ResumeColonyAssignments(colonyName string) error
AreColonyAssignmentsPaused(colonyName string) (bool, error)
Stop()
IsLeader() bool
TryBecomeLeader() bool
TimeoutLoop()
BlockingCmdQueueWorker()
RetentionWorker()
CmdQueueWorker()
}
Click to show internal directories.
Click to hide internal directories.