runner

package
v0.17.0-alpha1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2025 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BaseUID    = 9000
	MaxUID     = 20000
	NoGroupGID = 65534
	NoBodyUID  = 65534
)
View Source
const (
	DefaultRunnerID   = 0
	DefaultRunnerName = "default"
)

Variables

View Source
var (
	ErrNoCapacity          = errors.New("no runner capacity available")
	ErrPredictionNotFound  = errors.New("prediction not found")
	ErrRunnerNotFound      = errors.New("runner not found")
	ErrNoEmptySlot         = errors.New("no empty slot available")
	ErrInvalidRunnerStatus = errors.New("invalid runner status for new prediction")
	// ErrAsyncPrediction is a sentinel error used to indicate that a prediction is being served asynchronously, it is not surfaced outside of runner
	ErrAsyncPrediction = errors.New("async prediction")
)
View Source
var (
	Base64Regex           = regexp.MustCompile(`^data:.*;base64,(?P<base64>.*)$`)
	ErrSchemaNotAvailable = errors.New("OpenAPI schema not available for input processing")
)
View Source
var (
	LogRegex      = regexp.MustCompile(`^\[pid=(?P<pid>[^]]+)] (?P<msg>.*)$`)
	ResponseRegex = regexp.MustCompile(`^response-(?P<pid>\S+)-(?P<epoch>\d+).json$`)
	CancelFmt     = "cancel-%s"
	ErrNoCommand  = errors.New("no command available")
)

Functions

func AllocateUID

func AllocateUID() (int, error)

AllocateUID allocates a unique UID for process isolation

func Base64ToInput

func Base64ToInput(s string, paths *[]string) (string, error)

Base64ToInput converts base64 data URLs to temporary files

func OutputToBase64

func OutputToBase64(s string, paths *[]string) (string, error)

OutputToBase64 converts file paths to base64 data URLs

func PredictionID

func PredictionID() (string, error)

func PrepareProcedureSourceURL

func PrepareProcedureSourceURL(srcURL, runnerID string) (string, error)

func ProcessInputPaths

func ProcessInputPaths(input any, doc *openapi3.T, paths *[]string, fn func(string, *[]string) (string, error)) (any, error)

ProcessInputPaths processes the input paths and discards the now unused paths from the input. Note that we return the input, but the expectation is that input will be mutated in-place. This function returns ErrSchemaNotAvailable if the OpenAPI schema is not available. It is up to the caller to decide how handles this error (e.g. log a warning and proceed without path processing).

func URLToInput

func URLToInput(s string, paths *[]string) (string, error)

URLToInput downloads HTTP URLs to temporary files

Types

type Build

type Build struct {
	GPU        bool `yaml:"gpu"`
	Fast       bool `yaml:"fast"`
	CogRuntime bool `yaml:"cog_runtime"`
}

type CogConcurrency

type CogConcurrency struct {
	Max int `yaml:"max"`
}

type CogYaml

type CogYaml struct {
	Build       Build          `yaml:"build"`
	Concurrency CogConcurrency `yaml:"concurrency"`
	Predict     string         `yaml:"predict"`
}

func ReadCogYaml

func ReadCogYaml(dir string) (*CogYaml, error)

func (*CogYaml) PredictModuleAndPredictor

func (y *CogYaml) PredictModuleAndPredictor() (string, string, error)

type Concurrency

type Concurrency struct {
	Max     int `json:"max,omitempty"`
	Current int `json:"current,omitempty"`
}

type LogsSlice

type LogsSlice []string

LogsSlice is a []string that marshals to/from a newline-joined string in JSON

func (LogsSlice) MarshalJSON

func (l LogsSlice) MarshalJSON() ([]byte, error)

MarshalJSON implements custom JSON marshaling to convert logs from []string to string

func (LogsSlice) String

func (l LogsSlice) String() string

func (*LogsSlice) UnmarshalJSON

func (l *LogsSlice) UnmarshalJSON(data []byte) error

UnmarshalJSON implements custom JSON unmarshaling to convert logs from string to []string

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages the lifecycle and capacity of prediction runners

func NewManager

func NewManager(ctx context.Context, cfg config.Config, logger *logging.Logger) *Manager

NewManager creates a new runner manager with channel-based capacity control

func (*Manager) AvailableCapacity

func (m *Manager) AvailableCapacity() int

AvailableCapacity returns the number of available capacity slots

func (*Manager) CancelPrediction

func (m *Manager) CancelPrediction(predictionID string) error

func (*Manager) Capacity

func (m *Manager) Capacity() int

Capacity returns the number of available capacity slots

func (*Manager) Concurrency

func (m *Manager) Concurrency() Concurrency

Concurrency returns semaphore-based concurrency info

func (*Manager) ExitCode

func (m *Manager) ExitCode() int

ExitCode returns exit code for non-procedure mode

func (*Manager) ForceKillAll

func (m *Manager) ForceKillAll()

ForceKillAll immediately force-kills all runners and waits briefly for cleanup

func (*Manager) HandleRunnerIPC

func (m *Manager) HandleRunnerIPC(runnerName, status string) error

func (*Manager) IsStopped

func (m *Manager) IsStopped() bool

IsStopped returns whether the manager has been stopped

func (*Manager) PredictAsync

func (m *Manager) PredictAsync(ctx context.Context, req PredictionRequest) (*PredictionResponse, error)

PredictAsync executes an async prediction request - returns immediately, sends webhook when complete

func (*Manager) PredictSync

func (m *Manager) PredictSync(req PredictionRequest) (*PredictionResponse, error)

PredictSync executes a sync prediction request - blocks until complete

func (*Manager) Runners

func (m *Manager) Runners() []*Runner

Runners returns a list of all active runners

func (*Manager) Schema

func (m *Manager) Schema() (string, bool)

Schema returns the appropriate schema - procedure schema for procedure mode, runner schema for non-procedure mode

func (*Manager) SetupResult

func (m *Manager) SetupResult() SetupResult

SetupResult returns setup result for health checks

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start initializes the manager

func (*Manager) Status

func (m *Manager) Status() string

Status returns the overall system status

func (*Manager) Stop

func (m *Manager) Stop() error

Stop gracefully shuts down all runners

type MetricsPayload

type MetricsPayload struct {
	Source string         `json:"source,omitempty"`
	Type   string         `json:"type,omitempty"`
	Data   map[string]any `json:"data,omitempty"`
}

type PendingPrediction

type PendingPrediction struct {
	// contains filtered or unexported fields
}

type PredictionRequest

type PredictionRequest struct {
	Input               any             `json:"input"`
	ID                  string          `json:"id"`
	CreatedAt           string          `json:"created_at"`
	StartedAt           string          `json:"started_at"`
	Webhook             string          `json:"webhook,omitempty"`
	WebhookEventsFilter []webhook.Event `json:"webhook_events_filter,omitempty"`
	OutputFilePrefix    string          `json:"output_file_prefix,omitempty"`
	Context             map[string]any  `json:"context"`

	ProcedureSourceURL string `json:"-"` // this is not sent to the python code, used internally
}

type PredictionResponse

type PredictionResponse struct {
	ID      string           `json:"id"`
	Status  PredictionStatus `json:"status,omitempty"`
	Input   any              `json:"input"`
	Output  any              `json:"output,omitempty"`
	Error   string           `json:"error,omitempty"`
	Logs    LogsSlice        `json:"logs,omitempty"`
	Metrics map[string]any   `json:"metrics,omitempty"`

	CreatedAt   string `json:"created_at,omitempty"`
	StartedAt   string `json:"started_at,omitempty"`
	CompletedAt string `json:"completed_at,omitempty"`
}

type PredictionStatus

type PredictionStatus string
const (
	PredictionStarting   PredictionStatus = "starting"
	PredictionProcessing PredictionStatus = "processing"
	PredictionSucceeded  PredictionStatus = "succeeded"
	PredictionCanceled   PredictionStatus = "canceled"
	PredictionFailed     PredictionStatus = "failed"
)

func (PredictionStatus) IsCompleted

func (s PredictionStatus) IsCompleted() bool

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

func NewRunner

func NewRunner(ctx context.Context, ctxCancel context.CancelFunc, runnerCtx RunnerContext, command *exec.Cmd, maxConcurrency int, cfg config.Config, logger *logging.Logger) (*Runner, error)

NewRunner creates a new runner instance with the given context

func (*Runner) Cancel

func (r *Runner) Cancel(pid string) error

func (*Runner) Config

func (r *Runner) Config(ctx context.Context) error

func (*Runner) ForceKill

func (r *Runner) ForceKill()

func (*Runner) GracefulShutdown

func (r *Runner) GracefulShutdown()

func (*Runner) HandleIPC

func (r *Runner) HandleIPC(status string) error

func (*Runner) Idle

func (r *Runner) Idle() bool

func (*Runner) Start

func (r *Runner) Start(ctx context.Context) error

func (*Runner) Stop

func (r *Runner) Stop() error

func (*Runner) String

func (r *Runner) String() string

func (*Runner) WaitForStop

func (r *Runner) WaitForStop()

type RunnerContext

type RunnerContext struct {
	// contains filtered or unexported fields
}

RunnerContext contains everything a runner needs to operate

func (*RunnerContext) Cleanup

func (rc *RunnerContext) Cleanup() error

type RunnerID

type RunnerID string

RunnerID is a unique identifier for a runner instance. Format: 8-character base32 string (no leading zeros) Example: "k7m3n8p2", "b9q4x2w1"

func GenerateRunnerID

func GenerateRunnerID() RunnerID

GenerateRunnerID generates a new random runner ID

func (RunnerID) String

func (r RunnerID) String() string

type SetupResult

type SetupResult struct {
	Status SetupStatus `json:"status"`
	Logs   string      `json:"logs,omitempty"`
	Schema string      `json:"schema,omitempty"`
}

type SetupStatus

type SetupStatus string
const (
	SetupSucceeded SetupStatus = "succeeded"
	SetupFailed    SetupStatus = "failed"
)

type Status

type Status int
const (
	StatusInvalid Status = iota - 1 // -1 is invalid status
	StatusStarting
	StatusSetupFailed
	StatusReady
	StatusBusy
	StatusDefunct
)

func StatusFromString

func StatusFromString(statusStr string) (Status, error)

func (Status) String

func (s Status) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL