clickhouse

package
v1.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2026 License: GPL-3.0 Imports: 15 Imported by: 0

Documentation

Overview

Package clickhouse implements an output sink that writes DecoratedEvents directly to ClickHouse using the shared writer + router stack from pkg/clickhouse.

Unlike the other sinks (xatu, kafka, http, stdout), this sink does NOT use processor.BatchItemProcessor. Each call to HandleNewDecoratedEvents flushes the entire input slice as one columnar INSERT per affected table. This preserves cannon's per-epoch atomicity: one deriver callback delivers one full epoch, which maps to one CH INSERT per table covering exactly that epoch. The deriver only advances its coordinator checkpoint after this call returns nil — so checkpoint progress is gated on CH ack, not on a queued-for-batching ack.

As a consequence, the output.Config.ShippingMethod field is ignored for this sink type. A non-Sync setting logs a warning at construction.

Index

Constants

View Source
const SinkType = "clickhouse"

SinkType identifies this sink in output.Config.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	chwriter.Config `yaml:",inline"`

	// MetricsSubsystem is the Prometheus subsystem (under namespace "xatu")
	// for this sink's CH writer/router metrics. The sink itself is
	// binary-agnostic, so there is no built-in default — the binary that
	// instantiates this sink is responsible for setting a stable subsystem
	// name (cannon's wiring fills in "cannon", consumoor uses "consumoor",
	// etc.). When empty, falls back to "clickhouse" inside the metrics
	// layer; that fallback exists for safety only and shouldn't be relied
	// on by production binaries.
	MetricsSubsystem string `yaml:"metricsSubsystem"`

	// RestrictToTablePrefixes, when non-empty, filters the route catalog
	// to only routes whose target table name begins with any of the listed
	// prefixes. This shrinks the set of tables the writer registers and
	// validates at startup. The field is here for binaries whose event set
	// is narrower than the full route catalog (cannon emits only
	// canonical_*, sentries emit beacon_api_* / libp2p_*, etc.). The
	// binary's wiring is the right place to set this — user YAML should
	// not have to know the prefix taxonomy. Empty means use the full
	// catalog.
	RestrictToTablePrefixes []string `yaml:"restrictToTablePrefixes"`
}

Config configures the clickhouse output sink. The Writer fields are inlined so YAML keys land directly under `config:` (matching the layout used by the other sinks).

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the sink configuration for errors.

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

Sink writes DecoratedEvents straight to ClickHouse, bypassing the xatu server / Kafka path.

func New

func New(
	name string,
	config *Config,
	log observability.ContextualLogger, filterConfig *xatu.EventFilterConfig,
	shippingMethod processor.ShippingMethod,
) (*Sink, error)

New constructs a clickhouse sink. shippingMethod is accepted for interface uniformity but ignored — see the package-level godoc.

func (*Sink) HandleNewDecoratedEvent

func (s *Sink) HandleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error

HandleNewDecoratedEvent flushes a single event.

func (*Sink) HandleNewDecoratedEvents

func (s *Sink) HandleNewDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error

HandleNewDecoratedEvents routes the entire input slice and flushes one columnar INSERT per affected table. Returns the first table-flush error encountered (joined) so the caller's checkpoint does not advance on partial failure.

func (*Sink) Name

func (s *Sink) Name() string

Name returns the sink instance's user-supplied name.

func (*Sink) Start

func (s *Sink) Start(ctx context.Context) error

Start dials the underlying ClickHouse pool and validates table presence.

func (*Sink) Stop

func (s *Sink) Stop(ctx context.Context) error

Stop closes the ClickHouse pool.

func (*Sink) Type

func (s *Sink) Type() string

Type returns the sink type identifier.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL