route

package
v1.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: GPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidEvent = errors.New("invalid event")

ErrInvalidEvent is returned by FlattenTo when an event's payload is fundamentally invalid (e.g. nil protobuf message). Events flagged with this error are permanently unflattenable and should be dropped rather than retried.

Functions

func ByteSlicesToStrings

func ByteSlicesToStrings(bs [][]byte) []string

ByteSlicesToStrings converts [][]byte to []string via simple type cast. Used by generated Snapshot code for Array(FixedString(N)) columns where the stored bytes are already UTF-8 strings (e.g. hex hashes with "0x" prefix).

func ConsensusVersionMajor

func ConsensusVersionMajor(raw string) string

ConsensusVersionMajor extracts the major component from a raw consensus version string.

func ConsensusVersionMinor

func ConsensusVersionMinor(raw string) string

ConsensusVersionMinor extracts the minor component from a raw consensus version string.

func ConsensusVersionPatch

func ConsensusVersionPatch(raw string) string

ConsensusVersionPatch extracts the patch component from a raw consensus version string.

func FormatDecimal

func FormatDecimal(v int64, scale int) string

FormatDecimal formats a scaled Decimal64 value back to its string representation with the given number of decimal places. For example, FormatDecimal(272800, 3) returns "272.800".

func NewNullableFixedStr

func NewNullableFixedStr(size int) *proto.ColNullable[[]byte]

NewNullableFixedStr creates a ColNullable[[]byte] backed by a SafeColFixedStr of the given size. Used by generated columnar batch code for Nullable(FixedString(N)) columns.

func NormalizeConsensusVersion

func NormalizeConsensusVersion(raw string) string

NormalizeConsensusVersion strips the implementation prefix from a consensus version string (e.g. "Lighthouse/v4.5.0" -> "v4.5.0").

func NormalizeIPToIPv6

func NormalizeIPToIPv6(raw string) proto.IPv6

NormalizeIPToIPv6 parses an IP string and returns proto.IPv6. IPv4 inputs are mapped to IPv6 (::ffff:x.x.x.x) in a single parse without an intermediate string representation.

func NormalizeIPToIPv6Nullable

func NormalizeIPToIPv6Nullable(raw string) proto.Nullable[proto.IPv6]

NormalizeIPToIPv6Nullable parses an IP string and returns a Nullable IPv6. Returns NULL when the input is empty or unparseable (matching Vector behavior), rather than writing the zero address "::".

func PadToFixed

func PadToFixed(b []byte, size int) []byte

PadToFixed pads or truncates b to exactly size bytes.

func ParseConsensusVersion

func ParseConsensusVersion(raw string) (normalized, major, minor, patch string)

ParseConsensusVersion normalizes a raw consensus version string and splits it into its four components in a single pass. This avoids repeated SplitN calls when each component is needed.

func ParseIPv4

func ParseIPv4(s string) proto.IPv4

ParseIPv4 converts a string IPv4 address to proto.IPv4. Returns zero value on parse failure.

func ParseIPv6

func ParseIPv6(s string) proto.IPv6

ParseIPv6 converts a string IP address to proto.IPv6. Returns zero value on parse failure.

func ParseUInt128

func ParseUInt128(s string) (proto.UInt128, error)

ParseUInt128 converts a decimal string to proto.UInt128. Returns an error on parse failure.

func ParseUInt256

func ParseUInt256(s string) (proto.UInt256, error)

ParseUInt256 converts a decimal or hex string to proto.UInt256. Returns an error on parse failure.

func ParseUUID

func ParseUUID(s string) uuid.UUID

ParseUUID converts a string UUID to uuid.UUID. Returns zero value on parse failure.

func RecordError

func RecordError(err error)

RecordError stores an error encountered during init-time route registration. Accumulated errors are surfaced by All().

func Register

func Register(route Route) error

Register adds a route to the global catalog, returning an error on invalid or duplicate registrations.

func ScaleDecimal

func ScaleDecimal(s string, scale int) (int64, error)

ScaleDecimal converts a string decimal value to a scaled int64 for ch-go Decimal columns. For example, "1.618" with scale 3 becomes 1618. Returns an error on parse failure.

func SeaHash64

func SeaHash64(s string) uint64

SeaHash64 returns a SeaHash digest for the provided string.

It mirrors the reference algorithm used by Vector's VRL `seahash(...)` function so hash-derived keys match existing Vector output.

func SeaHashInt64

func SeaHashInt64(s string) int64

SeaHashInt64 returns SeaHash output cast to int64 (two's complement), matching how Int64 ClickHouse columns represent hashes.

func StringsToBytes

func StringsToBytes(ss []string) [][]byte

StringsToBytes converts a string slice to a byte-slice slice. Used by generated columnar batch code for Array(FixedString(N)) columns.

func UInt128ToString

func UInt128ToString(v proto.UInt128) string

UInt128ToString converts a proto.UInt128 to its decimal string representation.

func UInt256ToString

func UInt256ToString(v proto.UInt256) string

UInt256ToString converts a proto.UInt256 to its decimal string representation.

func Uint8SliceToIntSlice

func Uint8SliceToIntSlice(b []uint8) []int

Uint8SliceToIntSlice converts []uint8 (which JSON-marshals as base64) to []int for correct JSON array serialization.

func UnsupportedEvents

func UnsupportedEvents() map[xatu.Event_Name]string

UnsupportedEvents returns a copy of intentionally unsupported event reasons.

func UnsupportedReason

func UnsupportedReason(event xatu.Event_Name) (string, bool)

UnsupportedReason returns the documented reason when an event is intentionally unsupported by consumoor flatteners.

Types

type ColumnarBatch

type ColumnarBatch interface {
	// FlattenTo flattens one event into the batch's proto columns.
	FlattenTo(event *xatu.DecoratedEvent) error
	// Input returns the proto.Input for the accumulated batch.
	Input() proto.Input
	// Reset clears all accumulated data.
	Reset()
	// Rows returns the number of rows appended so far.
	Rows() int
}

ColumnarBatch accumulates events directly into typed ch-go proto columns. Generated per-table Batch structs implement this interface.

type EventPredicate

type EventPredicate func(event *xatu.DecoratedEvent) bool

EventPredicate determines whether a route should process a specific event.

type Route

type Route interface {
	// EventNames returns the event names this route handles.
	EventNames() []xatu.Event_Name

	// TableName returns the target ClickHouse table name.
	TableName() string

	// ShouldProcess returns whether this event should be processed by
	// this route. Used for conditional routing where the same event
	// name routes to different tables based on additional_data fields.
	ShouldProcess(event *xatu.DecoratedEvent) bool

	// NewBatch returns a ColumnarBatch for typed, zero-reflection
	// insertion.
	NewBatch() ColumnarBatch
}

Route converts a DecoratedEvent into flat ClickHouse rows for a specific target table. Each implementation handles one or more event names and produces rows for exactly one ClickHouse table.

func All

func All() ([]Route, error)

All returns all registered routes in deterministic table-name order. It returns an error if any route registrations failed during init.

func NewStaticRoute

func NewStaticRoute(
	table TableName,
	events []xatu.Event_Name,
	batchFactory func() ColumnarBatch,
	opts ...StaticRouteOption,
) (Route, error)

NewStaticRoute creates a route from explicit event names, table name, and a ColumnarBatch factory.

type SafeColFixedStr

type SafeColFixedStr struct {
	proto.ColFixedStr
}

SafeColFixedStr wraps proto.ColFixedStr to pad or truncate values on Append, avoiding panics from ch-go on short byte slices.

func (*SafeColFixedStr) Append

func (c *SafeColFixedStr) Append(b []byte)

Append pads or truncates b to the column's fixed size before appending.

type Snapshotter

type Snapshotter interface {
	ColumnarBatch
	// Snapshot reads back accumulated columnar data as row maps.
	Snapshot() []map[string]any
}

Snapshotter extends ColumnarBatch with test-only read-back for assertions.

type StaticRouteOption

type StaticRouteOption func(*staticRoute)

StaticRouteOption configures optional route behavior.

func WithStaticRoutePredicate

func WithStaticRoutePredicate(predicate EventPredicate) StaticRouteOption

WithStaticRoutePredicate sets conditional route processing behavior.

type TableName

type TableName string

TableName is a typed ClickHouse target table identifier for flatteners.

func (TableName) String

func (t TableName) String() string

type TypedColInput

type TypedColInput struct {
	proto.ColInput
	CHType proto.ColumnType
}

TypedColInput wraps a proto.ColInput to override Type() with a full type string that includes parameters (e.g. "Decimal(10, 3)" instead of bare "Decimal64").

func (*TypedColInput) Reset

func (c *TypedColInput) Reset()

Reset delegates to the inner column.

func (*TypedColInput) Type

func (c *TypedColInput) Type() proto.ColumnType

Type returns the full CH type string.

Directories

Path Synopsis
cmd/chrowgen command
cmd
chgo-rowgen command
generate command
Command generate spins up a ClickHouse container via testcontainers, applies all migrations, and regenerates every .gen.go file using chgo-rowgen.
Command generate spins up a ClickHouse container via testcontainers, applies all migrations, and regenerates every .gen.go file using chgo-rowgen.
Package testfixture provides shared test helpers for per-table snapshot tests across the flattener domain packages.
Package testfixture provides shared test helpers for per-table snapshot tests across the flattener domain packages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL