Documentation
¶
Overview ¶
Package esutil provides helper utilities to the Go client for Elasticsearch.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewJSONReader ¶
NewJSONReader encodes v into JSON and returns it as an io.Reader.
Types ¶
type BulkIndexer ¶
type BulkIndexer interface {
// Add adds an item to the indexer. It returns an error when the item cannot be added.
// Use the OnSuccess and OnFailure callbacks to get the operation result for the item.
//
// You must call the Close() method after you're done adding items.
//
// It is safe for concurrent use. When it's called from goroutines,
// they must finish before the call to Close, eg. using sync.WaitGroup.
Add(context.Context, BulkIndexerItem) error
// Close waits until all added items are flushed and closes the indexer.
Close(context.Context) error
// Stats returns indexer statistics.
Stats() BulkIndexerStats
}
BulkIndexer represents a parallel, asynchronous, efficient indexer for Elasticsearch.
func NewBulkIndexer ¶
func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error)
NewBulkIndexer creates a new bulk indexer.
Example ¶
log.SetFlags(0)
// Create the Elasticsearch client
//
es, err := elasticsearch.NewClient(elasticsearch.Config{
// Retry on 429 TooManyRequests statuses
//
RetryOnStatus: []int{502, 503, 504, 429},
// A simple incremental backoff function
//
RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond },
// Retry up to 5 attempts
//
MaxRetries: 5,
})
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// Create the indexer
//
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: es, // The Elasticsearch client
Index: "test", // The default index name
NumWorkers: 4, // The number of worker goroutines (default: number of CPUs)
FlushBytes: 5e+6, // The flush threshold in bytes (default: 5M)
})
if err != nil {
log.Fatalf("Error creating the indexer: %s", err)
}
// Add an item to the indexer
//
err = indexer.Add(
context.Background(),
esutil.BulkIndexerItem{
// Action field configures the operation to perform (index, create, delete, update)
Action: "index",
// DocumentID is the optional document ID
DocumentID: "1",
// Body is an `io.Reader` with the payload
Body: strings.NewReader(`{"title":"Test"}`),
// OnSuccess is the optional callback for each successful operation
OnSuccess: func(
ctx context.Context,
item esutil.BulkIndexerItem,
res esutil.BulkIndexerResponseItem,
) {
fmt.Printf("[%d] %s test/%s", res.Status, res.Result, item.DocumentID)
},
// OnFailure is the optional callback for each failed operation
OnFailure: func(
ctx context.Context,
item esutil.BulkIndexerItem,
res esutil.BulkIndexerResponseItem, err error,
) {
if err != nil {
log.Printf("ERROR: %s", err)
} else {
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
if err != nil {
log.Fatalf("Unexpected error: %s", err)
}
// Close the indexer channel and flush remaining items
//
if err := indexer.Close(context.Background()); err != nil {
log.Fatalf("Unexpected error: %s", err)
}
// Report the indexer statistics
//
stats := indexer.Stats()
if stats.NumFailed > 0 {
log.Fatalf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed)
} else {
log.Printf("Successfully indexed [%d] documents", stats.NumFlushed)
}
// For optimal performance, consider using a third-party package for JSON decoding and HTTP transport.
//
// For more information, examples and benchmarks, see:
//
// --> https://github.com/elastic/go-elasticsearch/tree/master/_examples/bulk
type BulkIndexerConfig ¶
type BulkIndexerConfig struct {
NumWorkers int // The number of workers. Defaults to runtime.NumCPU().
FlushBytes int // The flush threshold in bytes. Defaults to 5MB.
FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec.
Client *elasticsearch.Client // The Elasticsearch client.
Decoder BulkResponseJSONDecoder // A custom JSON decoder.
DebugLogger BulkIndexerDebugLogger // An optional logger for debugging.
OnError func(context.Context, error) // Called for indexer errors.
OnFlushStart func(context.Context) context.Context // Called when the flush starts.
OnFlushEnd func(context.Context) // Called when the flush ends.
// Parameters of the Bulk API.
Index string
ErrorTrace bool
FilterPath []string
Header http.Header
Human bool
Pipeline string
Pretty bool
Refresh string
Routing string
RequireAlias bool
Source []string
SourceExcludes []string
SourceIncludes []string
Timeout time.Duration
WaitForActiveShards string
}
BulkIndexerConfig represents configuration of the indexer.
type BulkIndexerDebugLogger ¶
type BulkIndexerDebugLogger interface {
Printf(string, ...interface{})
}
BulkIndexerDebugLogger defines the interface for a debugging logger.
type BulkIndexerItem ¶
type BulkIndexerItem struct {
Index string
Action string
DocumentID string
Routing string
RequireAlias bool
Version *int64
VersionType string
Body io.ReadSeeker
RetryOnConflict *int
IfSeqNo *int64
IfPrimaryTerm *int64
OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem) // Per item
OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Per item
// contains filtered or unexported fields
}
BulkIndexerItem represents an indexer item.
type BulkIndexerResponse ¶
type BulkIndexerResponse struct {
Took int `json:"took"`
HasErrors bool `json:"errors"`
Items []map[string]BulkIndexerResponseItem `json:"items,omitempty"`
}
BulkIndexerResponse represents the Elasticsearch response.
type BulkIndexerResponseItem ¶
type BulkIndexerResponseItem struct {
Index string `json:"_index"`
DocumentID string `json:"_id"`
Version int64 `json:"_version"`
Result string `json:"result"`
Status int `json:"status"`
SeqNo int64 `json:"_seq_no"`
PrimTerm int64 `json:"_primary_term"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Failed int `json:"failed"`
} `json:"_shards"`
Error struct {
Type string `json:"type"`
Reason string `json:"reason"`
Cause struct {
Type string `json:"type"`
Reason string `json:"reason"`
} `json:"caused_by"`
} `json:"error,omitempty"`
}
BulkIndexerResponseItem represents the Elasticsearch response item.
type BulkIndexerStats ¶
type BulkIndexerStats struct {
NumAdded uint64
NumFlushed uint64
NumFailed uint64
NumIndexed uint64
NumCreated uint64
NumUpdated uint64
NumDeleted uint64
NumRequests uint64
}
BulkIndexerStats represents the indexer statistics.
type BulkResponseJSONDecoder ¶
type BulkResponseJSONDecoder interface {
UnmarshalFromReader(io.Reader, *BulkIndexerResponse) error
}
BulkResponseJSONDecoder defines the interface for custom JSON decoders.
type JSONEncoder ¶
JSONEncoder defines the interface for custom JSON encoders.
type JSONReader ¶
type JSONReader struct {
// contains filtered or unexported fields
}
JSONReader represents a reader which takes an interface value, encodes it into JSON, and wraps it in an io.Reader.