process

package
v1.9.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func VerifyFunctionSpec

func VerifyFunctionSpec(funcSpec *core.FunctionSpec) error

Types

type AssignResult

type AssignResult struct {
	Process       *core.Process
	IsPaused      bool
	ResumeChannel <-chan bool
}

type Cluster

type Cluster interface {
	GetLeader() *Leader
}

type Controller

type Controller interface {
	AddProcessToDB(process *core.Process) (*core.Process, error)
	AddProcess(process *core.Process) (*core.Process, error)
	CloseSuccessful(processID string, executorID string, output []interface{}) error
	CloseFailed(processID string, errs []string) error
	Assign(executorID string, colonyName string, cpu int64, memory int64) (*AssignResult, error)
	DistributedAssign(executor *core.Executor, colonyName string, cpu int64, memory int64, storage int64) (*AssignResult, error)
	UnassignExecutor(processID string) error
	PauseColonyAssignments(colonyName string) error
	ResumeColonyAssignments(colonyName string) error
	AreColonyAssignmentsPaused(colonyName string) (bool, error)
	GetEventHandler() *EventHandler
	IsLeader() bool
	GetEtcdServer() EtcdServer
}

type EtcdServer

type EtcdServer interface {
	CurrentCluster() Cluster
}

type EventHandler

type EventHandler struct {
	// contains filtered or unexported fields
}

func NewEventHandler added in v1.9.6

func NewEventHandler(handler backends.RealtimeEventHandler) *EventHandler

func (*EventHandler) WaitForProcess

func (e *EventHandler) WaitForProcess(executorType string, state int, processID string, location string, ctx context.Context) (*core.Process, error)

type Handlers

type Handlers struct {
	// contains filtered or unexported fields
}

func NewHandlers

func NewHandlers(server Server) *Handlers

func (*Handlers) HandleAssignProcess

func (h *Handlers) HandleAssignProcess(c backends.Context, recoveredID string, payloadType string, jsonString string, originalRequest string)

handleAssignProcess handles HTTP requests for process assignment to executors.

This function implements leader-based exclusive assignment with retry loop and timeout support. It handles four main scenarios: 1. Non-leader node - redirects request to cluster leader for exclusive assignment 2. Colony assignments are paused - waits for resume signal or timeout 3. Process found - assigns and returns the process immediately 4. No process available - waits for new processes to be submitted

Flow:

  • If exclusiveAssign is enabled and this node is not leader: redirect to leader
  • Validates request parameters and executor permissions
  • Enters retry loop that continues until timeout is reached
  • Each iteration calls controller.assign() to attempt process assignment
  • If assignments are paused: waits on resume channel or timeout
  • If no process found: waits for new process events via waitForProcess()
  • Returns assigned process or appropriate error (timeout, forbidden, etc.)

Parameters:

  • c: Gin HTTP context for the request
  • recoveredID: Executor ID recovered from authentication
  • payloadType: Expected message type for validation
  • jsonString: Request body containing assignment parameters
  • originalRequest: Raw request for leader redirection in cluster mode

func (*Handlers) HandleCloseFailed

func (h *Handlers) HandleCloseFailed(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleCloseSuccessful

func (h *Handlers) HandleCloseSuccessful(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleGetPauseStatus

func (h *Handlers) HandleGetPauseStatus(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleGetProcess

func (h *Handlers) HandleGetProcess(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleGetProcessHist

func (h *Handlers) HandleGetProcessHist(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleGetProcesses

func (h *Handlers) HandleGetProcesses(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandlePauseAssignments

func (h *Handlers) HandlePauseAssignments(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleRemoveAllProcesses

func (h *Handlers) HandleRemoveAllProcesses(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleRemoveProcess

func (h *Handlers) HandleRemoveProcess(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleResumeAssignments

func (h *Handlers) HandleResumeAssignments(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleSetOutput

func (h *Handlers) HandleSetOutput(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) HandleSubmit

func (h *Handlers) HandleSubmit(c backends.Context, recoveredID string, payloadType string, jsonString string)

func (*Handlers) RegisterHandlers

func (h *Handlers) RegisterHandlers(handlerRegistry *registry.HandlerRegistry) error

RegisterHandlers implements the HandlerRegistrar interface

type Leader

type Leader struct {
	Host    string
	APIPort int
}

type Server

type Server interface {
	HandleHTTPError(c backends.Context, err error, errorCode int) bool
	SendHTTPReply(c backends.Context, payloadType string, jsonString string)
	SendEmptyHTTPReply(c backends.Context, payloadType string)
	Validator() security.Validator
	ExecutorDB() database.ExecutorDatabase
	UserDB() database.UserDatabase
	ProcessDB() database.ProcessDatabase
	BlueprintDB() database.BlueprintDatabase
	ProcessController() Controller
	ExclusiveAssign() bool
	TLS() bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL