agui

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Overview

Package agui provides a Messenger adapter for the AG-UI SSE server.

Unlike other adapters that connect to external platforms, this adapter runs in-process and IS the AG-UI HTTP/SSE server. Connect() starts the HTTP server that listens for browser SSE connections, and Disconnect() shuts it down.

Thread Registration Model

The adapter uses a thread-registration model rather than full message routing. When handleRun receives a user message, it calls InjectMessage to register the thread's event channel in the adapter. This allows other subsystems (HITL, send_message tool) to route responses back to the SSE client via the adapter's Send method. The chat handler still runs directly in handleRun — messages are NOT routed through the messenger receive loop.

The adapter exposes three methods beyond the Messenger interface:

  • InjectMessage: registers a thread and its event channel
  • CompleteThread: cleans up thread state when a run ends
  • ThreadDone: returns a channel that's closed when the thread completes

Package agui — dataurl.go extracts embedded data-URL files from AG-UI chat messages sent by the browser client, saves them to a temp directory, and returns clean message text plus messenger.Attachment structs with LocalPath set so the multimodal pipeline can process them like WhatsApp media.

Index

Constants

View Source
const DefaultAppName = "genie"

DefaultAppName is used when Config.AppName is empty.

Variables

This section is empty.

Functions

func AttachmentsFromContext added in v0.1.7

func AttachmentsFromContext(ctx context.Context) []messenger.Attachment

AttachmentsFromContext returns any attachments stored in the context. Returns nil if none are present.

func ExtractDataURLFiles added in v0.1.7

func ExtractDataURLFiles(message string) (cleanMessage string, tempDir string, attachments []messenger.Attachment)

ExtractDataURLFiles scans the message text for embedded data-URL file blocks, decodes them, saves them to a new secure temporary directory, and returns:

  • cleanMessage: the message text with file blocks removed
  • tempDir: the generated secure temporary directory containing files (or empty string if none were extracted)
  • attachments: slice of Attachment structs with LocalPath set

If there are no data URLs, the original message, an empty string, and nil are returned.

func GetAguiPasswordFromKeyring

func GetAguiPasswordFromKeyring(ctx context.Context) ([]byte, error)

GetAguiPasswordFromKeyring returns the AG-UI password from the keyring, or (nil, nil) / (nil, err) if not set or lookup fails. Exported so it can be unit-tested with KeyringSet/KeyringDelete without going through the middleware.

func MapEvent

func MapEvent(event interface{}, threadID, runID string) ([]byte, string, error)

MapEvent converts an internal TUI event to an AG-UI wire-format event. The threadID and runID are passed through from the original RunAgentInput. Returns the serialized JSON bytes and the AG-UI event type string.

func NewRunner

func NewRunner(chatFunc ChatFunc) runner.Runner

NewRunner creates a runner.Runner that wraps the Genie chat pipeline.

This adapter bridges the framework's structured runner interface to Genie's existing codeOwner.Chat() pipeline. The runner:

  • Accepts a model.Message with user text
  • Calls the chatFunc which emits *event.Event on an internal channel
  • Forwards events to the returned channel

The result is a framework-compatible runner that unlocks features like run registration, cancellation, and session-scoped execution while reusing the existing chat pipeline.

func WithAttachments added in v0.1.7

func WithAttachments(ctx context.Context, atts []messenger.Attachment) context.Context

WithAttachments returns a new context that carries the given attachments.

Types

type BackgroundWorker

type BackgroundWorker struct {
	// contains filtered or unexported fields
}

BackgroundWorker handles background agent execution.

func NewBackgroundWorker

func NewBackgroundWorker(handler Expert, maxConcurrent int) *BackgroundWorker

NewBackgroundWorker creates a worker with a concurrency limit.

func (*BackgroundWorker) HandleEvent

func (w *BackgroundWorker) HandleEvent(ctx context.Context, req aguitypes.EventRequest) (string, error)

