streaming

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package streaming provides memory-efficient processing for large inputs (>500K tokens).

Index

Constants

View Source
const (
	// DefaultChunkSize is the default size for streaming chunks (~4K tokens).
	DefaultChunkSize = 16 * 1024 // 16KB

	// MaxMemoryBuffer is the maximum memory buffer size.
	MaxMemoryBuffer = 100 * 1024 * 1024 // 100MB
)

Variables

This section is empty.

Functions

func EstimateTokens

func EstimateTokens(content string) int

EstimateTokens estimates the token count for content.

func ProcessLargeContent

func ProcessLargeContent(content string, processor ChunkProcessor) (string, error)

ProcessLargeContent processes large content in chunks.

func ShouldStream

func ShouldStream(content string) bool

ShouldStream determines if content should use streaming.

Types

type ArenaAllocator

type ArenaAllocator struct {
	// contains filtered or unexported fields
}

ArenaAllocator provides arena-based memory allocation.

func NewArena

func NewArena(size int) *ArenaAllocator

NewArena creates a new arena.

func (*ArenaAllocator) Alloc

func (a *ArenaAllocator) Alloc(size int) []byte

Alloc allocates memory from the arena.

func (*ArenaAllocator) Reset

func (a *ArenaAllocator) Reset()

Reset resets the arena.

func (*ArenaAllocator) Used

func (a *ArenaAllocator) Used() int

Used returns used memory.

type BackpressureController

type BackpressureController struct {
	// contains filtered or unexported fields
}

func NewBackpressureController

func NewBackpressureController(maxPending int) *BackpressureController

func (*BackpressureController) IsBlocked

func (b *BackpressureController) IsBlocked() bool

func (*BackpressureController) Release

func (b *BackpressureController) Release()

func (*BackpressureController) TryAcquire

func (b *BackpressureController) TryAcquire() bool

type Chunk

type Chunk struct {
	Data       []byte
	Offset     int64
	IsLast     bool
	TokenCount int
}

Chunk represents a chunk of content for processing.

type ChunkProcessor

type ChunkProcessor interface {
	Process(chunk []byte) ([]byte, int, error)
	Name() string
}

ChunkProcessor processes individual chunks.

type FilterProcessor

type FilterProcessor struct {
	// contains filtered or unexported fields
}

FilterProcessor adapts filter.Engine to ChunkProcessor.

func (*FilterProcessor) Name

func (p *FilterProcessor) Name() string

Name implements ChunkProcessor.

func (*FilterProcessor) Process

func (p *FilterProcessor) Process(chunk []byte) ([]byte, int, error)

Process implements ChunkProcessor.

type MemoryMappedFile

type MemoryMappedFile struct {
	// contains filtered or unexported fields
}

MemoryMappedFile provides memory-mapped file access.

func MapFile

func MapFile(path string) (*MemoryMappedFile, error)

MapFile maps a file into memory.

func (*MemoryMappedFile) Data

func (m *MemoryMappedFile) Data() []byte

Data returns the mapped data.

func (*MemoryMappedFile) Size

func (m *MemoryMappedFile) Size() int64

Size returns the file size.

func (*MemoryMappedFile) Unmap

func (m *MemoryMappedFile) Unmap() error

Unmap unmaps the file.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline processes content in streaming fashion.

func NewPipeline

func NewPipeline(processor ChunkProcessor) *Pipeline

NewPipeline creates a new streaming pipeline.

func (*Pipeline) Process

func (p *Pipeline) Process(reader io.Reader, writer io.Writer) error

ProcessReader processes from io.Reader.

func (*Pipeline) ProcessReader

func (p *Pipeline) ProcessReader(reader io.Reader, writer io.Writer) (Stats, error)

ProcessReader processes content from a reader.

func (*Pipeline) SetChunkSize

func (p *Pipeline) SetChunkSize(size int)

SetChunkSize sets the chunk size.

func (*Pipeline) SetMaxMemory

func (p *Pipeline) SetMaxMemory(max int64)

SetMaxMemory sets the maximum memory.

func (*Pipeline) SetWorkerCount

func (p *Pipeline) SetWorkerCount(count int)

SetWorkerCount sets the number of workers.

type Result

type Result struct {
	Data       []byte
	Offset     int64
	Saved      int
	TokenCount int
	Error      error
}

Result represents processed chunk output.

type Stats

type Stats struct {
	TotalChunks int64
	TotalSaved  int64
	TotalOutput int64
	PeakMemory  int64
}

Stats provides streaming statistics.

type StreamingMetrics

type StreamingMetrics struct {
	TotalChunks    int     `json:"total_chunks"`
	ProcessedBytes int     `json:"processed_bytes"`
	DurationMs     float64 `json:"duration_ms"`
	ThroughputMBs  float64 `json:"throughput_mbs"`
}

type StreamingProcessor

type StreamingProcessor struct {
	// contains filtered or unexported fields
}

func NewStreamingProcessor

func NewStreamingProcessor() *StreamingProcessor

func (*StreamingProcessor) Process

func (sp *StreamingProcessor) Process(input string, processor func(string) string) string

func (*StreamingProcessor) ProcessWithMetrics

func (sp *StreamingProcessor) ProcessWithMetrics(input string, processor func(string) string) (string, *StreamingMetrics)

func (*StreamingProcessor) SetAdaptive

func (sp *StreamingProcessor) SetAdaptive(adaptive bool)

func (*StreamingProcessor) SetChunkSize

func (sp *StreamingProcessor) SetChunkSize(size int)

func (*StreamingProcessor) SetThreshold

func (sp *StreamingProcessor) SetThreshold(threshold int)

func (*StreamingProcessor) ShouldStream

func (sp *StreamingProcessor) ShouldStream(input string) bool

type StreamingWriter

type StreamingWriter struct {
	// contains filtered or unexported fields
}

StreamingWriter writes large output efficiently.

func NewStreamingWriter

func NewStreamingWriter(w io.Writer, bufferSize int) *StreamingWriter

NewStreamingWriter creates a streaming writer.

func (*StreamingWriter) Close

func (s *StreamingWriter) Close() error

Close closes the writer.

func (*StreamingWriter) Flush

func (s *StreamingWriter) Flush() error

Flush flushes the buffer.

func (*StreamingWriter) Write

func (s *StreamingWriter) Write(p []byte) (n int, err error)

Write implements io.Writer.

func (*StreamingWriter) Written

func (s *StreamingWriter) Written() int64

Written returns total bytes written.

type ZeroCopyReader

type ZeroCopyReader struct {
	// contains filtered or unexported fields
}

ZeroCopyReader provides zero-copy reading.

func NewZeroCopyReader

func NewZeroCopyReader(data []byte) *ZeroCopyReader

NewZeroCopyReader creates a zero-copy reader.

func (*ZeroCopyReader) Read

func (z *ZeroCopyReader) Read(p []byte) (n int, err error)

Read implements io.Reader.

func (*ZeroCopyReader) ReadAt

func (z *ZeroCopyReader) ReadAt(p []byte, off int64) (n int, err error)

ReadAt implements io.ReaderAt.

func (*ZeroCopyReader) Reset

func (z *ZeroCopyReader) Reset()

Reset resets the reader.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL