async

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxReinforcementsPerOperation = 10
	DefaultMinIntervalBetweenMs          = 30000
	DefaultPercentSignalThreshold        = 5
	DefaultMaxPollFailures               = 3
)

Variables

This section is empty.

Functions

func ExtractOperationID

func ExtractOperationID(raw string, path string) (string, error)

Types

type CancelConfig

type CancelConfig struct {
	Tool           string `json:"tool" yaml:"tool"`
	OperationIDArg string `json:"operationIdArg" yaml:"operationIdArg"`
}

type Config

type Config struct {
	Run    RunConfig     `json:"run" yaml:"run"`
	Status StatusConfig  `json:"status" yaml:"status"`
	Cancel *CancelConfig `json:"cancel,omitempty" yaml:"cancel,omitempty"`
	// Instruction is per-operation guidance for non-terminal state, surfaced in
	// the centralized batch reinforcement template.
	Instruction string `json:"instruction,omitempty" yaml:"instruction,omitempty"`
	// TerminalInstruction is per-operation guidance once the operation reaches a
	// terminal state, surfaced in the centralized batch reinforcement template.
	TerminalInstruction           string `json:"terminalInstruction,omitempty" yaml:"terminalInstruction,omitempty"`
	WaitForResponse               bool   `json:"waitForResponse,omitempty" yaml:"waitForResponse,omitempty"`
	MaxReinforcementsPerOperation int    `json:"maxReinforcementsPerOperation,omitempty" yaml:"maxReinforcementsPerOperation,omitempty"`
	MinIntervalBetweenMs          int    `json:"minIntervalBetweenMs,omitempty" yaml:"minIntervalBetweenMs,omitempty"`
	TimeoutMs                     int    `json:"timeoutMs,omitempty" yaml:"timeoutMs,omitempty"`
	PollIntervalMs                int    `json:"pollIntervalMs,omitempty" yaml:"pollIntervalMs,omitempty"`
}

type Extracted

type Extracted struct {
	Status  string
	Message string
	Percent *int
	KeyData json.RawMessage
	Error   string
}

func ExtractPayload

func ExtractPayload(raw string, selector Selector) (*Extracted, error)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager() *Manager

func (*Manager) ActiveOps

func (m *Manager) ActiveOps(_ context.Context, convID, turnID string) []*OperationRecord

func (*Manager) ActiveWaitOps

func (m *Manager) ActiveWaitOps(ctx context.Context, convID, turnID string) []*OperationRecord

func (*Manager) CancelTurnPollers

func (m *Manager) CancelTurnPollers(_ context.Context, convID, turnID string)

CancelTurnPollers cancels every autonomous poller that belongs to the given conversation/turn. It is safe to call when no pollers are running (no-op). The service layer calls this when a turn is explicitly canceled so pollers do not outlive the turn they belong to.

func (*Manager) ConsumeChanged

func (m *Manager) ConsumeChanged(convID, turnID string) []*OperationRecord

func (*Manager) FindActiveByRequest

func (m *Manager) FindActiveByRequest(_ context.Context, convID, turnID, toolName, requestArgsDigest string) (*OperationRecord, bool)

func (*Manager) FinishPoller

func (m *Manager) FinishPoller(_ context.Context, id string)

func (*Manager) Get

func (m *Manager) Get(_ context.Context, id string) (*OperationRecord, bool)

func (*Manager) HasActiveWaitOps

func (m *Manager) HasActiveWaitOps(ctx context.Context, convID, turnID string) bool

func (*Manager) OperationsForTurn

func (m *Manager) OperationsForTurn(_ context.Context, convID, turnID string) []*OperationRecord

func (*Manager) RecordPollFailure

func (m *Manager) RecordPollFailure(_ context.Context, id, errMsg string, transient bool) (*OperationRecord, bool)

func (*Manager) Register

func (m *Manager) Register(_ context.Context, input RegisterInput) *OperationRecord

func (*Manager) ResetPollFailures

func (m *Manager) ResetPollFailures(_ context.Context, id string) (*OperationRecord, bool)

func (*Manager) StorePollerCancel

func (m *Manager) StorePollerCancel(_ context.Context, id string, cancel context.CancelFunc)

StorePollerCancel registers a cancel function for the given operation so that CancelTurnPollers can reach it. Called immediately after TryStartPoller succeeds.

func (*Manager) TerminalFailure

func (m *Manager) TerminalFailure(_ context.Context, convID, turnID string) (*OperationRecord, bool)

func (*Manager) TryRecordReinforcement

func (m *Manager) TryRecordReinforcement(_ context.Context, id string) (*OperationRecord, bool)

func (*Manager) TryStartPoller

func (m *Manager) TryStartPoller(_ context.Context, id string) bool

func (*Manager) Update

func (m *Manager) Update(_ context.Context, input UpdateInput) (*OperationRecord, bool)

func (*Manager) WaitForChange

func (m *Manager) WaitForChange(ctx context.Context, convID, turnID string) error

func (*Manager) WaitForNextPoll

func (m *Manager) WaitForNextPoll(ctx context.Context, convID, turnID string) error

type OperationRecord

type OperationRecord struct {
	ID                  string
	ParentConvID        string
	ParentTurnID        string
	ToolCallID          string
	ToolMessageID       string
	ToolName            string
	StatusToolName      string
	StatusArgs          map[string]interface{}
	CancelToolName      string
	RequestArgsDigest   string
	RequestArgs         map[string]interface{}
	WaitForResponse     bool
	State               State
	Status              string
	Message             string
	Percent             *int
	LastSignaledPercent *int
	KeyData             json.RawMessage
	Error               string
	CreatedAt           time.Time
	UpdatedAt           time.Time
	TimeoutAt           *time.Time
	TimeoutMs           int
	PollIntervalMs      int
	PollFailures        int
	// Instruction is per-operation guidance for non-terminal state, surfaced in
	// the centralized batch reinforcement template.
	Instruction string
	// TerminalInstruction is per-operation guidance once the operation reaches a
	// terminal state, surfaced in the centralized batch reinforcement template.
	TerminalInstruction           string
	MaxReinforcementsPerOperation int
	MinIntervalBetweenMs          int
	ReinforcementCount            int
	LastReinforcementAt           *time.Time
	// contains filtered or unexported fields
}

func (*OperationRecord) Terminal

func (r *OperationRecord) Terminal() bool

type RegisterInput

type RegisterInput struct {
	ID                            string
	ParentConvID                  string
	ParentTurnID                  string
	ToolCallID                    string
	ToolMessageID                 string
	ToolName                      string
	StatusToolName                string
	StatusArgs                    map[string]interface{}
	CancelToolName                string
	RequestArgsDigest             string
	RequestArgs                   map[string]interface{}
	WaitForResponse               bool
	Status                        string
	Message                       string
	Percent                       *int
	KeyData                       json.RawMessage
	Error                         string
	TimeoutMs                     int
	PollIntervalMs                int
	Instruction                   string
	TerminalInstruction           string
	MaxReinforcementsPerOperation int
	MinIntervalBetweenMs          int
}

type RunConfig

type RunConfig struct {
	Tool            string                 `json:"tool" yaml:"tool"`
	OperationIDPath string                 `json:"operationIdPath" yaml:"operationIdPath"`
	ExtraArgs       map[string]interface{} `json:"extraArgs,omitempty" yaml:"extraArgs,omitempty"`
	Selector        *Selector              `json:"selector,omitempty" yaml:"selector,omitempty"`
}

type Selector

type Selector struct {
	StatusPath       string   `json:"statusPath" yaml:"statusPath"`
	MessagePath      string   `json:"messagePath,omitempty" yaml:"messagePath,omitempty"`
	DataPath         string   `json:"dataPath,omitempty" yaml:"dataPath,omitempty"`
	ProgressPath     string   `json:"progressPath,omitempty" yaml:"progressPath,omitempty"`
	ErrorPath        string   `json:"errorPath,omitempty" yaml:"errorPath,omitempty"`
	TerminalStatuses []string `json:"terminalStatuses,omitempty" yaml:"terminalStatuses,omitempty"`
}

type State

type State string
const (
	StateStarted   State = "started"
	StateRunning   State = "running"
	StateWaiting   State = "waiting"
	StateCompleted State = "completed"
	StateFailed    State = "failed"
	StateCanceled  State = "canceled"
)

func DeriveState

func DeriveState(status, errMsg string, requested State) State

type StatusConfig

type StatusConfig struct {
	Tool           string                 `json:"tool" yaml:"tool"`
	OperationIDArg string                 `json:"operationIdArg" yaml:"operationIdArg"`
	ReuseRunArgs   bool                   `json:"reuseRunArgs,omitempty" yaml:"reuseRunArgs,omitempty"`
	ExtraArgs      map[string]interface{} `json:"extraArgs,omitempty" yaml:"extraArgs,omitempty"`
	Selector       Selector               `json:"selector" yaml:"selector"`
}

type UpdateInput

type UpdateInput struct {
	ID      string
	Status  string
	Message string
	Percent *int
	KeyData json.RawMessage
	Error   string
	State   State
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL