esbulk

package module
v0.7.32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2026 License: GPL-3.0 Imports: 21 Imported by: 5

README

esbulk

Fast parallel command line bulk loading utility for elasticsearch. Data is read from a newline delimited JSON file or stdin and indexed into elasticsearch in bulk and in parallel. The shortest command would be:

$ esbulk -index my-index-name < file.ldj

Caveat: If indexing pressure on the bulk API is too high (dozens or hundreds of parallel workers, large batch sizes, depending on you setup), esbulk will halt and report an error:

$ esbulk -index my-index-name -w 100 file.ldj
2017/01/02 16:25:25 error during bulk operation, try less workers (lower -w value) or
                    increase thread_pool.bulk.queue_size in your nodes

Please note that, in such a case, some documents are indexed and some are not. Your index will be in an inconsistent state, since there is no transactional bracket around the indexing process.

However, using defaults (parallelism: number of cores) on a single node setup will just work. For larger clusters, increase the number of workers until you see full CPU utilization. After that, more workers won't buy any more speed.

Currently, esbulk is tested against elasticsearch versions 5, 6, 7 and 8 using testcontainers. Originally written for Leipzig University Library, project finc.

GitHub All Releases

Installation

$ go install github.com/miku/esbulk/cmd/esbulk@latest

For deb or rpm packages, see: https://github.com/miku/esbulk/releases

Usage

$ esbulk -h
Usage of esbulk:
  -0    set the number of replicas to 0 during indexing
  -apikey string
        set the encoded ES api key (mutually exclusive with -u)
  -c string
        create index mappings, settings, aliases, https://is.gd/3zszeu
  -cpuprofile string
        write cpu profile to file
  -id string
        name of field to use as id field, by default ids are autogenerated
  -index string
        index name
  -k    skip insecure certificate verification
  -mapping string
        mapping string or filename to apply before indexing
  -memprofile string
        write heap profile to file
  -optype string
        optype (index - will replace existing data,
                create - will only create a new doc,
                update - create new or update existing data) (default "index")
  -p string
        pipeline to use to preprocess documents
  -purge
        purge any existing index before indexing
  -purge-pause duration
        pause after purge (default 1s)
  -r string
        Refresh interval after import (default "1s")
  -seed int
        seed for random server selection (default: current unix nano)
  -server value
        elasticsearch server, this works with https as well
  -size int
        bulk batch size (default 1000)
  -skipbroken
        skip broken json
  -timeout duration
        timeout for HTTP requests (default 30s)
  -type string
        elasticsearch doc type (deprecated since ES7)
  -u string
        http basic auth username:password, like curl -u
  -v    prints current program version
  -verbose
        output basic progress
  -w int
        number of workers to use (default 8)
  -z    unzip gz'd file on the fly

To index a JSON file, that contains one document per line, just run:

$ esbulk -index example file.ldj

Where file.ldj is line delimited JSON, like:

{"name": "esbulk", "version": "0.2.4"}
{"name": "estab", "version": "0.1.3"}
...

By default esbulk will use as many parallel workers, as there are cores. To tweak the indexing process, adjust the -size and -w parameters.

You can index from gzipped files as well, using the -z flag:

$ esbulk -z -index example file.ldj.gz

Starting with 0.3.7 the preferred method to set a non-default server hostport is via -server, e.g.

$ esbulk -server https://0.0.0.0:9201

This way, you can use https as well, which was not possible before. Options -host and -port are gone as of esbulk 0.5.0.

Reusing IDs

Since version 0.3.8: If you want to reuse IDs from your documents in elasticsearch, you can specify the ID field via -id flag:

$ cat file.json
{"x": "doc-1", "db": "mysql"}
{"x": "doc-2", "db": "mongo"}

Here, we would like to reuse the ID from field x.

$ esbulk -id x -index throwaway -verbose file.json
...

$ curl -s http://localhost:9200/throwaway/_search | jq
{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "throwaway",
        "_type": "default",
        "_id": "doc-2",
        "_score": 1,
        "_source": {
          "x": "doc-2",
          "db": "mongo"
        }
      },
      {
        "_index": "throwaway",
        "_type": "default",
        "_id": "doc-1",
        "_score": 1,
        "_source": {
          "x": "doc-1",
          "db": "mysql"
        }
      }
    ]
  }
}

Nested ID fields

Version 0.4.3 adds support for nested ID fields:

$ cat fixtures/pr-8-1.json
{"a": {"b": 1}}
{"a": {"b": 2}}
{"a": {"b": 3}}

$ esbulk -index throwaway -id a.b < fixtures/pr-8-1.json
...

Concatenated ID

Version 0.4.3 adds support for IDs that are the concatenation of multiple fields:

$ cat fixtures/pr-8-2.json
{"a": {"b": 1}, "c": "a"}
{"a": {"b": 2}, "c": "b"}
{"a": {"b": 3}, "c": "c"}

$ esbulk -index throwaway -id a.b,c < fixtures/pr-8-1.json
...

      {
        "_index": "xxx",
        "_type": "default",
        "_id": "1a",
        "_score": 1,
        "_source": {
          "a": {
            "b": 1
          },
          "c": "a"
        }
      },

Using X-Pack

Since 0.4.2: support for secured elasticsearch nodes:

$ esbulk -u elastic:changeme -index myindex file.ldj

A similar project has been started for solr, called solrbulk.

Contributors

and others.

Measurements

$ csvlook -I measurements.csv
| es    | esbulk | docs      | avg_b | nodes | cores | total_heap_gb | t_s   | docs_per_s | repl |
|-------|--------|-----------|-------|-------|-------|---------------|-------|------------|------|
| 6.1.2 | 0.4.8  | 138000000 | 2000  | 1     | 32    |  64           |  6420 |  22100     | 1    |
| 6.1.2 | 0.4.8  | 138000000 | 2000  | 1     |  8    |  30           | 27360 |   5100     | 1    |
| 6.1.2 | 0.4.8  |   1000000 | 2000  | 1     |  4    |   1           |   300 |   3300     | 1    |
| 6.1.2 | 0.4.8  |  10000000 |   26  | 1     |  4    |   8           |   122 |  81000     | 1    |
| 6.1.2 | 0.4.8  |  10000000 |   26  | 1     | 32    |  64           |    32 | 307000     | 1    |
| 6.2.3 | 0.4.10 | 142944530 | 2000  | 2     | 64    | 128           | 26253 |   5444     | 1    |
| 6.2.3 | 0.4.10 | 142944530 | 2000  | 2     | 64    | 128           | 11113 |  12831     | 0    |
| 6.2.3 | 0.4.13 |  15000000 | 6000  | 2     | 64    | 128           |  2460 |   6400     | 0    |

Why not add a row?

Documentation

Overview

Package esbulk implements a few helpers for performant indexing operations for elasticsearch.

Index

Constants

This section is empty.

Variables

View Source
var (

	// Worker errors
	ErrWorkerCopyFailed = errors.New("worker failed to copy document batch")
	ErrWorkerBulkIndex  = errors.New("worker bulk index operation failed")
)
View Source
var (
	// Version of application.
	Version = "0.7.31"

	ErrIndexNameRequired = errors.New("index name required")
	ErrNoWorkers         = errors.New("no workers configured")
	ErrInvalidBatchSize  = errors.New("cannot use zero batch size")
)

Functions

func BulkIndex added in v0.3.0

func BulkIndex(ctx context.Context, docs []string, options Options) error

BulkIndex takes a set of documents as strings and indexes them into elasticsearch.

func CreateHTTPClient added in v0.7.29

func CreateHTTPClient(insecureSkipVerify bool, timeout time.Duration) *pester.Client

CreateHTTPClient creates a pester client with optional TLS configuration and timeout.

func CreateHTTPRequest added in v0.7.29

func CreateHTTPRequest(method, url string, body io.Reader, options Options) (*http.Request, error)

CreateHTTPRequest builds an HTTP request with authentication and headers. It handles basic authentication, content-type headers, and returns a ready-to-use request. Deprecated: Use CreateHTTPRequestWithContext instead.

func CreateHTTPRequestWithContext added in v0.7.29

func CreateHTTPRequestWithContext(ctx context.Context, method, url string, body io.Reader, options Options) (*http.Request, error)

CreateHTTPRequest builds an HTTP request with authentication and headers. It handles basic authentication, content-type headers, and returns a ready-to-use request.

func CreateIndex added in v0.6.0

func CreateIndex(options Options, body io.Reader) error

CreateIndex creates a new index.

func DeleteIndex added in v0.6.0

func DeleteIndex(options Options) error

DeleteIndex removes an index.

func FlushIndex added in v0.6.0

func FlushIndex(idx int, options Options) error

FlushIndex flushes index.

func GetSettings added in v0.6.0

func GetSettings(idx int, options Options) (map[string]interface{}, error)

GetSettings fetches the settings of the index.

func PutMapping added in v0.6.0

func PutMapping(options Options, body io.Reader) error

PutMapping applies a mapping from a reader.

func Worker added in v0.3.0

func Worker(ctx context.Context, id string, options Options, lines chan string, wg *sync.WaitGroup, errChan chan<- error) error

Worker will batch index documents that come in on the lines channel. Errors are sent to the provided error channel; the function always returns nil to satisfy the WaitGroup contract.

Types

type ArrayFlags added in v0.6.0

type ArrayFlags []string

ArrayFlags allows to store lists of flag values.

func (*ArrayFlags) Set added in v0.6.0

func (f *ArrayFlags) Set(value string) error

Set appends a value.

func (*ArrayFlags) String added in v0.6.0

func (f *ArrayFlags) String() string

String representation.

type BulkResponse added in v0.6.0

type BulkResponse struct {
	Took      int    `json:"took"`
	HasErrors bool   `json:"errors"`
	Items     []Item `json:"items"`
}

BulkResponse is a response to a bulk request.

type Item added in v0.6.0

type Item struct {
	IndexAction struct {
		Index  string `json:"_index"`
		Type   string `json:"_type"`
		ID     string `json:"_id"`
		Status int    `json:"status"`
		Error  struct {
			Type      string `json:"type"`
			Reason    string `json:"reason"`
			IndexUUID string `json:"index_uuid"`
			Shard     string `json:"shard"`
			Index     string `json:"index"`
		} `json:"error"`
	} `json:"index"`
}

Item represents a bulk action.

type Options added in v0.3.0

type Options struct {
	Servers            []string
	Index              string
	OpType             string
	DocType            string
	BatchSize          int
	Verbose            bool
	IDField            string
	Scheme             string // http or https; deprecated, use: Servers.
	Username           string
	Password           string
	ApiKey             string
	Pipeline           string
	IncludeTypeName    bool // https://www.elastic.co/blog/moving-from-types-to-typeless-apis-in-elasticsearch-7-0
	InsecureSkipVerify bool
	// Timeout for HTTP requests (default: 30s)
	RequestTimeout time.Duration
}

Options represents bulk indexing options.

func (*Options) RandomServer added in v0.7.29

func (o *Options) RandomServer() string

RandomServer returns a random server from the Servers slice. Uses the global random generator seeded at program startup.

type Runner added in v0.7.0

type Runner struct {
	ApiKey             string
	BatchSize          int
	Config             string
	CpuProfile         string
	OpType             string
	DocType            string
	File               *os.File
	FileGzipped        bool
	IdentifierField    string
	IndexName          string
	Mapping            string
	MemProfile         string
	NumWorkers         int
	Password           string
	Pipeline           string
	Purge              bool
	PurgePause         time.Duration
	RefreshInterval    string
	Scheme             string
	Servers            []string
	Settings           string
	ShowVersion        bool
	SkipBroken         bool
	Username           string
	Verbose            bool
	InsecureSkipVerify bool
	ZeroReplica        bool
	// Request timeout for HTTP operations
	RequestTimeout time.Duration
	// contains filtered or unexported fields
}

Runner bundles various options. Factored out of a former main func and should be further split up (TODO).

func (*Runner) Run added in v0.7.0

func (r *Runner) Run() (err error)

Run starts indexing documents from file into a given index.

Directories

Path Synopsis
cmd
esbulk command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL