Documentation
¶
Index ¶
- Variables
- func ByteSlicesToStrings(bs [][]byte) []string
- func ConsensusVersionMajor(raw string) string
- func ConsensusVersionMinor(raw string) string
- func ConsensusVersionPatch(raw string) string
- func FormatDecimal(v int64, scale int) string
- func NewNullableFixedStr(size int) *proto.ColNullable[[]byte]
- func NormalizeConsensusVersion(raw string) string
- func NormalizeIPToIPv6(raw string) proto.IPv6
- func NormalizeIPToIPv6Nullable(raw string) proto.Nullable[proto.IPv6]
- func PadToFixed(b []byte, size int) []byte
- func ParseConsensusVersion(raw string) (normalized, major, minor, patch string)
- func ParseIPv4(s string) proto.IPv4
- func ParseIPv6(s string) proto.IPv6
- func ParseUInt128(s string) (proto.UInt128, error)
- func ParseUInt256(s string) (proto.UInt256, error)
- func ParseUUID(s string) uuid.UUID
- func RecordError(err error)
- func Register(route Route) error
- func ScaleDecimal(s string, scale int) (int64, error)
- func SeaHash64(s string) uint64
- func SeaHashInt64(s string) int64
- func StringsToBytes(ss []string) [][]byte
- func UInt128ToString(v proto.UInt128) string
- func UInt256ToString(v proto.UInt256) string
- func Uint8SliceToIntSlice(b []uint8) []int
- func UnsupportedEvents() map[xatu.Event_Name]string
- func UnsupportedReason(event xatu.Event_Name) (string, bool)
- type ColumnarBatch
- type EventPredicate
- type Route
- type SafeColFixedStr
- type Snapshotter
- type StaticRouteOption
- type TableName
- type TypedColInput
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidEvent = errors.New("invalid event")
ErrInvalidEvent is returned by FlattenTo when an event's payload is fundamentally invalid (e.g. nil protobuf message). Events flagged with this error are permanently unflattenable and should be dropped rather than retried.
Functions ¶
func ByteSlicesToStrings ¶
ByteSlicesToStrings converts [][]byte to []string via simple type cast. Used by generated Snapshot code for Array(FixedString(N)) columns where the stored bytes are already UTF-8 strings (e.g. hex hashes with "0x" prefix).
func ConsensusVersionMajor ¶
ConsensusVersionMajor extracts the major component from a raw consensus version string.
func ConsensusVersionMinor ¶
ConsensusVersionMinor extracts the minor component from a raw consensus version string.
func ConsensusVersionPatch ¶
ConsensusVersionPatch extracts the patch component from a raw consensus version string.
func FormatDecimal ¶
FormatDecimal formats a scaled Decimal64 value back to its string representation with the given number of decimal places. For example, FormatDecimal(272800, 3) returns "272.800".
func NewNullableFixedStr ¶
func NewNullableFixedStr(size int) *proto.ColNullable[[]byte]
NewNullableFixedStr creates a ColNullable[[]byte] backed by a SafeColFixedStr of the given size. Used by generated columnar batch code for Nullable(FixedString(N)) columns.
func NormalizeConsensusVersion ¶
NormalizeConsensusVersion strips the implementation prefix from a consensus version string (e.g. "Lighthouse/v4.5.0" -> "v4.5.0").
func NormalizeIPToIPv6 ¶
NormalizeIPToIPv6 parses an IP string and returns proto.IPv6. IPv4 inputs are mapped to IPv6 (::ffff:x.x.x.x) in a single parse without an intermediate string representation.
func NormalizeIPToIPv6Nullable ¶
NormalizeIPToIPv6Nullable parses an IP string and returns a Nullable IPv6. Returns NULL when the input is empty or unparseable (matching Vector behavior), rather than writing the zero address "::".
func PadToFixed ¶
PadToFixed pads or truncates b to exactly size bytes.
func ParseConsensusVersion ¶
ParseConsensusVersion normalizes a raw consensus version string and splits it into its four components in a single pass. This avoids repeated SplitN calls when each component is needed.
func ParseIPv4 ¶
ParseIPv4 converts a string IPv4 address to proto.IPv4. Returns zero value on parse failure.
func ParseIPv6 ¶
ParseIPv6 converts a string IP address to proto.IPv6. Returns zero value on parse failure.
func ParseUInt128 ¶
ParseUInt128 converts a decimal string to proto.UInt128. Returns an error on parse failure.
func ParseUInt256 ¶
ParseUInt256 converts a decimal or hex string to proto.UInt256. Returns an error on parse failure.
func ParseUUID ¶
ParseUUID converts a string UUID to uuid.UUID. Returns zero value on parse failure.
func RecordError ¶
func RecordError(err error)
RecordError stores an error encountered during init-time route registration. Accumulated errors are surfaced by All().
func Register ¶
Register adds a route to the global catalog, returning an error on invalid or duplicate registrations.
func ScaleDecimal ¶
ScaleDecimal converts a string decimal value to a scaled int64 for ch-go Decimal columns. For example, "1.618" with scale 3 becomes 1618. Returns an error on parse failure.
func SeaHash64 ¶
SeaHash64 returns a SeaHash digest for the provided string.
It mirrors the reference algorithm used by Vector's VRL `seahash(...)` function so hash-derived keys match existing Vector output.
func SeaHashInt64 ¶
SeaHashInt64 returns SeaHash output cast to int64 (two's complement), matching how Int64 ClickHouse columns represent hashes.
func StringsToBytes ¶
StringsToBytes converts a string slice to a byte-slice slice. Used by generated columnar batch code for Array(FixedString(N)) columns.
func UInt128ToString ¶
UInt128ToString converts a proto.UInt128 to its decimal string representation.
func UInt256ToString ¶
UInt256ToString converts a proto.UInt256 to its decimal string representation.
func Uint8SliceToIntSlice ¶
Uint8SliceToIntSlice converts []uint8 (which JSON-marshals as base64) to []int for correct JSON array serialization.
func UnsupportedEvents ¶
func UnsupportedEvents() map[xatu.Event_Name]string
UnsupportedEvents returns a copy of intentionally unsupported event reasons.
func UnsupportedReason ¶
func UnsupportedReason(event xatu.Event_Name) (string, bool)
UnsupportedReason returns the documented reason when an event is intentionally unsupported by consumoor flatteners.
Types ¶
type ColumnarBatch ¶
type ColumnarBatch interface {
// FlattenTo flattens one event into the batch's proto columns.
FlattenTo(event *xatu.DecoratedEvent) error
// Input returns the proto.Input for the accumulated batch.
Input() proto.Input
// Reset clears all accumulated data.
Reset()
// Rows returns the number of rows appended so far.
Rows() int
}
ColumnarBatch accumulates events directly into typed ch-go proto columns. Generated per-table Batch structs implement this interface.
type EventPredicate ¶
type EventPredicate func(event *xatu.DecoratedEvent) bool
EventPredicate determines whether a route should process a specific event.
type Route ¶
type Route interface {
// EventNames returns the event names this route handles.
EventNames() []xatu.Event_Name
// TableName returns the target ClickHouse table name.
TableName() string
// ShouldProcess returns whether this event should be processed by
// this route. Used for conditional routing where the same event
// name routes to different tables based on additional_data fields.
ShouldProcess(event *xatu.DecoratedEvent) bool
// NewBatch returns a ColumnarBatch for typed, zero-reflection
// insertion.
NewBatch() ColumnarBatch
}
Route converts a DecoratedEvent into flat ClickHouse rows for a specific target table. Each implementation handles one or more event names and produces rows for exactly one ClickHouse table.
func All ¶
All returns all registered routes in deterministic table-name order. It returns an error if any route registrations failed during init.
func NewStaticRoute ¶
func NewStaticRoute( table TableName, events []xatu.Event_Name, batchFactory func() ColumnarBatch, opts ...StaticRouteOption, ) (Route, error)
NewStaticRoute creates a route from explicit event names, table name, and a ColumnarBatch factory.
type SafeColFixedStr ¶
type SafeColFixedStr struct {
proto.ColFixedStr
}
SafeColFixedStr wraps proto.ColFixedStr to pad or truncate values on Append, avoiding panics from ch-go on short byte slices.
func (*SafeColFixedStr) Append ¶
func (c *SafeColFixedStr) Append(b []byte)
Append pads or truncates b to the column's fixed size before appending.
type Snapshotter ¶
type Snapshotter interface {
ColumnarBatch
// Snapshot reads back accumulated columnar data as row maps.
Snapshot() []map[string]any
}
Snapshotter extends ColumnarBatch with test-only read-back for assertions.
type StaticRouteOption ¶
type StaticRouteOption func(*staticRoute)
StaticRouteOption configures optional route behavior.
func WithStaticRoutePredicate ¶
func WithStaticRoutePredicate(predicate EventPredicate) StaticRouteOption
WithStaticRoutePredicate sets conditional route processing behavior.
type TableName ¶
type TableName string
TableName is a typed ClickHouse target table identifier for flatteners.
type TypedColInput ¶
type TypedColInput struct {
proto.ColInput
CHType proto.ColumnType
}
TypedColInput wraps a proto.ColInput to override Type() with a full type string that includes parameters (e.g. "Decimal(10, 3)" instead of bare "Decimal64").
func (*TypedColInput) Type ¶
func (c *TypedColInput) Type() proto.ColumnType
Type returns the full CH type string.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd/chrowgen
command
|
|
|
cmd
|
|
|
chgo-rowgen
command
|
|
|
generate
command
Command generate spins up a ClickHouse container via testcontainers, applies all migrations, and regenerates every .gen.go file using chgo-rowgen.
|
Command generate spins up a ClickHouse container via testcontainers, applies all migrations, and regenerates every .gen.go file using chgo-rowgen. |
|
Package testfixture provides shared test helpers for per-table snapshot tests across the flattener domain packages.
|
Package testfixture provides shared test helpers for per-table snapshot tests across the flattener domain packages. |