bulk

package
v0.63.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2025 License: Apache-2.0 Imports: 25 Imported by: 0

README

Bulk proxy

The following diagram describes the high level architecuture of the structures used to handle bulk requests in seq-proxy.

classDiagram
  direction TB
    class Ingestor {
        Pool[processor]
        ...
        ProcessBulk([]byte)
    }
%%    note for Ingestor "handles bulk logic, manages a pool of processors"

    class processor {
        -decoder d
        -indexer i
        -mapping m

        +process([]byte) 
        +requestAndCleanup() []byte
    }


%%    note for processor "accumulates meta and docs from a single bulk, returns  request ready to be sent to store"


    class decoder {
        <<interface>>
        set(k string, v []byte) 
        get(k string) []byte
        render([]byte) []byte
        decode([]byte, indexer, mapping)
    }

    processor .. decoder: calls index() when processing input
    processor .. indexer: calls getTokens() when extracting indexed data
    decoder .. indexer: uses on decode() call
    BulkProxy .. processor

    class indexer {
        +index(k string, v string) 
        +getTokens() []string

        -tokenizers
    }

    class insane {
        set(k string, v []byte) 
        get(k string) []byte
        render([]byte) []byte
        decode([]byte, indexer, mapping)
    }

%%    note for decoder "defines the interface that must be implemented, for libraries/formats used to decode incoming bulk requests."

%%    note for indexer "indexes input values, knows the mapping, uses specific tokenizers depending on the key value, keeps a list of extracted tokens"

    insane..|>decoder : Implements

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTooManyInflightBulks = errors.New("too many inflight bulks, dropping")

Functions

This section is empty.

Types

type Ingestor

type Ingestor struct {
	// contains filtered or unexported fields
}

func NewIngestor

func NewIngestor(c IngestorConfig, client StorageClient) *Ingestor

func (*Ingestor) ProcessDocuments

func (i *Ingestor) ProcessDocuments(ctx context.Context, requestTime time.Time, readNext func() ([]byte, error)) (int, error)

func (*Ingestor) Stop

func (i *Ingestor) Stop()

type IngestorConfig

type IngestorConfig struct {
	HotStores   *stores.Stores
	WriteStores *stores.Stores

	BulkCircuit circuitbreaker.Config

	MaxInflightBulks       int
	AllowedTimeDrift       time.Duration
	FutureAllowedTimeDrift time.Duration

	MappingProvider MappingProvider

	MaxTokenSize         int
	CaseSensitive        bool
	PartialFieldIndexing bool

	DocsZSTDCompressLevel  int
	MetasZSTDCompressLevel int

	MaxDocumentSize int
}

type MappingProvider

type MappingProvider interface {
	GetMapping() seq.Mapping
	GetRawMapping() *seq.RawMapping
}

type SeqDBClient

type SeqDBClient struct {
	// contains filtered or unexported fields
}

func NewSeqDBClient

func NewSeqDBClient(hots, colds *stores.Stores, bulkCircuit circuitbreaker.Config, clients map[string]storeapi.StoreApiClient) *SeqDBClient

func (*SeqDBClient) StoreDocuments

func (i *SeqDBClient) StoreDocuments(ctx context.Context, count int, docs, metas []byte) error

type StorageClient

type StorageClient interface {
	StoreDocuments(ctx context.Context, count int, docs, metas []byte) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL