esutil

package
v8.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2023 License: Apache-2.0 Imports: 13 Imported by: 160

Documentation

Overview

Package esutil provides helper utilities to the Go client for Elasticsearch.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewJSONReader

func NewJSONReader(v interface{}) io.Reader

NewJSONReader encodes v into JSON and returns it as an io.Reader.

Types

type BulkIndexer

type BulkIndexer interface {
	// Add adds an item to the indexer. It returns an error when the item cannot be added.
	// Use the OnSuccess and OnFailure callbacks to get the operation result for the item.
	//
	// You must call the Close() method after you're done adding items.
	//
	// It is safe for concurrent use. When it's called from goroutines,
	// they must finish before the call to Close, eg. using sync.WaitGroup.
	Add(context.Context, BulkIndexerItem) error

	// Close waits until all added items are flushed and closes the indexer.
	Close(context.Context) error

	// Stats returns indexer statistics.
	Stats() BulkIndexerStats
}

BulkIndexer represents a parallel, asynchronous, efficient indexer for Elasticsearch.

func NewBulkIndexer

func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error)

NewBulkIndexer creates a new bulk indexer.

Example
log.SetFlags(0)

// Create the Elasticsearch client
//
es, err := elasticsearch.NewClient(elasticsearch.Config{
	// Retry on 429 TooManyRequests statuses
	//
	RetryOnStatus: []int{502, 503, 504, 429},

	// A simple incremental backoff function
	//
	RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond },

	// Retry up to 5 attempts
	//
	MaxRetries: 5,
})
if err != nil {
	log.Fatalf("Error creating the client: %s", err)
}

// Create the indexer
//
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
	Client:     es,     // The Elasticsearch client
	Index:      "test", // The default index name
	NumWorkers: 4,      // The number of worker goroutines (default: number of CPUs)
	FlushBytes: 5e+6,   // The flush threshold in bytes (default: 5M)
})
if err != nil {
	log.Fatalf("Error creating the indexer: %s", err)
}

// Add an item to the indexer
//
err = indexer.Add(
	context.Background(),
	esutil.BulkIndexerItem{
		// Action field configures the operation to perform (index, create, delete, update)
		Action: "index",

		// DocumentID is the optional document ID
		DocumentID: "1",

		// Body is an `io.Reader` with the payload
		Body: strings.NewReader(`{"title":"Test"}`),

		// OnSuccess is the optional callback for each successful operation
		OnSuccess: func(
			ctx context.Context,
			item esutil.BulkIndexerItem,
			res esutil.BulkIndexerResponseItem,
		) {
			fmt.Printf("[%d] %s test/%s", res.Status, res.Result, item.DocumentID)
		},

		// OnFailure is the optional callback for each failed operation
		OnFailure: func(
			ctx context.Context,
			item esutil.BulkIndexerItem,
			res esutil.BulkIndexerResponseItem, err error,
		) {
			if err != nil {
				log.Printf("ERROR: %s", err)
			} else {
				log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
			}
		},
	},
)
if err != nil {
	log.Fatalf("Unexpected error: %s", err)
}

// Close the indexer channel and flush remaining items
//
if err := indexer.Close(context.Background()); err != nil {
	log.Fatalf("Unexpected error: %s", err)
}

// Report the indexer statistics
//
stats := indexer.Stats()
if stats.NumFailed > 0 {
	log.Fatalf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed)
} else {
	log.Printf("Successfully indexed [%d] documents", stats.NumFlushed)
}

// For optimal performance, consider using a third-party package for JSON decoding and HTTP transport.
//
// For more information, examples and benchmarks, see:
//
// --> https://github.com/elastic/go-elasticsearch/tree/master/_examples/bulk

type BulkIndexerConfig

