Documentation
¶
Index ¶
- Constants
- Variables
- func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error)
- func ExpectTables(ctx context.Context, client TableClient, expected []TableDesc) error
- type AutoScalingConfig
- type Bucket
- type BucketClient
- type CardinalityExceededError
- type Chunk
- type CompositeStore
- func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks ObjectClient, ...) error
- func (c CompositeStore) Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error)
- func (c CompositeStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)
- func (c CompositeStore) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, ...) ([]string, error)
- func (c CompositeStore) Put(ctx context.Context, chunks []Chunk) error
- func (c CompositeStore) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error
- func (c CompositeStore) Stop()
- type DayTime
- type DecodeContext
- type Fetcher
- type IndexClient
- type IndexEntry
- type IndexQuery
- type LegacySchemaConfig
- type MockStorage
- func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error
- func (m *MockStorage) CreateTable(_ context.Context, desc TableDesc) error
- func (m *MockStorage) DeleteTable(_ context.Context, name string) error
- func (m *MockStorage) DescribeTable(_ context.Context, name string) (desc TableDesc, isActive bool, err error)
- func (m *MockStorage) GetChunks(ctx context.Context, chunkSet []Chunk) ([]Chunk, error)
- func (m *MockStorage) ListTables(_ context.Context) ([]string, error)
- func (m *MockStorage) NewWriteBatch() WriteBatch
- func (m *MockStorage) PutChunks(_ context.Context, chunks []Chunk) error
- func (m *MockStorage) QueryPages(ctx context.Context, queries []IndexQuery, ...) error
- func (*MockStorage) Stop()
- func (m *MockStorage) UpdateTable(_ context.Context, _, desc TableDesc) error
- type ObjectAndIndexClient
- type ObjectClient
- type PeriodConfig
- type PeriodicTableConfig
- type ProvisionConfig
- type ReadBatch
- type ReadBatchIterator
- type Schema
- type SchemaConfig
- type Store
- type StoreConfig
- type TableClient
- type TableDesc
- type TableManager
- type TableManagerConfig
- type Tags
- type WriteBatch
Constants ¶
const ( ErrInvalidChecksum = errs.Error("invalid chunk checksum") ErrWrongMetadata = errs.Error("wrong chunk metadata") ErrMetadataLength = errs.Error("chunk metadata wrong length") ErrDataLength = errs.Error("chunk data wrong length") )
Errors that decode can return
Variables ¶
var BenchmarkLabels = labels.Labels{ {Name: model.MetricNameLabel, Value: "container_cpu_usage_seconds_total"}, {Name: "beta_kubernetes_io_arch", Value: "amd64"}, {Name: "beta_kubernetes_io_instance_type", Value: "c3.somesize"}, {Name: "beta_kubernetes_io_os", Value: "linux"}, {Name: "container_name", Value: "some-name"}, {Name: "cpu", Value: "cpu01"}, {Name: "failure_domain_beta_kubernetes_io_region", Value: "somewhere-1"}, {Name: "failure_domain_beta_kubernetes_io_zone", Value: "somewhere-1b"}, {Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, {Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, {Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, {Name: "job", Value: "kubernetes-cadvisor"}, {Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, {Name: "monitor", Value: "prod"}, {Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, {Name: "namespace", Value: "kube-system"}, {Name: "pod_name", Value: "some-other-name-5j8s8"}, }
BenchmarkLabels is a real example from Kubernetes' embedded cAdvisor metrics, lightly obfuscated
var ( // ErrNotSupported when a schema doesn't support that particular lookup. ErrNotSupported = errors.New("not supported") )
Functions ¶
func ChunksToMatrix ¶
func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error)
ChunksToMatrix converts a set of chunks to a model.Matrix.
func ExpectTables ¶
func ExpectTables(ctx context.Context, client TableClient, expected []TableDesc) error
ExpectTables compares existing tables to an expected set of tables. Exposed for testing,
Types ¶
type AutoScalingConfig ¶
type AutoScalingConfig struct {
Enabled bool `yaml:"enabled,omitempty"`
RoleARN string `yaml:"role_arn,omitempty"`
MinCapacity int64 `yaml:"min_capacity,omitempty"`
MaxCapacity int64 `yaml:"max_capacity,omitempty"`
OutCooldown int64 `yaml:"out_cooldown,omitempty"`
InCooldown int64 `yaml:"in_cooldown,omitempty"`
TargetValue float64 `yaml:"target,omitempty"`
}
AutoScalingConfig for DynamoDB tables.
func (*AutoScalingConfig) RegisterFlags ¶
func (cfg *AutoScalingConfig) RegisterFlags(argPrefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type Bucket ¶
type Bucket struct {
// contains filtered or unexported fields
}
Bucket describes a range of time with a tableName and hashKey
type BucketClient ¶
BucketClient is used to enforce retention on chunk buckets.
type CardinalityExceededError ¶
CardinalityExceededError is returned when the user reads a row that is too large.
func (CardinalityExceededError) Error ¶
func (e CardinalityExceededError) Error() string
type Chunk ¶
type Chunk struct {
// These two fields will be missing from older chunks (as will the hash).
// On fetch we will initialise these fields from the DynamoDB key.
Fingerprint model.Fingerprint `json:"fingerprint"`
UserID string `json:"userID"`
// These fields will be in all chunks, including old ones.
From model.Time `json:"from"`
Through model.Time `json:"through"`
Metric labels.Labels `json:"metric"`
// The hash is not written to the external storage either. We use
// crc32, Castagnoli table. See http://www.evanjones.ca/crc32c.html.
// For old chunks, ChecksumSet will be false.
ChecksumSet bool `json:"-"`
Checksum uint32 `json:"-"`
// We never use Delta encoding (the zero value), so if this entry is
// missing, we default to DoubleDelta.
Encoding prom_chunk.Encoding `json:"encoding"`
Data prom_chunk.Chunk `json:"-"`
// contains filtered or unexported fields
}
Chunk contains encoded timeseries data
func NewChunk ¶
func NewChunk(userID string, fp model.Fingerprint, metric labels.Labels, c prom_chunk.Chunk, from, through model.Time) Chunk
NewChunk creates a new chunk
func ParseExternalKey ¶
ParseExternalKey is used to construct a partially-populated chunk from the key in DynamoDB. This chunk can then be used to calculate the key needed to fetch the Chunk data from Memcache/S3, and then fully populate the chunk with decode().
Pre-checksums, the keys written to DynamoDB looked like `<fingerprint>:<start time>:<end time>` (aka the ID), and the key for memcache and S3 was `<user id>/<fingerprint>:<start time>:<end time>. Finger prints and times were written in base-10.
Post-checksums, externals keys become the same across DynamoDB, Memcache and S3. Numbers become hex encoded. Keys look like: `<user id>/<fingerprint>:<start time>:<end time>:<checksum>`.
func (*Chunk) Decode ¶
func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error
Decode the chunk from the given buffer, and confirm the chunk is the one we expected.
func (*Chunk) ExternalKey ¶
ExternalKey returns the key you can use to fetch this chunk from external storage. For newer chunks, this key includes a checksum.
type CompositeStore ¶
type CompositeStore struct {
// contains filtered or unexported fields
}
CompositeStore is a Store which delegates to various stores depending on when they were activated.
func NewCompositeStore ¶
func NewCompositeStore() CompositeStore
NewCompositeStore creates a new Store which delegates to different stores depending on time.
func (*CompositeStore) AddPeriod ¶
func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks ObjectClient, limits *validation.Overrides) error
AddPeriod adds the configuration for a period of time to the CompositeStore
func (CompositeStore) GetChunkRefs ¶
func (CompositeStore) LabelValuesForMetricName ¶
func (c CompositeStore) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error)
LabelValuesForMetricName retrieves all label values for a single label name and metric name.
type DayTime ¶
DayTime is a model.Time what holds day-aligned values, and marshals to/from YAML in YYYY-MM-DD format.
func (DayTime) MarshalYAML ¶
MarshalYAML implements yaml.Marshaller.
func (*DayTime) UnmarshalYAML ¶
UnmarshalYAML implements yaml.Unmarshaller.
type DecodeContext ¶
type DecodeContext struct {
// contains filtered or unexported fields
}
DecodeContext holds data that can be re-used between decodes of different chunks
func NewDecodeContext ¶
func NewDecodeContext() *DecodeContext
NewDecodeContext creates a new, blank, DecodeContext
type Fetcher ¶
type Fetcher struct {
// contains filtered or unexported fields
}
Fetcher deals with fetching chunk contents from the cache/store, and writing back any misses to the cache. Also responsible for decoding chunks from the cache, in parallel.
func NewChunkFetcher ¶
func NewChunkFetcher(cfg cache.Config, storage ObjectClient) (*Fetcher, error)
NewChunkFetcher makes a new ChunkFetcher.
func (*Fetcher) FetchChunks ¶
FetchChunks fetchers a set of chunks from cache and store.
type IndexClient ¶
type IndexClient interface {
Stop()
// For the write path.
NewWriteBatch() WriteBatch
BatchWrite(context.Context, WriteBatch) error
// For the read path.
QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error
}
IndexClient is a client for the storage of the index (e.g. DynamoDB or Bigtable).
type IndexEntry ¶
type IndexEntry struct {
TableName string
HashValue string
// For writes, RangeValue will always be set.
RangeValue []byte
// New for v6 schema, label value is not written as part of the range key.
Value []byte
}
IndexEntry describes an entry in the chunk index
type IndexQuery ¶
type IndexQuery struct {
TableName string
HashValue string
// One of RangeValuePrefix or RangeValueStart might be set:
// - If RangeValuePrefix is not nil, must read all keys with that prefix.
// - If RangeValueStart is not nil, must read all keys from there onwards.
// - If neither is set, must read all keys for that row.
RangeValuePrefix []byte
RangeValueStart []byte
// Filters for querying
ValueEqual []byte
// If the result of this lookup is immutable or not (for caching).
Immutable bool
}
IndexQuery describes a query for entries
type LegacySchemaConfig ¶
type LegacySchemaConfig struct {
StorageClient string // aws, gcp, etc.
// After midnight on this day, we start bucketing indexes by day instead of by
// hour. Only the day matters, not the time within the day.
DailyBucketsFrom flagext.DayValue
Base64ValuesFrom flagext.DayValue
V4SchemaFrom flagext.DayValue
V5SchemaFrom flagext.DayValue
V6SchemaFrom flagext.DayValue
V9SchemaFrom flagext.DayValue
BigtableColumnKeyFrom flagext.DayValue
// Config for the index & chunk tables.
OriginalTableName string
UsePeriodicTables bool
IndexTablesFrom flagext.DayValue
IndexTables PeriodicTableConfig
ChunkTablesFrom flagext.DayValue
ChunkTables PeriodicTableConfig
}
LegacySchemaConfig lets you configure schema via command-line flags
func (*LegacySchemaConfig) RegisterFlags ¶
func (cfg *LegacySchemaConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type MockStorage ¶
type MockStorage struct {
// contains filtered or unexported fields
}
MockStorage is a fake in-memory StorageClient.
func (*MockStorage) BatchWrite ¶
func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error
BatchWrite implements StorageClient.
func (*MockStorage) CreateTable ¶
func (m *MockStorage) CreateTable(_ context.Context, desc TableDesc) error
CreateTable implements StorageClient.
func (*MockStorage) DeleteTable ¶
func (m *MockStorage) DeleteTable(_ context.Context, name string) error
DeleteTable implements StorageClient.
func (*MockStorage) DescribeTable ¶
func (m *MockStorage) DescribeTable(_ context.Context, name string) (desc TableDesc, isActive bool, err error)
DescribeTable implements StorageClient.
func (*MockStorage) ListTables ¶
func (m *MockStorage) ListTables(_ context.Context) ([]string, error)
ListTables implements StorageClient.
func (*MockStorage) NewWriteBatch ¶
func (m *MockStorage) NewWriteBatch() WriteBatch
NewWriteBatch implements StorageClient.
func (*MockStorage) PutChunks ¶
func (m *MockStorage) PutChunks(_ context.Context, chunks []Chunk) error
PutChunks implements StorageClient.
func (*MockStorage) QueryPages ¶
func (m *MockStorage) QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error
QueryPages implements StorageClient.
func (*MockStorage) UpdateTable ¶
func (m *MockStorage) UpdateTable(_ context.Context, _, desc TableDesc) error
UpdateTable implements StorageClient.
type ObjectAndIndexClient ¶
type ObjectAndIndexClient interface {
PutChunkAndIndex(ctx context.Context, c Chunk, index WriteBatch) error
}
ObjectAndIndexClient allows optimisations where the same client handles both
type ObjectClient ¶
type ObjectClient interface {
Stop()
PutChunks(ctx context.Context, chunks []Chunk) error
GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error)
}
ObjectClient is for storing and retrieving chunks.
type PeriodConfig ¶
type PeriodConfig struct {
From DayTime `yaml:"from"` // used when working with config
IndexType string `yaml:"store"` // type of index client to use.
ObjectType string `yaml:"object_store"` // type of object client to use; if omitted, defaults to store.
Schema string `yaml:"schema"`
IndexTables PeriodicTableConfig `yaml:"index"`
ChunkTables PeriodicTableConfig `yaml:"chunks,omitempty"`
RowShards uint32 `yaml:"row_shards"`
}
PeriodConfig defines the schema and tables to use for a period of time
type PeriodicTableConfig ¶
type PeriodicTableConfig struct {
Prefix string `yaml:"prefix"`
Period time.Duration `yaml:"period,omitempty"`
Tags Tags `yaml:"tags,omitempty"`
}
PeriodicTableConfig is configuration for a set of time-sharded tables.
func (*PeriodicTableConfig) RegisterFlags ¶
func (cfg *PeriodicTableConfig) RegisterFlags(argPrefix, tablePrefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type ProvisionConfig ¶
type ProvisionConfig struct {
ProvisionedThroughputOnDemandMode bool `yaml:"provisioned_throughput_on_demand_mode"`
ProvisionedWriteThroughput int64 `yaml:"provisioned_write_throughput"`
ProvisionedReadThroughput int64 `yaml:"provisioned_read_throughput"`
InactiveThroughputOnDemandMode bool `yaml:"inactive_throughput_on_demand_mode"`
InactiveWriteThroughput int64 `yaml:"inactive_write_throughput"`
InactiveReadThroughput int64 `yaml:"inactive_read_throughput"`
WriteScale AutoScalingConfig `yaml:"write_scale"`
InactiveWriteScale AutoScalingConfig `yaml:"inactive_write_scale"`
InactiveWriteScaleLastN int64 `yaml:"inactive_write_scale_lastn"`
ReadScale AutoScalingConfig `yaml:"read_scale"`
InactiveReadScale AutoScalingConfig `yaml:"inactive_read_scale"`
InactiveReadScaleLastN int64 `yaml:"inactive_read_scale_lastn"`
}
ProvisionConfig holds config for provisioning capacity (on DynamoDB)
func (*ProvisionConfig) RegisterFlags ¶
func (cfg *ProvisionConfig) RegisterFlags(argPrefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type ReadBatch ¶
type ReadBatch interface {
Iterator() ReadBatchIterator
}
ReadBatch represents the results of a QueryPages.
type ReadBatchIterator ¶
ReadBatchIterator is an iterator over a ReadBatch.
type Schema ¶
type Schema interface {
// When doing a write, use this method to return the list of entries you should write to.
GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
// Should only be used with the seriesStore. TODO: Make seriesStore implement a different interface altogether.
// returns cache key string and []IndexEntry per bucket, matched in order
GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error)
GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
// When doing a read, use these methods to return the list of entries you should query
GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error)
GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error)
GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error)
// If the query resulted in series IDs, use this method to find chunks.
GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error)
}
Schema interface defines methods to calculate the hash and range keys needed to write or read chunks from the external index.
type SchemaConfig ¶
type SchemaConfig struct {
Configs []PeriodConfig `yaml:"configs"`
// contains filtered or unexported fields
}
SchemaConfig contains the config for our chunk index schemas
func DefaultSchemaConfig ¶
func DefaultSchemaConfig(store, schema string, from model.Time) SchemaConfig
DefaultSchemaConfig creates a simple schema config for testing
func (SchemaConfig) ChunkTableFor ¶
func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error)
ChunkTableFor calculates the chunk table shard for a given point in time.
func (*SchemaConfig) ForEachAfter ¶
func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig))
ForEachAfter will call f() on every entry after t, splitting entries if necessary so there is an entry starting at t
func (*SchemaConfig) Load ¶
func (cfg *SchemaConfig) Load() error
Load the yaml file, or build the config from legacy command-line flags
func (SchemaConfig) PrintYaml ¶
func (cfg SchemaConfig) PrintYaml()
PrintYaml dumps the yaml to stdout, to aid in migration
func (*SchemaConfig) RegisterFlags ¶
func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type Store ¶
type Store interface {
Put(ctx context.Context, chunks []Chunk) error
PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error
Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error)
// GetChunkRefs returns the un-loaded chunks and the fetchers to be used to load them. You can load each slice of chunks ([]Chunk),
// using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...)
GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)
LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error)
Stop()
}
Store for chunks.
type StoreConfig ¶
type StoreConfig struct {
ChunkCacheConfig cache.Config `yaml:"chunk_cache_config,omitempty"`
WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config,omitempty"`
MinChunkAge time.Duration `yaml:"min_chunk_age,omitempty"`
CacheLookupsOlderThan time.Duration `yaml:"cache_lookups_older_than,omitempty"`
// Limits query start time to be greater than now() - MaxLookBackPeriod, if set.
MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"`
}
StoreConfig specifies config for a ChunkStore
func (*StoreConfig) RegisterFlags ¶
func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
type TableClient ¶
type TableClient interface {
ListTables(ctx context.Context) ([]string, error)
CreateTable(ctx context.Context, desc TableDesc) error
DeleteTable(ctx context.Context, name string) error
DescribeTable(ctx context.Context, name string) (desc TableDesc, isActive bool, err error)
UpdateTable(ctx context.Context, current, expected TableDesc) error
}
TableClient is a client for telling Dynamo what to do with tables.
type TableDesc ¶
type TableDesc struct {
Name string
UseOnDemandIOMode bool
ProvisionedRead int64
ProvisionedWrite int64
Tags Tags
WriteScale AutoScalingConfig
ReadScale AutoScalingConfig
}
TableDesc describes a table.
type TableManager ¶
type TableManager struct {
// contains filtered or unexported fields
}
TableManager creates and manages the provisioned throughput on DynamoDB tables
func NewTableManager ¶
func NewTableManager(cfg TableManagerConfig, schemaCfg SchemaConfig, maxChunkAge time.Duration, tableClient TableClient, objectClient BucketClient) (*TableManager, error)
NewTableManager makes a new TableManager
func (*TableManager) SyncTables ¶
func (m *TableManager) SyncTables(ctx context.Context) error
SyncTables will calculate the tables expected to exist, create those that do not and update those that need it. It is exposed for testing.
type TableManagerConfig ¶
type TableManagerConfig struct {
// Master 'off-switch' for table capacity updates, e.g. when troubleshooting
ThroughputUpdatesDisabled bool `yaml:"throughput_updates_disabled"`
// Master 'on-switch' for table retention deletions
RetentionDeletesEnabled bool `yaml:"retention_deletes_enabled"`
// How far back tables will be kept before they are deleted
RetentionPeriod time.Duration `yaml:"retention_period"`
// Period with which the table manager will poll for tables.
DynamoDBPollInterval time.Duration `yaml:"dynamodb_poll_interval"`
// duration a table will be created before it is needed.
CreationGracePeriod time.Duration `yaml:"creation_grace_period"`
IndexTables ProvisionConfig `yaml:"index_tables_provisioning"`
ChunkTables ProvisionConfig `yaml:"chunk_tables_provisioning"`
}
TableManagerConfig holds config for a TableManager
func (*TableManagerConfig) RegisterFlags ¶
func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type Tags ¶
Tags is a string-string map that implements flag.Value.
func (*Tags) UnmarshalYAML ¶
UnmarshalYAML implements yaml.Unmarshaler.
type WriteBatch ¶
WriteBatch represents a batch of writes.