controllers

package
v1.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AssignResult

type AssignResult struct {
	Process       *core.Process
	IsPaused      bool
	ResumeChannel <-chan bool // Only set when IsPaused=true
}

AssignResult contains the result of a process assignment attempt

type ColoniesController

type ColoniesController struct {
	// contains filtered or unexported fields
}

func CreateColoniesController

func CreateColoniesController(db database.Database,
	thisNode cluster.Node,
	clusterConfig cluster.Config,
	etcdDataPath string,
	generatorPeriod int,
	cronPeriod int,
	retention bool,
	retentionPolicy int64,
	retentionPeriod int) *ColoniesController

func (*ColoniesController) AddAttribute

func (controller *ColoniesController) AddAttribute(attribute *core.Attribute) (*core.Attribute, error)

func (*ColoniesController) AddChild

func (controller *ColoniesController) AddChild(
	processGraphID string,
	parentProcessID string,
	childProcessID string,
	process *core.Process,
	executorID string,
	insert bool) (*core.Process, error)

func (*ColoniesController) AddColony

func (controller *ColoniesController) AddColony(colony *core.Colony) (*core.Colony, error)

func (*ColoniesController) AddCron

func (controller *ColoniesController) AddCron(cron *core.Cron) (*core.Cron, error)

func (*ColoniesController) AddExecutor

func (controller *ColoniesController) AddExecutor(executor *core.Executor, allowExecutorReregister bool) (*core.Executor, error)

func (*ColoniesController) AddFunction

func (controller *ColoniesController) AddFunction(function *core.Function) (*core.Function, error)

func (*ColoniesController) AddGenerator

func (controller *ColoniesController) AddGenerator(generator *core.Generator) (*core.Generator, error)

func (*ColoniesController) AddProcess

func (controller *ColoniesController) AddProcess(process *core.Process) (*core.Process, error)

func (*ColoniesController) AddProcessToDB

func (controller *ColoniesController) AddProcessToDB(process *core.Process) (*core.Process, error)

func (*ColoniesController) AreColonyAssignmentsPaused

func (controller *ColoniesController) AreColonyAssignmentsPaused(colonyName string) (bool, error)

func (*ColoniesController) Assign

func (controller *ColoniesController) Assign(executorID string, colonyName string, cpu int64, mem int64) (*AssignResult, error)

func (*ColoniesController) BlockingCmdQueueWorker

func (controller *ColoniesController) BlockingCmdQueueWorker()

func (*ColoniesController) CalcNextRun

func (controller *ColoniesController) CalcNextRun(cron *core.Cron) time.Time

func (*ColoniesController) CleanupWorker added in v1.9.3

func (controller *ColoniesController) CleanupWorker()

func (*ColoniesController) CloseFailed

func (controller *ColoniesController) CloseFailed(processID string, errs []string) error

func (*ColoniesController) CloseSuccessful

func (controller *ColoniesController) CloseSuccessful(processID string, executorID string, output []interface{}) error

func (*ColoniesController) CmdQueueWorker

func (controller *ColoniesController) CmdQueueWorker()

func (*ColoniesController) CreateProcessGraph

func (controller *ColoniesController) CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, kwargs map[string]interface{}, rootInput []interface{}, recoveredID string) (*core.ProcessGraph, error)

func (*ColoniesController) CreateResumeChannel

func (controller *ColoniesController) CreateResumeChannel(colonyName string) <-chan bool

createResumeChannel creates a channel that will be signaled when assignments are resumed for a colony

func (*ColoniesController) CronTriggerLoop

func (controller *ColoniesController) CronTriggerLoop()

func (*ColoniesController) FindFailedProcessGraphs

func (controller *ColoniesController) FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)

func (*ColoniesController) FindProcessHistory

func (controller *ColoniesController) FindProcessHistory(colonyName string, executorID string, seconds int, state int) ([]*core.Process, error)

func (*ColoniesController) FindRunningProcessGraphs

func (controller *ColoniesController) FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)

func (*ColoniesController) FindSuccessfulProcessGraphs

func (controller *ColoniesController) FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)

func (*ColoniesController) FindWaitingProcessGraphs

func (controller *ColoniesController) FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)

func (*ColoniesController) GeneratorTriggerLoop

func (controller *ColoniesController) GeneratorTriggerLoop()

func (*ColoniesController) GetAttribute

func (controller *ColoniesController) GetAttribute(attributeID string) (*core.Attribute, error)

func (*ColoniesController) GetColonies

func (controller *ColoniesController) GetColonies() ([]*core.Colony, error)

func (*ColoniesController) GetColony

func (controller *ColoniesController) GetColony(colonyName string) (*core.Colony, error)

func (*ColoniesController) GetColonyStatistics

func (controller *ColoniesController) GetColonyStatistics(colonyName string) (*core.Statistics, error)

func (*ColoniesController) GetCron

func (controller *ColoniesController) GetCron(cronID string) (*core.Cron, error)

func (*ColoniesController) GetCronPeriod

func (controller *ColoniesController) GetCronPeriod() int

func (*ColoniesController) GetCrons

