Documentation
¶
Index ¶
- Constants
- Variables
- func New(c *config.Config) (pipeline.Filter, error)
- func NewBulkRequestResort(c *config.Config) (pipeline.Filter, error)
- func NewBulkReshuffle(c *config.Config) (pipeline.Filter, error)
- func NewBulkResponseValidate(c *config.Config) (pipeline.Filter, error)
- func NewElasticsearchBulkRequestMutateFilter(c *config.Config) (pipeline.Filter, error)
- func NewElasticsearchRequestReshuffleFilter(c *config.Config) (pipeline.Filter, error)
- func NewHashModFilter(c *config.Config) (pipeline.Filter, error)
- func ParseURLMeta(pathStr string) (valid bool, urlLevelIndex, urlLevelType, id string)
- func SortDocumentsByTime(docs []elastic.VersionInfo)
- func SortDocumentsByVersion(docs []elastic.VersionInfo)
- type AutoGenerateDocID
- type BulkRequestResort
- type BulkReshuffle
- type BulkReshuffleConfig
- type BulkResponseProcess
- type CommitConfig
- type Config
- type DocumentBuffer
- type ElasticsearchBulkRequestMutate
- type ElasticsearchRequestReshuffle
- type HashModFilter
- type Level
- type Queue
- type Sorter
Constants ¶
View Source
const ClusterLevel = "cluster"
View Source
const IndexLevel = "index"
View Source
const NodeLevel = "node"
View Source
const ShardLevel = "shard"
Variables ¶
View Source
var JSON_CONTENT_TYPE = "application/json"
Functions ¶
func NewBulkResponseValidate ¶
func ParseURLMeta ¶
func SortDocumentsByTime ¶
func SortDocumentsByTime(docs []elastic.VersionInfo)
func SortDocumentsByVersion ¶
func SortDocumentsByVersion(docs []elastic.VersionInfo)
SortDocumentsByVersion 按照文档版本进行排序
Types ¶
type AutoGenerateDocID ¶
type AutoGenerateDocID struct {
Prefix string `config:"prefix" `
}
func (*AutoGenerateDocID) Filter ¶
func (filter *AutoGenerateDocID) Filter(ctx *fasthttp.RequestCtx)
func (*AutoGenerateDocID) Name ¶
func (filter *AutoGenerateDocID) Name() string
type BulkRequestResort ¶
type BulkRequestResort struct {
BatchSizeInDocs int `config:"batch_size_in_docs"` //batch size for each bulk request
BatchSizeInMB int `config:"batch_size_in_mb"`
MinBufferSize int `config:"min_buffer_size"`
MaxBufferSize int `config:"max_buffer_size"`
MinDocPaddingForOneDocs int `config:"min_doc_versions_for_one_doc"`
IdleTimeoutInSeconds string `config:"idle_timeout_in_seconds"`
TagOnComplete string `config:"tag_on_complete"` //add tag to parent context when all documents are processed
Elasticsearch string `config:"elasticsearch"`
PartitionSize int `config:"partition_size"`
OutputQueue Queue `config:"output_queue"`
CommitConfig CommitConfig `config:"commit_offset"`
// contains filtered or unexported fields
}
func (*BulkRequestResort) Filter ¶
func (filter *BulkRequestResort) Filter(ctx *fasthttp.RequestCtx)
func (*BulkRequestResort) Name ¶
func (filter *BulkRequestResort) Name() string
func (*BulkRequestResort) NewDocumentBuffer ¶
func (filter *BulkRequestResort) NewDocumentBuffer(partitionID int, queueName string, minBufferSize, maxBufferSize int, idleTimeout time.Duration, minDocPaddingSize int) *DocumentBuffer
NewDocumentBuffer 创建一个新的文档缓冲区
type BulkReshuffle ¶
type BulkReshuffle struct {
// contains filtered or unexported fields
}
func (*BulkReshuffle) Filter ¶
func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx)
func (*BulkReshuffle) Name ¶
func (this *BulkReshuffle) Name() string
type BulkReshuffleConfig ¶
type BulkReshuffleConfig struct {
TagsOnSuccess []string `config:"tag_on_success"`
Elasticsearch string `config:"elasticsearch"`
QueuePrefix string `config:"queue_name_prefix"`
Level string `config:"level"` //cluster/node(will,change)/index/shard/partition
PartitionSize int `config:"partition_size"`
FixNullID bool `config:"fix_null_id"`
ContinueAfterReshuffle bool `config:"continue_after_reshuffle"`
IndexStatsAnalysis bool `config:"index_stats_analysis"`
ActionStatsAnalysis bool `config:"action_stats_analysis"`
ContinueMetadataNotFound bool `config:"continue_metadata_missing"`
ValidateRequest bool `config:"validate_request"`
//split all lines into memory rather than scan
ValidEachLine bool `config:"validate_each_line"`
ValidMetadata bool `config:"validate_metadata"`
ValidPayload bool `config:"validate_payload"`
StickToNode bool `config:"stick_to_node"`
EnabledShards []int `config:"shards"`
BufferPoolEnabled bool `config:"bytes_buffer_enabled"`
MaxBufferCount uint32 `config:"max_buffer_items"`
MaxBufferSize uint32 `config:"max_buffer_size"`
}
type BulkResponseProcess ¶
type BulkResponseProcess struct {
// contains filtered or unexported fields
}
func (*BulkResponseProcess) Filter ¶
func (this *BulkResponseProcess) Filter(ctx *fasthttp.RequestCtx)
func (*BulkResponseProcess) Name ¶
func (this *BulkResponseProcess) Name() string
type CommitConfig ¶
type Config ¶
type Config struct {
StatsOnly bool `config:"stats_only"`
SuccessQueue string `config:"success_queue"`
InvalidQueue string `config:"invalid_queue"`
FailureQueue string `config:"failure_queue"`
SuccessFlow string `config:"success_flow"`
InvalidFlow string `config:"invalid_flow"`
FailureFlow string `config:"failure_flow"`
MessageTruncateSize int `config:"message_truncate_size"`
ShowBulkErrorMessage bool `config:"show_bulk_error_message"`
PartialFailureRetry bool `config:"partial_failure_retry"` //是否主动重试,只有部分失败的请求,避免大量没有意义的 409
PartialFailureMaxRetryTimes int `config:"partial_failure_max_retry_times"` //是否主动重试,只有部分失败的请求,避免大量没有意义的 409
PartialFailureRetryDelayLatencyInMs int `config:"partial_failure_retry_latency_in_ms"` //是否主动重试,只有部分失败的请求,避免大量没有意义的 409
ContinueOnAllError bool `config:"continue_on_all_error"` //没有拿到响应,整个请求都失败是否继续处理后续 flow
ContinueOnAnyError bool `config:"continue_on_any_error"` //拿到响应,出现任意请求失败是否都继续 flow 还是结束处理
ContinueOnSuccess bool `config:"continue_on_success"` //所有请求都成功
TagsOnAllSuccess []string `config:"tag_on_all_success"` //所有请求都成功,没有失败
TagsOnNone2xx []string `config:"tag_on_none_2xx"` //整个 bulk 请求非 200 或者 201 返回
//bulk requests
TagsOnAllInvalid []string `config:"tag_on_all_invalid"` //所有请求都是非法请求的情况
TagsOnAllFailure []string `config:"tag_on_all_failure"` //所有失败的请求都是失败请求的情况
TagsOnAnyError []string `config:"tag_on_any_error"` //请求里面包含任意失败或者非法请求的情况
TagsOnPartialSuccess []string `config:"tag_on_partial_success"` //包含部分成功的情况
TagsOnPartialFailure []string `config:"tag_on_partial_failure"` //包含部分失败的情况,可以重试
TagsOnPartialInvalid []string `config:"tag_on_partial_invalid"` //包含部分非法请求的情况,无需重试的请求
RetryFlow string `config:"retry_flow"`
RetryRules elastic.RetryRules `config:"retry_rules"`
BulkResponseParseConfig elastic.BulkResponseParseConfig `config:"response_handle"`
}
type DocumentBuffer ¶
DocumentBuffer 表示文档的缓冲区
func (*DocumentBuffer) GetDocuments ¶
func (b *DocumentBuffer) GetDocuments(count int) (int, []elastic.VersionInfo)
GetDocuments 返回最旧的文档通道,最多读取指定数量的文档
type ElasticsearchBulkRequestMutate ¶
type ElasticsearchBulkRequestMutate struct {
DefaultIndex string `config:"default_index"`
DefaultType string `config:"default_type"`
FixNilType bool `config:"fix_null_type"`
FixNilID bool `config:"fix_null_id"`
Pipeline string `config:"pipeline"`
RemoveTypeMeta bool `config:"remove_type"`
RemovePipeline bool `config:"remove_pipeline"`
AddTimestampToID bool `config:"generate_enhanced_id"`
IndexNameRename map[string]string `config:"index_rename"`
TypeNameRename map[string]string `config:"type_rename"`
}
func (*ElasticsearchBulkRequestMutate) Filter ¶
func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx)
func (*ElasticsearchBulkRequestMutate) Name ¶
func (filter *ElasticsearchBulkRequestMutate) Name() string
type ElasticsearchRequestReshuffle ¶
type ElasticsearchRequestReshuffle struct {
Elasticsearch string `config:"elasticsearch"`
TagsOnSuccess []string `config:"tag_on_success"`
SkipBulk bool `config:"skip_bulk"`
PartitionSize int `config:"partition_size"`
QueuePrefix string `config:"queue_name_prefix"`
HashFactor string `config:"hash_factor"`
ContinueAfterReshuffle bool `config:"continue_after_reshuffle"`
// contains filtered or unexported fields
}
func (*ElasticsearchRequestReshuffle) Filter ¶
func (this *ElasticsearchRequestReshuffle) Filter(ctx *fasthttp.RequestCtx)
func (*ElasticsearchRequestReshuffle) Name ¶
func (filter *ElasticsearchRequestReshuffle) Name() string
type HashModFilter ¶
type HashModFilter struct {
Source string `config:"source" `
TargetContextKey string `config:"target_context_name" `
PartitionSize int `config:"mod"`
AddToRequestHeader bool `config:"add_to_request_header" type:"bool" default_value:"true"`
AddToResponseHeader bool `config:"add_to_response_header" type:"bool" default_value:"true"`
// contains filtered or unexported fields
}
func (*HashModFilter) Filter ¶
func (filter *HashModFilter) Filter(ctx *fasthttp.RequestCtx)
func (*HashModFilter) Name ¶
func (filter *HashModFilter) Name() string
Source Files
¶
Click to show internal directories.
Click to hide internal directories.