sql

package
v1.13.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

README

Sql Processor Pluign

The sql processor plugin performs SQL query using incoming events. This plugin based on jmoiron/sqlx package.

An event label may be used as a table name using table_placeholder parameter.

If query returns rows, it will be added to event.

[!TIP]
This plugin may write it's own metrics

TLS usage

Drivers use plugin TLS configuration.

Configuration

[[processors]]
  [processors.sql]
    # if true, plugin client writes it's own metrics
    enable_metrics = false

    # SQL driver, must be on of: "pgx", "mysql", "sqlserver", "oracle", "clickhouse"
    driver = "pgx"

    # datasource service name in selected driver format
    dsn = "postgres://localhost:5432/postgres"

    # authentication credentials
    # if both not empty, takes precedence over ones provided in DSN
    username = ""
    password = ""

    # database connection params - https://pkg.go.dev/database/sql#DB.SetConnMaxIdleTime
    conns_max_idle_time = "10m"
    conns_max_life_time = "10m"
    conns_max_open = 2
    conns_max_idle = 1

    # queries execution timeout
    query_timeout = "10s"

    # a placeholder in query, which will be replaced by configured label
    # that may be useful if target table is partitioned
    table_placeholder = ":table_name"

    # label, which value will be used as a table name, if configured
    table_label = "table_name"

    # maximum number of attempts to execute query
    # before event will be marked as failed
    retry_attempts = 0 # zero for endless attempts

    # interval between retries to (re-)execute query
    retry_after = "5s"

    ## TLS configuration
    # if true, TLS client will be used
    tls_enable = false
    # trusted root certificates for server
    tls_ca_file = "/etc/neptunus/ca.pem"
    # used for TLS client certificate authentication
    tls_key_file = "/etc/neptunus/key.pem"
    tls_cert_file = "/etc/neptunus/cert.pem"
    # minimum TLS version, not limited by default
    tls_min_version = "TLS12"
    # send the specified TLS server name via SNI
    tls_server_name = "exmple.svc.local"
    # use TLS but skip chain & host verification
    tls_insecure_skip_verify = false

    # "query placeholders <- event fields" mapping
    [processors.sql.columns]
      expedition_type = "type"
      expedition_region = "region"
      expedition_owner = "owner"

    # "event fields <- query result columns" mapping
    # please note that each field type is always a slice
    [processors.sql.fields]
      uid = "expedition_uid"
      owner = "expedition_owner"

    # query, that will be executed for event
    [processors.sql.on_event]
      query = '''
UPDATE :table_name SET
    EXPEDITION_TYPE = :expedition_type
    , EXPEDITION_REGION = :expedition_region
    WHERE EXPEDITION_OWNER = :expedition_owner
RETURNING EXPEDITION_OWNER, EXPEDITION_UID;
'''

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Sql

type Sql struct {
	*core.BaseProcessor `mapstructure:"-"`
	EnableMetrics       bool          `mapstructure:"enable_metrics"`
	Driver              string        `mapstructure:"driver"`
	Dsn                 string        `mapstructure:"dsn"`
	Username            string        `mapstructure:"username"`
	Password            string        `mapstructure:"password"`
	ConnsMaxIdleTime    time.Duration `mapstructure:"conns_max_idle_time"`
	ConnsMaxLifetime    time.Duration `mapstructure:"conns_max_life_time"`
	ConnsMaxOpen        int           `mapstructure:"conns_max_open"`
	ConnsMaxIdle        int           `mapstructure:"conns_max_idle"`
	QueryTimeout        time.Duration `mapstructure:"query_timeout"`

	TablePlaceholder string            `mapstructure:"table_placeholder"`
	TableLabel       string            `mapstructure:"table_label"`
	OnEvent          csql.QueryInfo    `mapstructure:"on_event"`
	Columns          map[string]string `mapstructure:"columns"`
	Fields           map[string]string `mapstructure:"fields"`

	*tls.TLSClientConfig `mapstructure:",squash"`
	*retryer.Retryer     `mapstructure:",squash"`
	// contains filtered or unexported fields
}

func (*Sql) Close

func (p *Sql) Close() error

func (*Sql) Init

func (p *Sql) Init() error

func (*Sql) Run

func (p *Sql) Run()

func (*Sql) SetId

func (p *Sql) SetId(id uint64)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL