Documentation
¶
Index ¶
- func VerifyFunctionSpec(funcSpec *core.FunctionSpec) error
- type AssignResult
- type Cluster
- type Controller
- type EtcdServer
- type EventHandler
- type Handlers
- func (h *Handlers) HandleAssignProcess(c backends.Context, recoveredID string, payloadType string, jsonString string, ...)
- func (h *Handlers) HandleCloseFailed(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleCloseSuccessful(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleGetPauseStatus(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleGetProcess(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleGetProcessHist(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleGetProcesses(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandlePauseAssignments(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleRemoveAllProcesses(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleRemoveProcess(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleResumeAssignments(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleSetOutput(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) HandleSubmit(c backends.Context, recoveredID string, payloadType string, jsonString string)
- func (h *Handlers) RegisterHandlers(handlerRegistry *registry.HandlerRegistry) error
- type Leader
- type Server
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func VerifyFunctionSpec ¶
func VerifyFunctionSpec(funcSpec *core.FunctionSpec) error
Types ¶
type AssignResult ¶
type Controller ¶
type Controller interface {
AddProcessToDB(process *core.Process) (*core.Process, error)
AddProcess(process *core.Process) (*core.Process, error)
GetProcess(processID string) (*core.Process, error)
GetExecutor(executorID string) (*core.Executor, error)
FindProcessHistory(colonyName string, executorID string, seconds int, state int) ([]*core.Process, error)
RemoveProcess(processID string) error
RemoveAllProcesses(colonyName string, state int) error
SetOutput(processID string, output []interface{}) error
CloseSuccessful(processID string, executorID string, output []interface{}) error
CloseFailed(processID string, errs []string) error
Assign(executorID string, colonyName string, cpu int64, memory int64) (*AssignResult, error)
UnassignExecutor(processID string) error
PauseColonyAssignments(colonyName string) error
ResumeColonyAssignments(colonyName string) error
AreColonyAssignmentsPaused(colonyName string) (bool, error)
GetEventHandler() *EventHandler
IsLeader() bool
GetEtcdServer() EtcdServer
}
type EtcdServer ¶
type EtcdServer interface {
CurrentCluster() Cluster
}
type EventHandler ¶
type EventHandler struct{}
type Handlers ¶
type Handlers struct {
// contains filtered or unexported fields
}
func NewHandlers ¶
func (*Handlers) HandleAssignProcess ¶
func (h *Handlers) HandleAssignProcess(c backends.Context, recoveredID string, payloadType string, jsonString string, originalRequest string)
handleAssignProcess handles HTTP requests for process assignment to executors.
This function implements leader-based exclusive assignment with retry loop and timeout support. It handles four main scenarios: 1. Non-leader node - redirects request to cluster leader for exclusive assignment 2. Colony assignments are paused - waits for resume signal or timeout 3. Process found - assigns and returns the process immediately 4. No process available - waits for new processes to be submitted
Flow:
- If exclusiveAssign is enabled and this node is not leader: redirect to leader
- Validates request parameters and executor permissions
- Enters retry loop that continues until timeout is reached
- Each iteration calls controller.assign() to attempt process assignment
- If assignments are paused: waits on resume channel or timeout
- If no process found: waits for new process events via waitForProcess()
- Returns assigned process or appropriate error (timeout, forbidden, etc.)
Parameters:
- c: Gin HTTP context for the request
- recoveredID: Executor ID recovered from authentication
- payloadType: Expected message type for validation
- jsonString: Request body containing assignment parameters
- originalRequest: Raw request for leader redirection in cluster mode
func (*Handlers) HandleCloseFailed ¶
func (*Handlers) HandleCloseSuccessful ¶
func (*Handlers) HandleGetPauseStatus ¶
func (*Handlers) HandleGetProcess ¶
func (*Handlers) HandleGetProcessHist ¶
func (*Handlers) HandleGetProcesses ¶
func (*Handlers) HandlePauseAssignments ¶
func (*Handlers) HandleRemoveAllProcesses ¶
func (*Handlers) HandleRemoveProcess ¶
func (*Handlers) HandleResumeAssignments ¶
func (*Handlers) HandleSetOutput ¶
func (*Handlers) HandleSubmit ¶
func (*Handlers) RegisterHandlers ¶
func (h *Handlers) RegisterHandlers(handlerRegistry *registry.HandlerRegistry) error
RegisterHandlers implements the HandlerRegistrar interface
type Server ¶
type Server interface {
HandleHTTPError(c backends.Context, err error, errorCode int) bool
SendHTTPReply(c backends.Context, payloadType string, jsonString string)
SendEmptyHTTPReply(c backends.Context, payloadType string)
Validator() security.Validator
ExecutorDB() database.ExecutorDatabase
UserDB() database.UserDatabase
ProcessDB() database.ProcessDatabase
BlueprintDB() database.BlueprintDatabase
ProcessController() Controller
ExclusiveAssign() bool
TLS() bool
}
Click to show internal directories.
Click to hide internal directories.