HandleEvent processes an event by spawning a background agent run. It returns the runID and an error if the worker pool is full.

func (*BackgroundWorker) WaitForCompletion

func (w *BackgroundWorker) WaitForCompletion()

WaitForCompletion blocks until all active background tasks are finished.

type CapabilitiesStance

type CapabilitiesStance struct {
	ToolNames     []string `json:"tool_names"`
	AlwaysAllowed []string `json:"always_allowed"`
	DeniedTools   []string `json:"denied_tools"`
}

CapabilitiesStance exposes the effective AI stance for the UI and community: which tools are available, which skip approval, and which are denied. When set, the server exposes GET /api/v1/capabilities so clients can show "what Genie can do" and approval policy without exposing secrets.

type ChatFunc

type ChatFunc func(ctx context.Context, message string, eventChan chan<- interface{}) error

ChatFunc is the function signature for the chat handler pipeline. It takes a context, message text, and a channel to emit events on. This matches the shape returned by Application.buildChatHandler().

type ChatRequest

type ChatRequest struct {
	ThreadID    string
	RunID       string
	Message     string
	Attachments []messenger.Attachment
	EventChan   chan<- interface{}
}

ChatRequest bundles the inputs for a single AG-UI chat invocation.

type Config

type Config struct {
	// AppName is used as the session.Key AppName for thread tracking.
	// Defaults to "genie" if empty.
	AppName string `yaml:"app_name" toml:"app_name"`
}

Config holds AGUI adapter settings.

type ContextItem

type ContextItem = aguisdk.Context

ContextItem represents a context item passed by the client.

type EventAdapter

type EventAdapter struct {
	// contains filtered or unexported fields
}

EventAdapter converts trpc-agent-go events to TUI messages. This adapter exists to bridge the gap between the agent's event stream and the TUI's message system. Without this adapter, the TUI would not be able to understand and display agent events.

func NewEventAdapter

func NewEventAdapter(agentName string) *EventAdapter

NewEventAdapter creates a new event adapter for the given agent name.

func (*EventAdapter) ConvertEvent

func (a *EventAdapter) ConvertEvent(evt *event.Event) []interface{}

ConvertEvent converts a trpc-agent-go event to one or more TUI messages. This method exists to translate agent events into TUI-specific message types. Without this conversion, the TUI would receive raw events that it doesn't know how to handle.

This method handles both standard trpc-agent-go events and custom event types used by the granter workflow.

func (*EventAdapter) FlushMessage

func (a *EventAdapter) FlushMessage() []interface{}

FlushMessage emits a TEXT_MESSAGE_END for any open message. Call this when the event stream ends to close the lifecycle.

func (*EventAdapter) Start

func (a *EventAdapter) Start(
	ctx context.Context,
	inputChan <-chan interface{},
	outputChan chan<- interface{},
)

Start starts the event adapter. It listens to the input channel for events and sends converted TUI messages to the output channel. This method blocks until the input channel is closed.

type Expert

type Expert interface {
	Handle(ctx context.Context, req ChatRequest)
	Resume(ctx context.Context) string
	InjectFeedback(ctx context.Context, threadID, message string) error
}

Expert is the one who knows how to handle a chat request.

func NewChatHandler

func NewChatHandler(
	resumeFunc func(ctx context.Context) string,
	chatFunc func(ctx context.Context, message string, agentsMessage chan<- interface{}) error,
	injectFunc func(ctx context.Context, message string) error,
) Expert

NewChatHandler creates a ChatHandler that bridges the AG-UI server to the existing codeOwner.Chat() pipeline.

The expert.Do() method converts raw *event.Event objects to TUI messages (AgentStreamChunkMsg, TextMessageStartMsg, etc.) via its own EventAdapter before writing them to the event channel. The ReAcTree runs multiple stages, each producing the same text content — causing duplicated output.

This function inserts a dedup filter between the agent and the SSE stream: after the first TEXT_MESSAGE_END, all subsequent text message events are suppressed while tool calls and stage progress events pass through.

type InjectFeedbackRequest added in v0.1.6

type InjectFeedbackRequest struct {
	ThreadID string `json:"threadId"`
	Message  string `json:"message"`
}

InjectFeedbackRequest is the payload for the /api/v1/inject endpoint.

type Message

type Message = aguisdk.Message

Message represents an AG-UI protocol message. Re-exported from the official SDK — Content is `any` (string or []InputContent for multimodal). Use msg.ContentString() to extract text content.

type Messenger

type Messenger struct {
	// contains filtered or unexported fields
}

Messenger implements the messenger.Messenger interface for the AG-UI SSE server. Unlike other messenger adapters that connect to external platforms, this IS the server — Connect() starts the AG-UI HTTP/SSE server that browsers connect to.

The adapter uses a thread-registration model: InjectMessage registers a thread's event channel so that Send() can route responses back to the SSE stream. The incoming Receive channel is unused in normal operation because chat handling runs directly in handleRun, not through the messenger receive loop.

func New

func New(cfg Config, opts ...messenger.Option) *Messenger

New creates a new AGUI Messenger with the given config and options. An optional session.Service can be provided for structured thread tracking. If nil, an in-memory session service is created automatically.

func (*Messenger) ActiveThreadCount

func (m *Messenger) ActiveThreadCount() int

ActiveThreadCount returns the number of currently registered threads. Useful for monitoring and testing.

func (*Messenger) CompleteThread

func (m *Messenger) CompleteThread(threadID string)

CompleteThread removes the thread from the active threads map. Called by the AG-UI server when a chat run finishes to clean up state. Safe to call multiple times for the same threadID.

func (*Messenger) ConfigureServer

func (m *Messenger) ConfigureServer(cfg ServerConfig)

ConfigureServer injects the dependencies needed to run the AG-UI HTTP server. This must be called before Connect(). The server is owned by the AGUI messenger — Connect() starts it, Disconnect() stops it.

func (*Messenger) Connect

func (m *Messenger) Connect(ctx context.Context) (http.Handler, error)

Connect initializes the AG-UI messenger adapter and returns an http.Handler for the AG-UI SSE server. Must be called after ConfigureServer.

The adapter DOES NOT start its own http.Server. The caller mounts the returned handler on a shared HTTP mux.

func (*Messenger) ConnectionInfo

func (m *Messenger) ConnectionInfo() string

ConnectionInfo returns connection instructions for the AG-UI adapter.

func (*Messenger) Disconnect

func (m *Messenger) Disconnect(ctx context.Context) error

Disconnect cleans up all active threads and releases resources. The caller is responsible for stopping the HTTP server that was serving the handler returned from Connect.

func (*Messenger) FormatApproval

FormatApproval returns the request unchanged — AG-UI SSE does not support rich formatting (the web UI handles approval rendering client-side).

func (*Messenger) FormatClarification

FormatClarification returns the request unchanged — AG-UI SSE handles clarification rendering client-side.

func (*Messenger) InjectMessage

func (m *Messenger) InjectMessage(threadID, runID, text string, eventChan chan<- interface{}, sender interface{}) error

InjectMessage registers a thread's event channel with the adapter so that Send() can route responses back to the SSE stream. This is the bridge between the AG-UI server's handleRun and the messenger subsystem.

The method does NOT push messages into the Receive channel — the chat handler runs directly in handleRun. Thread registration is purely for enabling subsystems (HITL, send_message tool) to reach the SSE client.

If the threadID already has an active thread registered, the old thread is cleaned up first (its done channel is closed) before the new one is registered. This prevents resource leaks from rapid same-thread reuse.

Parameters:

  • threadID: the AG-UI thread identifier
  • runID: the AG-UI run identifier (used for correlation)
  • text: the user's message content (logged for debugging)
  • eventChan: the channel for writing SSE events back to the client
  • sender: optional *SenderInfo (passed as interface{} to satisfy MessengerBridge); nil defaults to "agui-user"

func (*Messenger) Platform

func (m *Messenger) Platform() messenger.Platform

Platform returns the AGUI platform identifier.

func (*Messenger) Receive

func (m *Messenger) Receive(_ context.Context) (<-chan messenger.IncomingMessage, error)

Receive returns a read-only channel that delivers incoming messages.

Note: In the current thread-registration model, this channel is not actively consumed. It exists to satisfy the Messenger interface and for future use if full message routing through the messenger pipeline is desired.

func (*Messenger) Send

Send delivers a message to the AG-UI SSE stream for the given thread. It looks up the active thread's event channel and writes the text content. The event channel reader (handleRun) is responsible for converting this into proper SSE events.

Send is safe to call from any goroutine. If the thread has already completed (client disconnect, run finished), Send returns an error without blocking.

func (*Messenger) SessionService

func (m *Messenger) SessionService() session.Service

SessionService returns the underlying session.Service for direct access (e.g. session history queries, state management). May be nil if not connected.

func (*Messenger) SetHandlerWrapper

func (m *Messenger) SetHandlerWrapper(fn func(http.Handler) http.Handler)

SetHandlerWrapper configures a function that wraps the HTTP handler before the server starts serving. This is used by the guild worker to add agent routing middleware around the core AG-UI handler. Safe to call before or after ConfigureServer.

func (*Messenger) ThreadDone

func (m *Messenger) ThreadDone(threadID string) <-chan struct{}

ThreadDone returns a channel that is closed when the given thread completes. This allows callers to detect when a thread's SSE connection has ended.

func (*Messenger) UpdateMessage

func (m *Messenger) UpdateMessage(_ context.Context, _ messenger.UpdateRequest) error

UpdateMessage is a no-op for AG-UI — the AG-UI SSE adapter handles approval/clarification lifecycle client-side and does not need server-side message editing. Returns nil to satisfy the Messenger interface.

type MessengerBridge

type MessengerBridge interface {
	// InjectMessage registers the thread's event channel with the adapter.
	// This is purely for thread registration — the chat handler still runs
	// directly in handleRun. The sender parameter is optional (nil = default).
	InjectMessage(threadID, runID, text string, eventChan chan<- interface{}, sender interface{}) error
	// CompleteThread removes the thread from the active threads map.
	CompleteThread(threadID string)
}

MessengerBridge is an optional interface that allows the AG-UI server to register active threads with the AGUI messenger adapter. When set, each handleRun call registers the thread's event channel so that subsystems (HITL, send_message tool) can route responses back to the SSE client.

This interface exists here (rather than depending on messenger/agui directly) to avoid a circular import between pkg/agui and pkg/messenger.

type RunAgentInput

type RunAgentInput = aguisdk.RunAgentInput

RunAgentInput is the request body for the AG-UI run endpoint. This is the official AG-UI SDK type, providing richer fields (multimodal content, State, ParentRunID) and built-in snake_case JSON compatibility. See https://docs.ag-ui.com/concepts/architecture#running-agents

type SSEWriter

type SSEWriter struct {
	// contains filtered or unexported fields
}

SSEWriter wraps an http.ResponseWriter to write Server-Sent Events. Each event is formatted as:

event: <type>\ndata: <json>\n\n

The writer flushes after every event to ensure real-time streaming.

func NewSSEWriter

func NewSSEWriter(w http.ResponseWriter) (*SSEWriter, error)

NewSSEWriter creates an SSEWriter and sets the required SSE headers. Returns an error if the ResponseWriter does not support http.Flusher.

func (*SSEWriter) WriteComment

func (s *SSEWriter) WriteComment(comment string) error

WriteComment writes an SSE comment (used for keep-alive pings). Format: : <comment>\n\n

func (*SSEWriter) WriteEvent

func (s *SSEWriter) WriteEvent(eventType string, data []byte) error

WriteEvent writes a single SSE event with the given type and JSON data. Format: event: <eventType>\ndata: <data>\n\n

type SenderInfo

type SenderInfo struct {
	ID          string
	Username    string
	DisplayName string
}

SenderInfo carries optional identity information for the web user. When provided to InjectMessage, it replaces the default "agui-user" sender identity, enabling sender allowlists and accurate attribution.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the AG-UI HTTP server that exposes genie as an SSE endpoint.

func NewServer

func NewServer(
	c messenger.AGUIConfig,
	handler Expert,
	approvalStore hitl.ApprovalStore,
	clarifyStore clarify.Store,
	bgWorker *BackgroundWorker,
	capabilities *CapabilitiesStance,
	approveList *toolwrap.ApproveList,
	agentName string,
	workers ...aguitypes.BGWorker,
) *Server

NewServer creates a new AG-UI HTTP server from the given configuration. The bgWorker is created by the caller so it can also be shared with the cron scheduler dispatcher, keeping dependency wiring in one place. If capabilities is non-nil, GET /api/v1/capabilities will serve the AI stance. If approveList is non-nil, the /approve endpoint accepts allowForMins and allowWhenArgsContain to add temporary auto-approve rules.

func (*Server) Handler

func (s *Server) Handler() http.Handler

Handler returns the chi router with AG-UI endpoints.

func (*Server) Runner

func (s *Server) Runner() trunner.Runner

Runner returns the configured framework runner for direct programmatic use. Returns nil if not set. This is NOT used internally by handleRun (which uses the Expert handler), but is available for external callers that want the structured Run(ctx, userID, sessionID, msg) interface.

func (*Server) SetHandlerWrapper

func (s *Server) SetHandlerWrapper(fn func(http.Handler) http.Handler)

SetHandlerWrapper configures a function that wraps the HTTP handler before the server starts serving. This is used by the guild worker to add agent routing middleware around the core AG-UI handler.

func (*Server) SetMessengerBridge

func (s *Server) SetMessengerBridge(bridge MessengerBridge)

SetMessengerBridge configures the optional messenger adapter bridge. When set, handleRun injects user messages into the messenger pipeline so subsystems like HITL and send_message can route to the SSE client.

func (*Server) SetRunner

func (s *Server) SetRunner(r trunner.Runner)

SetRunner configures the framework runner for structured chat execution. The runner wraps the same chat pipeline as the Expert and provides features like run registration, cancellation, and session tracking.

Note: The runner is exposed as an external API via Runner() for direct callers (e.g. background workers, programmatic invocations). The handleRun SSE endpoint continues to use the Expert handler for its dedup and event translation pipeline.

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start starts the HTTP server and blocks until the context is cancelled. The server binds to 127.0.0.1 only so it is not accessible from other machines on the network.

type ServerConfig

type ServerConfig struct {
	AGUIConfig    messenger.AGUIConfig
	ChatHandler   Expert
	ApprovalStore hitl.ApprovalStore
	ClarifyStore  clarify.Store
	BGWorker      *BackgroundWorker
	Workers       []aguitypes.BGWorker

	// ChatFunc is the raw chat function used to create a framework Runner.
	// If non-nil, a Runner is wired automatically.
	ChatFunc ChatFunc

	// Capabilities, when set, is served at GET /api/v1/capabilities so the
	// frontend and docs can show the current tool policy (AI stance).
	Capabilities *CapabilitiesStance

	// ApproveList is the in-memory temporary allowlist for "approve for X mins".
	// When set, the /approve endpoint accepts allowForMins and allowWhenArgsContain.
	ApproveList *toolwrap.ApproveList

	// AgentName is the configured agent name (e.g. "qa_agent", "my-bot").
	// Exposed via /health so the chat UI can display it. Defaults to "Genie".
	AgentName string
}

ServerConfig holds the dependencies needed to run the AG-UI HTTP server. These are injected via ConfigureServer before Connect is called.

type ToolDefinition

type ToolDefinition = aguisdk.Tool

ToolDefinition represents a tool definition passed by the client.

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL