chat

package
v0.0.0-...-d8a54d3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package chat implements Koopa's main conversational agent.

Agent is a stateless, LLM-powered agent that provides conversational interactions with tool calling and knowledge base integration. It uses the Google Genkit framework for LLM inference and tool orchestration.

Architecture

The Agent follows a stateless design pattern with dependency injection:

InvocationContext (input)
     |
     v
Agent.ExecuteStream() or Agent.Execute()
     |
     +-- Load session history from SessionStore
     |
     +-- Build message list (history + user input)
     |
     +-- Call Genkit Generate with:
     |    - LLM model
     |    - Tool references (cached at initialization)
     |    - Message history
     |    - Optional: StreamCallback for real-time output
     |
     +-- Save updated history to SessionStore
     |
     v
Response (final text + tool requests)

Configuration

Agent requires configuration via the Config struct at construction time:

type Config struct {
    Genkit       *genkit.Genkit
    SessionStore *session.Store
    Logger       *slog.Logger
    Tools        []ai.Tool

    // Configuration values
    ModelName string  // e.g., "googleai/gemini-2.5-flash"
    MaxTurns  int     // Maximum agentic loop turns
    Language  string  // Response language preference

    // Resilience configuration
    RetryConfig          RetryConfig
    CircuitBreakerConfig CircuitBreakerConfig
    RateLimiter          *rate.Limiter

    // Token management
    TokenBudget TokenBudget
}

Required fields are validated during construction.

Streaming Support

Agent supports both streaming and non-streaming execution modes:

  • Execute(): Non-streaming, returns complete response
  • ExecuteStream(): Streaming with optional callback for real-time output

For streaming, provide a StreamCallback function:

type StreamCallback func(ctx context.Context, chunk *ai.ModelResponseChunk) error

The callback is invoked for each chunk of the response, enabling real-time display (typewriter effect) in CLI or SSE streaming in HTTP APIs.

Flow (Genkit Integration)

The package provides a Genkit Flow for HTTP and observability:

  • NewFlow(): Returns singleton streaming Flow (idempotent, safe to call multiple times)
  • Flow supports both Run() and Stream() methods
  • Stream() enables Server-Sent Events (SSE) for real-time responses

Example Flow usage:

// Create Flow (idempotent — first call initializes, subsequent calls return cached)
chatFlow := chat.NewFlow(g, chatAgent)

// Non-streaming
output, err := chatFlow.Run(ctx, chat.Input{Query: "Hello", SessionID: "..."})

// Streaming (for SSE)
for streamValue, err := range chatFlow.Stream(ctx, input) {
    if streamValue.Done {
        // Final output in streamValue.Output
    } else {
        // Partial chunk in streamValue.Stream.Text
    }
}

Tool Registration

Tools are registered from toolsets during initialization:

  1. For each toolset, get its available tools via Tools() method
  2. Convert ExecutableTools to Genkit format using genkit.DefineTool
  3. Cache tool references for reuse across invocations
  4. Validate that all tools were registered successfully

Tool references are cached to avoid re-registering on every Execute call.

Session Management

Chat manages conversation history through the SessionStore:

History: Retrieves previous messages for a session
AppendMessages: Persists new messages incrementally (preferred)

History save failures are logged but don't fail the request.

Example Usage

// Create agent with required configuration
agent, err := chat.New(chat.Config{
    Genkit:       g,
    SessionStore: sessionStore,
    Logger:       slog.Default(),
    Tools:        tools,
    ModelName:    "googleai/gemini-2.5-flash",
    MaxTurns:     10,
    Language:     "auto",
})
if err != nil {
    return err
}

// Non-streaming execution
resp, err := agent.Execute(ctx, sessionID, "What is the weather?")

// Streaming execution with callback
resp, err := agent.ExecuteStream(ctx, sessionID, "What is the weather?",
    func(ctx context.Context, chunk *ai.ModelResponseChunk) error {
        fmt.Print(chunk.Text()) // Real-time output
        return nil
    })

Error Handling

The package uses sentinel errors for categorization:

  • ErrInvalidSession: Invalid session ID format
  • ErrExecutionFailed: LLM or tool execution failed

Empty responses are handled with a fallback message to improve UX.

Testing

Agent is designed for testability:

  • Dependencies are concrete types with clear interfaces
  • Stateless design eliminates test ordering issues
  • Config struct allows partial configuration for unit tests

Thread Safety

Agent is safe for concurrent use. The underlying dependencies (SessionStore, Genkit) must also be thread-safe.

Index

Constants

View Source
const (
	// Name is the unique identifier for the Chat agent.
	Name = "chat"

	// Description describes the Chat agent's capabilities.
	Description = "A general purpose chat agent that can help with various tasks using tools and knowledge base."

	// KoopaPromptName is the name of the Dotprompt file for the Chat agent.
	// This corresponds to prompts/koopa.prompt.
	// NOTE: The LLM model is configured in the Dotprompt file, not via Config.
	KoopaPromptName = "koopa"
)

Agent name and description constants

View Source
const FlowName = "koopa/chat"

FlowName is the registered name of the Chat Flow in Genkit.

Variables

View Source
var (
	// ErrInvalidSession indicates the session ID is invalid or malformed.
	ErrInvalidSession = errors.New("invalid session")

	// ErrExecutionFailed indicates agent execution failed.
	ErrExecutionFailed = errors.New("execution failed")
)

Sentinel errors for agent operations.

View Source
var ErrCircuitOpen = errors.New("circuit breaker is open")

ErrCircuitOpen is returned when the circuit is open.

Functions

func ResetFlowForTesting

func ResetFlowForTesting()

ResetFlowForTesting resets the Flow singleton for testing. This allows tests to initialize with different configurations. WARNING: Only use in tests. Not safe for concurrent use.

Types

type Agent

type Agent struct {
	// contains filtered or unexported fields
}

Agent is Koopa's main conversational agent. It provides LLM-powered conversations with tool calling and knowledge base integration.

Agent is stateless and uses dependency injection. Required parameters are provided via Config struct.

All configuration values are captured immutably at construction time to ensure thread-safe concurrent access.

func New

func New(cfg Config) (*Agent, error)

New creates a new Agent with required configuration.

RAG context is provided by knowledge tools (search_documents, search_history, search_system_knowledge) which the LLM calls when it determines context is needed.

NOTE: The LLM model is configured in prompts/koopa.prompt, not via Config.

Example:

agent, err := chat.New(chat.Config{
    Genkit:       g,
    SessionStore: sessionStore,
    Logger:       logger,
    Tools:        tools,  // Pre-registered via RegisterXxxTools()
    MaxTurns:     cfg.MaxTurns,
    Language:     cfg.Language,
})

func (*Agent) DefineFlow

func (a *Agent) DefineFlow(g *genkit.Genkit) *Flow

DefineFlow defines the Genkit Streaming Flow for Chat Agent. Supports both streaming (via callback) and non-streaming modes.

IMPORTANT: Use NewFlow() instead of calling DefineFlow() directly. DefineFlow registers a global Flow; calling it twice causes panic.

Each Agent has its own dedicated Flow, responsible for: 1. Observability (Genkit DevUI tracing) 2. Type safety (Input/Output schema) 3. HTTP endpoint exposure via genkit.Handler() 4. Streaming support for real-time output

Design: Flow is a lightweight wrapper, Agent.ExecuteStream() contains core logic

Error Handling : - Errors are now properly returned using sentinel errors from agent package - Genkit tracing will correctly show error spans - HTTP handlers can use errors.Is() to determine error type and HTTP status

func (*Agent) Execute

func (a *Agent) Execute(ctx context.Context, sessionID uuid.UUID, input string) (*Response, error)

Execute runs the chat agent with the given input (non-streaming). This is a convenience wrapper around ExecuteStream with nil callback.

func (*Agent) ExecuteStream

func (a *Agent) ExecuteStream(ctx context.Context, sessionID uuid.UUID, input string, callback StreamCallback) (*Response, error)

ExecuteStream runs the chat agent with optional streaming output. If callback is non-nil, it is called for each chunk of the response as it's generated. If callback is nil, the response is generated without streaming (equivalent to Execute). The final response is always returned after generation completes.

func (*Agent) GenerateTitle

func (a *Agent) GenerateTitle(ctx context.Context, userMessage string) string

GenerateTitle generates a concise session title from the user's first message. Uses AI generation with fallback to simple truncation. Returns empty string on failure (best-effort).

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern.

func NewCircuitBreaker

func NewCircuitBreaker(cfg CircuitBreakerConfig) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker.

func (*CircuitBreaker) Allow

func (cb *CircuitBreaker) Allow() error

Allow checks if a request should be allowed. Uses exclusive lock to safely handle Open -> HalfOpen transition.

func (*CircuitBreaker) Failure

func (cb *CircuitBreaker) Failure()

Failure records a failed call.

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset resets the circuit breaker to closed state. This is primarily useful for testing.

func (*CircuitBreaker) State

func (cb *CircuitBreaker) State() CircuitState

State returns the current circuit state.

func (*CircuitBreaker) Success

func (cb *CircuitBreaker) Success()

Success records a successful call.

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	FailureThreshold int           // Failures before opening (default: 5)
	SuccessThreshold int           // Successes to close from half-open (default: 2)
	Timeout          time.Duration // Time before trying half-open (default: 30s)
}

CircuitBreakerConfig configures the circuit breaker.

func DefaultCircuitBreakerConfig

func DefaultCircuitBreakerConfig() CircuitBreakerConfig

DefaultCircuitBreakerConfig returns sensible defaults.

type CircuitState

type CircuitState int

CircuitState represents the state of the circuit breaker.

const (
	// CircuitClosed is the normal operation state.
	CircuitClosed CircuitState = iota
	// CircuitOpen rejects all requests.
	CircuitOpen
	// CircuitHalfOpen allows test requests to check recovery.
	CircuitHalfOpen
)

func (CircuitState) String

func (s CircuitState) String() string

String returns the string representation of the circuit state.

type Config

type Config struct {
	Genkit       *genkit.Genkit
	SessionStore *session.Store
	Logger       *slog.Logger
	Tools        []ai.Tool // Pre-registered tools from RegisterXxxTools()

	// Configuration values
	ModelName string // Provider-qualified model name (e.g., "googleai/gemini-2.5-flash", "ollama/llama3.3")
	MaxTurns  int    // Maximum agentic loop turns
	Language  string // Response language preference

	// Resilience configuration
	RetryConfig          RetryConfig          // LLM retry settings (zero-value uses defaults)
	CircuitBreakerConfig CircuitBreakerConfig // Circuit breaker settings (zero-value uses defaults)
	RateLimiter          *rate.Limiter        // Optional: proactive rate limiting (nil = use default)

	// Token management
	TokenBudget TokenBudget // Token budget for context window (zero-value uses defaults)

	// Memory (optional)
	MemoryStore *memory.Store // User memory store (nil = memory disabled)

	// Background lifecycle (required when MemoryStore is set).
	// BackgroundCtx outlives individual requests — used for async extraction.
	// WG tracks background goroutines for graceful shutdown.
	BackgroundCtx context.Context //nolint:containedctx // App lifecycle context, not a request context
	WG            *sync.WaitGroup
}

Config contains all required parameters for Chat agent.

type Flow

type Flow = core.Flow[Input, Output, StreamChunk]

Flow is the type alias for Chat Agent's Genkit Streaming Flow. Exported for use in api package with genkit.Handler().

func NewFlow

func NewFlow(g *genkit.Genkit, agent *Agent) *Flow

NewFlow returns the Chat Flow singleton, initializing it on first call. Subsequent calls return the existing Flow (parameters are ignored). This is safe because genkit.DefineStreamingFlow panics on re-registration.

type Input

type Input struct {
	Query     string `json:"query"`
	SessionID string `json:"sessionId"` // Required field: session ID
}

Input defines the request payload for the chat agent flow.

type Output

type Output struct {
	Response  string `json:"response"`
	SessionID string `json:"sessionId"`
}

Output defines the response payload from the chat agent flow.

type Response

type Response struct {
	FinalText    string            // Model's final text output
	ToolRequests []*ai.ToolRequest // Tool requests made during execution
}

Response represents the complete result of an agent execution.

type RetryConfig

type RetryConfig struct {
	MaxRetries      int           // Maximum number of retry attempts
	InitialInterval time.Duration // Initial backoff interval
	MaxInterval     time.Duration // Maximum backoff interval
}

RetryConfig configures the retry behavior for LLM calls.

func DefaultRetryConfig

func DefaultRetryConfig() RetryConfig

DefaultRetryConfig returns sensible defaults for LLM API calls.

type StreamCallback

type StreamCallback func(ctx context.Context, chunk *ai.ModelResponseChunk) error

StreamCallback is called for each chunk of streaming response. The chunk contains partial content that can be immediately displayed to the user. Return an error to abort the stream.

type StreamChunk

type StreamChunk struct {
	Text string `json:"text"` // Partial text chunk
}

StreamChunk is the streaming output type for Chat Flow. Each chunk contains partial text that can be immediately displayed to the user.

type TokenBudget

type TokenBudget struct {
	MaxHistoryTokens int // Maximum tokens for conversation history
	MaxMemoryTokens  int // Maximum tokens for user memory injection
}

TokenBudget manages context window limits.

func DefaultTokenBudget

func DefaultTokenBudget() TokenBudget

DefaultTokenBudget returns defaults for modern large-context models. 32K tokens ≈ 64 conversation turns — balances context retention with cost.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL