tasksubmit

package
v1.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2026 License: AGPL-3.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusAccepted = 200
	StatusRejected = 400
)

Status codes for task submission responses.

View Source
const (
	TaskStatusNew       = "NEW"
	TaskStatusAccepted  = "ACCEPTED"
	TaskStatusDeclined  = "DECLINED"
	TaskStatusExecuting = "EXECUTING"
	TaskStatusCompleted = "COMPLETED"
	TaskStatusSucceeded = "SUCCEEDED"
	TaskStatusCancelled = "CANCELLED"
	TaskStatusExpired   = "EXPIRED"
)

Task statuses

View Source
const (
	// TaskAcceptTimeout is the maximum time a task can stay in NEW status before being cancelled
	TaskAcceptTimeout = 1 * time.Minute
	// TaskQueueHeadTimeout is the maximum time a task can stay at the head of the queue before expiring
	TaskQueueHeadTimeout = 1 * time.Hour
)

Task timeout constants

View Source
const (
	TypeSubmit       uint32 = 1 // Task submission request
	TypeResult       uint32 = 2 // Task result response
	TypeStatusUpdate uint32 = 3 // Task status update (accept/decline/execute/complete)
	TypeSendResults  uint32 = 4 // Send task results
)

Frame types for task submission on port 1003.

Variables

View Source
var AllowedResultExtensions = map[string]bool{

	".md": true, ".txt": true, ".rtf": true, ".docx": true, ".pdf": true, ".pptx": true,

	".pth": true, ".pt": true, ".onnx": true, ".h5": true, ".pb": true, ".ckpt": true,
	".safetensors": true, ".bin": true,

	".csv": true, ".parquet": true, ".xlsx": true, ".xls": true,

	".jpg": true, ".jpeg": true, ".png": true, ".svg": true, ".gif": true, ".webp": true,
}

Allowed file extensions for results

View Source
var ForbiddenResultExtensions = map[string]bool{
	".go": true, ".py": true, ".js": true, ".ts": true, ".java": true, ".c": true,
	".cpp": true, ".h": true, ".hpp": true, ".rs": true, ".rb": true, ".php": true,
	".swift": true, ".kt": true, ".scala": true, ".sh": true, ".bash": true, ".zsh": true,
	".ps1": true, ".bat": true, ".cmd": true, ".sql": true, ".r": true, ".R": true,
	".lua": true, ".pl": true, ".pm": true, ".ex": true, ".exs": true, ".clj": true,
	".hs": true, ".ml": true, ".fs": true, ".cs": true, ".vb": true, ".dart": true,
}

Forbidden file extensions (source code)

Functions

func GenerateTaskID

func GenerateTaskID() string

GenerateTaskID generates a unique task ID using crypto/rand. Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx (UUID-like format)

func MarshalTaskFile

func MarshalTaskFile(tf *TaskFile) ([]byte, error)

MarshalTaskFile serializes a TaskFile to JSON bytes.

func ParseTime

func ParseTime(s string) (time.Time, error)

ParseTime parses a time string in RFC3339 format.

func TypeName

func TypeName(t uint32) string

TypeName returns a human-readable name for a frame type.

func WriteFrame

func WriteFrame(w io.Writer, f *Frame) error

WriteFrame writes a frame to a writer.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client connects to a remote task submission service on port 1003.

func Dial

func Dial(d *driver.Driver, addr protocol.Addr) (*Client, error)

Dial connects to a remote agent's task submission port.

func (*Client) Close

func (c *Client) Close() error

Close closes the connection.

func (*Client) SendResults

func (c *Client) SendResults(msg *TaskResultMessage) error

SendResults sends task results to the remote agent.

func (*Client) SendStatusUpdate

func (c *Client) SendStatusUpdate(taskID, status, justification string) error

SendStatusUpdate sends a task status update to the remote agent.

func (*Client) SubmitTask

func (c *Client) SubmitTask(taskDescription string, targetAddr string) (*SubmitResponse, error)

SubmitTask sends a task submission request and waits for a response. Returns the task_id assigned to this task.

type Frame

type Frame struct {
	Type    uint32
	Payload []byte
}

Frame is a typed data unit exchanged for task submissions. Wire format: [4-byte type][4-byte length][JSON payload]

func MarshalSubmitRequest

func MarshalSubmitRequest(req *SubmitRequest) (*Frame, error)

MarshalSubmitRequest creates a submit frame from a request.

func MarshalSubmitResponse

func MarshalSubmitResponse(resp *SubmitResponse) (*Frame, error)

MarshalSubmitResponse creates a response frame.

func MarshalTaskResult

func MarshalTaskResult(result *TaskResult) (*Frame, error)

MarshalTaskResult creates a result frame.

func MarshalTaskResultMessage

func MarshalTaskResultMessage(msg *TaskResultMessage) (*Frame, error)

MarshalTaskResultMessage creates a send results frame.

func MarshalTaskStatusUpdate

func MarshalTaskStatusUpdate(update *TaskStatusUpdate) (*Frame, error)

MarshalTaskStatusUpdate creates a status update frame.

func ReadFrame

func ReadFrame(r io.Reader) (*Frame, error)

ReadFrame reads a frame from a reader.

type Handler

type Handler func(conn net.Conn, req *SubmitRequest) bool

Handler is called for each incoming task submission request. It should return true to accept the task, false to reject it.

type PoloScoreBreakdown

type PoloScoreBreakdown struct {
	Base                 float64 `json:"base"`
	CpuBonus             float64 `json:"cpu_bonus"`
	CpuMinutes           float64 `json:"cpu_minutes"`
	IdleFactor           float64 `json:"idle_factor"`
	StagedFactor         float64 `json:"staged_factor"`
	EfficiencyMultiplier float64 `json:"efficiency_multiplier"`
	RawReward            float64 `json:"raw_reward"`
	FinalReward          int     `json:"final_reward"`
}

PoloScoreBreakdown contains the detailed breakdown of the polo score calculation.

type ResultSender

type ResultSender func(result *TaskResult) error

ResultSender is a callback for sending task results back to the submitter.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server listens on port 1003 and dispatches incoming task submissions to a handler.

func NewServer

func NewServer(d *driver.Driver, handler Handler) *Server

NewServer creates a task submission server.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe() error

ListenAndServe binds port 1003 and starts accepting connections.

type SubmitRequest

type SubmitRequest struct {
	TaskID          string `json:"task_id"`
	TaskDescription string `json:"task_description"`
	FromAddr        string `json:"from_addr"`
	ToAddr          string `json:"to_addr"`
}

SubmitRequest represents a task submission request.

func UnmarshalSubmitRequest

func UnmarshalSubmitRequest(f *Frame) (*SubmitRequest, error)

UnmarshalSubmitRequest parses a submit frame into a request.

type SubmitResponse

type SubmitResponse struct {
	TaskID  string `json:"task_id"`
	Status  int    `json:"status"`
	Message string `json:"message"`
}

SubmitResponse represents the response to a task submission.

func UnmarshalSubmitResponse

func UnmarshalSubmitResponse(f *Frame) (*SubmitResponse, error)

UnmarshalSubmitResponse parses a response frame.

type TaskFile

type TaskFile struct {
	TaskID              string `json:"task_id"`
	TaskDescription     string `json:"task_description"`
	CreatedAt           string `json:"created_at"`
	Status              string `json:"status"`
	StatusJustification string `json:"status_justification"`
	From                string `json:"from"`
	To                  string `json:"to"`

	// Time metadata tracking
	AcceptedAt       string `json:"accepted_at,omitempty"`        // When task was accepted/declined
	StagedAt         string `json:"staged_at,omitempty"`          // When task became head of queue
	ExecuteStartedAt string `json:"execute_started_at,omitempty"` // When pilotctl execute was called
	CompletedAt      string `json:"completed_at,omitempty"`       // When results were sent

	// Computed durations (in milliseconds for precision)
	TimeIdleMs   int64 `json:"time_idle_ms,omitempty"`   // Time from creation to accept/decline
	TimeStagedMs int64 `json:"time_staged_ms,omitempty"` // Time at head of queue before execute
	TimeCpuMs    int64 `json:"time_cpu_ms,omitempty"`    // Time spent executing before sending results
}

TaskFile represents a task stored on disk.

func NewTaskFile

func NewTaskFile(taskID, taskDescription, fromAddr, toAddr string) *TaskFile

NewTaskFile creates a new TaskFile with NEW status.

func UnmarshalTaskFile

func UnmarshalTaskFile(data []byte) (*TaskFile, error)

UnmarshalTaskFile deserializes JSON bytes to a TaskFile.

func (*TaskFile) CalculateTimeCpu

func (tf *TaskFile) CalculateTimeCpu()

CalculateTimeCpu calculates and sets time_cpu_ms based on execute start and current time.

func (*TaskFile) CalculateTimeIdle

func (tf *TaskFile) CalculateTimeIdle()

CalculateTimeIdle calculates and sets time_idle_ms based on creation and current time.

func (*TaskFile) CalculateTimeStaged

func (tf *TaskFile) CalculateTimeStaged()

CalculateTimeStaged calculates and sets time_staged_ms based on staged time and current time.

func (*TaskFile) IsExpiredForAccept

func (tf *TaskFile) IsExpiredForAccept() bool

IsExpiredForAccept checks if the task has exceeded the accept timeout (1 minute).

func (*TaskFile) IsExpiredInQueue

func (tf *TaskFile) IsExpiredInQueue() bool

IsExpiredInQueue checks if the task has exceeded the queue head timeout (1 hour).

func (*TaskFile) PoloScoreReward

func (tf *TaskFile) PoloScoreReward() int

PoloScoreReward calculates the polo score reward for a successfully completed task.

The formula uses logarithmic scaling for compute time and proportional penalties for responsiveness, creating a balanced reward system:

reward = (base + cpuBonus) * efficiencyMultiplier

Components:

  • base = 1.0 (guaranteed minimum for completing any task)
  • cpuBonus = log2(1 + cpu_minutes) (logarithmic scaling, no cap)
  • 1 min → +1.0, 3 min → +2.0, 7 min → +3.0, 15 min → +4.0, 31 min → +5.0
  • efficiencyMultiplier = 1.0 - idleFactor - stagedFactor
  • idleFactor = min(time_idle / 60s, 0.3) (up to 30% penalty for slow accept)
  • stagedFactor = min(time_staged / 600s, 0.3) (up to 30% penalty for queue delays)

The efficiency multiplier ranges from 0.4 to 1.0, rewarding responsive agents. Final reward is rounded to nearest integer with minimum of 1.

Examples:

  • Instant accept, instant execute, 1 min CPU → (1+1.0)*1.0 = 2
  • Instant accept, instant execute, 10 min CPU → (1+3.46)*1.0 = 4
  • 30s idle, 5 min staged, 10 min CPU → (1+3.46)*0.55 = 2
  • Instant accept, instant execute, 30 min CPU → (1+4.95)*1.0 = 6

func (*TaskFile) PoloScoreRewardDetailed

func (tf *TaskFile) PoloScoreRewardDetailed() PoloScoreBreakdown

PoloScoreRewardDetailed calculates and returns the detailed polo score breakdown.

func (*TaskFile) TimeSinceCreation

func (tf *TaskFile) TimeSinceCreation() (time.Duration, error)

TimeSinceCreation returns the duration since the task was created.

func (*TaskFile) TimeSinceStaged

func (tf *TaskFile) TimeSinceStaged() (time.Duration, error)

TimeSinceStaged returns the duration since the task was staged (became head of queue).

type TaskResult

type TaskResult struct {
	TaskDescription string      `json:"task_description"`
	Status          string      `json:"status"` // "success" or "error"
	Result          interface{} `json:"result"` // can be string, object, etc.
	Error           string      `json:"error,omitempty"`
	Timestamp       string      `json:"timestamp"`
}

TaskResult represents the result of a completed task (legacy compatibility).

func UnmarshalTaskResult

func UnmarshalTaskResult(f *Frame) (*TaskResult, error)

UnmarshalTaskResult parses a result frame.

type TaskResultMessage

type TaskResultMessage struct {
	TaskID      string `json:"task_id"`
	ResultType  string `json:"result_type"` // "text" or "file"
	ResultText  string `json:"result_text,omitempty"`
	Filename    string `json:"filename,omitempty"`
	FileData    []byte `json:"file_data,omitempty"`
	CompletedAt string `json:"completed_at"`

	// Time metadata for polo score calculation
	TimeIdleMs   int64 `json:"time_idle_ms,omitempty"`   // Time from creation to accept/decline
	TimeStagedMs int64 `json:"time_staged_ms,omitempty"` // Time at head of queue before execute
	TimeCpuMs    int64 `json:"time_cpu_ms,omitempty"`    // Time spent executing before sending results
}

TaskResultMessage represents task results being sent back.

func UnmarshalTaskResultMessage

func UnmarshalTaskResultMessage(f *Frame) (*TaskResultMessage, error)

UnmarshalTaskResultMessage parses a send results frame.

type TaskStatusUpdate

type TaskStatusUpdate struct {
	TaskID        string `json:"task_id"`
	Status        string `json:"status"`
	Justification string `json:"justification"`
}

TaskStatusUpdate represents a status change message.

func UnmarshalTaskStatusUpdate

func UnmarshalTaskStatusUpdate(f *Frame) (*TaskStatusUpdate, error)

UnmarshalTaskStatusUpdate parses a status update frame.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL