process

package
v1.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func VerifyFunctionSpec

func VerifyFunctionSpec(funcSpec *core.FunctionSpec) error

Types

type AssignResult

type AssignResult struct {
	Process       *core.Process
	IsPaused      bool
	ResumeChannel <-chan bool
}

type Cluster

type Cluster interface {
	GetLeader() *Leader
}

type Controller

type Controller interface {
	AddProcessToDB(process *core.Process) (*core.Process, error)
	AddProcess(process *core.Process) (*core.Process, error)
	GetProcess(processID string) (*core.Process, error)
	GetExecutor(executorID string) (*core.Executor, error)
	FindProcessHistory(colonyName string, executorID string, seconds int, state int) ([]*core.Process, error)
	RemoveProcess(processID string) error
	RemoveAllProcesses(colonyName string, state int) error
	SetOutput(processID string, output []interface{}) error
	CloseSuccessful(processID string, executorID string, output []interface{}) error
	CloseFailed(processID string, errs []string) error
	Assign(executorID string, colonyName string, cpu int64, memory int64) (*AssignResult, error)
	UnassignExecutor(processID string) error
	PauseColonyAssignments(colonyName string) error
	ResumeColonyAssignments(colonyName string) error
	AreColonyAssignmentsPaused(colonyName string) (bool, error)
	GetEventHandler() *EventHandler
	IsLeader() bool
	GetEtcdServer() EtcdServer
}

type EtcdServer

type EtcdServer interface {
	CurrentCluster() Cluster
}

type EventHandler

type EventHandler struct{}

func (*EventHandler) WaitForProcess

func (e *EventHandler) WaitForProcess(executorType string, state int, processID string, ctx context.Context) (*core.Process, error)

type Handlers

type Handlers struct {
	// contains filtered or unexported fields
}

func NewHandlers

func NewHandlers(server Server) *Handlers

func (*Handlers) HandleAssignProcess

func (h *Handlers) HandleAssignProcess(c backends.Context, recoveredID string, payloadType string, jsonString string, originalRequest string)

handleAssignProcess handles HTTP requests for process assignment to executors.

This function implements leader-based exclusive assignment with retry loop and timeout support. It handles four main scenarios: 1. Non-leader node - redirects request to cluster leader for exclusive assignment 2. Colony assignments are paused - waits for resume signal or timeout 3. Process found - assigns and returns the process immediately 4. No process available - waits for new processes to be submitted

Flow:

  • If exclusiveAssign is enabled and this node is not leader: redirect to leader
  • Validates request parameters and executor permissions
  • Enters retry loop that continues until timeout is reached
  • Each iteration calls controller.assign() to attempt process assignment
  • If assignments are paused: waits on resume channel or timeout
  • If no process found: waits for new process events via waitForProcess()
  • Returns assigned process or appropriate error (timeout, forbidden, etc.)

Parameters:

  • c: Gin HTTP context for the request
  • recoveredID: Executor ID recovered from authentication
  • payloadType: Expected message type for validation
  • jsonString: Request body containing assignment parameters
  • originalRequest: Raw request for leader redirection in cluster mode

func (*Handlers) HandleCloseFailed

func (h *Handlers) HandleCloseFailed(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleCloseSuccessful

func (h *Handlers) HandleCloseSuccessful(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleGetPauseStatus

func (h *Handlers) HandleGetPauseStatus(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleGetProcess

func (h *Handlers) HandleGetProcess(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleGetProcessHist

func (h *Handlers) HandleGetProcessHist(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleGetProcesses

func (h *Handlers) HandleGetProcesses(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandlePauseAssignments

func (h *Handlers) HandlePauseAssignments(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleRemoveAllProcesses

func (h *Handlers) HandleRemoveAllProcesses(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleRemoveProcess

func (h *Handlers) HandleRemoveProcess(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleResumeAssignments

func (h *Handlers) HandleResumeAssignments(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleSetOutput

func (h *Handlers) HandleSetOutput(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleSubmit

func (h *Handlers) HandleSubmit(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) RegisterHandlers

func (h *Handlers) RegisterHandlers(handlerRegistry *registry.HandlerRegistry) error

RegisterHandlers implements the HandlerRegistrar interface

type Leader

type Leader struct {
	Host    string
	APIPort int
}

type Server

type Server interface {
	HandleHTTPError(c backends.Context, err error, errorCode int) bool
	SendHTTPReply(c backends.Context, payloadType string, jsonString string)
	SendEmptyHTTPReply(c backends.Context, payloadType string)
	Validator() security.Validator
	ExecutorDB() database.ExecutorDatabase
	UserDB() database.UserDatabase
	ProcessDB() database.ProcessDatabase
	ProcessController() Controller
	ExclusiveAssign() bool
	TLS() bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL