tasksubmit

package
v1.9.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: AGPL-3.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusAccepted = 200
	StatusRejected = 400
)

Status codes for task submission responses.

View Source
const (
	TaskStatusNew       = "NEW"
	TaskStatusAccepted  = "ACCEPTED"
	TaskStatusDeclined  = "DECLINED"
	TaskStatusExecuting = "EXECUTING"
	TaskStatusCompleted = "COMPLETED"
	TaskStatusSucceeded = "SUCCEEDED"
	TaskStatusCancelled = "CANCELLED"
	TaskStatusExpired   = "EXPIRED"
)

Task statuses

View Source
const (
	// TaskAcceptTimeout is the maximum time a task can stay in NEW status before being cancelled
	TaskAcceptTimeout = 1 * time.Minute
	// TaskQueueHeadTimeout is the maximum time a task can stay at the head of the queue before expiring
	TaskQueueHeadTimeout = 1 * time.Hour
	// TaskAcceptedStallTimeout bounds how long a submitter will wait for
	// a result message after the task has been accepted by the receiver.
	// If the receiver dies between accept and send-results, the submitter
	// times out and marks the task EXPIRED so it doesn't appear stuck.
	TaskAcceptedStallTimeout = 1 * time.Minute
)

Task timeout constants

View Source
const (
	TypeSubmit       uint32 = 1 // Task submission request
	TypeResult       uint32 = 2 // Task result response
	TypeStatusUpdate uint32 = 3 // Task status update (accept/decline/execute/complete)
	TypeSendResults  uint32 = 4 // Send task results
)

Frame types for task submission on port 1003.

View Source
const (
	MaxTaskDescription     = 16 * 1024        // 16 KiB — any reasonable prompt/description
	MaxTaskResultText      = 1 * 1024 * 1024  // 1 MiB — inline text results
	MaxTaskResultFilename  = 256              // filesystem-safe length cap
	MaxTaskResultFileBytes = 15 * 1024 * 1024 // ~15 MiB; frame cap is 16 MiB
	MaxTaskJustification   = 4 * 1024         // status/decline reasons
)

Submission-size bounds. Issue #198: without these the receiver allocates whatever the sender sends, so a single hostile submit can OOM the daemon. The frame reader already caps the whole frame at 16 MiB; these are tighter semantic bounds checked before the content is persisted.

Variables

View Source
var AllowedResultExtensions = map[string]bool{

	".md": true, ".txt": true, ".rtf": true, ".docx": true, ".pdf": true, ".pptx": true,

	".pth": true, ".pt": true, ".onnx": true, ".h5": true, ".pb": true, ".ckpt": true,
	".safetensors": true, ".bin": true,

	".csv": true, ".parquet": true, ".xlsx": true, ".xls": true,

	".jpg": true, ".jpeg": true, ".png": true, ".svg": true, ".gif": true, ".webp": true,
}

Allowed file extensions for results

View Source
var ForbiddenResultExtensions = map[string]bool{
	".go": true, ".py": true, ".js": true, ".ts": true, ".java": true, ".c": true,
	".cpp": true, ".h": true, ".hpp": true, ".rs": true, ".rb": true, ".php": true,
	".swift": true, ".kt": true, ".scala": true, ".sh": true, ".bash": true, ".zsh": true,
	".ps1": true, ".bat": true, ".cmd": true, ".sql": true, ".r": true, ".R": true,
	".lua": true, ".pl": true, ".pm": true, ".ex": true, ".exs": true, ".clj": true,
	".hs": true, ".ml": true, ".fs": true, ".cs": true, ".vb": true, ".dart": true,
}

Forbidden file extensions (source code)

Functions

func GenerateTaskID

func GenerateTaskID() string

GenerateTaskID generates a unique task ID using crypto/rand. Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx (UUID-like format)

func MarshalTaskFile

func MarshalTaskFile(tf *TaskFile) ([]byte, error)

MarshalTaskFile serializes a TaskFile to JSON bytes.

func ParseTime

func ParseTime(s string) (time.Time, error)

ParseTime parses a time string in RFC3339 format.

func TypeName

func TypeName(t uint32) string

TypeName returns a human-readable name for a frame type.

func ValidateSubmitRequest

func ValidateSubmitRequest(req *SubmitRequest) error

ValidateSubmitRequest rejects submissions whose description exceeds the server-side cap. Called before persisting the task to disk.

func ValidateTaskResultMessage

func ValidateTaskResultMessage(m *TaskResultMessage) error

ValidateTaskResultMessage rejects result payloads that would blow past the per-field caps.

func WriteFrame

func WriteFrame(w io.Writer, f *Frame) error

WriteFrame writes a frame to a writer.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client connects to a remote task submission service on port 1003.

func Dial

func Dial(d *driver.Driver, addr protocol.Addr) (*Client, error)

Dial connects to a remote agent's task submission port.

func (*Client) Close

func (c *Client) Close() error

Close closes the connection.

func (*Client) SendResults

func (c *Client) SendResults(msg *TaskResultMessage) error

SendResults sends task results to the remote agent.

func (*Client) SendStatusUpdate

func (c *Client) SendStatusUpdate(taskID, status, justification string) error

SendStatusUpdate sends a task status update to the remote agent.

func (*Client) SubmitTask

func (c *Client) SubmitTask(taskDescription string, targetAddr string) (*SubmitResponse, error)

SubmitTask sends a task submission request and waits for a response. Returns the task_id assigned to this task.

type Frame

type Frame struct {
	Type    uint32
	Payload []byte
}

Frame is a typed data unit exchanged for task submissions. Wire format: [4-byte type][4-byte length][JSON payload]

func MarshalSubmitRequest

func MarshalSubmitRequest(req *SubmitRequest) (*Frame, error)

MarshalSubmitRequest creates a submit frame from a request.

func MarshalSubmitResponse

func MarshalSubmitResponse(resp *SubmitResponse) (*Frame, error)

MarshalSubmitResponse creates a response frame.

func MarshalTaskResult

func MarshalTaskResult(result *TaskResult) (*Frame, error)

MarshalTaskResult creates a result frame.

func MarshalTaskResultMessage

func MarshalTaskResultMessage(msg *TaskResultMessage) (*Frame, error)

MarshalTaskResultMessage creates a send results frame.

func MarshalTaskStatusUpdate

func MarshalTaskStatusUpdate(update *TaskStatusUpdate) (*Frame, error)

MarshalTaskStatusUpdate creates a status update frame.

func ReadFrame

func ReadFrame(r io.Reader) (*Frame, error)

ReadFrame reads a frame from a reader.

type Handler

type Handler func(conn net.Conn, req *SubmitRequest) bool

Handler is called for each incoming task submission request. It should return true to accept the task, false to reject it.

type PoloScoreBreakdown

type PoloScoreBreakdown struct {
	Base                 float64 `json:"base"`
	CpuBonus             float64 `json:"cpu_bonus"`
	CpuMinutes           float64 `json:"cpu_minutes"`
	IdleFactor           float64 `json:"idle_factor"`
	StagedFactor         float64 `json:"staged_factor"`
	EfficiencyMultiplier float64 `json:"efficiency_multiplier"`
	RawReward            float64 `json:"raw_reward"`
	FinalReward          int     `json:"final_reward"`
}

PoloScoreBreakdown contains the detailed breakdown of the polo score calculation.

type ResultSender

type ResultSender func(result *TaskResult) error

ResultSender is a callback for sending task results back to the submitter.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server listens on port 1003 and dispatches incoming task submissions to a handler.

func NewServer

func NewServer(d *driver.Driver, handler Handler) *Server

NewServer creates a task submission server.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe() error

ListenAndServe binds port 1003 and starts accepting connections.

type SubmitRequest

type SubmitRequest struct {
	TaskID          string `json:"task_id"`
	TaskDescription string `json:"task_description"`
	FromAddr        string `json:"from_addr"`
	ToAddr          string `json:"to_addr"`
}

SubmitRequest represents a task submission request.

func UnmarshalSubmitRequest

func UnmarshalSubmitRequest(f *Frame) (*SubmitRequest, error)

UnmarshalSubmitRequest parses a submit frame into a request.

type SubmitResponse

type SubmitResponse struct {
	TaskID  string `json:"task_id"`
	Status  int    `json:"status"`
	Message string `json:"message"`
}

SubmitResponse represents the response to a task submission.

func UnmarshalSubmitResponse

func UnmarshalSubmitResponse(f *Frame) (*SubmitResponse, error)

UnmarshalSubmitResponse parses a response frame.

type TaskFile

type TaskFile struct {
	TaskID              string `json:"task_id"`
	TaskDescription     string `json:"task_description"`
	CreatedAt           string `json:"created_at"`
	Status              string `json:"status"`
	StatusJustification string `json:"status_justification"`
	From                string `json:"from"`
	To                  string `json:"to"`

	// Time metadata tracking
	AcceptedAt       string `json:"accepted_at,omitempty"`        // When task was accepted/declined
	StagedAt         string `json:"staged_at,omitempty"`          // When task became head of queue
	ExecuteStartedAt string `json:"execute_started_at,omitempty"` // When pilotctl execute was called
	CompletedAt      string `json:"completed_at,omitempty"`       // When results were sent

	// Computed durations (in milliseconds for precision)
	TimeIdleMs   int64 `json:"time_idle_ms,omitempty"`   // Time from creation to accept/decline
	TimeStagedMs int64 `json:"time_staged_ms,omitempty"` // Time at head of queue before execute
	TimeCpuMs    int64 `json:"time_cpu_ms,omitempty"`    // Time spent executing before sending results

	// Result payload as received by the submitter.
	ResultType string `json:"result_type,omitempty"` // "text" or "file"
	Results    string `json:"results,omitempty"`     // text result body (or filename if file)
}

TaskFile represents a task stored on disk.

func NewTaskFile

func NewTaskFile(taskID, taskDescription, fromAddr, toAddr string) *TaskFile

NewTaskFile creates a new TaskFile with NEW status.

func UnmarshalTaskFile

func UnmarshalTaskFile(data []byte) (*TaskFile, error)

UnmarshalTaskFile deserializes JSON bytes to a TaskFile.

func (*TaskFile) CalculateTimeCpu

func (tf *TaskFile) CalculateTimeCpu()

CalculateTimeCpu calculates and sets time_cpu_ms based on execute start and current time.

func (*TaskFile) CalculateTimeIdle

func (tf *TaskFile) CalculateTimeIdle()

CalculateTimeIdle calculates and sets time_idle_ms based on creation and current time.

func (*TaskFile) CalculateTimeStaged

func (tf *TaskFile) CalculateTimeStaged()

CalculateTimeStaged calculates and sets time_staged_ms based on staged time and current time.

func (*TaskFile) IsAcceptedStalled

func (tf *TaskFile) IsAcceptedStalled() bool

IsAcceptedStalled checks (on the submitter side) whether an ACCEPTED task has been waiting for results past TaskAcceptedStallTimeout. Used to bound the task lifecycle when the receiver dies between accept and send-results — without this the submitter would otherwise hold the task in ACCEPTED forever.

func (*TaskFile) IsExpiredForAccept

func (tf *TaskFile) IsExpiredForAccept() bool

IsExpiredForAccept checks if the task has exceeded the accept timeout (1 minute).

func (*TaskFile) IsExpiredInQueue

func (tf *TaskFile) IsExpiredInQueue() bool

IsExpiredInQueue checks if the task has exceeded the queue head timeout (1 hour).

func (*TaskFile) PoloScoreReward

func (tf *TaskFile) PoloScoreReward() int

PoloScoreReward calculates the polo score reward for a successfully completed task.

The formula uses logarithmic scaling for compute time and proportional penalties for responsiveness, creating a balanced reward system:

reward = (base + cpuBonus) * efficiencyMultiplier

Components:

  • base = 1.0 (guaranteed minimum for completing any task)
  • cpuBonus = log2(1 + cpu_minutes) (logarithmic scaling, no cap)
  • 1 min → +1.0, 3 min → +2.0, 7 min → +3.0, 15 min → +4.0, 31 min → +5.0
  • efficiencyMultiplier = 1.0 - idleFactor - stagedFactor
  • idleFactor = min(time_idle / 60s, 0.3) (up to 30% penalty for slow accept)
  • stagedFactor = min(time_staged / 600s, 0.3) (up to 30% penalty for queue delays)

The efficiency multiplier ranges from 0.4 to 1.0, rewarding responsive agents. Final reward is rounded to nearest integer with minimum of 1.

Examples:

  • Instant accept, instant execute, 1 min CPU → (1+1.0)*1.0 = 2
  • Instant accept, instant execute, 10 min CPU → (1+3.46)*1.0 = 4
  • 30s idle, 5 min staged, 10 min CPU → (1+3.46)*0.55 = 2
  • Instant accept, instant execute, 30 min CPU → (1+4.95)*1.0 = 6

func (*TaskFile) PoloScoreRewardDetailed

func (tf *TaskFile) PoloScoreRewardDetailed() PoloScoreBreakdown

PoloScoreRewardDetailed calculates and returns the detailed polo score breakdown.

func (*TaskFile) TimeSinceCreation

func (tf *TaskFile) TimeSinceCreation() (time.Duration, error)

TimeSinceCreation returns the duration since the task was created.

func (*TaskFile) TimeSinceStaged

func (tf *TaskFile) TimeSinceStaged() (time.Duration, error)

TimeSinceStaged returns the duration since the task was staged (became head of queue).

type TaskResult

type TaskResult struct {
	TaskDescription string      `json:"task_description"`
	Status          string      `json:"status"` // "success" or "error"
	Result          interface{} `json:"result"` // can be string, object, etc.
	Error           string      `json:"error,omitempty"`
	Timestamp       string      `json:"timestamp"`
}

TaskResult represents the result of a completed task (legacy compatibility).

func UnmarshalTaskResult

func UnmarshalTaskResult(f *Frame) (*TaskResult, error)

UnmarshalTaskResult parses a result frame.

type TaskResultMessage

type TaskResultMessage struct {
	TaskID      string `json:"task_id"`
	ResultType  string `json:"result_type"` // "text" or "file"
	ResultText  string `json:"result_text,omitempty"`
	Filename    string `json:"filename,omitempty"`
	FileData    []byte `json:"file_data,omitempty"`
	CompletedAt string `json:"completed_at"`

	// Time metadata for polo score calculation
	TimeIdleMs   int64 `json:"time_idle_ms,omitempty"`   // Time from creation to accept/decline
	TimeStagedMs int64 `json:"time_staged_ms,omitempty"` // Time at head of queue before execute
	TimeCpuMs    int64 `json:"time_cpu_ms,omitempty"`    // Time spent executing before sending results
}

TaskResultMessage represents task results being sent back.

func UnmarshalTaskResultMessage

func UnmarshalTaskResultMessage(f *Frame) (*TaskResultMessage, error)

UnmarshalTaskResultMessage parses a send results frame.

type TaskStatusUpdate

type TaskStatusUpdate struct {
	TaskID        string `json:"task_id"`
	Status        string `json:"status"`
	Justification string `json:"justification"`
}

TaskStatusUpdate represents a status change message.

func UnmarshalTaskStatusUpdate

func UnmarshalTaskStatusUpdate(f *Frame) (*TaskStatusUpdate, error)

UnmarshalTaskStatusUpdate parses a status update frame.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL