 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Overview ¶
Package esutil provides helper utilities to the Go client for Elasticsearch.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewJSONReader ¶
NewJSONReader encodes v into JSON and returns it as an io.Reader.
Types ¶
type BulkIndexer ¶
type BulkIndexer interface {
	// Add adds an item to the indexer. It returns an error when the item cannot be added.
	// Use the OnSuccess and OnFailure callbacks to get the operation result for the item.
	//
	// You must call the Close() method after you're done adding items.
	//
	// It is safe for concurrent use. When it's called from goroutines,
	// they must finish before the call to Close, eg. using sync.WaitGroup.
	Add(context.Context, BulkIndexerItem) error
	// Close waits until all added items are flushed and closes the indexer.
	Close(context.Context) error
	// Stats returns indexer statistics.
	Stats() BulkIndexerStats
}
    BulkIndexer represents a parallel, asynchronous, efficient indexer for Elasticsearch.
func NewBulkIndexer ¶
func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error)
NewBulkIndexer creates a new bulk indexer.
Example ¶
log.SetFlags(0)
// Create the Elasticsearch client
//
es, err := elasticsearch.NewClient(elasticsearch.Config{
	// Retry on 429 TooManyRequests statuses
	//
	RetryOnStatus: []int{502, 503, 504, 429},
	// A simple incremental backoff function
	//
	RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond },
	// Retry up to 5 attempts
	//
	MaxRetries: 5,
})
if err != nil {
	log.Fatalf("Error creating the client: %s", err)
}
// Create the indexer
//
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
	Client:     es,     // The Elasticsearch client
	Index:      "test", // The default index name
	NumWorkers: 4,      // The number of worker goroutines (default: number of CPUs)
	FlushBytes: 5e+6,   // The flush threshold in bytes (default: 5M)
})
if err != nil {
	log.Fatalf("Error creating the indexer: %s", err)
}
// Add an item to the indexer
//
err = indexer.Add(
	context.Background(),
	esutil.BulkIndexerItem{
		// Action field configures the operation to perform (index, create, delete, update)
		Action: "index",
		// DocumentID is the optional document ID
		DocumentID: "1",
		// Body is an `io.Reader` with the payload
		Body: strings.NewReader(`{"title":"Test"}`),
		// OnSuccess is the optional callback for each successful operation
		OnSuccess: func(
			ctx context.Context,
			item esutil.BulkIndexerItem,
			res esutil.BulkIndexerResponseItem,
		) {
			fmt.Printf("[%d] %s test/%s", res.Status, res.Result, item.DocumentID)
		},
		// OnFailure is the optional callback for each failed operation
		OnFailure: func(
			ctx context.Context,
			item esutil.BulkIndexerItem,
			res esutil.BulkIndexerResponseItem, err error,
		) {
			if err != nil {
				log.Printf("ERROR: %s", err)
			} else {
				log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
			}
		},
	},
)
if err != nil {
	log.Fatalf("Unexpected error: %s", err)
}
// Close the indexer channel and flush remaining items
//
if err := indexer.Close(context.Background()); err != nil {
	log.Fatalf("Unexpected error: %s", err)
}
// Report the indexer statistics
//
stats := indexer.Stats()
if stats.NumFailed > 0 {
	log.Fatalf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed)
} else {
	log.Printf("Successfully indexed [%d] documents", stats.NumFlushed)
}
// For optimal performance, consider using a third-party package for JSON decoding and HTTP transport.
//
// For more information, examples and benchmarks, see:
//
// --> https://github.com/elastic/go-elasticsearch/tree/master/_examples/bulk
type BulkIndexerConfig ¶
type BulkIndexerConfig struct {
	NumWorkers    int           // The number of workers. Defaults to runtime.NumCPU().
	FlushBytes    int           // The flush threshold in bytes. Defaults to 5MB.
	FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec.
	Client      *elasticsearch.Client   // The Elasticsearch client.
	Decoder     BulkResponseJSONDecoder // A custom JSON decoder.
	DebugLogger BulkIndexerDebugLogger  // An optional logger for debugging.
	OnError      func(context.Context, error)          // Called for indexer errors.
	OnFlushStart func(context.Context) context.Context // Called when the flush starts.
	OnFlushEnd   func(context.Context)                 // Called when the flush ends.
	// Parameters of the Bulk API.
	Index               string
	ErrorTrace          bool
	FilterPath          []string
	Header              http.Header
	Human               bool
	Pipeline            string
	Pretty              bool
	Refresh             string
	Routing             string
	Source              []string
	SourceExcludes      []string
	SourceIncludes      []string
	Timeout             time.Duration
	WaitForActiveShards string
}
    BulkIndexerConfig represents configuration of the indexer.
type BulkIndexerDebugLogger ¶
type BulkIndexerDebugLogger interface {
	Printf(string, ...interface{})
}
    BulkIndexerDebugLogger defines the interface for a debugging logger.
type BulkIndexerItem ¶
type BulkIndexerItem struct {
	Index           string
	Action          string
	DocumentID      string
	Routing         string
	Version         *int64
	VersionType     string
	Body            io.ReadSeeker
	RetryOnConflict *int
	OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem)        // Per item
	OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Per item
	// contains filtered or unexported fields
}
    BulkIndexerItem represents an indexer item.
type BulkIndexerResponse ¶
type BulkIndexerResponse struct {
	Took      int                                  `json:"took"`
	HasErrors bool                                 `json:"errors"`
	Items     []map[string]BulkIndexerResponseItem `json:"items,omitempty"`
}
    BulkIndexerResponse represents the Elasticsearch response.
type BulkIndexerResponseItem ¶
type BulkIndexerResponseItem struct {
	Index      string `json:"_index"`
	DocumentID string `json:"_id"`
	Version    int64  `json:"_version"`
	Result     string `json:"result"`
	Status     int    `json:"status"`
	SeqNo      int64  `json:"_seq_no"`
	PrimTerm   int64  `json:"_primary_term"`
	Shards struct {
		Total      int `json:"total"`
		Successful int `json:"successful"`
		Failed     int `json:"failed"`
	} `json:"_shards"`
	Error struct {
		Type   string `json:"type"`
		Reason string `json:"reason"`
		Cause  struct {
			Type   string `json:"type"`
			Reason string `json:"reason"`
		} `json:"caused_by"`
	} `json:"error,omitempty"`
}
    BulkIndexerResponseItem represents the Elasticsearch response item.
type BulkIndexerStats ¶
type BulkIndexerStats struct {
	NumAdded    uint64
	NumFlushed  uint64
	NumFailed   uint64
	NumIndexed  uint64
	NumCreated  uint64
	NumUpdated  uint64
	NumDeleted  uint64
	NumRequests uint64
}
    BulkIndexerStats represents the indexer statistics.
type BulkResponseJSONDecoder ¶
type BulkResponseJSONDecoder interface {
	UnmarshalFromReader(io.Reader, *BulkIndexerResponse) error
}
    BulkResponseJSONDecoder defines the interface for custom JSON decoders.
type JSONEncoder ¶
JSONEncoder defines the interface for custom JSON encoders.
type JSONReader ¶
type JSONReader struct {
	// contains filtered or unexported fields
}
    JSONReader represents a reader which takes an interface value, encodes it into JSON, and wraps it in an io.Reader.