Documentation
¶
Index ¶
Constants ¶
View Source
const ( DefaultRecomputePreviousN = 2 DefaultRecomputeNoOlderThan = 10 * time.Minute DefaultComputeRunsPerInterval = 10 DefaultComputeNoMoreThan = 2 * time.Minute )
View Source
const (
// When planning a select statement, passing zero tells it not to chunk results. Only applies to raw queries
NoChunkingSize = 0
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Enables logging in CQ service to display when CQ's are processed and how many points are wrote.
LogEnabled bool `toml:"log-enabled"`
// If this flag is set to false, both the brokers and data nodes should ignore any CQ processing.
Enabled bool `toml:"enabled"`
// when continuous queries are run we'll automatically recompute previous intervals
// in case lagged data came in. Set to zero if you never have lagged data. We do
// it this way because invalidating previously computed intervals would be insanely hard
// and expensive.
RecomputePreviousN int `toml:"recompute-previous-n"`
// The RecomputePreviousN setting provides guidance for how far back to recompute, the RecomputeNoOlderThan
// setting sets a ceiling on how far back in time it will go. For example, if you have 2 PreviousN
// and have this set to 10m, then we'd only compute the previous two intervals for any
// CQs that have a group by time <= 5m. For all others, we'd only recompute the previous window
RecomputeNoOlderThan toml.Duration `toml:"recompute-no-older-than"`
// ComputeRunsPerInterval will determine how many times the current and previous N intervals
// will be computed. The group by time will be divided by this and it will get computed this many times:
// group by time seconds / runs per interval
// This will give partial results for current group by intervals and will determine how long it will
// be until lagged data is recomputed. For example, if this number is 10 and the group by time is 10m, it
// will be a minute past the previous 10m bucket of time before lagged data is picked up
ComputeRunsPerInterval int `toml:"compute-runs-per-interval"`
// ComputeNoMoreThan paired with the RunsPerInterval will determine the ceiling of how many times smaller
// group by times will be computed. For example, if you have RunsPerInterval set to 10 and this setting
// to 1m. Then for a group by time(1m) will actually only get computed once per interval (and once per PreviousN).
// If you have a group by time(5m) then you'll get five computes per interval. Any group by time window larger
// than 10m will get computed 10 times for each interval.
ComputeNoMoreThan toml.Duration `toml:"compute-no-more-than"`
}
Config represents a configuration for the continuous query service.
type ContinuousQuerier ¶
type ContinuousQuerier interface {
// Run executes the named query in the named database. Blank database or name matches all.
Run(database, name string, t time.Time) error
}
ContinuousQuerier represents a service that executes continuous queries.
type ContinuousQuery ¶
type ContinuousQuery struct {
Database string
Info *meta.ContinuousQueryInfo
LastRun time.Time
// contains filtered or unexported fields
}
ContinuousQuery is a local wrapper / helper around continuous queries.
func NewContinuousQuery ¶
func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*ContinuousQuery, error)
NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement
type RunRequest ¶ added in v0.9.4
type RunRequest struct {
// Now tells the CQ serivce what the current time is.
Now time.Time
// CQs tells the CQ service which queries to run.
// If nil, all queries will be run.
CQs []string
}
RunRequest is a request to run one or more CQs.
type Service ¶
type Service struct {
MetaStore metaStore
QueryExecutor queryExecutor
PointsWriter pointsWriter
Config *Config
RunInterval time.Duration
// RunCh can be used by clients to signal service to run CQs.
RunCh chan *RunRequest
Logger *log.Logger
// contains filtered or unexported fields
}
Service manages continuous query execution.
func (*Service) ExecuteContinuousQuery ¶
func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) error
ExecuteContinuousQuery executes a single CQ.
Click to show internal directories.
Click to hide internal directories.