func (controller *ColoniesController) GetCrons(colonyName string, count int) ([]*core.Cron, error)

func (*ColoniesController) GetEtcdServer

func (controller *ColoniesController) GetEtcdServer() *cluster.EtcdServer

func (*ColoniesController) GetEventHandler

func (controller *ColoniesController) GetEventHandler() backends.RealtimeEventHandler

func (*ColoniesController) GetExecutor

func (controller *ColoniesController) GetExecutor(executorID string) (*core.Executor, error)

func (*ColoniesController) GetExecutorByColonyName

func (controller *ColoniesController) GetExecutorByColonyName(colonyName string) ([]*core.Executor, error)

func (*ColoniesController) GetFunctionByID

func (controller *ColoniesController) GetFunctionByID(functionID string) (*core.Function, error)

func (*ColoniesController) GetFunctionsByColonyName

func (controller *ColoniesController) GetFunctionsByColonyName(colonyName string) ([]*core.Function, error)

func (*ColoniesController) GetFunctionsByExecutorName

func (controller *ColoniesController) GetFunctionsByExecutorName(colonyName string, executorName string) ([]*core.Function, error)

func (*ColoniesController) GetGenerator

func (controller *ColoniesController) GetGenerator(generatorID string) (*core.Generator, error)

func (*ColoniesController) GetGeneratorPeriod

func (controller *ColoniesController) GetGeneratorPeriod() int

func (*ColoniesController) GetGenerators

func (controller *ColoniesController) GetGenerators(colonyName string, count int) ([]*core.Generator, error)

func (*ColoniesController) GetProcess

func (controller *ColoniesController) GetProcess(processID string) (*core.Process, error)

func (*ColoniesController) GetProcessGraphByID

func (controller *ColoniesController) GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)

func (*ColoniesController) GetProcessGraphStorage

func (controller *ColoniesController) GetProcessGraphStorage() *processGraphStorageAdapter

getProcessGraphStorage creates a storage adapter for ProcessGraph

func (*ColoniesController) GetStatistics

func (controller *ColoniesController) GetStatistics() (*core.Statistics, error)

func (*ColoniesController) GetThisNode

func (controller *ColoniesController) GetThisNode() cluster.Node

func (*ColoniesController) HandleDefunctProcessgraph

func (controller *ColoniesController) HandleDefunctProcessgraph(processGraphID string, processID string, err error) error

func (*ColoniesController) IsLeader

func (controller *ColoniesController) IsLeader() bool

func (*ColoniesController) NotifyChildren

func (controller *ColoniesController) NotifyChildren(process *core.Process) error

func (*ColoniesController) PackGenerator

func (controller *ColoniesController) PackGenerator(generatorID string, colonyName, arg string) error

func (*ColoniesController) PauseColonyAssignments

func (controller *ColoniesController) PauseColonyAssignments(colonyName string) error

func (*ColoniesController) RemoveAllProcessGraphs

func (controller *ColoniesController) RemoveAllProcessGraphs(colonyName string, state int) error

func (*ColoniesController) RemoveAllProcesses

func (controller *ColoniesController) RemoveAllProcesses(colonyName string, state int) error

func (*ColoniesController) RemoveColony

func (controller *ColoniesController) RemoveColony(colonyName string) error

func (*ColoniesController) RemoveCron

func (controller *ColoniesController) RemoveCron(cronID string) error

func (*ColoniesController) RemoveFunction

func (controller *ColoniesController) RemoveFunction(functionID string) error

func (*ColoniesController) RemoveGenerator

func (controller *ColoniesController) RemoveGenerator(generatorID string) error

func (*ColoniesController) RemoveProcess

func (controller *ColoniesController) RemoveProcess(processID string) error

func (*ColoniesController) RemoveProcessGraph

func (controller *ColoniesController) RemoveProcessGraph(processID string) error

func (*ColoniesController) ResetDatabase

func (controller *ColoniesController) ResetDatabase() error

func (*ColoniesController) ResetProcess

func (controller *ColoniesController) ResetProcess(processID string) error

func (*ColoniesController) ResolveGenerator

func (controller *ColoniesController) ResolveGenerator(colonyName string, generatorName string) (*core.Generator, error)

func (*ColoniesController) ResumeColonyAssignments

func (controller *ColoniesController) ResumeColonyAssignments(colonyName string) error

func (*ColoniesController) RetentionWorker

func (controller *ColoniesController) RetentionWorker()

func (*ColoniesController) RunCron

func (controller *ColoniesController) RunCron(cronID string) (*core.Cron, error)

func (*ColoniesController) SetOutput

func (controller *ColoniesController) SetOutput(processID string, output []interface{}) error

func (*ColoniesController) StartCron

func (controller *ColoniesController) StartCron(cron *core.Cron)

func (*ColoniesController) Stop

func (controller *ColoniesController) Stop()

func (*ColoniesController) SubmitWorkflow

func (controller *ColoniesController) SubmitWorkflow(generator *core.Generator, counter int, recoveredID string)

func (*ColoniesController) SubmitWorkflowSpec

func (controller *ColoniesController) SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)

func (*ColoniesController) SubscribeProcess

func (controller *ColoniesController) SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error

func (*ColoniesController) SubscribeProcesses

func (controller *ColoniesController) SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error

func (*ColoniesController) TimeoutLoop

func (controller *ColoniesController) TimeoutLoop()

func (*ColoniesController) TriggerCrons

func (controller *ColoniesController) TriggerCrons()

func (*ColoniesController) TriggerGenerators

func (controller *ColoniesController) TriggerGenerators()

func (*ColoniesController) TryBecomeLeader

func (controller *ColoniesController) TryBecomeLeader() bool

func (*ColoniesController) UnassignExecutor

func (controller *ColoniesController) UnassignExecutor(processID string) error

func (*ColoniesController) UpdateProcessGraph

func (controller *ColoniesController) UpdateProcessGraph(graph *core.ProcessGraph) error

func (*ColoniesController) WakeupPausedAssignments

func (controller *ColoniesController) WakeupPausedAssignments(colonyName string)

wakeupPausedAssignments signals all waiting executors that assignments have been resumed

type Controller

type Controller interface {
	GetCronPeriod() int
	GetGeneratorPeriod() int
	GetEtcdServer() *cluster.EtcdServer
	GetEventHandler() backends.RealtimeEventHandler
	GetThisNode() cluster.Node
	SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error
	SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error
	GetColonies() ([]*core.Colony, error)
	GetColony(colonyName string) (*core.Colony, error)
	AddColony(colony *core.Colony) (*core.Colony, error)
	RemoveColony(colonyName string) error
	AddExecutor(executor *core.Executor, allowExecutorReregister bool) (*core.Executor, error)
	GetExecutor(executorID string) (*core.Executor, error)
	GetExecutorByColonyName(colonyName string) ([]*core.Executor, error)
	AddProcessToDB(process *core.Process) (*core.Process, error)
	AddProcess(process *core.Process) (*core.Process, error)
	AddChild(processGraphID string, parentProcessID string, childProcessID string, process *core.Process, executorID string, insert bool) (*core.Process, error)
	GetProcess(processID string) (*core.Process, error)
	FindProcessHistory(colonyName string, executorID string, seconds int, state int) ([]*core.Process, error)
	UpdateProcessGraph(graph *core.ProcessGraph) error
	CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, kwargs map[string]interface{}, rootInput []interface{}, recoveredID string) (*core.ProcessGraph, error)
	SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)
	GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)
	FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
	FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
	FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
	FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
	RemoveProcess(processID string) error
	RemoveAllProcesses(colonyName string, state int) error
	RemoveProcessGraph(processID string) error
	RemoveAllProcessGraphs(colonyName string, state int) error
	SetOutput(processID string, output []interface{}) error
	CloseSuccessful(processID string, executorID string, output []interface{}) error
	NotifyChildren(process *core.Process) error
	CloseFailed(processID string, errs []string) error
	HandleDefunctProcessgraph(processGraphID string, processID string, err error) error
	Assign(executorID string, colonyName string, cpu int64, memory int64) (*AssignResult, error)
	UnassignExecutor(processID string) error
	ResetProcess(processID string) error
	GetColonyStatistics(colonyName string) (*core.Statistics, error)
	GetStatistics() (*core.Statistics, error)
	AddAttribute(attribute *core.Attribute) (*core.Attribute, error)
	GetAttribute(attributeID string) (*core.Attribute, error)
	AddFunction(function *core.Function) (*core.Function, error)
	GetFunctionsByExecutorName(colonyName string, executorName string) ([]*core.Function, error)
	GetFunctionsByColonyName(colonyName string) ([]*core.Function, error)
	GetFunctionByID(functionID string) (*core.Function, error)
	RemoveFunction(functionID string) error
	AddGenerator(generator *core.Generator) (*core.Generator, error)
	GetGenerator(generatorID string) (*core.Generator, error)
	ResolveGenerator(colonyName string, generatorName string) (*core.Generator, error)
	GetGenerators(colonyName string, count int) ([]*core.Generator, error)
	PackGenerator(generatorID string, colonyName, arg string) error
	GeneratorTriggerLoop()
	TriggerGenerators()
	SubmitWorkflow(generator *core.Generator, counter int, recoveredID string) // TODO: change name, there is also a submitWorkflowSpec()
	AddCron(cron *core.Cron) (*core.Cron, error)
	RemoveGenerator(generatorID string) error
	GetCron(cronID string) (*core.Cron, error)
	GetCrons(colonyName string, count int) ([]*core.Cron, error)
	RunCron(cronID string) (*core.Cron, error)
	RemoveCron(cronID string) error
	CalcNextRun(cron *core.Cron) time.Time
	StartCron(cron *core.Cron)
	TriggerCrons()
	CronTriggerLoop()
	ResetDatabase() error
	PauseColonyAssignments(colonyName string) error
	ResumeColonyAssignments(colonyName string) error
	AreColonyAssignmentsPaused(colonyName string) (bool, error)
	Stop()
	IsLeader() bool
	TryBecomeLeader() bool
	TimeoutLoop()
	BlockingCmdQueueWorker()
	RetentionWorker()
	CmdQueueWorker()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL