Documentation
¶
Index ¶
- func NewClickHouseModule(name string, config map[string]any) (sdk.ModuleInstance, error)
- func NewDruidCompactStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewDruidDatasourceStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewDruidIngestStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewDruidModule(name string, config map[string]any) (sdk.ModuleInstance, error)
- func NewDruidQueryStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewInfluxModule(name string, config map[string]any) (sdk.ModuleInstance, error)
- func NewQuestDBModule(name string, config map[string]any) (sdk.ModuleInstance, error)
- func NewTSArchiveStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewTSClickHouseViewStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewTSContinuousQueryStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewTSDownsampleStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewTSQueryStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewTSRetentionStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewTSTierStatusStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewTSWriteBatchStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewTSWriteStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewTimescaleModule(name string, config map[string]any) (sdk.ModuleInstance, error)
- func Register(name string, w TimeSeriesWriter) error
- func Unregister(name string)
- type ArchiveResult
- type ClickHouseConfig
- type ClickHouseModule
- func (m *ClickHouseModule) Archive(ctx context.Context, olderThan time.Duration, dest, format string, ...) (*ArchiveResult, error)
- func (m *ClickHouseModule) Conn() driver.Conn
- func (m *ClickHouseModule) Init() error
- func (m *ClickHouseModule) Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)
- func (m *ClickHouseModule) Start(ctx context.Context) error
- func (m *ClickHouseModule) Status(ctx context.Context, measurement string) (*TierStatus, error)
- func (m *ClickHouseModule) Stop(_ context.Context) error
- func (m *ClickHouseModule) WriteBatch(ctx context.Context, points []Point) error
- func (m *ClickHouseModule) WritePoint(ctx context.Context, measurement string, tags map[string]string, ...) error
- type CompactionConfig
- type CompactionStatus
- type DatasourceInfo
- type DruidClient
- type DruidConfig
- type DruidModule
- func (m *DruidModule) Client() DruidClient
- func (m *DruidModule) Config() DruidConfig
- func (m *DruidModule) Init() error
- func (m *DruidModule) Query(ctx context.Context, query string, _ ...any) ([]map[string]any, error)
- func (m *DruidModule) Start(ctx context.Context) error
- func (m *DruidModule) Stop(_ context.Context) error
- func (m *DruidModule) WriteBatch(ctx context.Context, points []Point) error
- func (m *DruidModule) WritePoint(ctx context.Context, measurement string, tags map[string]string, ...) error
- type DruidStatus
- type HypertableConfig
- type InfluxConfig
- type InfluxModule
- func (m *InfluxModule) Archive(ctx context.Context, olderThan time.Duration, dest, format string, ...) (*ArchiveResult, error)
- func (m *InfluxModule) BucketsAPI() api.BucketsAPI
- func (m *InfluxModule) Config() InfluxConfig
- func (m *InfluxModule) Init() error
- func (m *InfluxModule) Query(ctx context.Context, query string, _ ...any) ([]map[string]any, error)
- func (m *InfluxModule) QueryAPI() api.QueryAPI
- func (m *InfluxModule) Start(ctx context.Context) error
- func (m *InfluxModule) Status(ctx context.Context, measurement string) (*TierStatus, error)
- func (m *InfluxModule) Stop(_ context.Context) error
- func (m *InfluxModule) UpdateBucketRetention(ctx context.Context, bucketName string, durationSeconds int64) error
- func (m *InfluxModule) WriteAPI() api.WriteAPI
- func (m *InfluxModule) WriteBatch(ctx context.Context, points []Point) error
- func (m *InfluxModule) WritePoint(ctx context.Context, measurement string, tags map[string]string, ...) error
- type Point
- type QueryResult
- type QuestDBConfig
- type QuestDBModule
- func (m *QuestDBModule) Config() QuestDBConfig
- func (m *QuestDBModule) Init() error
- func (m *QuestDBModule) Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)
- func (m *QuestDBModule) Start(ctx context.Context) error
- func (m *QuestDBModule) Stop(ctx context.Context) error
- func (m *QuestDBModule) WriteBatch(ctx context.Context, points []Point) error
- func (m *QuestDBModule) WritePoint(ctx context.Context, measurement string, tags map[string]string, ...) error
- type SenderFactory
- type SupervisorStatus
- type TierManager
- type TierStatus
- type TimeSeriesWriter
- type TimescaleConfig
- type TimescaleModule
- func (m *TimescaleModule) CreateHypertable(ctx context.Context, table, timeColumn, chunkInterval string) error
- func (m *TimescaleModule) DB() *sql.DB
- func (m *TimescaleModule) Init() error
- func (m *TimescaleModule) Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)
- func (m *TimescaleModule) Start(ctx context.Context) error
- func (m *TimescaleModule) Stop(_ context.Context) error
- func (m *TimescaleModule) WriteBatch(ctx context.Context, points []Point) error
- func (m *TimescaleModule) WritePoint(ctx context.Context, measurement string, tags map[string]string, ...) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewClickHouseModule ¶
NewClickHouseModule creates a new timeseries.clickhouse module instance.
func NewDruidCompactStep ¶
NewDruidCompactStep creates a new step.ts_druid_compact instance.
func NewDruidDatasourceStep ¶
NewDruidDatasourceStep creates a new step.ts_druid_datasource instance.
func NewDruidIngestStep ¶
NewDruidIngestStep creates a new step.ts_druid_ingest instance.
func NewDruidModule ¶
NewDruidModule creates a new timeseries.druid module instance.
func NewDruidQueryStep ¶
NewDruidQueryStep creates a new step.ts_druid_query instance.
func NewInfluxModule ¶
NewInfluxModule creates a new timeseries.influxdb module instance.
func NewQuestDBModule ¶
NewQuestDBModule creates a new timeseries.questdb module instance.
func NewTSArchiveStep ¶
NewTSArchiveStep creates a new step.ts_archive instance.
func NewTSClickHouseViewStep ¶
NewTSClickHouseViewStep creates a new step.ts_clickhouse_view instance.
func NewTSContinuousQueryStep ¶
NewTSContinuousQueryStep creates a new step.ts_continuous_query instance.
func NewTSDownsampleStep ¶
NewTSDownsampleStep creates a new step.ts_downsample instance.
func NewTSQueryStep ¶
NewTSQueryStep creates a new step.ts_query instance.
func NewTSRetentionStep ¶
NewTSRetentionStep creates a new step.ts_retention instance.
func NewTSTierStatusStep ¶
NewTSTierStatusStep creates a new step.ts_tier_status instance.
func NewTSWriteBatchStep ¶
NewTSWriteBatchStep creates a new step.ts_write_batch instance.
func NewTSWriteStep ¶
NewTSWriteStep creates a new step.ts_write instance.
func NewTimescaleModule ¶
NewTimescaleModule creates a new timeseries.timescaledb module instance.
func Register ¶
func Register(name string, w TimeSeriesWriter) error
Register adds a named TimeSeriesWriter to the global registry. Returns an error if the name is already registered.
func Unregister ¶
func Unregister(name string)
Unregister removes a named TimeSeriesWriter from the global registry.
Types ¶
type ArchiveResult ¶
type ArchiveResult struct {
RowsArchived int64 `json:"rowsArchived"`
BytesWritten int64 `json:"bytesWritten"`
Destination string `json:"destination"`
DeletedFromHot bool `json:"deletedFromHot"`
}
ArchiveResult holds the result of an archive operation.
type ClickHouseConfig ¶
type ClickHouseConfig struct {
Endpoints []string `json:"endpoints" yaml:"endpoints"`
Database string `json:"database" yaml:"database"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
MaxOpenConns int `json:"maxOpenConns" yaml:"maxOpenConns"`
MaxIdleConns int `json:"maxIdleConns" yaml:"maxIdleConns"`
Compression string `json:"compression" yaml:"compression"`
Secure bool `json:"secure" yaml:"secure"`
DialTimeout time.Duration `json:"dialTimeout" yaml:"dialTimeout"`
}
ClickHouseConfig holds configuration for the timeseries.clickhouse module.
type ClickHouseModule ¶
type ClickHouseModule struct {
// contains filtered or unexported fields
}
ClickHouseModule is the timeseries.clickhouse module.
func (*ClickHouseModule) Archive ¶
func (m *ClickHouseModule) Archive(ctx context.Context, olderThan time.Duration, dest, format string, deleteAfter bool) (*ArchiveResult, error)
Archive implements TierManager for ClickHouseModule.
func (*ClickHouseModule) Conn ¶
func (m *ClickHouseModule) Conn() driver.Conn
Conn returns the underlying ClickHouse connection.
func (*ClickHouseModule) Init ¶
func (m *ClickHouseModule) Init() error
Init validates the module configuration.
func (*ClickHouseModule) Query ¶
func (m *ClickHouseModule) Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)
Query implements TimeSeriesWriter by executing a SQL query and scanning results.
func (*ClickHouseModule) Start ¶
func (m *ClickHouseModule) Start(ctx context.Context) error
Start opens the ClickHouse connection, pings the server, and registers.
func (*ClickHouseModule) Status ¶
func (m *ClickHouseModule) Status(ctx context.Context, measurement string) (*TierStatus, error)
Status implements TierManager for ClickHouseModule.
func (*ClickHouseModule) Stop ¶
func (m *ClickHouseModule) Stop(_ context.Context) error
Stop closes the connection and deregisters.
func (*ClickHouseModule) WriteBatch ¶
func (m *ClickHouseModule) WriteBatch(ctx context.Context, points []Point) error
WriteBatch implements TimeSeriesWriter using ClickHouse's native batch protocol.
type CompactionConfig ¶
type CompactionConfig struct {
TargetCompactionSizeBytes int64 `json:"targetCompactionSizeBytes,omitempty" yaml:"targetCompactionSizeBytes,omitempty"`
SkipOffsetFromLatest string `json:"skipOffsetFromLatest,omitempty" yaml:"skipOffsetFromLatest,omitempty"`
}
CompactionConfig holds parameters for a Druid compaction task.
type CompactionStatus ¶
CompactionStatus holds the compaction state for a datasource.
type DatasourceInfo ¶
type DatasourceInfo struct {
Name string `json:"name"`
Properties map[string]any `json:"properties,omitempty"`
}
DatasourceInfo contains metadata about a Druid datasource.
type DruidClient ¶
type DruidClient interface {
GetStatus(ctx context.Context) (*DruidStatus, error)
SQLQuery(ctx context.Context, query string, params []any) (*QueryResult, error)
NativeQuery(ctx context.Context, query map[string]any) (*QueryResult, error)
SubmitSupervisor(ctx context.Context, spec map[string]any) (*SupervisorStatus, error)
GetSupervisorStatus(ctx context.Context, id string) (*SupervisorStatus, error)
SuspendSupervisor(ctx context.Context, id string) error
ResumeSupervisor(ctx context.Context, id string) error
TerminateSupervisor(ctx context.Context, id string) error
ListDatasources(ctx context.Context) ([]string, error)
GetDatasource(ctx context.Context, name string) (*DatasourceInfo, error)
DisableDatasource(ctx context.Context, name string) error
SubmitCompaction(ctx context.Context, datasource string, config CompactionConfig) error
GetCompactionStatus(ctx context.Context, datasource string) (*CompactionStatus, error)
}
DruidClient is the interface for Druid Router API operations.
func NewDruidClient ¶
func NewDruidClient(routerURL, username, password string, timeout time.Duration) DruidClient
NewDruidClient creates a Druid HTTP client.
type DruidConfig ¶
type DruidConfig struct {
RouterURL string `json:"routerUrl" yaml:"routerUrl"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
HTTPTimeout time.Duration `json:"httpTimeout" yaml:"httpTimeout"`
}
DruidConfig holds configuration for the timeseries.druid module.
type DruidModule ¶
type DruidModule struct {
// contains filtered or unexported fields
}
DruidModule implements the timeseries.druid module.
func (*DruidModule) Client ¶
func (m *DruidModule) Client() DruidClient
Client returns the underlying DruidClient.
func (*DruidModule) Config ¶
func (m *DruidModule) Config() DruidConfig
Config returns the module configuration.
func (*DruidModule) Init ¶
func (m *DruidModule) Init() error
Init validates the module configuration.
func (*DruidModule) Start ¶
func (m *DruidModule) Start(ctx context.Context) error
Start pings Druid and registers the module.
func (*DruidModule) Stop ¶
func (m *DruidModule) Stop(_ context.Context) error
Stop deregisters the module (stateless — no connection to close).
func (*DruidModule) WriteBatch ¶
func (m *DruidModule) WriteBatch(ctx context.Context, points []Point) error
WriteBatch implements TimeSeriesWriter by submitting a native index_parallel task.
type DruidStatus ¶
DruidStatus represents Druid's status response.
type HypertableConfig ¶
type HypertableConfig struct {
Table string `json:"table" yaml:"table"`
TimeColumn string `json:"timeColumn" yaml:"timeColumn"`
ChunkInterval string `json:"chunkInterval" yaml:"chunkInterval"`
}
HypertableConfig describes a TimescaleDB hypertable to create on Start.
type InfluxConfig ¶
type InfluxConfig struct {
URL string `json:"url" yaml:"url"`
Token string `json:"token" yaml:"token"`
Org string `json:"org" yaml:"org"`
Bucket string `json:"bucket" yaml:"bucket"`
BatchSize uint `json:"batchSize" yaml:"batchSize"`
FlushInterval time.Duration `json:"flushInterval" yaml:"flushInterval"`
Precision string `json:"precision" yaml:"precision"`
}
InfluxConfig holds configuration for the timeseries.influxdb module.
type InfluxModule ¶
type InfluxModule struct {
// contains filtered or unexported fields
}
InfluxModule is the timeseries.influxdb module.
func (*InfluxModule) Archive ¶
func (m *InfluxModule) Archive(ctx context.Context, olderThan time.Duration, dest, format string, deleteAfter bool) (*ArchiveResult, error)
Archive implements TierManager for InfluxModule.
func (*InfluxModule) BucketsAPI ¶
func (m *InfluxModule) BucketsAPI() api.BucketsAPI
BucketsAPI returns the buckets management API.
func (*InfluxModule) Config ¶
func (m *InfluxModule) Config() InfluxConfig
Config returns the module configuration.
func (*InfluxModule) Init ¶
func (m *InfluxModule) Init() error
Init validates the module configuration.
func (*InfluxModule) QueryAPI ¶
func (m *InfluxModule) QueryAPI() api.QueryAPI
QueryAPI returns the query API for direct use.
func (*InfluxModule) Start ¶
func (m *InfluxModule) Start(ctx context.Context) error
Start creates the InfluxDB client, pings the server, and registers the module.
func (*InfluxModule) Status ¶
func (m *InfluxModule) Status(ctx context.Context, measurement string) (*TierStatus, error)
Status implements TierManager for InfluxModule.
func (*InfluxModule) Stop ¶
func (m *InfluxModule) Stop(_ context.Context) error
Stop closes the InfluxDB client (flushes pending writes) and deregisters.
func (*InfluxModule) UpdateBucketRetention ¶
func (m *InfluxModule) UpdateBucketRetention(ctx context.Context, bucketName string, durationSeconds int64) error
UpdateBucketRetention updates the retention rules for a bucket.
func (*InfluxModule) WriteAPI ¶
func (m *InfluxModule) WriteAPI() api.WriteAPI
WriteAPI returns the non-blocking write API for direct use.
func (*InfluxModule) WriteBatch ¶
func (m *InfluxModule) WriteBatch(ctx context.Context, points []Point) error
WriteBatch implements TimeSeriesWriter.
type Point ¶
type Point struct {
Measurement string `json:"measurement" yaml:"measurement"`
Tags map[string]string `json:"tags" yaml:"tags"`
Fields map[string]any `json:"fields" yaml:"fields"`
Timestamp time.Time `json:"timestamp" yaml:"timestamp"`
}
Point represents a single time-series data point.
type QueryResult ¶
QueryResult holds rows returned from a Druid query.
type QuestDBConfig ¶
type QuestDBConfig struct {
ILPEndpoint string `json:"ilpEndpoint" yaml:"ilpEndpoint"`
HTTPEndpoint string `json:"httpEndpoint" yaml:"httpEndpoint"`
AuthToken string `json:"authToken" yaml:"authToken"`
TLSEnabled bool `json:"tlsEnabled" yaml:"tlsEnabled"`
AutoFlush bool `json:"autoFlush" yaml:"autoFlush"`
FlushInterval time.Duration `json:"flushInterval" yaml:"flushInterval"`
}
QuestDBConfig holds configuration for the timeseries.questdb module.
type QuestDBModule ¶
type QuestDBModule struct {
// contains filtered or unexported fields
}
QuestDBModule implements the timeseries.questdb module.
func (*QuestDBModule) Config ¶
func (m *QuestDBModule) Config() QuestDBConfig
Config returns the module configuration.
func (*QuestDBModule) Init ¶
func (m *QuestDBModule) Init() error
Init validates the module configuration.
func (*QuestDBModule) Query ¶
func (m *QuestDBModule) Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)
Query implements TimeSeriesWriter using the HTTP /exec endpoint.
func (*QuestDBModule) Start ¶
func (m *QuestDBModule) Start(ctx context.Context) error
Start verifies HTTP health and initializes the ILP sender, then registers in the global registry.
func (*QuestDBModule) Stop ¶
func (m *QuestDBModule) Stop(ctx context.Context) error
Stop flushes pending messages, closes the ILP sender, and deregisters.
func (*QuestDBModule) WriteBatch ¶
func (m *QuestDBModule) WriteBatch(ctx context.Context, points []Point) error
WriteBatch implements TimeSeriesWriter.
type SenderFactory ¶
type SenderFactory func(ctx context.Context, cfg QuestDBConfig) (qdb.LineSender, error)
SenderFactory creates a qdb.LineSender from config. Injectable for tests.
type SupervisorStatus ¶
type SupervisorStatus struct {
ID string `json:"id"`
State string `json:"state"`
Healthy bool `json:"healthy"`
}
SupervisorStatus represents a Druid supervisor's state.
type TierManager ¶
type TierManager interface {
Archive(ctx context.Context, olderThan time.Duration, dest, format string, deleteAfter bool) (*ArchiveResult, error)
Status(ctx context.Context, measurement string) (*TierStatus, error)
}
TierManager is implemented by time-series modules that support hot/cold tiering.
type TierStatus ¶
type TierStatus struct {
HotRows int64 `json:"hotRows"`
HotOldestTimestamp *time.Time `json:"hotOldestTimestamp,omitempty"`
ColdFiles int `json:"coldFiles"`
ColdOldestTimestamp *time.Time `json:"coldOldestTimestamp,omitempty"`
TotalBytes int64 `json:"totalBytes"`
}
TierStatus holds the hot vs cold data summary.
type TimeSeriesWriter ¶
type TimeSeriesWriter interface {
WritePoint(ctx context.Context, measurement string, tags map[string]string, fields map[string]any, timestamp time.Time) error
WriteBatch(ctx context.Context, points []Point) error
Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)
}
TimeSeriesWriter is the common interface implemented by all time-series modules.
func Lookup ¶
func Lookup(name string) (TimeSeriesWriter, error)
Lookup finds a TimeSeriesWriter by module name.
type TimescaleConfig ¶
type TimescaleConfig struct {
Connection string `json:"connection" yaml:"connection"`
MaxOpenConns int `json:"maxOpenConns" yaml:"maxOpenConns"`
MaxIdleConns int `json:"maxIdleConns" yaml:"maxIdleConns"`
Hypertables []HypertableConfig `json:"hypertables" yaml:"hypertables"`
}
TimescaleConfig holds configuration for the timeseries.timescaledb module.
type TimescaleModule ¶
type TimescaleModule struct {
// contains filtered or unexported fields
}
TimescaleModule is the timeseries.timescaledb module.
func NewTimescaleModuleFromDB ¶
func NewTimescaleModuleFromDB(name string, db *sql.DB) *TimescaleModule
NewTimescaleModuleFromDB creates a TimescaleModule backed by an existing *sql.DB. Intended for integration tests where a real database is not available.
func (*TimescaleModule) CreateHypertable ¶
func (m *TimescaleModule) CreateHypertable(ctx context.Context, table, timeColumn, chunkInterval string) error
CreateHypertable creates a TimescaleDB hypertable if it doesn't already exist.
func (*TimescaleModule) DB ¶
func (m *TimescaleModule) DB() *sql.DB
DB returns the underlying *sql.DB.
func (*TimescaleModule) Init ¶
func (m *TimescaleModule) Init() error
Init validates the module configuration.
func (*TimescaleModule) Query ¶
func (m *TimescaleModule) Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)
Query implements TimeSeriesWriter using standard SQL.
func (*TimescaleModule) Start ¶
func (m *TimescaleModule) Start(ctx context.Context) error
Start opens the database connection, verifies the TimescaleDB extension, and creates configured hypertables.
func (*TimescaleModule) Stop ¶
func (m *TimescaleModule) Stop(_ context.Context) error
Stop closes the database connection and deregisters.
func (*TimescaleModule) WriteBatch ¶
func (m *TimescaleModule) WriteBatch(ctx context.Context, points []Point) error
WriteBatch implements TimeSeriesWriter.