Documentation
¶
Index ¶
- Variables
- type BatchIndexer
- type Document
- type DocumentError
- type ErrSchemaAlreadyExists
- type ErrSchemaNotFound
- type ErrSchemaUpdateOutOfOrder
- type ErrTypeInvalid
- type IndexerConfig
- type Mapper
- type Option
- type Severity
- type Store
- type StoreOption
- type StoreRetrier
- func (s *StoreRetrier) ApplySchemaChange(ctx context.Context, logEntry *schemalog.LogEntry) error
- func (s *StoreRetrier) DeleteSchema(ctx context.Context, schemaName string) error
- func (s *StoreRetrier) DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error
- func (s *StoreRetrier) GetMapper() Mapper
- func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error)
- type StoreRetryConfig
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrRetriable = errors.New("retriable error") ErrInvalidQuery = errors.New("invalid query") )
Functions ¶
This section is empty.
Types ¶
type BatchIndexer ¶
type BatchIndexer struct {
// contains filtered or unexported fields
}
BatchIndexer is the environment for ingesting the WAL logical replication events into a search store using the pgstream flow
func NewBatchIndexer ¶
func NewBatchIndexer(ctx context.Context, config IndexerConfig, store Store, lsnParser replication.LSNParser, opts ...Option) (*BatchIndexer, error)
NewBatchIndexer returns a processor of wal events that indexes data into the search store provided on input.
func (*BatchIndexer) Close ¶
func (i *BatchIndexer) Close() error
func (*BatchIndexer) Name ¶
func (i *BatchIndexer) Name() string
func (*BatchIndexer) ProcessWALEvent ¶
ProcessWALEvent is responsible for sending the wal event to the search store and committing the event position. It can be called concurrently.
type DocumentError ¶
type ErrSchemaAlreadyExists ¶
type ErrSchemaAlreadyExists struct {
SchemaName string
}
func (ErrSchemaAlreadyExists) Error ¶
func (e ErrSchemaAlreadyExists) Error() string
type ErrSchemaNotFound ¶
type ErrSchemaNotFound struct {
SchemaName string
}
func (ErrSchemaNotFound) Error ¶
func (e ErrSchemaNotFound) Error() string
type ErrSchemaUpdateOutOfOrder ¶
type ErrSchemaUpdateOutOfOrder struct {
SchemaName string
SchemaID string
NewVersion int
CurrentVersion int
CurrentCreatedAt time.Time
NewCreatedAt time.Time
}
func (ErrSchemaUpdateOutOfOrder) Error ¶
func (e ErrSchemaUpdateOutOfOrder) Error() string
type ErrTypeInvalid ¶
type ErrTypeInvalid struct {
Input string
}
func (ErrTypeInvalid) Error ¶
func (e ErrTypeInvalid) Error() string
type IndexerConfig ¶
type Option ¶
type Option func(*BatchIndexer)
func WithCheckpoint ¶
func WithCheckpoint(c checkpointer.Checkpoint) Option
func WithLogger ¶
type Store ¶
type Store interface {
GetMapper() Mapper
// schema operations
ApplySchemaChange(ctx context.Context, logEntry *schemalog.LogEntry) error
DeleteSchema(ctx context.Context, schemaName string) error
// data operations
DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error
SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error)
}
type StoreOption ¶
type StoreOption func(*StoreRetrier)
func WithStoreLogger ¶
func WithStoreLogger(logger loglib.Logger) StoreOption
type StoreRetrier ¶
type StoreRetrier struct {
// contains filtered or unexported fields
}
StoreRetrier applies a retry strategy to failed search store operations.
func NewStoreRetrier ¶
func NewStoreRetrier(s Store, cfg StoreRetryConfig, opts ...StoreOption) *StoreRetrier
func (*StoreRetrier) ApplySchemaChange ¶
func (*StoreRetrier) DeleteSchema ¶
func (s *StoreRetrier) DeleteSchema(ctx context.Context, schemaName string) error
func (*StoreRetrier) DeleteTableDocuments ¶
func (*StoreRetrier) GetMapper ¶
func (s *StoreRetrier) GetMapper() Mapper
func (*StoreRetrier) SendDocuments ¶
func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error)
SendDocuments will go over failed documents, identifying any with retriable errors and retrying them with the configured backoff policy.
type StoreRetryConfig ¶
Source Files
¶
Click to show internal directories.
Click to hide internal directories.