async

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const EventJobType = "event"
View Source
const MemoryReconcileJobType = "memory.reconcile"
View Source
const ResumeContinueJobType = "resume.continue"
View Source
const RunJobType = "run"

Variables

View Source
var (
	ErrJobNotFound = errors.New("async: job not found")
	ErrStaleLease  = errors.New("async: stale job lease")
	ErrInvalidJob  = errors.New("async: invalid job")
)

Functions

This section is empty.

Types

type EventPayload added in v0.1.2

type EventPayload struct {
	Type        string             `json:"type"`
	RunID       string             `json:"run_id,omitempty"`
	Payload     json.RawMessage    `json:"payload,omitempty"`
	MaxAttempts int                `json:"max_attempts,omitempty"`
	Metadata    map[string]string  `json:"metadata,omitempty"`
	Principal   identity.Principal `json:"principal,omitempty"`
}

func (EventPayload) Event added in v0.1.2

func (payload EventPayload) Event() eventrouter.Event

type Handler

type Handler interface {
	HandleJob(ctx context.Context, job Job) error
}

type HandlerFunc

type HandlerFunc func(ctx context.Context, job Job) error

func (HandlerFunc) HandleJob

func (fn HandlerFunc) HandleJob(ctx context.Context, job Job) error

type Job

type Job struct {
	ID             string          `json:"id"`
	Type           string          `json:"type"`
	RunID          string          `json:"run_id,omitempty"`
	Payload        json.RawMessage `json:"payload,omitempty"`
	State          JobState        `json:"state"`
	Attempts       int             `json:"attempts"`
	MaxAttempts    int             `json:"max_attempts"`
	LastError      string          `json:"last_error,omitempty"`
	CreatedAt      time.Time       `json:"created_at"`
	UpdatedAt      time.Time       `json:"updated_at"`
	AvailableAt    time.Time       `json:"available_at"`
	LeaseWorkerID  string          `json:"lease_worker_id,omitempty"`
	LeaseExpiresAt time.Time       `json:"lease_expires_at,omitempty"`
}

func CloneJob

func CloneJob(job Job) Job

func (Job) Validate

func (job Job) Validate() error

type JobAdmin added in v0.1.4

type JobAdmin interface {
	ListJobs(ctx context.Context, filter JobFilter) ([]Job, error)
	Requeue(ctx context.Context, jobID string) error
}

JobAdmin extends Queue with dead-letter inspection and requeue support.

type JobFilter added in v0.1.4

type JobFilter struct {
	State JobState
	Limit int
}

type JobState

type JobState string
const (
	JobQueued     JobState = "queued"
	JobRunning    JobState = "running"
	JobCompleted  JobState = "completed"
	JobFailed     JobState = "failed"
	JobCancelled  JobState = "cancelled"
	JobDeadLetter JobState = "dead_letter"
)

type Lease

type Lease struct {
	JobID     string
	WorkerID  string
	Attempt   int
	ExpiresAt time.Time
}

func (Lease) Validate

func (lease Lease) Validate() error

type LeaseRenewer

type LeaseRenewer interface {
	Renew(ctx context.Context, lease Lease, ttl time.Duration) (Lease, bool, error)
}

type MemoryReconcilePayload added in v0.1.9

type MemoryReconcilePayload struct {
	MemoryName string             `json:"memory_name"`
	Agent      string             `json:"agent"`
	RunID      string             `json:"run_id,omitempty"`
	Principal  identity.Principal `json:"principal,omitempty"`
}

func (MemoryReconcilePayload) MarshalJSONBytes added in v0.1.9

func (payload MemoryReconcilePayload) MarshalJSONBytes() (json.RawMessage, error)

type Queue

type Queue interface {
	Enqueue(ctx context.Context, job Job) (Job, error)
	Lease(ctx context.Context, workerID string, ttl time.Duration) (Lease, bool, error)
	Load(ctx context.Context, jobID string) (Job, error)
	Complete(ctx context.Context, lease Lease) error
	Fail(ctx context.Context, lease Lease, cause error) error
	Cancel(ctx context.Context, jobID string) error
}

type QueueMetrics added in v0.1.4

type QueueMetrics struct {
	Queued     int
	Running    int
	DeadLetter int
}

QueueMetrics summarizes queue depth by state.

func CollectQueueMetrics added in v0.1.4

func CollectQueueMetrics(ctx context.Context, queue Queue) (QueueMetrics, error)

CollectQueueMetrics counts jobs by state when the queue implements JobAdmin.

type ResumeContinuePayload added in v0.1.2

type ResumeContinuePayload struct {
	Token       string             `json:"token"`
	Decision    core.Decision      `json:"decision"`
	Amendment   json.RawMessage    `json:"amendment,omitempty"`
	RunID       string             `json:"run_id,omitempty"`
	MaxAttempts int                `json:"max_attempts,omitempty"`
	Metadata    map[string]string  `json:"metadata,omitempty"`
	Principal   identity.Principal `json:"principal,omitempty"`
}

type RunPayload

type RunPayload struct {
	RunID       string             `json:"run_id,omitempty"`
	ScenarioID  string             `json:"scenario_id,omitempty"`
	Scenario    json.RawMessage    `json:"scenario,omitempty"`
	Agent       string             `json:"agent,omitempty"`
	Prompt      string             `json:"prompt,omitempty"`
	Context     json.RawMessage    `json:"context,omitempty"`
	MaxAttempts int                `json:"max_attempts,omitempty"`
	Metadata    map[string]string  `json:"metadata,omitempty"`
	Principal   identity.Principal `json:"principal,omitempty"`
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(queue Queue, handler Handler, config WorkerConfig) (*Worker, error)

func (*Worker) Run

func (worker *Worker) Run(ctx context.Context) error

type WorkerConfig

type WorkerConfig struct {
	WorkerID      string
	Concurrency   int
	LeaseTTL      time.Duration
	RenewInterval time.Duration
	JobTimeout    time.Duration
	PollInterval  time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL