session

package
v1.3.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package session provides session abstractions for managing conversations. Sessions wrap pipelines and provide convenient APIs for text and duplex streaming interactions.

Package session provides session abstractions for managing conversations.

Index

Constants

View Source
const DefaultDrainTimeout = 30 * time.Second

DefaultDrainTimeout is the maximum time to wait for pipeline completion during Drain.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncToolCheckResult added in v1.3.14

type AsyncToolCheckResult struct {
	// ShouldWait is true if the tool requires human approval before execution.
	ShouldWait bool
	// PendingInfo provides context when ShouldWait is true.
	PendingInfo *tools.PendingToolInfo
	// Handled is true when the checker executed the handler directly
	// (i.e., the tool was an async tool but the check passed, so it ran immediately).
	Handled bool
	// HandlerResult contains the JSON-encoded result when Handled is true.
	HandlerResult json.RawMessage
	// HandlerError is set when Handled is true but execution failed.
	HandlerError error
}

AsyncToolCheckResult describes the outcome of an HITL check on a tool call.

type AsyncToolChecker added in v1.3.14

type AsyncToolChecker func(callID, name string, args map[string]any) *AsyncToolCheckResult

AsyncToolChecker is called before executing a tool to determine if it requires human approval. Returns nil if the tool is not an async tool (falls through to normal registry execution).

type BaseSession

type BaseSession interface {
	// Identity
	ID() string

	// Variable management for template substitution
	Variables() map[string]string
	SetVar(name, value string)
	GetVar(name string) (string, bool)

	// State management - encapsulates StateStore access
	Messages(ctx context.Context) ([]types.Message, error)
	Clear(ctx context.Context) error
}

BaseSession represents core session capabilities shared by all session types.

type DuplexSession

type DuplexSession interface {
	BaseSession

	// SendChunk sends a chunk to the session (populate MediaDelta for media, Content for text).
	// This method is thread-safe and can be called from multiple goroutines.
	SendChunk(ctx context.Context, chunk *providers.StreamChunk) error

	// SendText is a convenience method for sending text directly.
	SendText(ctx context.Context, text string) error

	// SendFrame sends an image frame to the session for realtime video scenarios.
	// This is a convenience method that wraps SendChunk with proper image formatting.
	SendFrame(ctx context.Context, frame *ImageFrame) error

	// SendVideoChunk sends a video chunk to the session for encoded video streaming.
	// This is a convenience method that wraps SendChunk with proper video formatting.
	SendVideoChunk(ctx context.Context, chunk *VideoChunk) error

	// Response returns a receive-only channel for streaming responses.
	// The channel emits StreamChunks containing LLM responses (text, media, tool calls, etc).
	Response() <-chan providers.StreamChunk

	// Close ends the streaming session and releases resources.
	Close() error

	// Drain gracefully stops the session: sends EndOfStream to the pipeline,
	// waits for the provider to finish processing (up to the given timeout),
	// then closes. Returns nil if the pipeline completes within the timeout.
	Drain(ctx context.Context) error

	// Done returns a channel that's closed when the session ends.
	Done() <-chan struct{}

	// Error returns any error that occurred during the session.
	Error() error

	// SubmitToolResults sends resolved/rejected tool results back into the
	// duplex pipeline so they flow to the provider via ToolResponseSupport.
	// Used after HITL approval or client tool fulfillment.
	SubmitToolResults(ctx context.Context, responses []providers.ToolResponse) error

	// ForkSession creates a new session that is a fork of this one.
	// The new session will have an independent copy of the conversation state.
	ForkSession(
		ctx context.Context,
		forkID string,
		pipelineBuilder PipelineBuilder,
	) (DuplexSession, error)
}

DuplexSession manages bidirectional streaming conversations. Uses providers.StreamChunk for BOTH input and output for API symmetry.

func NewDuplexSession

func NewDuplexSession(ctx context.Context, cfg *DuplexSessionConfig) (DuplexSession, error)

NewDuplexSession creates a bidirectional session from a config. PipelineBuilder and Provider are required.

If Config is provided (ASM mode):

  • Passes streaming provider + base config to PipelineBuilder
  • DuplexProviderStage creates session lazily using system_prompt from element metadata
  • Single long-running pipeline execution

If Config is nil (VAD mode):

  • No streaming provider or config
  • Calls PipelineBuilder with nil streaming provider
  • VAD middleware triggers multiple pipeline executions

type DuplexSessionConfig

type DuplexSessionConfig struct {
	ConversationID   string
	UserID           string
	StateStore       statestore.Store                // StateStore for conversation history
	PipelineBuilder  PipelineBuilder                 // Function to build pipeline (required, typically a closure from SDK)
	Provider         providers.Provider              // Provider for LLM calls (required)
	Config           *providers.StreamingInputConfig // For ASM mode: base streaming config. For VAD mode: nil
	ToolRegistry     *tools.Registry                 // Optional: for executing tool calls
	AsyncToolChecker AsyncToolChecker                // Optional: HITL gate for tool calls
	Metadata         map[string]interface{}
	Variables        map[string]string // Initial variables for template substitution
}

DuplexSessionConfig configures a DuplexSession.

PipelineBuilder and Provider are required. PipelineBuilder is typically a closure created in SDK that captures configuration.

Two modes based on Config field:

ASM Mode (Config provided):

  • Pipeline is the single source of truth
  • DuplexProviderStage creates the session lazily using system_prompt from element metadata
  • PipelineBuilder receives streaming provider + base config (NOT a pre-created session)
  • Single long-running pipeline execution for continuous streaming

VAD Mode (Config nil):

  • No provider session created
  • Calls PipelineBuilder with nil streaming provider
  • Builder creates pipeline with VAD middleware and provider middleware for one-shot calls
  • Multiple pipeline executions, one per detected turn

StateStore should match what's configured in the Pipeline middleware.

type ImageFrame added in v1.1.8

type ImageFrame struct {
	// Data is the raw image data (JPEG, PNG, etc.)
	Data []byte

	// MIMEType is the MIME type of the image (e.g., "image/jpeg")
	MIMEType string

	// Width is the image width in pixels (optional, for metadata)
	Width int

	// Height is the image height in pixels (optional, for metadata)
	Height int

	// FrameNum is the sequence number for ordering frames
	FrameNum int64

	// Timestamp is when the frame was captured
	Timestamp time.Time
}

ImageFrame represents an image frame for streaming. Use this with DuplexSession.SendFrame() for realtime video scenarios.

type PipelineBuilder

type PipelineBuilder func(
	ctx context.Context,
	provider providers.Provider,
	streamProvider providers.StreamInputSupport,
	streamConfig *providers.StreamingInputConfig,
	conversationID string,
	store statestore.Store,
) (*stage.StreamPipeline, error)

PipelineBuilder creates a StreamPipeline for a DuplexSession. This is typically a closure created in SDK that captures configuration.

For ASM mode: streamProvider will be non-nil, builder passes it to DuplexProviderStage

which creates the session lazily using system_prompt from element metadata.

For VAD mode: streamProvider will be nil, builder creates pipeline with VAD/TTS stages.

type UnarySession

type UnarySession interface {
	BaseSession

	// Execution methods
	Execute(ctx context.Context, role, content string) (*pipeline.ExecutionResult, error)
	ExecuteWithMessage(ctx context.Context, message types.Message) (*pipeline.ExecutionResult, error)
	ExecuteStream(ctx context.Context, role, content string) (<-chan providers.StreamChunk, error)
	ExecuteStreamWithMessage(ctx context.Context, message types.Message) (<-chan providers.StreamChunk, error)

	// ResumeWithToolResults injects tool result messages into the session
	// history and re-executes the pipeline so the LLM can continue.
	ResumeWithToolResults(ctx context.Context, toolResults []types.Message) (*pipeline.ExecutionResult, error)

	// ResumeStreamWithToolResults is the streaming equivalent of ResumeWithToolResults.
	// It injects tool result messages and returns a streaming channel.
	ResumeStreamWithToolResults(ctx context.Context, toolResults []types.Message) (<-chan providers.StreamChunk, error)

	// ForkSession creates a new session that is a fork of this one.
	// The new session will have an independent copy of the conversation state.
	ForkSession(ctx context.Context, forkID string, pipeline *stage.StreamPipeline) (UnarySession, error)
}

UnarySession manages unary (request/response) conversations with multimodal support.

func NewUnarySession

func NewUnarySession(cfg UnarySessionConfig) (UnarySession, error)

NewUnarySession creates a new unary session.

type UnarySessionConfig

type UnarySessionConfig struct {
	ConversationID string
	UserID         string
	StateStore     statestore.Store // Must match Pipeline's StateStore stages
	Pipeline       *stage.StreamPipeline
	Metadata       map[string]interface{}
	Variables      map[string]string // Initial variables for template substitution
}

UnarySessionConfig configures a TextSession. StateStore should match what's configured in the Pipeline stages.

type VideoChunk added in v1.1.8

type VideoChunk struct {
	// Data is the encoded video data (H.264, VP8, etc.)
	Data []byte

	// MIMEType is the MIME type of the video (e.g., "video/h264")
	MIMEType string

	// Width is the video width in pixels (optional, for metadata)
	Width int

	// Height is the video height in pixels (optional, for metadata)
	Height int

	// ChunkIndex is the sequence number for ordering chunks
	ChunkIndex int

	// IsKeyFrame indicates if this chunk contains a keyframe
	IsKeyFrame bool

	// Timestamp is when the chunk was created/captured
	Timestamp time.Time

	// Duration is the duration of this video chunk
	Duration time.Duration
}

VideoChunk represents a video chunk for streaming. Use this with DuplexSession.SendVideoChunk() for encoded video segments.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL