timeseries

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewClickHouseModule

func NewClickHouseModule(name string, config map[string]any) (sdk.ModuleInstance, error)

NewClickHouseModule creates a new timeseries.clickhouse module instance.

func NewDruidCompactStep

func NewDruidCompactStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewDruidCompactStep creates a new step.ts_druid_compact instance.

func NewDruidDatasourceStep

func NewDruidDatasourceStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewDruidDatasourceStep creates a new step.ts_druid_datasource instance.

func NewDruidIngestStep

func NewDruidIngestStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewDruidIngestStep creates a new step.ts_druid_ingest instance.

func NewDruidModule

func NewDruidModule(name string, config map[string]any) (sdk.ModuleInstance, error)

NewDruidModule creates a new timeseries.druid module instance.

func NewDruidQueryStep

func NewDruidQueryStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewDruidQueryStep creates a new step.ts_druid_query instance.

func NewInfluxModule

func NewInfluxModule(name string, config map[string]any) (sdk.ModuleInstance, error)

NewInfluxModule creates a new timeseries.influxdb module instance.

func NewQuestDBModule

func NewQuestDBModule(name string, config map[string]any) (sdk.ModuleInstance, error)

NewQuestDBModule creates a new timeseries.questdb module instance.

func NewTSArchiveStep

func NewTSArchiveStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewTSArchiveStep creates a new step.ts_archive instance.

func NewTSClickHouseViewStep

func NewTSClickHouseViewStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewTSClickHouseViewStep creates a new step.ts_clickhouse_view instance.

func NewTSContinuousQueryStep

func NewTSContinuousQueryStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewTSContinuousQueryStep creates a new step.ts_continuous_query instance.

func NewTSDownsampleStep

func NewTSDownsampleStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewTSDownsampleStep creates a new step.ts_downsample instance.

func NewTSQueryStep

func NewTSQueryStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewTSQueryStep creates a new step.ts_query instance.

func NewTSRetentionStep

func NewTSRetentionStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewTSRetentionStep creates a new step.ts_retention instance.

func NewTSTierStatusStep

func NewTSTierStatusStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewTSTierStatusStep creates a new step.ts_tier_status instance.

func NewTSWriteBatchStep

func NewTSWriteBatchStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewTSWriteBatchStep creates a new step.ts_write_batch instance.

func NewTSWriteStep

func NewTSWriteStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewTSWriteStep creates a new step.ts_write instance.

func NewTimescaleModule

func NewTimescaleModule(name string, config map[string]any) (sdk.ModuleInstance, error)

NewTimescaleModule creates a new timeseries.timescaledb module instance.

func Register

func Register(name string, w TimeSeriesWriter) error

Register adds a named TimeSeriesWriter to the global registry. Returns an error if the name is already registered.

func Unregister

func Unregister(name string)

Unregister removes a named TimeSeriesWriter from the global registry.

Types

type ArchiveResult

type ArchiveResult struct {
	RowsArchived   int64  `json:"rowsArchived"`
	BytesWritten   int64  `json:"bytesWritten"`
	Destination    string `json:"destination"`
	DeletedFromHot bool   `json:"deletedFromHot"`
}

ArchiveResult holds the result of an archive operation.

type ClickHouseConfig

type ClickHouseConfig struct {
	Endpoints    []string      `json:"endpoints"    yaml:"endpoints"`
	Database     string        `json:"database"     yaml:"database"`
	Username     string        `json:"username"     yaml:"username"`
	Password     string        `json:"password"     yaml:"password"`
	MaxOpenConns int           `json:"maxOpenConns" yaml:"maxOpenConns"`
	MaxIdleConns int           `json:"maxIdleConns" yaml:"maxIdleConns"`
	Compression  string        `json:"compression"  yaml:"compression"`
	Secure       bool          `json:"secure"       yaml:"secure"`
	DialTimeout  time.Duration `json:"dialTimeout"  yaml:"dialTimeout"`
}

ClickHouseConfig holds configuration for the timeseries.clickhouse module.

type ClickHouseModule

type ClickHouseModule struct {
	// contains filtered or unexported fields
}

ClickHouseModule is the timeseries.clickhouse module.

func (*ClickHouseModule) Archive

func (m *ClickHouseModule) Archive(ctx context.Context, olderThan time.Duration, dest, format string, deleteAfter bool) (*ArchiveResult, error)

Archive implements TierManager for ClickHouseModule.

func (*ClickHouseModule) Conn

func (m *ClickHouseModule) Conn() driver.Conn

Conn returns the underlying ClickHouse connection.

func (*ClickHouseModule) Init

func (m *ClickHouseModule) Init() error

Init validates the module configuration.

func (*ClickHouseModule) Query

func (m *ClickHouseModule) Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)

Query implements TimeSeriesWriter by executing a SQL query and scanning results.

func (*ClickHouseModule) Start

func (m *ClickHouseModule) Start(ctx context.Context) error

Start opens the ClickHouse connection, pings the server, and registers.

func (*ClickHouseModule) Status

func (m *ClickHouseModule) Status(ctx context.Context, measurement string) (*TierStatus, error)

Status implements TierManager for ClickHouseModule.

func (*ClickHouseModule) Stop

Stop closes the connection and deregisters.

func (*ClickHouseModule) WriteBatch

func (m *ClickHouseModule) WriteBatch(ctx context.Context, points []Point) error

WriteBatch implements TimeSeriesWriter using ClickHouse's native batch protocol.

func (*ClickHouseModule) WritePoint

func (m *ClickHouseModule) WritePoint(ctx context.Context, measurement string, tags map[string]string, fields map[string]any, timestamp time.Time) error

WritePoint implements TimeSeriesWriter with a single INSERT.

type CompactionConfig

type CompactionConfig struct {
	TargetCompactionSizeBytes int64  `json:"targetCompactionSizeBytes,omitempty" yaml:"targetCompactionSizeBytes,omitempty"`
	SkipOffsetFromLatest      string `json:"skipOffsetFromLatest,omitempty"      yaml:"skipOffsetFromLatest,omitempty"`
}

CompactionConfig holds parameters for a Druid compaction task.

type CompactionStatus

type CompactionStatus struct {
	DataSource string `json:"dataSource"`
	State      string `json:"state"`
}

CompactionStatus holds the compaction state for a datasource.

type DatasourceInfo

type DatasourceInfo struct {
	Name       string         `json:"name"`
	Properties map[string]any `json:"properties,omitempty"`
}

DatasourceInfo contains metadata about a Druid datasource.

type DruidClient

type DruidClient interface {
	GetStatus(ctx context.Context) (*DruidStatus, error)
	SQLQuery(ctx context.Context, query string, params []any) (*QueryResult, error)
	NativeQuery(ctx context.Context, query map[string]any) (*QueryResult, error)
	SubmitSupervisor(ctx context.Context, spec map[string]any) (*SupervisorStatus, error)
	GetSupervisorStatus(ctx context.Context, id string) (*SupervisorStatus, error)
	SuspendSupervisor(ctx context.Context, id string) error
	ResumeSupervisor(ctx context.Context, id string) error
	TerminateSupervisor(ctx context.Context, id string) error
	ListDatasources(ctx context.Context) ([]string, error)
	GetDatasource(ctx context.Context, name string) (*DatasourceInfo, error)
	DisableDatasource(ctx context.Context, name string) error
	SubmitCompaction(ctx context.Context, datasource string, config CompactionConfig) error
	GetCompactionStatus(ctx context.Context, datasource string) (*CompactionStatus, error)
}

DruidClient is the interface for Druid Router API operations.

func NewDruidClient

func NewDruidClient(routerURL, username, password string, timeout time.Duration) DruidClient

NewDruidClient creates a Druid HTTP client.

type DruidConfig

type DruidConfig struct {
	RouterURL   string        `json:"routerUrl"    yaml:"routerUrl"`
	Username    string        `json:"username"     yaml:"username"`
	Password    string        `json:"password"     yaml:"password"`
	HTTPTimeout time.Duration `json:"httpTimeout"  yaml:"httpTimeout"`
}

DruidConfig holds configuration for the timeseries.druid module.

type DruidModule

type DruidModule struct {
	// contains filtered or unexported fields
}

DruidModule implements the timeseries.druid module.

func (*DruidModule) Client

func (m *DruidModule) Client() DruidClient

Client returns the underlying DruidClient.

func (*DruidModule) Config

func (m *DruidModule) Config() DruidConfig

Config returns the module configuration.

func (*DruidModule) Init

func (m *DruidModule) Init() error

Init validates the module configuration.

func (*DruidModule) Query

func (m *DruidModule) Query(ctx context.Context, query string, _ ...any) ([]map[string]any, error)

Query implements TimeSeriesWriter using the Druid SQL endpoint.

func (*DruidModule) Start

func (m *DruidModule) Start(ctx context.Context) error

Start pings Druid and registers the module.

func (*DruidModule) Stop

func (m *DruidModule) Stop(_ context.Context) error

Stop deregisters the module (stateless — no connection to close).

func (*DruidModule) WriteBatch

func (m *DruidModule) WriteBatch(ctx context.Context, points []Point) error

WriteBatch implements TimeSeriesWriter by submitting a native index_parallel task.

func (*DruidModule) WritePoint

func (m *DruidModule) WritePoint(ctx context.Context, measurement string, tags map[string]string, fields map[string]any, ts time.Time) error

WritePoint implements TimeSeriesWriter by submitting a single-row inline batch task.

type DruidStatus

type DruidStatus struct {
	Version string `json:"version"`
	Loading bool   `json:"loading"`
}

DruidStatus represents Druid's status response.

type HypertableConfig

type HypertableConfig struct {
	Table         string `json:"table"         yaml:"table"`
	TimeColumn    string `json:"timeColumn"    yaml:"timeColumn"`
	ChunkInterval string `json:"chunkInterval" yaml:"chunkInterval"`
}

HypertableConfig describes a TimescaleDB hypertable to create on Start.

type InfluxConfig

type InfluxConfig struct {
	URL           string        `json:"url"           yaml:"url"`
	Token         string        `json:"token"         yaml:"token"`
	Org           string        `json:"org"           yaml:"org"`
	Bucket        string        `json:"bucket"        yaml:"bucket"`
	BatchSize     uint          `json:"batchSize"     yaml:"batchSize"`
	FlushInterval time.Duration `json:"flushInterval" yaml:"flushInterval"`
	Precision     string        `json:"precision"     yaml:"precision"`
}

InfluxConfig holds configuration for the timeseries.influxdb module.

type InfluxModule

type InfluxModule struct {
	// contains filtered or unexported fields
}

InfluxModule is the timeseries.influxdb module.

func (*InfluxModule) Archive

func (m *InfluxModule) Archive(ctx context.Context, olderThan time.Duration, dest, format string, deleteAfter bool) (*ArchiveResult, error)

Archive implements TierManager for InfluxModule.

func (*InfluxModule) BucketsAPI

func (m *InfluxModule) BucketsAPI() api.BucketsAPI

BucketsAPI returns the buckets management API.

func (*InfluxModule) Config

func (m *InfluxModule) Config() InfluxConfig

Config returns the module configuration.

func (*InfluxModule) Init

func (m *InfluxModule) Init() error

Init validates the module configuration.

func (*InfluxModule) Query

func (m *InfluxModule) Query(ctx context.Context, query string, _ ...any) ([]map[string]any, error)

Query implements TimeSeriesWriter using the Flux query language.

func (*InfluxModule) QueryAPI

func (m *InfluxModule) QueryAPI() api.QueryAPI

QueryAPI returns the query API for direct use.

func (*InfluxModule) Start

func (m *InfluxModule) Start(ctx context.Context) error

Start creates the InfluxDB client, pings the server, and registers the module.

func (*InfluxModule) Status

func (m *InfluxModule) Status(ctx context.Context, measurement string) (*TierStatus, error)

Status implements TierManager for InfluxModule.

func (*InfluxModule) Stop

func (m *InfluxModule) Stop(_ context.Context) error

Stop closes the InfluxDB client (flushes pending writes) and deregisters.

func (*InfluxModule) UpdateBucketRetention

func (m *InfluxModule) UpdateBucketRetention(ctx context.Context, bucketName string, durationSeconds int64) error

UpdateBucketRetention updates the retention rules for a bucket.

func (*InfluxModule) WriteAPI

func (m *InfluxModule) WriteAPI() api.WriteAPI

WriteAPI returns the non-blocking write API for direct use.

func (*InfluxModule) WriteBatch

func (m *InfluxModule) WriteBatch(ctx context.Context, points []Point) error

WriteBatch implements TimeSeriesWriter.

func (*InfluxModule) WritePoint

func (m *InfluxModule) WritePoint(ctx context.Context, measurement string, tags map[string]string, fields map[string]any, timestamp time.Time) error

WritePoint implements TimeSeriesWriter.

type Point

type Point struct {
	Measurement string            `json:"measurement" yaml:"measurement"`
	Tags        map[string]string `json:"tags"        yaml:"tags"`
	Fields      map[string]any    `json:"fields"      yaml:"fields"`
	Timestamp   time.Time         `json:"timestamp"   yaml:"timestamp"`
}

Point represents a single time-series data point.

type QueryResult

type QueryResult struct {
	Rows []map[string]any
}

QueryResult holds rows returned from a Druid query.

type QuestDBConfig

type QuestDBConfig struct {
	ILPEndpoint   string        `json:"ilpEndpoint"   yaml:"ilpEndpoint"`
	HTTPEndpoint  string        `json:"httpEndpoint"  yaml:"httpEndpoint"`
	AuthToken     string        `json:"authToken"     yaml:"authToken"`
	TLSEnabled    bool          `json:"tlsEnabled"    yaml:"tlsEnabled"`
	AutoFlush     bool          `json:"autoFlush"     yaml:"autoFlush"`
	FlushInterval time.Duration `json:"flushInterval" yaml:"flushInterval"`
}

QuestDBConfig holds configuration for the timeseries.questdb module.

type QuestDBModule

type QuestDBModule struct {
	// contains filtered or unexported fields
}

QuestDBModule implements the timeseries.questdb module.

func (*QuestDBModule) Config

func (m *QuestDBModule) Config() QuestDBConfig

Config returns the module configuration.

func (*QuestDBModule) Init

func (m *QuestDBModule) Init() error

Init validates the module configuration.

func (*QuestDBModule) Query

func (m *QuestDBModule) Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)

Query implements TimeSeriesWriter using the HTTP /exec endpoint.

func (*QuestDBModule) Start

func (m *QuestDBModule) Start(ctx context.Context) error

Start verifies HTTP health and initializes the ILP sender, then registers in the global registry.

func (*QuestDBModule) Stop

func (m *QuestDBModule) Stop(ctx context.Context) error

Stop flushes pending messages, closes the ILP sender, and deregisters.

func (*QuestDBModule) WriteBatch

func (m *QuestDBModule) WriteBatch(ctx context.Context, points []Point) error

WriteBatch implements TimeSeriesWriter.

func (*QuestDBModule) WritePoint

func (m *QuestDBModule) WritePoint(ctx context.Context, measurement string, tags map[string]string, fields map[string]any, ts time.Time) error

WritePoint implements TimeSeriesWriter using the ILP protocol.

type SenderFactory

type SenderFactory func(ctx context.Context, cfg QuestDBConfig) (qdb.LineSender, error)

SenderFactory creates a qdb.LineSender from config. Injectable for tests.

type SupervisorStatus

type SupervisorStatus struct {
	ID      string `json:"id"`
	State   string `json:"state"`
	Healthy bool   `json:"healthy"`
}

SupervisorStatus represents a Druid supervisor's state.

type TierManager

type TierManager interface {
	Archive(ctx context.Context, olderThan time.Duration, dest, format string, deleteAfter bool) (*ArchiveResult, error)
	Status(ctx context.Context, measurement string) (*TierStatus, error)
}

TierManager is implemented by time-series modules that support hot/cold tiering.

type TierStatus

type TierStatus struct {
	HotRows             int64      `json:"hotRows"`
	HotOldestTimestamp  *time.Time `json:"hotOldestTimestamp,omitempty"`
	ColdFiles           int        `json:"coldFiles"`
	ColdOldestTimestamp *time.Time `json:"coldOldestTimestamp,omitempty"`
	TotalBytes          int64      `json:"totalBytes"`
}

TierStatus holds the hot vs cold data summary.

type TimeSeriesWriter

type TimeSeriesWriter interface {
	WritePoint(ctx context.Context, measurement string, tags map[string]string, fields map[string]any, timestamp time.Time) error
	WriteBatch(ctx context.Context, points []Point) error
	Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)
}

TimeSeriesWriter is the common interface implemented by all time-series modules.

func Lookup

func Lookup(name string) (TimeSeriesWriter, error)

Lookup finds a TimeSeriesWriter by module name.

type TimescaleConfig

type TimescaleConfig struct {
	Connection   string             `json:"connection"    yaml:"connection"`
	MaxOpenConns int                `json:"maxOpenConns"  yaml:"maxOpenConns"`
	MaxIdleConns int                `json:"maxIdleConns"  yaml:"maxIdleConns"`
	Hypertables  []HypertableConfig `json:"hypertables"   yaml:"hypertables"`
}

TimescaleConfig holds configuration for the timeseries.timescaledb module.

type TimescaleModule

type TimescaleModule struct {
	// contains filtered or unexported fields
}

TimescaleModule is the timeseries.timescaledb module.

func NewTimescaleModuleFromDB

func NewTimescaleModuleFromDB(name string, db *sql.DB) *TimescaleModule

NewTimescaleModuleFromDB creates a TimescaleModule backed by an existing *sql.DB. Intended for integration tests where a real database is not available.

func (*TimescaleModule) CreateHypertable

func (m *TimescaleModule) CreateHypertable(ctx context.Context, table, timeColumn, chunkInterval string) error

CreateHypertable creates a TimescaleDB hypertable if it doesn't already exist.

func (*TimescaleModule) DB

func (m *TimescaleModule) DB() *sql.DB

DB returns the underlying *sql.DB.

func (*TimescaleModule) Init

func (m *TimescaleModule) Init() error

Init validates the module configuration.

func (*TimescaleModule) Query

func (m *TimescaleModule) Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)

Query implements TimeSeriesWriter using standard SQL.

func (*TimescaleModule) Start

func (m *TimescaleModule) Start(ctx context.Context) error

Start opens the database connection, verifies the TimescaleDB extension, and creates configured hypertables.

func (*TimescaleModule) Stop

func (m *TimescaleModule) Stop(_ context.Context) error

Stop closes the database connection and deregisters.

func (*TimescaleModule) WriteBatch

func (m *TimescaleModule) WriteBatch(ctx context.Context, points []Point) error

WriteBatch implements TimeSeriesWriter.

func (*TimescaleModule) WritePoint

func (m *TimescaleModule) WritePoint(ctx context.Context, measurement string, tags map[string]string, fields map[string]any, timestamp time.Time) error

WritePoint implements TimeSeriesWriter. Inserts a single row into the measurement table using a JSONB schema.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL