Documentation
¶
Overview ¶
Package eventqueue provides implementation for in-memory queue management and event processing.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrQueueClosed = errors.New("queue is closed")
ErrQueueClosed indicates that the event queue has been closed.
Functions ¶
This section is empty.
Types ¶
type Manager ¶
type Manager interface {
// GetOrCreate returns an existing queue if one exists, or creates a new one.
GetOrCreate(ctx context.Context, taskID a2a.TaskID) (Queue, error)
// Get returns an existing queue if one exists.
Get(ctx context.Context, taskId a2a.TaskID) (Queue, bool)
// Destroy closes the queue for the specified task and frees all associates resources.
Destroy(ctx context.Context, taskID a2a.TaskID) error
}
Manager manages event queues on a per-task basis. It provides lifecycle management for task-specific event queues, enabling multiple clients to attach to the same task's event stream.
func NewInMemoryManager ¶
func NewInMemoryManager(options ...MemManagerOption) Manager
NewInMemoryManager creates a new in-memory eventqueue manager. A message dispatcher goroutine is started when the first queue for a task ID is created. All the queues returned for the task ID before Destroy() is called are attached to the same goroutine. Each goroutine must use its own Queue. Destroy() stops the goroutine and closes all the queues. If queues were buffered consumers are allowed to drain them.
Queue.Write() returns when a message is put to all the open queues associated with the task. Queue.Read() blocks until a message is received through another queue or until close. Queue.Read() will not receive a message sent using Write() call on the same queue. Queue.Close() unregisters a queue from further broadcasts. Queue.Close() may partially drain the queue, so Read() behavior after is Close() is undefined.
type MemManagerOption ¶ added in v0.3.3
type MemManagerOption func(*inMemoryManager)
func WithQueueBufferSize ¶ added in v0.3.3
func WithQueueBufferSize(size int) MemManagerOption
type Queue ¶
Queue defines the interface for publishing and consuming events generated during agent execution.
type Reader ¶
type Reader interface {
// Read dequeues an event or blocks if the queue is empty.
// TaskVersion is expected to be the same as was provided to [Writer.WriteVersioned].
Read(ctx context.Context) (a2a.Event, a2a.TaskVersion, error)
}
Reader defines the interface for reading events from a queue. A2A server stack reads events written by [a2asrv.AgentExecutor].
type Writer ¶
type Writer interface {
// Write enqueues an event or blocks if a bounded queue is full.
//
// Kept to maintain AgentExecutor API until a breaking SDK release.
// The code other than AgentExecutor must use WriteVersioned.
Write(ctx context.Context, event a2a.Event) error
// WriteVersioned enqueues an event with information about which version the task was moved
// to after the event was applied. Blocks if a bounded queue is full.
WriteVersioned(ctx context.Context, event a2a.Event, version a2a.TaskVersion) error
}
Writer defines the interface for writing events to a queue. [a2asrv.AgentExecutor] translates agent responses to Messages, Tasks or Task update events.