type BulkIndexerConfig struct {
	NumWorkers    int           // The number of workers. Defaults to runtime.NumCPU().
	FlushBytes    int           // The flush threshold in bytes. Defaults to 5MB.
	FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec.

	Client      *elasticsearch.Client   // The Elasticsearch client.
	Decoder     BulkResponseJSONDecoder // A custom JSON decoder.
	DebugLogger BulkIndexerDebugLogger  // An optional logger for debugging.

	OnError      func(context.Context, error)          // Called for indexer errors.
	OnFlushStart func(context.Context) context.Context // Called when the flush starts.
	OnFlushEnd   func(context.Context)                 // Called when the flush ends.

	// Parameters of the Bulk API.
	Index               string
	ErrorTrace          bool
	FilterPath          []string
	Header              http.Header
	Human               bool
	Pipeline            string
	Pretty              bool
	Refresh             string
	Routing             string
	Source              []string
	SourceExcludes      []string
	SourceIncludes      []string
	Timeout             time.Duration
	WaitForActiveShards string
}

BulkIndexerConfig represents configuration of the indexer.

type BulkIndexerDebugLogger

type BulkIndexerDebugLogger interface {
	Printf(string, ...interface{})
}

BulkIndexerDebugLogger defines the interface for a debugging logger.

type BulkIndexerItem

type BulkIndexerItem struct {
	Index           string
	Action          string
	DocumentID      string
	Routing         string
	Version         *int64
	VersionType     string
	Body            io.ReadSeeker
	RetryOnConflict *int

	OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem)        // Per item
	OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Per item
	// contains filtered or unexported fields
}

BulkIndexerItem represents an indexer item.

type BulkIndexerResponse

type BulkIndexerResponse struct {
	Took      int                                  `json:"took"`
	HasErrors bool                                 `json:"errors"`
	Items     []map[string]BulkIndexerResponseItem `json:"items,omitempty"`
}

BulkIndexerResponse represents the Elasticsearch response.

type BulkIndexerResponseItem

type BulkIndexerResponseItem struct {
	Index      string `json:"_index"`
	DocumentID string `json:"_id"`
	Version    int64  `json:"_version"`
	Result     string `json:"result"`
	Status     int    `json:"status"`
	SeqNo      int64  `json:"_seq_no"`
	PrimTerm   int64  `json:"_primary_term"`

	Shards struct {
		Total      int `json:"total"`
		Successful int `json:"successful"`
		Failed     int `json:"failed"`
	} `json:"_shards"`

	Error struct {
		Type   string `json:"type"`
		Reason string `json:"reason"`
		Cause  struct {
			Type   string `json:"type"`
			Reason string `json:"reason"`
		} `json:"caused_by"`
	} `json:"error,omitempty"`
}

BulkIndexerResponseItem represents the Elasticsearch response item.

type BulkIndexerStats

type BulkIndexerStats struct {
	NumAdded    uint64
	NumFlushed  uint64
	NumFailed   uint64
	NumIndexed  uint64
	NumCreated  uint64
	NumUpdated  uint64
	NumDeleted  uint64
	NumRequests uint64
}

BulkIndexerStats represents the indexer statistics.

type BulkResponseJSONDecoder

type BulkResponseJSONDecoder interface {
	UnmarshalFromReader(io.Reader, *BulkIndexerResponse) error
}

BulkResponseJSONDecoder defines the interface for custom JSON decoders.

type JSONEncoder

type JSONEncoder interface {
	EncodeJSON(io.Writer) error
}

JSONEncoder defines the interface for custom JSON encoders.

type JSONReader

type JSONReader struct {
	// contains filtered or unexported fields
}

JSONReader represents a reader which takes an interface value, encodes it into JSON, and wraps it in an io.Reader.

func (*JSONReader) Read

func (r *JSONReader) Read(p []byte) (int, error)

Read implements the io.Reader interface.

func (*JSONReader) WriteTo

func (r *JSONReader) WriteTo(w io.Writer) (int64, error)

WriteTo implements the io.WriterTo interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL