Documentation
¶
Index ¶
- func New(c *config.Config) (pipeline.Processor, error)
- type BulkIndexingProcessor
- func (processor *BulkIndexingProcessor) HandleQueueConfig(v *queue.QueueConfig, parentContext *pipeline.Context)
- func (processor *BulkIndexingProcessor) Name() string
- func (processor *BulkIndexingProcessor) NewBulkWorker(parentContext *pipeline.Context, qConfig *queue.QueueConfig, ...)
- func (processor *BulkIndexingProcessor) NewSlicedBulkWorker(ctx *pipeline.Context, key, workerID string, sliceID, maxSlices int, ...)
- func (processor *BulkIndexingProcessor) Process(c *pipeline.Context) error
- func (processor *BulkIndexingProcessor) Release() error
- type Config
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BulkIndexingProcessor ¶
处理 bulk 格式的数据索引。
func (*BulkIndexingProcessor) HandleQueueConfig ¶
func (processor *BulkIndexingProcessor) HandleQueueConfig(v *queue.QueueConfig, parentContext *pipeline.Context)
func (*BulkIndexingProcessor) Name ¶
func (processor *BulkIndexingProcessor) Name() string
func (*BulkIndexingProcessor) NewBulkWorker ¶
func (processor *BulkIndexingProcessor) NewBulkWorker(parentContext *pipeline.Context, qConfig *queue.QueueConfig, preferedHost string)
func (*BulkIndexingProcessor) NewSlicedBulkWorker ¶
func (processor *BulkIndexingProcessor) NewSlicedBulkWorker(ctx *pipeline.Context, key, workerID string, sliceID, maxSlices int, tag string, bulkSizeInByte int, qConfig *queue.QueueConfig, host string)
func (*BulkIndexingProcessor) Process ¶
func (processor *BulkIndexingProcessor) Process(c *pipeline.Context) error
func (*BulkIndexingProcessor) Release ¶
func (processor *BulkIndexingProcessor) Release() error
type Config ¶
type Config struct {
NumOfSlices int `config:"num_of_slices"`
Slices []int `config:"slices"`
DocumentLevelSlicing bool `config:"document_level_slicing"`
IdleTimeoutInSecond int `config:"idle_timeout_in_seconds"`
MaxConnectionPerHost int `config:"max_connection_per_node"`
QueueLabels map[string]interface{} `config:"queues,omitempty"`
Selector queue.QueueSelector `config:"queue_selector"`
Consumer queue.ConsumerConfig `config:"consumer"`
MaxWorkers int `config:"max_worker_size"`
DetectActiveQueue bool `config:"detect_active_queue"`
VerboseBulkResult bool `config:"verbose_bulk_result"`
DetectIntervalInMs int `config:"detect_interval"`
ValidateRequest bool `config:"valid_request"`
SkipEmptyQueue bool `config:"skip_empty_queue"`
SkipOnMissingInfo bool `config:"skip_info_missing"`
LogBulkError bool `config:"log_bulk_error"`
BulkConfig elastic.BulkProcessorConfig `config:"bulk"`
Elasticsearch string `config:"elasticsearch,omitempty"`
ElasticsearchConfig *elastic.ElasticsearchConfig `config:"elasticsearch_config"`
WaitingAfter []string `config:"waiting_after"`
RetryDelayIntervalInMs int `config:"retry_delay_interval"`
// contains filtered or unexported fields
}
Click to show internal directories.
Click to hide internal directories.