Documentation
¶
Overview ¶
Package clickhouse implements an output sink that writes DecoratedEvents directly to ClickHouse using the shared writer + router stack from pkg/clickhouse.
Unlike the other sinks (xatu, kafka, http, stdout), this sink does NOT use processor.BatchItemProcessor. Each call to HandleNewDecoratedEvents flushes the entire input slice as one columnar INSERT per affected table. This preserves cannon's per-epoch atomicity: one deriver callback delivers one full epoch, which maps to one CH INSERT per table covering exactly that epoch. The deriver only advances its coordinator checkpoint after this call returns nil — so checkpoint progress is gated on CH ack, not on a queued-for-batching ack.
As a consequence, the output.Config.ShippingMethod field is ignored for this sink type. A non-Sync setting logs a warning at construction.
Index ¶
- Constants
- type Config
- type Sink
- func (s *Sink) HandleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error
- func (s *Sink) HandleNewDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error
- func (s *Sink) Name() string
- func (s *Sink) Start(ctx context.Context) error
- func (s *Sink) Stop(ctx context.Context) error
- func (s *Sink) Type() string
Constants ¶
const SinkType = "clickhouse"
SinkType identifies this sink in output.Config.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
chwriter.Config `yaml:",inline"`
// MetricsSubsystem is the Prometheus subsystem (under namespace "xatu")
// for this sink's CH writer/router metrics. The sink itself is
// binary-agnostic, so there is no built-in default — the binary that
// instantiates this sink is responsible for setting a stable subsystem
// name (cannon's wiring fills in "cannon", consumoor uses "consumoor",
// etc.). When empty, falls back to "clickhouse" inside the metrics
// layer; that fallback exists for safety only and shouldn't be relied
// on by production binaries.
MetricsSubsystem string `yaml:"metricsSubsystem"`
// RestrictToTablePrefixes, when non-empty, filters the route catalog
// to only routes whose target table name begins with any of the listed
// prefixes. This shrinks the set of tables the writer registers and
// validates at startup. The field is here for binaries whose event set
// is narrower than the full route catalog (cannon emits only
// canonical_*, sentries emit beacon_api_* / libp2p_*, etc.). The
// binary's wiring is the right place to set this — user YAML should
// not have to know the prefix taxonomy. Empty means use the full
// catalog.
RestrictToTablePrefixes []string `yaml:"restrictToTablePrefixes"`
}
Config configures the clickhouse output sink. The Writer fields are inlined so YAML keys land directly under `config:` (matching the layout used by the other sinks).
type Sink ¶
type Sink struct {
// contains filtered or unexported fields
}
Sink writes DecoratedEvents straight to ClickHouse, bypassing the xatu server / Kafka path.
func New ¶
func New( name string, config *Config, log observability.ContextualLogger, filterConfig *xatu.EventFilterConfig, shippingMethod processor.ShippingMethod, ) (*Sink, error)
New constructs a clickhouse sink. shippingMethod is accepted for interface uniformity but ignored — see the package-level godoc.
func (*Sink) HandleNewDecoratedEvent ¶
HandleNewDecoratedEvent flushes a single event.
func (*Sink) HandleNewDecoratedEvents ¶
HandleNewDecoratedEvents routes the entire input slice and flushes one columnar INSERT per affected table. Returns the first table-flush error encountered (joined) so the caller's checkpoint does not advance on partial failure.