pipeline

package
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrorDetail

type ErrorDetail struct {
	File      string    `json:"file"`
	Error     string    `json:"error"`
	Timestamp time.Time `json:"timestamp"`
}

ErrorDetail contains information about a processing error

type FileInfo

type FileInfo struct {
	Path    string
	Size    int64
	ModTime time.Time
	Type    FileType
	Hash    string // SHA256 for deduplication
}

FileInfo represents a discovered file

type FileScanner

type FileScanner struct {
	// contains filtered or unexported fields
}

FileScanner handles file discovery and filtering

func NewFileScanner

func NewFileScanner(root, pattern string, exclude []string, recursive bool) *FileScanner

NewFileScanner creates a new file scanner

func (*FileScanner) Scan

func (s *FileScanner) Scan(ctx context.Context) ([]FileInfo, error)

Scan discovers files matching the criteria

type FileType

type FileType string

FileType represents the type of file

const (
	FileTypePDF     FileType = "pdf"
	FileTypeTXT     FileType = "txt"
	FileTypeMD      FileType = "md"
	FileTypeJSON    FileType = "json"
	FileTypeYAML    FileType = "yaml"
	FileTypeImage   FileType = "image"
	FileTypeUnknown FileType = "unknown"
)

type IngestOptions

type IngestOptions struct {
	// Source configuration
	Source    string   // File or directory path
	Glob      string   // Glob pattern (e.g., "**/*.pdf")
	Exclude   []string // Exclusion patterns
	Recursive bool     // Recursive directory scan (default: true)

	// Target configuration
	Collection string // Target collection name
	VDBType    string // Vector database type

	// Processing configuration
	BatchSize int               // Documents per batch (default: 100)
	Workers   int               // Concurrent workers (default: 4)
	Metadata  map[string]string // Additional metadata for all docs

	// Operation modes
	DryRun    bool   // Preview without executing
	Resume    bool   // Resume from previous state
	StateFile string // State file path for resume

	// Output configuration
	OutputFormat string // json, yaml, table
	ReportFile   string // Report output file
	Quiet        bool   // Suppress progress output
}

IngestOptions configures the ingestion pipeline

type IngestReport

type IngestReport struct {
	Status    string    `json:"status"` // success, partial, failure
	StartTime time.Time `json:"start_time"`
	EndTime   time.Time `json:"end_time"`
	Duration  float64   `json:"duration_seconds"`

	// File statistics
	FilesScanned   int `json:"files_scanned"`
	FilesProcessed int `json:"files_processed"`
	FilesSkipped   int `json:"files_skipped"`
	FilesFailed    int `json:"files_failed"`

	// Document statistics
	DocumentsCreated int `json:"documents_created"`
	DocumentsFailed  int `json:"documents_failed"`

	// Performance metrics
	ThroughputFiles float64 `json:"throughput_files_per_sec"`
	ThroughputDocs  float64 `json:"throughput_docs_per_sec"`

	// Error details
	Errors []ErrorDetail `json:"errors,omitempty"`

	// Configuration
	Collection string `json:"collection"`
	VDBType    string `json:"vdb_type"`
	BatchSize  int    `json:"batch_size"`
	Workers    int    `json:"workers"`
}

IngestReport is the final summary

type IngestState

type IngestState struct {
	ProcessedFiles map[string]bool `json:"processed_files"`
	LastUpdated    time.Time       `json:"last_updated"`
}

IngestState tracks processing state for resume capability

type ProcessingResult

type ProcessingResult struct {
	File          FileInfo
	Documents     []*vectordb.Document
	DocumentCount int
	Error         error
	Duration      time.Duration
}

ProcessingResult represents the result of processing a file

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor handles document processing and ingestion

func NewProcessor

func NewProcessor(vdbClient vectordb.VectorDBClient, llmClient *llm.OpenAIClient, options *IngestOptions, progress *ProgressTracker) *Processor

NewProcessor creates a new document processor

func (*Processor) ProcessFiles

func (p *Processor) ProcessFiles(ctx context.Context, files []FileInfo) (*IngestReport, error)

ProcessFiles processes files concurrently and creates documents in batches

type ProgressTracker

type ProgressTracker struct {
	// contains filtered or unexported fields
}

ProgressTracker handles progress reporting for pipeline operations

func NewProgressTracker

func NewProgressTracker(enabled bool, quiet bool) *ProgressTracker

NewProgressTracker creates a new progress tracker

func (*ProgressTracker) FinishBatching

func (p *ProgressTracker) FinishBatching()

FinishBatching completes batch creation

func (*ProgressTracker) FinishProcessing

func (p *ProgressTracker) FinishProcessing()

FinishProcessing completes the progress bar

func (*ProgressTracker) FinishScanning

func (p *ProgressTracker) FinishScanning(count int)

FinishScanning completes scanning progress

func (*ProgressTracker) GetWriter

func (p *ProgressTracker) GetWriter() io.Writer

GetWriter returns a writer for the progress output

func (*ProgressTracker) ShowError

func (p *ProgressTracker) ShowError(msg string)

ShowError displays an error message

func (*ProgressTracker) ShowInfo

func (p *ProgressTracker) ShowInfo(msg string)

ShowInfo displays an info message

func (*ProgressTracker) ShowWarning

func (p *ProgressTracker) ShowWarning(msg string)

ShowWarning displays a warning message

func (*ProgressTracker) StartBatching

func (p *ProgressTracker) StartBatching(totalDocs int, batchSize int)

StartBatching shows batch creation progress

func (*ProgressTracker) StartProcessing

func (p *ProgressTracker) StartProcessing(total int, batchSize int, workers int)

StartProcessing creates a progress bar for file processing

func (*ProgressTracker) StartScanning

func (p *ProgressTracker) StartScanning()

StartScanning shows scanning progress

func (*ProgressTracker) UpdateBatch

func (p *ProgressTracker) UpdateBatch(batchNum int, totalBatches int, docsInBatch int)

UpdateBatch shows progress for batch operations

func (*ProgressTracker) UpdateProgress

func (p *ProgressTracker) UpdateProgress(delta int)

UpdateProgress increments the progress bar

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL