Documentation
¶
Index ¶
- Constants
- Variables
- func GenerateTaskID() string
- func MarshalTaskFile(tf *TaskFile) ([]byte, error)
- func ParseTime(s string) (time.Time, error)
- func TypeName(t uint32) string
- func WriteFrame(w io.Writer, f *Frame) error
- type Client
- type Frame
- func MarshalSubmitRequest(req *SubmitRequest) (*Frame, error)
- func MarshalSubmitResponse(resp *SubmitResponse) (*Frame, error)
- func MarshalTaskResult(result *TaskResult) (*Frame, error)
- func MarshalTaskResultMessage(msg *TaskResultMessage) (*Frame, error)
- func MarshalTaskStatusUpdate(update *TaskStatusUpdate) (*Frame, error)
- func ReadFrame(r io.Reader) (*Frame, error)
- type Handler
- type PoloScoreBreakdown
- type ResultSender
- type Server
- type SubmitRequest
- type SubmitResponse
- type TaskFile
- func (tf *TaskFile) CalculateTimeCpu()
- func (tf *TaskFile) CalculateTimeIdle()
- func (tf *TaskFile) CalculateTimeStaged()
- func (tf *TaskFile) IsExpiredForAccept() bool
- func (tf *TaskFile) IsExpiredInQueue() bool
- func (tf *TaskFile) PoloScoreReward() int
- func (tf *TaskFile) PoloScoreRewardDetailed() PoloScoreBreakdown
- func (tf *TaskFile) TimeSinceCreation() (time.Duration, error)
- func (tf *TaskFile) TimeSinceStaged() (time.Duration, error)
- type TaskResult
- type TaskResultMessage
- type TaskStatusUpdate
Constants ¶
const ( StatusAccepted = 200 StatusRejected = 400 )
Status codes for task submission responses.
const ( TaskStatusNew = "NEW" TaskStatusAccepted = "ACCEPTED" TaskStatusDeclined = "DECLINED" TaskStatusExecuting = "EXECUTING" TaskStatusCompleted = "COMPLETED" TaskStatusSucceeded = "SUCCEEDED" TaskStatusCancelled = "CANCELLED" TaskStatusExpired = "EXPIRED" )
Task statuses
const ( // TaskAcceptTimeout is the maximum time a task can stay in NEW status before being cancelled TaskAcceptTimeout = 1 * time.Minute // TaskQueueHeadTimeout is the maximum time a task can stay at the head of the queue before expiring TaskQueueHeadTimeout = 1 * time.Hour )
Task timeout constants
const ( TypeSubmit uint32 = 1 // Task submission request TypeResult uint32 = 2 // Task result response TypeStatusUpdate uint32 = 3 // Task status update (accept/decline/execute/complete) TypeSendResults uint32 = 4 // Send task results )
Frame types for task submission on port 1003.
Variables ¶
var AllowedResultExtensions = map[string]bool{ ".md": true, ".txt": true, ".rtf": true, ".docx": true, ".pdf": true, ".pptx": true, ".pth": true, ".pt": true, ".onnx": true, ".h5": true, ".pb": true, ".ckpt": true, ".safetensors": true, ".bin": true, ".csv": true, ".parquet": true, ".xlsx": true, ".xls": true, ".jpg": true, ".jpeg": true, ".png": true, ".svg": true, ".gif": true, ".webp": true, }
Allowed file extensions for results
var ForbiddenResultExtensions = map[string]bool{ ".go": true, ".py": true, ".js": true, ".ts": true, ".java": true, ".c": true, ".cpp": true, ".h": true, ".hpp": true, ".rs": true, ".rb": true, ".php": true, ".swift": true, ".kt": true, ".scala": true, ".sh": true, ".bash": true, ".zsh": true, ".ps1": true, ".bat": true, ".cmd": true, ".sql": true, ".r": true, ".R": true, ".lua": true, ".pl": true, ".pm": true, ".ex": true, ".exs": true, ".clj": true, ".hs": true, ".ml": true, ".fs": true, ".cs": true, ".vb": true, ".dart": true, }
Forbidden file extensions (source code)
Functions ¶
func GenerateTaskID ¶
func GenerateTaskID() string
GenerateTaskID generates a unique task ID using crypto/rand. Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx (UUID-like format)
func MarshalTaskFile ¶
MarshalTaskFile serializes a TaskFile to JSON bytes.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client connects to a remote task submission service on port 1003.
func (*Client) SendResults ¶
func (c *Client) SendResults(msg *TaskResultMessage) error
SendResults sends task results to the remote agent.
func (*Client) SendStatusUpdate ¶
SendStatusUpdate sends a task status update to the remote agent.
func (*Client) SubmitTask ¶
func (c *Client) SubmitTask(taskDescription string, targetAddr string) (*SubmitResponse, error)
SubmitTask sends a task submission request and waits for a response. Returns the task_id assigned to this task.
type Frame ¶
Frame is a typed data unit exchanged for task submissions. Wire format: [4-byte type][4-byte length][JSON payload]
func MarshalSubmitRequest ¶
func MarshalSubmitRequest(req *SubmitRequest) (*Frame, error)
MarshalSubmitRequest creates a submit frame from a request.
func MarshalSubmitResponse ¶
func MarshalSubmitResponse(resp *SubmitResponse) (*Frame, error)
MarshalSubmitResponse creates a response frame.
func MarshalTaskResult ¶
func MarshalTaskResult(result *TaskResult) (*Frame, error)
MarshalTaskResult creates a result frame.
func MarshalTaskResultMessage ¶
func MarshalTaskResultMessage(msg *TaskResultMessage) (*Frame, error)
MarshalTaskResultMessage creates a send results frame.
func MarshalTaskStatusUpdate ¶
func MarshalTaskStatusUpdate(update *TaskStatusUpdate) (*Frame, error)
MarshalTaskStatusUpdate creates a status update frame.
type Handler ¶
type Handler func(conn net.Conn, req *SubmitRequest) bool
Handler is called for each incoming task submission request. It should return true to accept the task, false to reject it.
type PoloScoreBreakdown ¶
type PoloScoreBreakdown struct {
Base float64 `json:"base"`
CpuBonus float64 `json:"cpu_bonus"`
CpuMinutes float64 `json:"cpu_minutes"`
IdleFactor float64 `json:"idle_factor"`
StagedFactor float64 `json:"staged_factor"`
EfficiencyMultiplier float64 `json:"efficiency_multiplier"`
RawReward float64 `json:"raw_reward"`
FinalReward int `json:"final_reward"`
}
PoloScoreBreakdown contains the detailed breakdown of the polo score calculation.
type ResultSender ¶
type ResultSender func(result *TaskResult) error
ResultSender is a callback for sending task results back to the submitter.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server listens on port 1003 and dispatches incoming task submissions to a handler.
func (*Server) ListenAndServe ¶
ListenAndServe binds port 1003 and starts accepting connections.
type SubmitRequest ¶
type SubmitRequest struct {
TaskID string `json:"task_id"`
TaskDescription string `json:"task_description"`
FromAddr string `json:"from_addr"`
ToAddr string `json:"to_addr"`
}
SubmitRequest represents a task submission request.
func UnmarshalSubmitRequest ¶
func UnmarshalSubmitRequest(f *Frame) (*SubmitRequest, error)
UnmarshalSubmitRequest parses a submit frame into a request.
type SubmitResponse ¶
type SubmitResponse struct {
TaskID string `json:"task_id"`
Status int `json:"status"`
Message string `json:"message"`
}
SubmitResponse represents the response to a task submission.
func UnmarshalSubmitResponse ¶
func UnmarshalSubmitResponse(f *Frame) (*SubmitResponse, error)
UnmarshalSubmitResponse parses a response frame.
type TaskFile ¶
type TaskFile struct {
TaskID string `json:"task_id"`
TaskDescription string `json:"task_description"`
CreatedAt string `json:"created_at"`
Status string `json:"status"`
StatusJustification string `json:"status_justification"`
From string `json:"from"`
To string `json:"to"`
// Time metadata tracking
AcceptedAt string `json:"accepted_at,omitempty"` // When task was accepted/declined
StagedAt string `json:"staged_at,omitempty"` // When task became head of queue
ExecuteStartedAt string `json:"execute_started_at,omitempty"` // When pilotctl execute was called
CompletedAt string `json:"completed_at,omitempty"` // When results were sent
// Computed durations (in milliseconds for precision)
TimeIdleMs int64 `json:"time_idle_ms,omitempty"` // Time from creation to accept/decline
TimeStagedMs int64 `json:"time_staged_ms,omitempty"` // Time at head of queue before execute
TimeCpuMs int64 `json:"time_cpu_ms,omitempty"` // Time spent executing before sending results
}
TaskFile represents a task stored on disk.
func NewTaskFile ¶
NewTaskFile creates a new TaskFile with NEW status.
func UnmarshalTaskFile ¶
UnmarshalTaskFile deserializes JSON bytes to a TaskFile.
func (*TaskFile) CalculateTimeCpu ¶
func (tf *TaskFile) CalculateTimeCpu()
CalculateTimeCpu calculates and sets time_cpu_ms based on execute start and current time.
func (*TaskFile) CalculateTimeIdle ¶
func (tf *TaskFile) CalculateTimeIdle()
CalculateTimeIdle calculates and sets time_idle_ms based on creation and current time.
func (*TaskFile) CalculateTimeStaged ¶
func (tf *TaskFile) CalculateTimeStaged()
CalculateTimeStaged calculates and sets time_staged_ms based on staged time and current time.
func (*TaskFile) IsExpiredForAccept ¶
IsExpiredForAccept checks if the task has exceeded the accept timeout (1 minute).
func (*TaskFile) IsExpiredInQueue ¶
IsExpiredInQueue checks if the task has exceeded the queue head timeout (1 hour).
func (*TaskFile) PoloScoreReward ¶
PoloScoreReward calculates the polo score reward for a successfully completed task.
The formula uses logarithmic scaling for compute time and proportional penalties for responsiveness, creating a balanced reward system:
reward = (base + cpuBonus) * efficiencyMultiplier
Components:
- base = 1.0 (guaranteed minimum for completing any task)
- cpuBonus = log2(1 + cpu_minutes) (logarithmic scaling, no cap)
- 1 min → +1.0, 3 min → +2.0, 7 min → +3.0, 15 min → +4.0, 31 min → +5.0
- efficiencyMultiplier = 1.0 - idleFactor - stagedFactor
- idleFactor = min(time_idle / 60s, 0.3) (up to 30% penalty for slow accept)
- stagedFactor = min(time_staged / 600s, 0.3) (up to 30% penalty for queue delays)
The efficiency multiplier ranges from 0.4 to 1.0, rewarding responsive agents. Final reward is rounded to nearest integer with minimum of 1.
Examples:
- Instant accept, instant execute, 1 min CPU → (1+1.0)*1.0 = 2
- Instant accept, instant execute, 10 min CPU → (1+3.46)*1.0 = 4
- 30s idle, 5 min staged, 10 min CPU → (1+3.46)*0.55 = 2
- Instant accept, instant execute, 30 min CPU → (1+4.95)*1.0 = 6
func (*TaskFile) PoloScoreRewardDetailed ¶
func (tf *TaskFile) PoloScoreRewardDetailed() PoloScoreBreakdown
PoloScoreRewardDetailed calculates and returns the detailed polo score breakdown.
func (*TaskFile) TimeSinceCreation ¶
TimeSinceCreation returns the duration since the task was created.
type TaskResult ¶
type TaskResult struct {
TaskDescription string `json:"task_description"`
Status string `json:"status"` // "success" or "error"
Result interface{} `json:"result"` // can be string, object, etc.
Error string `json:"error,omitempty"`
Timestamp string `json:"timestamp"`
}
TaskResult represents the result of a completed task (legacy compatibility).
func UnmarshalTaskResult ¶
func UnmarshalTaskResult(f *Frame) (*TaskResult, error)
UnmarshalTaskResult parses a result frame.
type TaskResultMessage ¶
type TaskResultMessage struct {
TaskID string `json:"task_id"`
ResultType string `json:"result_type"` // "text" or "file"
ResultText string `json:"result_text,omitempty"`
Filename string `json:"filename,omitempty"`
FileData []byte `json:"file_data,omitempty"`
CompletedAt string `json:"completed_at"`
// Time metadata for polo score calculation
TimeIdleMs int64 `json:"time_idle_ms,omitempty"` // Time from creation to accept/decline
TimeStagedMs int64 `json:"time_staged_ms,omitempty"` // Time at head of queue before execute
TimeCpuMs int64 `json:"time_cpu_ms,omitempty"` // Time spent executing before sending results
}
TaskResultMessage represents task results being sent back.
func UnmarshalTaskResultMessage ¶
func UnmarshalTaskResultMessage(f *Frame) (*TaskResultMessage, error)
UnmarshalTaskResultMessage parses a send results frame.
type TaskStatusUpdate ¶
type TaskStatusUpdate struct {
TaskID string `json:"task_id"`
Status string `json:"status"`
Justification string `json:"justification"`
}
TaskStatusUpdate represents a status change message.
func UnmarshalTaskStatusUpdate ¶
func UnmarshalTaskStatusUpdate(f *Frame) (*TaskStatusUpdate, error)
UnmarshalTaskStatusUpdate parses a status update frame.