indexer

package
v1.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2026 License: MIT Imports: 25 Imported by: 0

README

多模态索引管线 (Multimodal Indexing Pipeline)

1. 架构概述

基于 github.com/DotNetAge/gochat/pkg/pipeline 提供的强类型可编程管线框架,本模块旨在构建一条灵活、高内聚的单支多模态索引管线。 该管线的核心目标是实现各类结构化/非结构化文件的自动化摄入,并打通文本与图片的多模态搜索能力。

完整的端到端流向为: 读取文件 (File/Stream) -> 多格式智能解析与分块 (Parser & Chunker) -> 多模态向量生成 (Embedding) -> 多路存储 (VectorStore / DocumentStore / GraphStore)

2. 核心挑战与解决策略:同一向量空间投影

实现多模态(文件与图片)混合搜索的难点在于如何将文本语义与视觉特征投影到同一个向量空间 (Latent Space) 中。为了让查询文本能够匹配到相关的图片,或让查询图片能够匹配到相关的文本,我们需要:

  1. 选用多模态对齐模型 (Multimodal Alignment Model) 基于 @gochat/pkg/embedding 库,我们将扩展支持如 CLIP (Contrastive Language-Image Pretraining)Chinese-CLIP (如 OFA-Sys/chinese-clip-vit-base-patch16) 等多模态模型。由于这些模型在预训练时使用了对比学习,文本和图像的输出向量已经对齐在同一维度空间(例如 512 维或 768 维)。

  2. 分离解析与统一映射

    • Parser 层:文档解析器(如 PDF、Docx 解析器)在读取文件时,需将原文档拆解为“文本块”和“图片实体”,并保留它们在原文中的邻近上下文关系(上下文锚点)。
    • Embedding 层
      • 如果当前 Chunk 是文本,调用模型的 Text Encoder(文本分支)生成稠密向量。
      • 如果当前 Chunk 是图片(Base64 或文件引用),调用模型的 Vision Encoder(视觉分支)生成稠密向量。
    • 由于两者输出在同一个向量空间中,它们可以被无缝插入同一个 VectorStore 的同一个 Collection/Index 中。
  3. 关联存储策略 为了实现图文互搜和图谱问答,数据将被多路路由:

    • VectorStore:统一存储所有生成的多模态向量,实现全局余弦相似度(Cosine Similarity)检索。
    • DocumentStore:存储原始数据(包含图文内容、元数据、所属文档信息),提供高保真的原始上下文(用于下游大模型生成)。
    • GraphStore:对文本 Chunk 提取出的实体(Entities)以及实体间的关系(Relationships)进行构建,支持基于图的推理与发现。

3. 管线步骤 (Pipeline Steps) 设计

我们基于现存的 @pkg/steps/indexing 进行重新编排与增强:

  1. DiscoverStep (发现与加载)
    • 获取目标文件元数据(路径、大小、时间等)。
  2. MultimodalParseStep (多路智能解析)
    • 使用现有的 stepinx.Multi(parsers...) 步骤,通过传入系统支持的各种格式解析器集合(如 CSV, DOCX, HTML, Image, Markdown, PDF, PPT, Text, XML, YAML 等),按文件扩展名自动路由至对应的 Parser,充分发挥现有解析能力。
    • 各个 Parser 从文件中分离文本段落及内嵌图片,向后游传递带有 type="text"type="image" 标记的 core.Document 流。
  3. SemanticChunkStep (语义与多模态分块)
    • 对长文本执行滑窗或语义分块(Semantic Chunking)。
    • 对图片执行元数据封装(可选地添加由 VLM 生成的 Image Caption)。
  4. MultimodalEmbedStep (多模态向量生成)
    • 依赖本地化、可自动下载的 gochat/pkg/embedding 库。
    • 根据 Chunk 的模态标识分别调用文本/视觉编码器,产出统一维度的 Vector。
  5. EntityExtractStep (图谱实体抽取)
    • 提取文本中的实体与关系,打上元数据,准备用于图谱(GraphStore)写入。
  6. MultiStoreStep (多路路由与持久化)
    • 将 Chunk 原文存入 DocumentStore。
    • 将生成的向量写入 VectorStore。
    • 将抽取的图谱节点/边写入 GraphStore。

4. 强类型状态上下文 (Context State)

管线通过强类型泛型(Go 1.18+)传递上下文 *IndexingState

type IndexingState struct {
    // 基础输入
    FilePath string
    Metadata core.Metadata
    
    // 多模态文档流
    Documents <-chan *core.Document
    Chunks    <-chan *core.Chunk // Chunk.Metadata 中包含 "modality": "text" | "image"
    
    // 产出物
    Vectors   []*core.Vector
    Entities  []*core.Entity
    
    // 执行统计
    TotalChunks   int
    TotalImages   int
    TotalEntities int
}

5. 管线组装示例 (Pipeline Builder)

pkg/indexer/builder.go 或类似组装类中,构建完整的单支多模态索引服务:

func BuildMultimodalIndexPipeline(
    parsers []core.Parser,                      // 接收所有支持的解析器集合
    chunker core.Chunker, 
    embedder embedding.MultimodalProvider,      // 支持文本与图片的统一对齐提供者
    entityExtractor core.EntityExtractor,
    vectorStore core.VectorStore,
    docStore core.DocumentStore,
    graphStore core.GraphStore,
) *pipeline.Pipeline[*IndexingState] {
    
    p := pipeline.New[*IndexingState]()
    
    // 按序挂载可编程 Step
    p.AddSteps(
        stepinx.Discover(),
        stepinx.Multi(parsers...),                  // ★ 自动根据文件后缀智能路由到正确的解析器
        stepinx.Chunk(chunker),                     // 语义分块
        stepinx.MultimodalEmbed(embedder),          // ★ 核心:投影至同一向量空间
        stepinx.Entities(entityExtractor, logger),  // 抽取图谱网络
        stepinx.Store(vectorStore, docStore, graphStore), // 多端持久化写入
    )
    
    return p
}

6. 后续演进

  1. Embedding 层扩展:对齐目前的 bgesentence-bert 方案,在 gochat/pkg/embedding 中补充对等尺寸的多模态视觉模型(如 CLIP)支持,并提供自动下载能力。
  2. 多模态图谱关联:除了存入 VectorStore 外,探讨是否将识别出的“图片 Chunk”作为 Node 注册进 GraphStore,形成跨越文档界限的图文多维关联网络。

Documentation

Overview

Package indexer provides high-level indexers for building RAG pipelines.

This package offers pre-configured indexer implementations:

  • DefaultNativeIndexer: Lightweight, local-first indexer for prototyping
  • DefaultAdvancedIndexer: High-performance indexer for production use
  • DefaultGraphIndexer: Knowledge graph-enhanced indexer
  • NewVectorIndexer: Custom vector-based indexer
  • NewMultimodalGraphIndexer: Multimodal and graph-capable indexer

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config added in v1.1.2

type Config struct {
	Concurrency bool
	Workers     int
}

Config defines the configuration for the indexer. It controls concurrency and worker pool settings for parallel document processing.

type Indexer

type Indexer interface {
	indexing.Indexer
	Init() error
	Start() error
	VectorStore() core.VectorStore
	DocStore() core.DocStore
	GraphStore() core.GraphStore
	Embedder() embedding.Provider
	Chunker() core.SemanticChunker
}

Indexer is the unified interface for document indexing. It provides methods for processing files and directories into vector/graph stores.

func DefaultAdvancedIndexer added in v1.1.3

func DefaultAdvancedIndexer(opts ...IndexerOption) (Indexer, error)

DefaultAdvancedIndexer creates a high-performance Indexer preset for production use. It features increased worker concurrency and optimized defaults for enterprise workloads.

func DefaultGraphIndexer added in v1.1.3

func DefaultGraphIndexer(opts ...IndexerOption) (Indexer, error)

DefaultGraphIndexer creates a Knowledge-Graph enabled Indexer preset. It integrates graph-based entity relationship extraction for complex query understanding.

func DefaultIndexer added in v1.1.2

func DefaultIndexer(opts ...IndexerOption) (Indexer, error)

DefaultIndexer is an alias for DefaultNativeIndexer, provided for backward compatibility.

func DefaultNativeIndexer added in v1.1.3

func DefaultNativeIndexer(opts ...IndexerOption) (Indexer, error)

DefaultNativeIndexer creates a light-weight, local-first Indexer. It uses default TokenChunker, local SQLite and GoVector stores, suitable for quick prototyping and testing.

func NewMultimodalGraphIndexer

func NewMultimodalGraphIndexer(
	parsers []core.Parser,
	chunker core.SemanticChunker,
	embedder embedding.MultimodalProvider,
	entityExtractor core.EntityExtractor,
	vectorStore core.VectorStore,
	docStore core.DocStore,
	graphStore core.GraphStore,
	logger logging.Logger,
	metrics core.Metrics,
	opts ...IndexerOption,
) (Indexer, error)

NewMultimodalGraphIndexer creates an advanced multimodal and graph pipeline. It supports both text and image inputs, with knowledge graph extraction capabilities.

Parameters:

  • parsers: list of document parsers
  • chunker: semantic chunker for splitting documents
  • embedder: multimodal embedding provider
  • entityExtractor: entity extractor for graph construction
  • vectorStore: vector storage backend
  • docStore: document metadata storage
  • graphStore: knowledge graph storage
  • logger: logging service
  • metrics: observability metrics service

func NewVectorIndexer

func NewVectorIndexer(
	parsers []core.Parser,
	chunker core.SemanticChunker,
	embedder embedding.Provider,
	vectorStore core.VectorStore,
	docStore core.DocStore,
	logger logging.Logger,
	metrics core.Metrics,
	opts ...IndexerOption,
) Indexer

NewVectorIndexer creates a simple text-vector pipeline for basic RAG setups.

Parameters:

  • parsers: list of document parsers
  • chunker: semantic chunker for splitting documents
  • embedder: embedding provider for vectorization
  • vectorStore: vector storage backend
  • docStore: document metadata storage
  • logger: logging service
  • metrics: observability metrics service

type IndexerOption added in v1.1.2

type IndexerOption func(*defaultIndexer)

IndexerOption defines a function to configure the indexer.

func ClearParsers added in v1.1.3

func ClearParsers() IndexerOption

ClearParsers clears the current parser registry.

func WithAllParsers added in v1.1.2

func WithAllParsers() IndexerOption

WithAllParsers enables all available builtin parsers using the global factory registry.

func WithBGE added in v1.1.3

func WithBGE(modelPath string) IndexerOption

func WithBert added in v1.1.3

func WithBert(modelPath string) IndexerOption

func WithBoltDoc added in v1.1.3

func WithBoltDoc(path string) IndexerOption

func WithCharacterChunker added in v1.1.3

func WithCharacterChunker(size, overlap int) IndexerOption

WithCharacterChunker sets a simple character-based chunker.

func WithChunker added in v1.1.2

func WithChunker(chunker core.SemanticChunker) IndexerOption

WithChunker sets the semantic chunker.

func WithClip added in v1.1.3

func WithClip(modelPath string) IndexerOption

func WithConcurrency added in v1.1.2

func WithConcurrency(enabled bool) IndexerOption

WithConcurrency enables or disables concurrent indexing.

func WithConsoleLogger added in v1.1.3

func WithConsoleLogger() IndexerOption

WithConsoleLogger configures the indexer to output logs to standard output.

func WithDefaultGoVector added in v1.1.3

func WithDefaultGoVector() IndexerOption

WithDefaultGoVector configures the indexer with an out-of-the-box local GoVector core.

func WithDefaultSQLiteDoc added in v1.1.3

func WithDefaultSQLiteDoc() IndexerOption

WithDefaultSQLiteDoc configures the indexer with an out-of-the-box local SQLite doc core.

func WithDefaultSemanticChunker added in v1.1.3

func WithDefaultSemanticChunker() IndexerOption

WithDefaultSemanticChunker configures the indexer with an out-of-the-box Semantic chunker.

func WithDocStore added in v1.1.3

func WithDocStore(s core.DocStore) IndexerOption

WithDocStore sets a custom document core.

func WithEmbedding added in v1.1.2

func WithEmbedding(embedder embedding.Provider) IndexerOption

WithEmbedding sets the embedding provider explicitly.

func WithExtractor added in v1.1.2

func WithExtractor(extractor core.EntityExtractor) IndexerOption

WithExtractor sets the entity extractor.

func WithFileLogger added in v1.1.3

func WithFileLogger(path string) IndexerOption

WithFileLogger configures the indexer to output logs to a specific file.

func WithGoVector added in v1.1.3

func WithGoVector(collection string, path string, dimension int) IndexerOption

func WithGraph added in v1.1.2

func WithGraph(graphStore core.GraphStore) IndexerOption

WithGraph sets the graph store explicitly.

func WithLogger added in v1.1.2

func WithLogger(logger logging.Logger) IndexerOption

WithLogger sets the logger.

func WithMetrics added in v1.1.2

func WithMetrics(metrics core.Metrics) IndexerOption

WithMetrics sets the metrics recorder.

func WithMilvus added in v1.1.3

func WithMilvus(collection string, addr string, dimension int) IndexerOption

func WithName added in v1.1.4

func WithName(name string) IndexerOption

WithName sets a unique name for the indexer instance, used for resource isolation.

func WithNeoGraph added in v1.1.3

func WithNeoGraph(uri, username, password, dbName string) IndexerOption

func WithOpenAI added in v1.1.3

func WithOpenAI(apiKey string, model string) IndexerOption

func WithOpenTelemetryTracer added in v1.1.3

func WithOpenTelemetryTracer(ctx context.Context, endpoint string, serviceName string) IndexerOption

WithOpenTelemetryTracer configures the indexer to send distributed traces to an OTel exporter. endpoint is the gRPC endpoint of the collector (e.g., "localhost:4317").

func WithParsers added in v1.1.2

func WithParsers(parsers ...core.Parser) IndexerOption

WithParsers adds custom parsers to the registry.

func WithPinecone added in v1.1.3

func WithPinecone(indexName string, apiKey string, dimension int) IndexerOption

func WithPrometheusMetrics added in v1.1.3

func WithPrometheusMetrics(addr string) IndexerOption

WithPrometheusMetrics configures the indexer to collect and expose metrics via Prometheus. It will start an HTTP server on the given address (e.g., ":8080") to serve the /metrics endpoint.

func WithQdrant added in v1.1.3

func WithQdrant(collection string, host string, port int, dimension int) IndexerOption

func WithSQLDoc added in v1.1.3

func WithSQLDoc(path string) IndexerOption

func WithStore added in v1.1.2

func WithStore(vectorStore core.VectorStore, docStore core.DocStore) IndexerOption

WithStore sets the vector and document stores explicitly.

func WithTokenChunker added in v1.1.3

func WithTokenChunker(size, overlap int, model string) IndexerOption

WithTokenChunker sets an accurate token-based chunker.

func WithTriplesExtractor added in v1.1.6

func WithTriplesExtractor(extractor core.TriplesExtractor) IndexerOption

WithTriplesExtractor sets the triples extractor for GraphRAG. This enables automatic knowledge graph construction from documents. When set, IndexDocuments will extract entities and relationships from text and store them in GraphStore with document_id for cascade delete support.

func WithVectorStore added in v1.1.3

func WithVectorStore(s core.VectorStore) IndexerOption

WithVectorStore sets a custom vector core.

func WithWatchDir added in v1.1.2

func WithWatchDir(dirs ...string) IndexerOption

WithWatchDir adds directories to watch for changes.

func WithWeaviate added in v1.1.3

func WithWeaviate(collection string, addr string, apiKey string, dimension int) IndexerOption

func WithWorkers added in v1.1.2

func WithWorkers(workers int) IndexerOption

WithWorkers sets the number of workers for concurrent indexing.

func WithZapLogger added in v1.1.3

func WithZapLogger(path string, maxSizeMB, maxDays, maxBackups int, console bool) IndexerOption

WithZapLogger configures the indexer to use a production-grade Zap logger with log rotation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL