ch

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package ch is used to interact with ClickHouse servers.

Index

Constants

View Source
const (
	// IntervalGroup is the column alias for the interval group.
	IntervalGroup  = "group_timestamp"
	AggNumberCol   = "agg_number"
	AggStringCol   = "agg_string"
	AggLocationCol = "agg_location"
)
View Source
const (

	// TimeoutErrCode is the error code returned by ClickHouse when a query is interrupted due to exceeding the max_execution_time.
	TimeoutErrCode = int32(159)
)

Variables

View Source
var (
	// GetSegmentsLatency measures latency of segment detection by mechanism
	GetSegmentsLatency = promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Name:    "telemetry_ch_get_segments_latency_seconds",
			Help:    "Latency of GetSegments (segment detection) in seconds",
			Buckets: prometheus.DefBuckets,
		},
		[]string{"mechanism"},
	)

	// GetAggregatedSignalsForRangesLatency measures latency of batch signal aggregation for segment summaries
	GetAggregatedSignalsForRangesLatency = promauto.NewHistogram(
		prometheus.HistogramOpts{
			Name:    "telemetry_ch_get_aggregated_signals_for_ranges_latency_seconds",
			Help:    "Latency of GetAggregatedSignalsForRanges in seconds",
			Buckets: prometheus.DefBuckets,
		},
	)

	// GetEventCountsForRangesLatency measures latency of batch event counts for segment summaries
	GetEventCountsForRangesLatency = promauto.NewHistogram(
		prometheus.HistogramOpts{
			Name:    "telemetry_ch_get_event_counts_for_ranges_latency_seconds",
			Help:    "Latency of GetEventCountsForRanges in seconds",
			Buckets: prometheus.DefBuckets,
		},
	)
)
View Source
var SourceTranslations = map[string][]string{
	"macaron":  {"dimo/integration/2ULfuC8U9dOqRshZBAi0lMM1Rrx", "0x4c674ddE8189aEF6e3b58F5a36d7438b2b1f6Bc2"},
	"tesla":    {"dimo/integration/26A5Dk3vvvQutjSyF0Jka2DP5lg", "0xc4035Fecb1cc906130423EF05f9C20977F643722"},
	"autopi":   {"dimo/integration/27qftVRWQYpVDcO5DltO5Ojbjxk", "0x5e31bBc786D7bEd95216383787deA1ab0f1c1897"},
	"smartcar": {"dimo/integration/22N2xaPOq2WW2gAHBHd0Ikn4Zob", "0xcd445F4c6bDAD32b68a2939b912150Fe3C88803E"},
	"ruptela":  {"0xF26421509Efe92861a587482100c6d728aBf1CD0"},
	"compass":  {"0x55BF1c27d468314Ea119CF74979E2b59F962295c"},
	"motorq":   {"0x5879B43D88Fa93CE8072d6612cBc8dE93E98CE5d"},
}

Functions

This section is empty.

Types

type ActiveWindow added in v0.1.28

type ActiveWindow struct {
	WindowStart         time.Time
	WindowEnd           time.Time
	SignalCount         uint64
	DistinctSignalCount uint64
}

ActiveWindow represents a time window with sufficient signal activity. Used by frequency and changepoint detectors.

type AggSignal added in v0.1.19

type AggSignal struct {
	// SignalType describes the type of values in the aggregation:
	// float, string, or location.
	SignalType FieldType
	// SignalIndex is an identifier for the aggregation within its
	// SignalType. For all types this is simply an index into the
	// corresponding argument array.
	//
	// We could get away with a single number, since we know how many
	// arguments of each type there are, but it appears to us that this
	// would make adding new types riskier.
	SignalIndex uint16
	// Timestamp is the timestamp for the bucket, the leftmost point.
	Timestamp time.Time
	// ValueNumber is the value for this row if it is of float or
	// approximate location type.
	ValueNumber float64
	// ValueNumber is the value for this row if it is of float or
	// approximate location type.
	ValueString   string
	ValueLocation vss.Location
}

type AggSignalForRange added in v0.2.0

type AggSignalForRange struct {
	SegIndex      int
	SignalType    FieldType
	SignalIndex   uint16
	ValueNumber   float64
	ValueString   string
	ValueLocation vss.Location
}

AggSignalForRange is AggSignal with segment index (from GetAggregatedSignalsForRanges).

type AliasHandleMapper added in v0.1.19

type AliasHandleMapper struct {
	// contains filtered or unexported fields
}

func NewAliasHandleMapper added in v0.1.19

func NewAliasHandleMapper() *AliasHandleMapper

func (*AliasHandleMapper) Add added in v0.1.19

func (m *AliasHandleMapper) Add(alias, handle string)

func (*AliasHandleMapper) Alias added in v0.1.19

func (m *AliasHandleMapper) Alias(handle string) string

func (*AliasHandleMapper) Handle added in v0.1.19

func (m *AliasHandleMapper) Handle(alias string) string

type ChangePointDetector added in v0.1.28

type ChangePointDetector struct {
	// contains filtered or unexported fields
}

ChangePointDetector detects segments using CUSUM (Cumulative Sum) change point detection. CUSUM monitors cumulative deviation from expected baseline to detect regime changes. When vehicle becomes active, signal frequency increases significantly.

func NewChangePointDetector added in v0.1.28

func NewChangePointDetector(conn clickhouse.Conn) *ChangePointDetector

NewChangePointDetector creates a new ChangePointDetector with the given connection.

func (*ChangePointDetector) DetectSegments added in v0.1.28

func (d *ChangePointDetector) DetectSegments(
	ctx context.Context,
	tokenID uint32,
	from, to time.Time,
	config *model.SegmentConfig,
) ([]*model.Segment, error)

DetectSegments implements CUSUM-based change point detection

func (*ChangePointDetector) GetMechanismName added in v0.1.28

func (d *ChangePointDetector) GetMechanismName() string

GetMechanismName returns the name of this detection mechanism

type EventCount added in v0.2.0

type EventCount struct {
	Name  string
	Count int
}

EventCount is the count of events by name in a time range.

type EventCountForRange added in v0.2.0

type EventCountForRange struct {
	SegIndex int
	Name     string
	Count    int
}

EventCountForRange is event count by name for one segment index (from GetEventCountsForRanges).

type EventSummary added in v0.2.0

type EventSummary struct {
	Name      string
	Count     uint64
	FirstSeen time.Time
	LastSeen  time.Time
}

EventSummary is the per-event summary for a vehicle (all time): name, count, first/last seen.

type FieldType added in v0.1.19

type FieldType uint8

FieldType indicates the type of values in the aggregation.

const (
	// FloatType is the type for rows with numeric values that are in
	// the VSS spec.
	FloatType FieldType = 1
	// StringType is the type for rows with string values.
	StringType FieldType = 2
	// LocType is the type for rows with location values.
	LocType FieldType = 3
)

func (*FieldType) Scan added in v0.1.19

func (t *FieldType) Scan(value any) error

type FrequencyDetector added in v0.1.28

type FrequencyDetector struct {
	// contains filtered or unexported fields
}

FrequencyDetector detects segments using frequency analysis of signal updates. Analyzes signal update patterns to identify vehicle activity periods.

func NewFrequencyDetector added in v0.1.28

func NewFrequencyDetector(conn clickhouse.Conn) *FrequencyDetector

NewFrequencyDetector creates a new FrequencyDetector with the given connection.

func (*FrequencyDetector) DetectSegments added in v0.1.28

func (d *FrequencyDetector) DetectSegments(
	ctx context.Context,
	tokenID uint32,
	from, to time.Time,
	config *model.SegmentConfig,
) ([]*model.Segment, error)

DetectSegments implements frequency-based segment detection

func (*FrequencyDetector) GetMechanismName added in v0.1.28

func (d *FrequencyDetector) GetMechanismName() string

GetMechanismName returns the name of this detection mechanism.

type IdlingDetector added in v0.2.0

type IdlingDetector struct {
	// contains filtered or unexported fields
}

IdlingDetector detects segments where engine RPM remains in idle range. Processes RPM samples in-memory for exact segment boundaries (no window discretization). Note: Detection is RPM-only. Callers (e.g. repository) filter out segments with speed > 0.

func NewIdlingDetector added in v0.2.0

func NewIdlingDetector(conn clickhouse.Conn) *IdlingDetector

NewIdlingDetector creates a new IdlingDetector with the given connection.

func (*IdlingDetector) DetectSegments added in v0.2.0

func (d *IdlingDetector) DetectSegments(
	ctx context.Context,
	tokenID uint32,
	from, to time.Time,
	config *model.SegmentConfig,
) ([]*model.Segment, error)

DetectSegments fetches RPM samples (1 CH query) and finds contiguous runs of idle RPM in-memory.

func (*IdlingDetector) GetMechanismName added in v0.2.0

func (d *IdlingDetector) GetMechanismName() string

GetMechanismName returns the name of this detection mechanism.

type IgnitionDetector added in v0.1.28

type IgnitionDetector struct {
	// contains filtered or unexported fields
}

IgnitionDetector detects segments using ignition state transitions

func NewIgnitionDetector added in v0.1.28

func NewIgnitionDetector(conn clickhouse.Conn) *IgnitionDetector

NewIgnitionDetector creates a new IgnitionDetector with the given connection

func (*IgnitionDetector) DetectSegments added in v0.1.28

func (d *IgnitionDetector) DetectSegments(
	ctx context.Context,
	tokenID uint32,
	from, to time.Time,
	config *model.SegmentConfig,
) (_ []*model.Segment, retErr error)

DetectSegments implements ignition-based segment detection

func (*IgnitionDetector) GetMechanismName added in v0.1.28

func (d *IgnitionDetector) GetMechanismName() string

GetMechanismName returns the name of this detection mechanism

type RechargeDetector added in v0.2.0

type RechargeDetector struct {
	// contains filtered or unexported fields
}

RechargeDetector detects recharge segments by finding trough-to-peak rises in the SoC curve.

func NewRechargeDetector added in v0.2.0

func NewRechargeDetector(conn clickhouse.Conn) *RechargeDetector

NewRechargeDetector creates a new RechargeDetector with the given connection.

func (*RechargeDetector) DetectSegments added in v0.2.0

func (d *RechargeDetector) DetectSegments(
	ctx context.Context,
	tokenID uint32,
	from, to time.Time,
	config *model.SegmentConfig,
) ([]*model.Segment, error)

DetectSegments finds periods where state of charge rises (trough to peak), filters by odometer, and merges nearby sessions.

func (*RechargeDetector) GetMechanismName added in v0.2.0

func (d *RechargeDetector) GetMechanismName() string

GetMechanismName returns the name of this detection mechanism.

type RefuelDetector added in v0.2.0

type RefuelDetector struct {
	// contains filtered or unexported fields
}

RefuelDetector detects refuel segments by finding large fuel rises and emitting segments from the last low reading (trough) to the first stable high reading (peak).

func NewRefuelDetector added in v0.2.0

func NewRefuelDetector(conn clickhouse.Conn) *RefuelDetector

NewRefuelDetector creates a new RefuelDetector with the given connection.

func (*RefuelDetector) DetectSegments added in v0.2.0

func (d *RefuelDetector) DetectSegments(
	ctx context.Context,
	tokenID uint32,
	from, to time.Time,
	config *model.SegmentConfig,
) ([]*model.Segment, error)

DetectSegments finds 5-min windows with >30% fuel rise, then for each rise emits a segment from the trough (last low sample before the jump) to the peak (first stable high after). 1 CH query (fuel only).

func (*RefuelDetector) GetMechanismName added in v0.2.0

func (d *RefuelDetector) GetMechanismName() string

GetMechanismName returns the name of this detection mechanism.

type SegmentDetector added in v0.1.28

type SegmentDetector interface {
	// DetectSegments identifies vehicle usage segments using mechanism-specific logic
	DetectSegments(
		ctx context.Context,
		tokenID uint32,
		from, to time.Time,
		config *model.SegmentConfig,
	) ([]*model.Segment, error)

	// GetMechanismName returns the name of this detection mechanism
	GetMechanismName() string
}

SegmentDetector defines the interface for different segment detection mechanisms.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is a ClickHouse service that interacts with the ClickHouse database.

func NewService

func NewService(settings config.Settings) (*Service, error)

NewService creates a new ClickHouse service.

func (*Service) GetAggregatedSignals

func (s *Service) GetAggregatedSignals(ctx context.Context, aggArgs *model.AggregatedSignalArgs) ([]*AggSignal, error)

GetAggregatedSignals returns a slice of aggregated signals based on the provided arguments from the ClickHouse database. The signals are sorted by timestamp in ascending order. The timestamp on each signal is for the start of the interval.

func (*Service) GetAggregatedSignalsForRanges added in v0.2.0

func (s *Service) GetAggregatedSignalsForRanges(ctx context.Context, tokenID uint32, ranges []TimeRange, globalFrom, globalTo time.Time, floatArgs []model.FloatSignalArgs, locationArgs []model.LocationSignalArgs) ([]*AggSignalForRange, error)

GetAggregatedSignalsForRanges returns aggregated signals for multiple time ranges (one per segment) in one query. Only FloatArgs and LocationArgs are used; StringArgs and ApproxLocArgs are ignored.

func (*Service) GetAvailableSignals added in v0.0.21

func (s *Service) GetAvailableSignals(ctx context.Context, tokenId uint32, filter *model.SignalFilter) ([]string, error)

GetAvailableSignals returns a slice of available signals from the ClickHouse database. if no signals are available, a nil slice is returned.

func (*Service) GetEventCounts added in v0.2.0

func (s *Service) GetEventCounts(ctx context.Context, subject string, from, to time.Time, eventNames []string) ([]*EventCount, error)

GetEventCounts returns event counts by name in the given time range. If eventNames is nil or empty, all event names in the range are returned; otherwise only requested names (missing names get count 0 in the caller).

func (*Service) GetEventCountsForRanges added in v0.2.0

func (s *Service) GetEventCountsForRanges(ctx context.Context, subject string, ranges []TimeRange, eventNames []string) ([]*EventCountForRange, error)

GetEventCountsForRanges returns event counts by name per segment index for multiple time ranges in one query. If eventNames is nil or empty, all event names are returned; otherwise only requested names (missing get count 0 at call site).

func (*Service) GetEventSummaries added in v0.2.0

func (s *Service) GetEventSummaries(ctx context.Context, subject string) ([]*EventSummary, error)

GetEventSummaries returns per-event summaries (name, count, first/last seen) for a subject (vehicle), all time.

func (*Service) GetEvents added in v0.1.19

func (s *Service) GetEvents(ctx context.Context, subject string, from, to time.Time, filter *model.EventFilter) ([]*vss.Event, error)

func (*Service) GetLatestSignals

func (s *Service) GetLatestSignals(ctx context.Context, latestArgs *model.LatestSignalsArgs) ([]*vss.Signal, error)

GetLatestSignals returns the latest signals based on the provided arguments from the ClickHouse database.

func (*Service) GetSegments added in v0.1.28

func (s *Service) GetSegments(
	ctx context.Context,
	tokenID uint32,
	from, to time.Time,
	mechanism model.DetectionMechanism,
	config *model.SegmentConfig,
) ([]*model.Segment, error)

GetSegments returns segments detected using the specified mechanism.

func (*Service) GetSignalSummaries added in v0.1.22

func (s *Service) GetSignalSummaries(ctx context.Context, tokenId uint32, filter *model.SignalFilter) ([]*model.SignalDataSummary, error)

type StateChange added in v0.1.28

type StateChange struct {
	Timestamp time.Time
	State     float64
	PrevState float64
}

StateChange represents a single state change from the signal_state_changes table

type TimeRange added in v0.2.0

type TimeRange struct {
	From, To time.Time
}

TimeRange is a [From, To) interval for batch event count queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL