Documentation
¶
Index ¶
- Constants
- Variables
- func GenerateTaskID() string
- func MarshalTaskFile(tf *TaskFile) ([]byte, error)
- func ParseTime(s string) (time.Time, error)
- func TypeName(t uint32) string
- func ValidateSubmitRequest(req *SubmitRequest) error
- func ValidateTaskResultMessage(m *TaskResultMessage) error
- func WriteFrame(w io.Writer, f *Frame) error
- type Client
- type Frame
- func MarshalSubmitRequest(req *SubmitRequest) (*Frame, error)
- func MarshalSubmitResponse(resp *SubmitResponse) (*Frame, error)
- func MarshalTaskResult(result *TaskResult) (*Frame, error)
- func MarshalTaskResultMessage(msg *TaskResultMessage) (*Frame, error)
- func MarshalTaskStatusUpdate(update *TaskStatusUpdate) (*Frame, error)
- func ReadFrame(r io.Reader) (*Frame, error)
- type Handler
- type PoloScoreBreakdown
- type ResultSender
- type Server
- type SubmitRequest
- type SubmitResponse
- type TaskFile
- func (tf *TaskFile) CalculateTimeCpu()
- func (tf *TaskFile) CalculateTimeIdle()
- func (tf *TaskFile) CalculateTimeStaged()
- func (tf *TaskFile) IsAcceptedStalled() bool
- func (tf *TaskFile) IsExpiredForAccept() bool
- func (tf *TaskFile) IsExpiredInQueue() bool
- func (tf *TaskFile) PoloScoreReward() int
- func (tf *TaskFile) PoloScoreRewardDetailed() PoloScoreBreakdown
- func (tf *TaskFile) TimeSinceCreation() (time.Duration, error)
- func (tf *TaskFile) TimeSinceStaged() (time.Duration, error)
- type TaskResult
- type TaskResultMessage
- type TaskStatusUpdate
Constants ¶
const ( StatusAccepted = 200 StatusRejected = 400 )
Status codes for task submission responses.
const ( TaskStatusNew = "NEW" TaskStatusAccepted = "ACCEPTED" TaskStatusDeclined = "DECLINED" TaskStatusExecuting = "EXECUTING" TaskStatusCompleted = "COMPLETED" TaskStatusSucceeded = "SUCCEEDED" TaskStatusCancelled = "CANCELLED" TaskStatusExpired = "EXPIRED" )
Task statuses
const ( // TaskAcceptTimeout is the maximum time a task can stay in NEW status before being cancelled TaskAcceptTimeout = 1 * time.Minute // TaskQueueHeadTimeout is the maximum time a task can stay at the head of the queue before expiring TaskQueueHeadTimeout = 1 * time.Hour // TaskAcceptedStallTimeout bounds how long a submitter will wait for // a result message after the task has been accepted by the receiver. // If the receiver dies between accept and send-results, the submitter // times out and marks the task EXPIRED so it doesn't appear stuck. TaskAcceptedStallTimeout = 1 * time.Minute )
Task timeout constants
const ( TypeSubmit uint32 = 1 // Task submission request TypeResult uint32 = 2 // Task result response TypeStatusUpdate uint32 = 3 // Task status update (accept/decline/execute/complete) TypeSendResults uint32 = 4 // Send task results )
Frame types for task submission on port 1003.
const ( MaxTaskDescription = 16 * 1024 // 16 KiB — any reasonable prompt/description MaxTaskResultText = 1 * 1024 * 1024 // 1 MiB — inline text results MaxTaskResultFilename = 256 // filesystem-safe length cap MaxTaskResultFileBytes = 15 * 1024 * 1024 // ~15 MiB; frame cap is 16 MiB MaxTaskJustification = 4 * 1024 // status/decline reasons )
Submission-size bounds. Issue #198: without these the receiver allocates whatever the sender sends, so a single hostile submit can OOM the daemon. The frame reader already caps the whole frame at 16 MiB; these are tighter semantic bounds checked before the content is persisted.
Variables ¶
var AllowedResultExtensions = map[string]bool{ ".md": true, ".txt": true, ".rtf": true, ".docx": true, ".pdf": true, ".pptx": true, ".pth": true, ".pt": true, ".onnx": true, ".h5": true, ".pb": true, ".ckpt": true, ".safetensors": true, ".bin": true, ".csv": true, ".parquet": true, ".xlsx": true, ".xls": true, ".jpg": true, ".jpeg": true, ".png": true, ".svg": true, ".gif": true, ".webp": true, }
Allowed file extensions for results
var ForbiddenResultExtensions = map[string]bool{ ".go": true, ".py": true, ".js": true, ".ts": true, ".java": true, ".c": true, ".cpp": true, ".h": true, ".hpp": true, ".rs": true, ".rb": true, ".php": true, ".swift": true, ".kt": true, ".scala": true, ".sh": true, ".bash": true, ".zsh": true, ".ps1": true, ".bat": true, ".cmd": true, ".sql": true, ".r": true, ".R": true, ".lua": true, ".pl": true, ".pm": true, ".ex": true, ".exs": true, ".clj": true, ".hs": true, ".ml": true, ".fs": true, ".cs": true, ".vb": true, ".dart": true, }
Forbidden file extensions (source code)
Functions ¶
func GenerateTaskID ¶
func GenerateTaskID() string
GenerateTaskID generates a unique task ID using crypto/rand. Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx (UUID-like format)
func MarshalTaskFile ¶
MarshalTaskFile serializes a TaskFile to JSON bytes.
func ValidateSubmitRequest ¶
func ValidateSubmitRequest(req *SubmitRequest) error
ValidateSubmitRequest rejects submissions whose description exceeds the server-side cap. Called before persisting the task to disk.
func ValidateTaskResultMessage ¶
func ValidateTaskResultMessage(m *TaskResultMessage) error
ValidateTaskResultMessage rejects result payloads that would blow past the per-field caps.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client connects to a remote task submission service on port 1003.
func (*Client) SendResults ¶
func (c *Client) SendResults(msg *TaskResultMessage) error
SendResults sends task results to the remote agent.
func (*Client) SendStatusUpdate ¶
SendStatusUpdate sends a task status update to the remote agent.
func (*Client) SubmitTask ¶
func (c *Client) SubmitTask(taskDescription string, targetAddr string) (*SubmitResponse, error)
SubmitTask sends a task submission request and waits for a response. Returns the task_id assigned to this task.
type Frame ¶
Frame is a typed data unit exchanged for task submissions. Wire format: [4-byte type][4-byte length][JSON payload]
func MarshalSubmitRequest ¶
func MarshalSubmitRequest(req *SubmitRequest) (*Frame, error)
MarshalSubmitRequest creates a submit frame from a request.
func MarshalSubmitResponse ¶
func MarshalSubmitResponse(resp *SubmitResponse) (*Frame, error)
MarshalSubmitResponse creates a response frame.
func MarshalTaskResult ¶
func MarshalTaskResult(result *TaskResult) (*Frame, error)
MarshalTaskResult creates a result frame.
func MarshalTaskResultMessage ¶
func MarshalTaskResultMessage(msg *TaskResultMessage) (*Frame, error)
MarshalTaskResultMessage creates a send results frame.
func MarshalTaskStatusUpdate ¶
func MarshalTaskStatusUpdate(update *TaskStatusUpdate) (*Frame, error)
MarshalTaskStatusUpdate creates a status update frame.
type Handler ¶
type Handler func(conn net.Conn, req *SubmitRequest) bool
Handler is called for each incoming task submission request. It should return true to accept the task, false to reject it.
type PoloScoreBreakdown ¶
type PoloScoreBreakdown struct {
Base float64 `json:"base"`
CpuBonus float64 `json:"cpu_bonus"`
CpuMinutes float64 `json:"cpu_minutes"`
IdleFactor float64 `json:"idle_factor"`
StagedFactor float64 `json:"staged_factor"`
EfficiencyMultiplier float64 `json:"efficiency_multiplier"`
RawReward float64 `json:"raw_reward"`
FinalReward int `json:"final_reward"`
}
PoloScoreBreakdown contains the detailed breakdown of the polo score calculation.
type ResultSender ¶
type ResultSender func(result *TaskResult) error
ResultSender is a callback for sending task results back to the submitter.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server listens on port 1003 and dispatches incoming task submissions to a handler.
func (*Server) ListenAndServe ¶
ListenAndServe binds port 1003 and starts accepting connections.
type SubmitRequest ¶
type SubmitRequest struct {
TaskID string `json:"task_id"`
TaskDescription string `json:"task_description"`
FromAddr string `json:"from_addr"`
ToAddr string `json:"to_addr"`
}
SubmitRequest represents a task submission request.
func UnmarshalSubmitRequest ¶
func UnmarshalSubmitRequest(f *Frame) (*SubmitRequest, error)
UnmarshalSubmitRequest parses a submit frame into a request.
type SubmitResponse ¶
type SubmitResponse struct {
TaskID string `json:"task_id"`
Status int `json:"status"`
Message string `json:"message"`
}
SubmitResponse represents the response to a task submission.
func UnmarshalSubmitResponse ¶
func UnmarshalSubmitResponse(f *Frame) (*SubmitResponse, error)
UnmarshalSubmitResponse parses a response frame.
type TaskFile ¶
type TaskFile struct {
TaskID string `json:"task_id"`
TaskDescription string `json:"task_description"`
CreatedAt string `json:"created_at"`
Status string `json:"status"`
StatusJustification string `json:"status_justification"`
From string `json:"from"`
To string `json:"to"`
// Time metadata tracking
AcceptedAt string `json:"accepted_at,omitempty"` // When task was accepted/declined
StagedAt string `json:"staged_at,omitempty"` // When task became head of queue
ExecuteStartedAt string `json:"execute_started_at,omitempty"` // When pilotctl execute was called
CompletedAt string `json:"completed_at,omitempty"` // When results were sent
// Computed durations (in milliseconds for precision)
TimeIdleMs int64 `json:"time_idle_ms,omitempty"` // Time from creation to accept/decline
TimeStagedMs int64 `json:"time_staged_ms,omitempty"` // Time at head of queue before execute
TimeCpuMs int64 `json:"time_cpu_ms,omitempty"` // Time spent executing before sending results
// Result payload as received by the submitter.
ResultType string `json:"result_type,omitempty"` // "text" or "file"
Results string `json:"results,omitempty"` // text result body (or filename if file)
}
TaskFile represents a task stored on disk.
func NewTaskFile ¶
NewTaskFile creates a new TaskFile with NEW status.
func UnmarshalTaskFile ¶
UnmarshalTaskFile deserializes JSON bytes to a TaskFile.
func (*TaskFile) CalculateTimeCpu ¶
func (tf *TaskFile) CalculateTimeCpu()
CalculateTimeCpu calculates and sets time_cpu_ms based on execute start and current time.
func (*TaskFile) CalculateTimeIdle ¶
func (tf *TaskFile) CalculateTimeIdle()
CalculateTimeIdle calculates and sets time_idle_ms based on creation and current time.
func (*TaskFile) CalculateTimeStaged ¶
func (tf *TaskFile) CalculateTimeStaged()
CalculateTimeStaged calculates and sets time_staged_ms based on staged time and current time.
func (*TaskFile) IsAcceptedStalled ¶
IsAcceptedStalled checks (on the submitter side) whether an ACCEPTED task has been waiting for results past TaskAcceptedStallTimeout. Used to bound the task lifecycle when the receiver dies between accept and send-results — without this the submitter would otherwise hold the task in ACCEPTED forever.
func (*TaskFile) IsExpiredForAccept ¶
IsExpiredForAccept checks if the task has exceeded the accept timeout (1 minute).
func (*TaskFile) IsExpiredInQueue ¶
IsExpiredInQueue checks if the task has exceeded the queue head timeout (1 hour).
func (*TaskFile) PoloScoreReward ¶
PoloScoreReward calculates the polo score reward for a successfully completed task.
The formula uses logarithmic scaling for compute time and proportional penalties for responsiveness, creating a balanced reward system:
reward = (base + cpuBonus) * efficiencyMultiplier
Components:
- base = 1.0 (guaranteed minimum for completing any task)
- cpuBonus = log2(1 + cpu_minutes) (logarithmic scaling, no cap)
- 1 min → +1.0, 3 min → +2.0, 7 min → +3.0, 15 min → +4.0, 31 min → +5.0
- efficiencyMultiplier = 1.0 - idleFactor - stagedFactor
- idleFactor = min(time_idle / 60s, 0.3) (up to 30% penalty for slow accept)
- stagedFactor = min(time_staged / 600s, 0.3) (up to 30% penalty for queue delays)
The efficiency multiplier ranges from 0.4 to 1.0, rewarding responsive agents. Final reward is rounded to nearest integer with minimum of 1.
Examples:
- Instant accept, instant execute, 1 min CPU → (1+1.0)*1.0 = 2
- Instant accept, instant execute, 10 min CPU → (1+3.46)*1.0 = 4
- 30s idle, 5 min staged, 10 min CPU → (1+3.46)*0.55 = 2
- Instant accept, instant execute, 30 min CPU → (1+4.95)*1.0 = 6
func (*TaskFile) PoloScoreRewardDetailed ¶
func (tf *TaskFile) PoloScoreRewardDetailed() PoloScoreBreakdown
PoloScoreRewardDetailed calculates and returns the detailed polo score breakdown.
func (*TaskFile) TimeSinceCreation ¶
TimeSinceCreation returns the duration since the task was created.
type TaskResult ¶
type TaskResult struct {
TaskDescription string `json:"task_description"`
Status string `json:"status"` // "success" or "error"
Result interface{} `json:"result"` // can be string, object, etc.
Error string `json:"error,omitempty"`
Timestamp string `json:"timestamp"`
}
TaskResult represents the result of a completed task (legacy compatibility).
func UnmarshalTaskResult ¶
func UnmarshalTaskResult(f *Frame) (*TaskResult, error)
UnmarshalTaskResult parses a result frame.
type TaskResultMessage ¶
type TaskResultMessage struct {
TaskID string `json:"task_id"`
ResultType string `json:"result_type"` // "text" or "file"
ResultText string `json:"result_text,omitempty"`
Filename string `json:"filename,omitempty"`
FileData []byte `json:"file_data,omitempty"`
CompletedAt string `json:"completed_at"`
// Time metadata for polo score calculation
TimeIdleMs int64 `json:"time_idle_ms,omitempty"` // Time from creation to accept/decline
TimeStagedMs int64 `json:"time_staged_ms,omitempty"` // Time at head of queue before execute
TimeCpuMs int64 `json:"time_cpu_ms,omitempty"` // Time spent executing before sending results
}
TaskResultMessage represents task results being sent back.
func UnmarshalTaskResultMessage ¶
func UnmarshalTaskResultMessage(f *Frame) (*TaskResultMessage, error)
UnmarshalTaskResultMessage parses a send results frame.
type TaskStatusUpdate ¶
type TaskStatusUpdate struct {
TaskID string `json:"task_id"`
Status string `json:"status"`
Justification string `json:"justification"`
}
TaskStatusUpdate represents a status change message.
func UnmarshalTaskStatusUpdate ¶
func UnmarshalTaskStatusUpdate(f *Frame) (*TaskStatusUpdate, error)
UnmarshalTaskStatusUpdate parses a status update frame.