Documentation
¶
Index ¶
- Constants
- Variables
- func AllocateUID() (int, error)
- func Base64ToInput(s string, paths *[]string) (string, error)
- func OutputToBase64(s string, paths *[]string) (string, error)
- func PredictionID() (string, error)
- func PrepareProcedureSourceURL(srcURL, runnerID string) (string, error)
- func ProcessInputPaths(input any, doc *openapi3.T, paths *[]string, ...) (any, error)
- func URLToInput(s string, paths *[]string) (string, error)
- type Build
- type CogConcurrency
- type CogYaml
- type Concurrency
- type LogsSlice
- type Manager
- func (m *Manager) AvailableCapacity() int
- func (m *Manager) CancelPrediction(predictionID string) error
- func (m *Manager) Capacity() int
- func (m *Manager) Concurrency() Concurrency
- func (m *Manager) ExitCode() int
- func (m *Manager) ForceKillAll()
- func (m *Manager) HandleRunnerIPC(runnerName, status string) error
- func (m *Manager) IsStopped() bool
- func (m *Manager) PredictAsync(ctx context.Context, req PredictionRequest) (*PredictionResponse, error)
- func (m *Manager) PredictSync(req PredictionRequest) (*PredictionResponse, error)
- func (m *Manager) Runners() []*Runner
- func (m *Manager) Schema() (string, bool)
- func (m *Manager) SetupResult() SetupResult
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) Status() string
- func (m *Manager) Stop() error
- type MetricsPayload
- type PendingPrediction
- type PredictionRequest
- type PredictionResponse
- type PredictionStatus
- type Runner
- func (r *Runner) Cancel(pid string) error
- func (r *Runner) Config(ctx context.Context) error
- func (r *Runner) ForceKill()
- func (r *Runner) GracefulShutdown()
- func (r *Runner) HandleIPC(status string) error
- func (r *Runner) Idle() bool
- func (r *Runner) Start(ctx context.Context) error
- func (r *Runner) Stop() error
- func (r *Runner) String() string
- func (r *Runner) WaitForStop()
- type RunnerContext
- type RunnerID
- type SetupResult
- type SetupStatus
- type Status
Constants ¶
const ( BaseUID = 9000 MaxUID = 20000 NoGroupGID = 65534 NoBodyUID = 65534 )
const ( DefaultRunnerID = 0 DefaultRunnerName = "default" )
Variables ¶
var ( ErrNoCapacity = errors.New("no runner capacity available") ErrPredictionNotFound = errors.New("prediction not found") ErrRunnerNotFound = errors.New("runner not found") ErrNoEmptySlot = errors.New("no empty slot available") ErrInvalidRunnerStatus = errors.New("invalid runner status for new prediction") // ErrAsyncPrediction is a sentinel error used to indicate that a prediction is being served asynchronously, it is not surfaced outside of runner ErrAsyncPrediction = errors.New("async prediction") )
var ( Base64Regex = regexp.MustCompile(`^data:.*;base64,(?P<base64>.*)$`) ErrSchemaNotAvailable = errors.New("OpenAPI schema not available for input processing") )
var ( LogRegex = regexp.MustCompile(`^\[pid=(?P<pid>[^]]+)] (?P<msg>.*)$`) ResponseRegex = regexp.MustCompile(`^response-(?P<pid>\S+)-(?P<epoch>\d+).json$`) CancelFmt = "cancel-%s" ErrNoCommand = errors.New("no command available") )
Functions ¶
func AllocateUID ¶
AllocateUID allocates a unique UID for process isolation
func Base64ToInput ¶
Base64ToInput converts base64 data URLs to temporary files
func OutputToBase64 ¶
OutputToBase64 converts file paths to base64 data URLs
func PredictionID ¶
func ProcessInputPaths ¶
func ProcessInputPaths(input any, doc *openapi3.T, paths *[]string, fn func(string, *[]string) (string, error)) (any, error)
ProcessInputPaths processes the input paths and discards the now unused paths from the input. Note that we return the input, but the expectation is that input will be mutated in-place. This function returns ErrSchemaNotAvailable if the OpenAPI schema is not available. It is up to the caller to decide how handles this error (e.g. log a warning and proceed without path processing).
Types ¶
type CogConcurrency ¶
type CogConcurrency struct {
Max int `yaml:"max"`
}
type CogYaml ¶
type CogYaml struct {
Build Build `yaml:"build"`
Concurrency CogConcurrency `yaml:"concurrency"`
Predict string `yaml:"predict"`
}
func ReadCogYaml ¶
type Concurrency ¶
type LogsSlice ¶
type LogsSlice []string
LogsSlice is a []string that marshals to/from a newline-joined string in JSON
func (LogsSlice) MarshalJSON ¶
MarshalJSON implements custom JSON marshaling to convert logs from []string to string
func (*LogsSlice) UnmarshalJSON ¶
UnmarshalJSON implements custom JSON unmarshaling to convert logs from string to []string
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages the lifecycle and capacity of prediction runners
func NewManager ¶
NewManager creates a new runner manager with channel-based capacity control
func (*Manager) AvailableCapacity ¶
AvailableCapacity returns the number of available capacity slots
func (*Manager) CancelPrediction ¶
func (*Manager) Concurrency ¶
func (m *Manager) Concurrency() Concurrency
Concurrency returns semaphore-based concurrency info
func (*Manager) ForceKillAll ¶
func (m *Manager) ForceKillAll()
ForceKillAll immediately force-kills all runners and waits briefly for cleanup
func (*Manager) HandleRunnerIPC ¶
func (*Manager) PredictAsync ¶
func (m *Manager) PredictAsync(ctx context.Context, req PredictionRequest) (*PredictionResponse, error)
PredictAsync executes an async prediction request - returns immediately, sends webhook when complete
func (*Manager) PredictSync ¶
func (m *Manager) PredictSync(req PredictionRequest) (*PredictionResponse, error)
PredictSync executes a sync prediction request - blocks until complete
func (*Manager) Schema ¶
Schema returns the appropriate schema - procedure schema for procedure mode, runner schema for non-procedure mode
func (*Manager) SetupResult ¶
func (m *Manager) SetupResult() SetupResult
SetupResult returns setup result for health checks
type MetricsPayload ¶
type PendingPrediction ¶
type PendingPrediction struct {
// contains filtered or unexported fields
}
type PredictionRequest ¶
type PredictionRequest struct {
Input any `json:"input"`
ID string `json:"id"`
CreatedAt string `json:"created_at"`
StartedAt string `json:"started_at"`
Webhook string `json:"webhook,omitempty"`
WebhookEventsFilter []webhook.Event `json:"webhook_events_filter,omitempty"`
OutputFilePrefix string `json:"output_file_prefix,omitempty"`
Context map[string]any `json:"context"`
ProcedureSourceURL string `json:"-"` // this is not sent to the python code, used internally
}
type PredictionResponse ¶
type PredictionResponse struct {
ID string `json:"id"`
Status PredictionStatus `json:"status,omitempty"`
Input any `json:"input"`
Output any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
Logs LogsSlice `json:"logs,omitempty"`
Metrics map[string]any `json:"metrics,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
StartedAt string `json:"started_at,omitempty"`
CompletedAt string `json:"completed_at,omitempty"`
}
type PredictionStatus ¶
type PredictionStatus string
const ( PredictionStarting PredictionStatus = "starting" PredictionProcessing PredictionStatus = "processing" PredictionSucceeded PredictionStatus = "succeeded" PredictionCanceled PredictionStatus = "canceled" PredictionFailed PredictionStatus = "failed" )
func (PredictionStatus) IsCompleted ¶
func (s PredictionStatus) IsCompleted() bool
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
func NewRunner ¶
func NewRunner(ctx context.Context, ctxCancel context.CancelFunc, runnerCtx RunnerContext, command *exec.Cmd, maxConcurrency int, cfg config.Config, logger *logging.Logger) (*Runner, error)
NewRunner creates a new runner instance with the given context
func (*Runner) GracefulShutdown ¶
func (r *Runner) GracefulShutdown()
func (*Runner) WaitForStop ¶
func (r *Runner) WaitForStop()
type RunnerContext ¶
type RunnerContext struct {
// contains filtered or unexported fields
}
RunnerContext contains everything a runner needs to operate
func (*RunnerContext) Cleanup ¶
func (rc *RunnerContext) Cleanup() error
type RunnerID ¶
type RunnerID string
RunnerID is a unique identifier for a runner instance. Format: 8-character base32 string (no leading zeros) Example: "k7m3n8p2", "b9q4x2w1"
func GenerateRunnerID ¶
func GenerateRunnerID() RunnerID
GenerateRunnerID generates a new random runner ID
type SetupResult ¶
type SetupResult struct {
Status SetupStatus `json:"status"`
Logs string `json:"logs,omitempty"`
Schema string `json:"schema,omitempty"`
}
type SetupStatus ¶
type SetupStatus string
const ( SetupSucceeded SetupStatus = "succeeded" SetupFailed SetupStatus = "failed" )