chain

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CollectStream

func CollectStream(ctx context.Context, reader protocol.StreamReader) (*protocol.ChatResponse, error)

CollectStream collects all chunks into a response.

func FollowUpRequest

func FollowUpRequest(input protocol.ChatRequest, result *protocol.ChatResponse) protocol.ChatRequest

func TransformStream

func TransformStream(reader protocol.StreamReader, transformer StreamTransformer) (protocol.StreamReader, error)

TransformStream applies a transformer to each chunk.

Types

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder provides a fluent API for building chains.

func NewBuilder

func NewBuilder(name string) *Builder

NewBuilder creates a new Builder.

func (*Builder) AddModel

func (b *Builder) AddModel(name string, model protocol.ChatModel) *Builder

AddModel adds a ChatModel node to the chain.

func (*Builder) AddNode

func (b *Builder) AddNode(node Node) *Builder

AddNode adds a custom node to the chain.

func (*Builder) AddProcessor

func (b *Builder) AddProcessor(name string, fn ProcessorFunc) *Builder

AddProcessor adds a processor node to the chain.

func (*Builder) Build

func (b *Builder) Build() *NodeChain

Build creates the chain.

type GenericChain

type GenericChain[I, O any] struct {
	// contains filtered or unexported fields
}

GenericChain represents a sequence of operations with type-safe input and output.

func NewGenericChain

func NewGenericChain[I, O any](name string) *GenericChain[I, O]

NewGenericChain creates a new generic chain.

func (*GenericChain[I, O]) AppendLambda

func (c *GenericChain[I, O]) AppendLambda(name string, fn func(context.Context, any) (any, error)) *GenericChain[I, O]

AppendLambda adds a transformation lambda.

func (*GenericChain[I, O]) AppendModel

func (c *GenericChain[I, O]) AppendModel(name string, model protocol.ChatModel) *GenericChain[I, O]

AppendModel adds a chat model step.

func (*GenericChain[I, O]) AppendPrompt

func (c *GenericChain[I, O]) AppendPrompt(name string, template PromptTemplate) *GenericChain[I, O]

AppendPrompt adds a prompt template step.

func (*GenericChain[I, O]) Compile

func (c *GenericChain[I, O]) Compile(ctx context.Context) error

Compile validates the chain.

func (*GenericChain[I, O]) Invoke

func (c *GenericChain[I, O]) Invoke(ctx context.Context, input I) (O, error)

Invoke executes the chain.

type GenericStep

type GenericStep interface {
	// contains filtered or unexported methods
}

GenericStep is a single step in a generic chain.

type ModelNode

type ModelNode struct {
	// contains filtered or unexported fields
}

ModelNode wraps a ChatModel as a Node.

func NewModelNode

func NewModelNode(name string, model protocol.ChatModel) *ModelNode

NewModelNode creates a new ModelNode.

func (*ModelNode) Collect

Collect reads all chunks from a stream into a response.

func (*ModelNode) Invoke

Invoke calls the model synchronously.

func (*ModelNode) Name

func (n *ModelNode) Name() string

Name returns the node's name.

func (*ModelNode) Stream

Stream calls the model with streaming.

func (*ModelNode) Transform

Transform applies a pass-through transformation to each chunk.

type Node

type Node interface {
	Runnable
	Name() string
}

Node represents a single node in a processing chain.

type NodeChain

type NodeChain struct {
	// contains filtered or unexported fields
}

NodeChain represents a sequence of nodes that can be chained together. This is the legacy API, prefer using NewChain for new code.

func New

func New(name string, nodes ...Node) *NodeChain

New is an alias for NewNodeChain for backwards compatibility. Deprecated: Use NewNodeChain for new code.

func NewNodeChain

func NewNodeChain(name string, nodes ...Node) *NodeChain

NewNodeChain creates a new node chain with the given nodes.

func (*NodeChain) Collect

Collect reads all chunks from a stream into a response.

func (*NodeChain) CollectBatch added in v1.1.0

func (c *NodeChain) CollectBatch(ctx context.Context, readers []protocol.StreamReader) ([]*protocol.ChatResponse, error)

CollectBatch collects multiple streams into responses (流进批出) Each stream is collected into a complete response

func (*NodeChain) Invoke

Invoke executes all nodes in sequence synchronously.

func (*NodeChain) InvokeBatch added in v1.1.0

func (c *NodeChain) InvokeBatch(ctx context.Context, inputs []protocol.ChatRequest) ([]*protocol.ChatResponse, error)

InvokeBatch executes the chain on multiple requests synchronously (批进批出) Returns responses in the same order as inputs

func (*NodeChain) Name

func (c *NodeChain) Name() string

Name returns the chain's name.

func (*NodeChain) Stream

Stream executes all nodes in sequence with streaming on the final node.

func (*NodeChain) StreamBatch added in v1.1.0

func (c *NodeChain) StreamBatch(ctx context.Context, inputs []protocol.ChatRequest) (protocol.StreamReader, error)

StreamBatch executes the chain on multiple requests with streaming (批进流出) Returns a stream of responses

func (*NodeChain) Transform

Transform applies a transformation to each chunk in a stream.

func (*NodeChain) TransformBatch added in v1.1.0

func (c *NodeChain) TransformBatch(ctx context.Context, readers []protocol.StreamReader) ([]protocol.StreamReader, error)

TransformBatch applies transformations to multiple streams (流进流出) Returns transformed streams

type ProcessingStream

type ProcessingStream struct {
	// contains filtered or unexported fields
}

ProcessingStream wraps a stream and applies a processor to each chunk.

func NewProcessingStream

func NewProcessingStream(upstream protocol.StreamReader, processor StreamProcessor) *ProcessingStream

NewProcessingStream creates a new ProcessingStream.

func (*ProcessingStream) Close

func (s *ProcessingStream) Close() error

Close closes the stream.

func (*ProcessingStream) Metrics

func (s *ProcessingStream) Metrics() metrics.CallMetrics

Metrics returns the upstream metrics.

func (*ProcessingStream) Recv

Recv receives and processes the next chunk.

type ProcessorFunc

type ProcessorFunc func(ctx context.Context, resp *protocol.ChatResponse) (*protocol.ChatResponse, error)

ProcessorFunc processes a ChatResponse.

type ProcessorNode

type ProcessorNode struct {
	// contains filtered or unexported fields
}

ProcessorNode wraps a processor function as a Node.

func NewProcessorNode

func NewProcessorNode(name string, fn ProcessorFunc) *ProcessorNode

NewProcessorNode creates a new ProcessorNode.

func (*ProcessorNode) Collect

Collect reads all chunks from a stream and applies the processor.

func (*ProcessorNode) Invoke

Invoke executes the processor function. Note: ProcessorNode should not be used as the first node in a chain. Use ProcessResult instead when you have a previous response to process.

func (*ProcessorNode) Name

func (n *ProcessorNode) Name() string

Name returns the node's name.

func (*ProcessorNode) ProcessResult

func (n *ProcessorNode) ProcessResult(ctx context.Context, input *protocol.ChatResponse) (*protocol.ChatResponse, error)

ProcessResult processes an existing response (for use in chains).

func (*ProcessorNode) Stream

Stream is not supported for processor nodes.

func (*ProcessorNode) Transform

Transform is not supported for processor nodes.

type PromptTemplate

type PromptTemplate interface {
	Render(data map[string]any) ([]protocol.Message, error)
}

PromptTemplate defines a template that renders messages.

type Runnable

Runnable represents a composable unit that can be invoked or streamed.

type SimplePrompt

type SimplePrompt struct {
	// contains filtered or unexported fields
}

SimplePrompt creates a prompt template from system and user templates.

func NewSimplePrompt

func NewSimplePrompt(system, user string) *SimplePrompt

NewSimplePrompt creates a simple prompt with system and user messages.

func (*SimplePrompt) Render

func (p *SimplePrompt) Render(data map[string]any) ([]protocol.Message, error)

type StreamProcessor

type StreamProcessor interface {
	ProcessChunk(ctx context.Context, chunk *protocol.ChatStreamChunk) (*protocol.ChatStreamChunk, error)
}

StreamProcessor processes chunks from a stream.

type StreamProcessorFunc

type StreamProcessorFunc func(ctx context.Context, chunk *protocol.ChatStreamChunk) (*protocol.ChatStreamChunk, error)

StreamProcessorFunc is a function that processes stream chunks.

func (StreamProcessorFunc) ProcessChunk

ProcessChunk implements StreamProcessor.

type StreamProcessorNode

type StreamProcessorNode struct {
	// contains filtered or unexported fields
}

StreamProcessorNode wraps a stream processor as a Node.

func NewStreamProcessorNode

func NewStreamProcessorNode(name string, processor StreamProcessor) *StreamProcessorNode

NewStreamProcessorNode creates a new StreamProcessorNode.

func (*StreamProcessorNode) Collect

Collect is not supported for stream processor nodes.

func (*StreamProcessorNode) Invoke

Invoke is not supported for stream processor nodes.

func (*StreamProcessorNode) Name

func (n *StreamProcessorNode) Name() string

Name returns the node's name.

func (*StreamProcessorNode) Stream

Stream is not supported without an upstream stream.

func (*StreamProcessorNode) Transform

Transform applies the stream processor to each chunk.

type StreamTransformer

type StreamTransformer interface {
	Transform(ctx context.Context, chunk *protocol.ChatStreamChunk) (*protocol.ChatStreamChunk, error)
}

StreamTransformer transforms chunks.

type StreamTransformerFunc

type StreamTransformerFunc func(ctx context.Context, chunk *protocol.ChatStreamChunk) (*protocol.ChatStreamChunk, error)

StreamTransformerFunc is a function adapter for StreamTransformer.

func (StreamTransformerFunc) Transform

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL