Documentation
¶
Index ¶
- func VerifyFunctionSpec(funcSpec *core.FunctionSpec) error
- type AssignResult
- type Cluster
- type Controller
- type EtcdServer
- type EventHandler
- type Handlers
- func (h *Handlers) HandleAssignProcess(c backends.Context, recoveredID string, payloadType string, jsonString string, ...)
- func (h *Handlers) HandleCloseFailed(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleCloseSuccessful(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleGetPauseStatus(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleGetProcess(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleGetProcessHist(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleGetProcesses(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandlePauseAssignments(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleRemoveAllProcesses(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleRemoveProcess(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleResumeAssignments(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleSetOutput(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleSubmit(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) RegisterHandlers(handlerRegistry *registry.HandlerRegistry) error
- type Leader
- type Server
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func VerifyFunctionSpec ¶
func VerifyFunctionSpec(funcSpec *core.FunctionSpec) error
Types ¶
type AssignResult ¶
type Controller ¶
type Controller interface {
AddProcessToDB(process *core.Process) (*core.Process, error)
AddProcess(process *core.Process) (*core.Process, error)
GetProcess(processID string) (*core.Process, error)
GetExecutor(executorID string) (*core.Executor, error)
FindProcessHistory(colonyName string, executorID string, seconds int, state int) ([]*core.Process, error)
RemoveProcess(processID string) error
RemoveAllProcesses(colonyName string, state int) error
SetOutput(processID string, output []interface{}) error
CloseSuccessful(processID string, executorID string, output []interface{}) error
CloseFailed(processID string, errs []string) error
Assign(executorID string, colonyName string, cpu int64, memory int64) (*AssignResult, error)
UnassignExecutor(processID string) error
PauseColonyAssignments(colonyName string) error
ResumeColonyAssignments(colonyName string) error
AreColonyAssignmentsPaused(colonyName string) (bool, error)
GetEventHandler() *EventHandler
IsLeader() bool
GetEtcdServer() EtcdServer
}
type EtcdServer ¶
type EtcdServer interface {
CurrentCluster() Cluster
}
type EventHandler ¶
type EventHandler struct{}
type Handlers ¶
type Handlers struct {
// contains filtered or unexported fields
}
func NewHandlers ¶
func (*Handlers) HandleAssignProcess ¶
func (h *Handlers) HandleAssignProcess(c backends.Context, recoveredID string, payloadType string, jsonString string, originalRequest string)
handleAssignProcess handles HTTP requests for process assignment to executors.
This function implements leader-based exclusive assignment with retry loop and timeout support. It handles four main scenarios: 1. Non-leader node - redirects request to cluster leader for exclusive assignment 2. Colony assignments are paused - waits for resume signal or timeout 3. Process found - assigns and returns the process immediately 4. No process available - waits for new processes to be submitted
Flow:
- If exclusiveAssign is enabled and this node is not leader: redirect to leader
- Validates request parameters and executor permissions
- Enters retry loop that continues until timeout is reached
- Each iteration calls controller.assign() to attempt process assignment
- If assignments are paused: waits on resume channel or timeout
- If no process found: waits for new process events via waitForProcess()
- Returns assigned process or appropriate error (timeout, forbidden, etc.)
Parameters:
- c: Gin HTTP context for the request
- recoveredID: Executor ID recovered from authentication
- payloadType: Expected message type for validation
- jsonString: Request body containing assignment parameters
- originalRequest: Raw request for leader redirection in cluster mode
func (*Handlers) HandleCloseFailed ¶
func (*Handlers) HandleCloseSuccessful ¶
func (*Handlers) HandleGetPauseStatus ¶
func (*Handlers) HandleGetProcess ¶
func (*Handlers) HandleGetProcessHist ¶
func (*Handlers) HandleGetProcesses ¶
func (*Handlers) HandlePauseAssignments ¶
func (*Handlers) HandleRemoveAllProcesses ¶
func (*Handlers) HandleRemoveProcess ¶
func (*Handlers) HandleResumeAssignments ¶
func (*Handlers) HandleSetOutput ¶
func (*Handlers) HandleSubmit ¶
func (*Handlers) RegisterHandlers ¶
func (h *Handlers) RegisterHandlers(handlerRegistry *registry.HandlerRegistry) error
RegisterHandlers implements the HandlerRegistrar interface
type Server ¶
type Server interface {
HandleHTTPError(c backends.Context, err error, errorCode int) bool
SendHTTPReply(c backends.Context, payloadType string, jsonString string)
SendEmptyHTTPReply(c backends.Context, payloadType string)
Validator() security.Validator
ExecutorDB() database.ExecutorDatabase
UserDB() database.UserDatabase
ProcessDB() database.ProcessDatabase
ProcessController() Controller
ExclusiveAssign() bool
TLS() bool
}
Click to show internal directories.
Click to hide internal directories.