Documentation
¶
Overview ¶
Package eventqueue provides implementation for in-memory queue management and event processing.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrQueueClosed = errors.New("queue is closed")
ErrQueueClosed indicates that the event queue has been closed.
Functions ¶
This section is empty.
Types ¶
type Manager ¶
type Manager interface {
// CreateReader creates a new event reader for the specified task.
CreateReader(ctx context.Context, taskID a2a.TaskID) (Reader, error)
// CreateWriter creates a new event writer for the specified task.
CreateWriter(ctx context.Context, taskID a2a.TaskID) (Writer, error)
// Destroy closes the event queue for the specified task and frees all associates resources.
Destroy(ctx context.Context, taskID a2a.TaskID) error
}
Manager manages event queues for tasks.
func NewInMemoryManager ¶
func NewInMemoryManager(options ...MemManagerOption) Manager
NewInMemoryManager creates a new in-memory eventqueue manager. A message dispatcher goroutine is started when the first queue for a task ID is created. All the queues returned for the task ID before Destroy() is called are attached to the same goroutine. Each goroutine must use its own Queue. Destroy() stops the goroutine and closes all the queues. If queues were buffered consumers are allowed to drain them.
Queue.Write() returns when a message is put to all the open queues associated with the task. Queue.Read() blocks until a message is received through another queue or until close. Queue.Read() will not receive a message sent using Write() call on the same queue. Queue.Close() unregisters a queue from further broadcasts. Queue.Close() may partially drain the queue, so Read() behavior after is Close() is undefined.
type MemManagerOption ¶ added in v0.3.3
type MemManagerOption func(*inMemoryManager)
MemManagerOption is a functional option for configuring an in-memory event manager.
func WithQueueBufferSize ¶ added in v0.3.3
func WithQueueBufferSize(size int) MemManagerOption
WithQueueBufferSize configures the size of the in-memory event queue buffer.
type Message ¶
type Message struct {
// Event is the event which was applied to task store.
Event a2a.Event
// TaskVersion is the version of the task after event was applied.
TaskVersion taskstore.TaskVersion
// Protocol is the version of the protocol which emitting process running.
Protocol a2a.ProtocolVersion
}
Message represents the data broadcasted to event subscribers through event queue.
type Reader ¶
type Reader interface {
// Read dequeues an event or blocks if the queue is empty.
// TaskVersion is expected to be the same as was provided to [Writer.WriteVersioned].
Read(ctx context.Context) (*Message, error)
// Close shuts down a connection to the queue.
Close() error
}
Reader defines the interface for reading events from a queue. A2A server stack reads events written by [a2asrv.AgentExecutor].
type Writer ¶
type Writer interface {
// Write enqueues an event or blocks if a bounded queue is full.
//
// Kept to maintain AgentExecutor API until a breaking SDK release.
// The code other than AgentExecutor must use WriteVersioned.
Write(ctx context.Context, msg *Message) error
// Close shuts down a connection to the queue.
Close() error
}
Writer defines the interface for writing events to a queue. [a2asrv.AgentExecutor] translates agent responses to Messages, Tasks or Task update events.