Documentation
¶
Index ¶
- func CanApplyLastNResultsOptimization(start, end int64) bool
- func EstimatedJSONRowLen(fields []Field) int
- func MarshalDeleteTasksToJSON(tasks []*DeleteTask) []byte
- func MarshalFieldsToJSON(dst []byte, fields []Field) []byte
- func MarshalFieldsToLogfmt(dst []byte, fields []Field) []byte
- func MarshalTenantIDsToJSON(tenantIDs []TenantID) []byte
- func PutFields(f *Fields)
- func PutInsertRow(r *InsertRow)
- func PutJSONParser(p *JSONParser)
- func PutLogRows(lr *LogRows)
- func PutStreamTags(st *StreamTags)
- func PutSyslogParser(p *SyslogParser)
- func RenameField(fields []Field, oldNames []string, newName string)
- func TryParseTimestampRFC3339Nano(s string) (int64, bool)
- type BlockColumn
- type DataBlock
- func (db *DataBlock) GetColumnByName(name string) *BlockColumn
- func (db *DataBlock) GetTimestamps(dst []int64) ([]int64, bool)
- func (db *DataBlock) Marshal(dst []byte) []byte
- func (db *DataBlock) Reset()
- func (db *DataBlock) RowsCount() int
- func (db *DataBlock) UnmarshalInplace(src []byte, valuesBuf []string) ([]byte, []string, error)
- type DatadbStats
- type DeleteTask
- type Field
- type Fields
- type Filter
- type IndexdbStats
- type InsertRow
- type JSONParser
- type LogRows
- func (lr *LogRows) ForEachRow(callback func(streamHash uint64, r *InsertRow))
- func (lr *LogRows) GetRowString(idx int) string
- func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field, streamFieldsLen int)
- func (lr *LogRows) MustAddInsertRow(r *InsertRow)
- func (lr *LogRows) NeedFlush() bool
- func (lr *LogRows) Reset()
- func (lr *LogRows) ResetKeepSettings()
- func (lr *LogRows) RowsCount() int
- type NetQueryRunner
- type PartitionStats
- type Query
- func (q *Query) AddCountByTimePipe(step, off int64, fields []string)
- func (q *Query) AddExtraFilters(extraFilters *Filter)
- func (q *Query) AddFacetsPipe(limit, maxValuesPerField, maxValueLen int, keepConstFields bool)
- func (q *Query) AddPipeOffsetLimit(offset, limit uint64)
- func (q *Query) AddPipeSortByTimeDesc()
- func (q *Query) AddTimeFilter(start, end int64)
- func (q *Query) CanLiveTail() bool
- func (q *Query) CanReturnLastNResults() bool
- func (q *Query) Clone(timestamp int64) *Query
- func (q *Query) CloneWithTimeFilter(timestamp, start, end int64) *Query
- func (q *Query) DropAllPipes()
- func (q *Query) GetConcurrency() int
- func (q *Query) GetFilterTimeRange() (int64, int64)
- func (q *Query) GetLastNResultsQuery() (qOpt *Query, offset uint64, limit uint64)
- func (q *Query) GetParallelReaders(defaultParallelReaders int) int
- func (q *Query) GetStatsLabels() ([]string, error)
- func (q *Query) GetStatsLabelsAddGroupingByTime(step int64) ([]string, error)
- func (q *Query) GetTimestamp() int64
- func (q *Query) HasGlobalTimeFilter() bool
- func (q *Query) String() string
- type QueryContext
- type QueryStats
- type RunNetQueryFunc
- type Storage
- func (s *Storage) DebugFlush()
- func (s *Storage) DeleteActiveTasks(_ context.Context) ([]*DeleteTask, error)
- func (s *Storage) DeleteRunTask(_ context.Context, taskID string, timestamp int64, tenantIDs []TenantID, ...) error
- func (s *Storage) DeleteStopTask(ctx context.Context, taskID string) error
- func (s *Storage) EnableLogNewStreams(seconds int)
- func (s *Storage) GetFieldNames(qctx *QueryContext) ([]ValueWithHits, error)
- func (s *Storage) GetFieldValues(qctx *QueryContext, fieldName string, limit uint64) ([]ValueWithHits, error)
- func (s *Storage) GetStreamFieldNames(qctx *QueryContext) ([]ValueWithHits, error)
- func (s *Storage) GetStreamFieldValues(qctx *QueryContext, fieldName string, limit uint64) ([]ValueWithHits, error)
- func (s *Storage) GetStreamIDs(qctx *QueryContext, limit uint64) ([]ValueWithHits, error)
- func (s *Storage) GetStreams(qctx *QueryContext, limit uint64) ([]ValueWithHits, error)
- func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]TenantID, error)
- func (s *Storage) IsReadOnly() bool
- func (s *Storage) MustAddRows(lr *LogRows)
- func (s *Storage) MustClose()
- func (s *Storage) MustForceMerge(partitionNamePrefix string)
- func (s *Storage) PartitionAttach(name string) error
- func (s *Storage) PartitionDetach(name string) error
- func (s *Storage) PartitionList() []string
- func (s *Storage) PartitionSnapshotCreate(name string) (string, error)
- func (s *Storage) PartitionSnapshotList() []string
- func (s *Storage) RunQuery(qctx *QueryContext, writeBlock WriteDataBlockFunc) error
- func (s *Storage) UpdateStats(ss *StorageStats)
- type StorageConfig
- type StorageStats
- type StreamFilter
- type StreamTags
- func (st *StreamTags) Add(name, value string)
- func (st *StreamTags) Len() int
- func (st *StreamTags) Less(i, j int) bool
- func (st *StreamTags) MarshalCanonical(dst []byte) []byte
- func (st *StreamTags) Reset()
- func (st *StreamTags) String() string
- func (st *StreamTags) Swap(i, j int)
- func (st *StreamTags) UnmarshalCanonical(src []byte) ([]byte, error)
- type SyslogParser
- type TenantID
- type TimeFormatter
- type ValueWithHits
- type WriteDataBlockFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CanApplyLastNResultsOptimization ¶ added in v1.34.0
CanApplyLastNResultsOptimization returns true if there is sense for applying 'last N' optimization for the query on the time range [start, end]
func EstimatedJSONRowLen ¶
EstimatedJSONRowLen returns an approximate length of the log entry with the given fields if represented as JSON.
The calculation logic must stay in sync with block.uncompressedSizeBytes() in block.go. If you change logic here, update block.uncompressedSizeBytes() accordingly and vice versa.
func MarshalDeleteTasksToJSON ¶ added in v1.38.0
func MarshalDeleteTasksToJSON(tasks []*DeleteTask) []byte
MarshalDeleteTasksToJSON marshals tasks into a JSON array and returns the result
func MarshalFieldsToJSON ¶
MarshalFieldsToJSON appends JSON-marshaled fields to dst and returns the result.
func MarshalFieldsToLogfmt ¶
MarshalFieldsToLogfmt appends logfmt-marshaled fields to dst and returns the result.
func MarshalTenantIDsToJSON ¶ added in v1.38.0
MarshalTenantIDsToJSON returns JSON representation of the given tenantIDs
func PutFields ¶ added in v1.38.0
func PutFields(f *Fields)
PutFields returns f to the pool.
f cannot be used after returning to the pool. Use GetFields() for obtaining an empty Fields from the pool.
func PutInsertRow ¶
func PutInsertRow(r *InsertRow)
PutInsertRow returns r to the pool, so it could be reused via GetInsertRow.
func PutJSONParser ¶
func PutJSONParser(p *JSONParser)
PutJSONParser returns the parser to the pool.
The parser cannot be used after returning to the pool.
func PutSyslogParser ¶
func PutSyslogParser(p *SyslogParser)
PutSyslogParser returns back syslog parser to the pool.
p cannot be used after returning to the pool.
func RenameField ¶
RenameField renames the first non-empty field with the name from oldNames list to newName in Fields
func TryParseTimestampRFC3339Nano ¶
TryParseTimestampRFC3339Nano parses s as RFC3339 with optional nanoseconds part and timezone offset and returns unix timestamp in nanoseconds.
If s doesn't contain timezone offset, then the local timezone is used.
The returned timestamp can be negative if s is smaller than 1970 year.
Types ¶
type BlockColumn ¶
type BlockColumn struct {
// Name is the column name
Name string
// Values is column values
Values []string
}
BlockColumn is a single column of a block of data
type DataBlock ¶
type DataBlock struct {
// Columns represents columns in the data block.
Columns []BlockColumn
}
DataBlock is a single block of data
func (*DataBlock) GetColumnByName ¶ added in v1.31.0
func (db *DataBlock) GetColumnByName(name string) *BlockColumn
GetColumnByName returns column with the given name from db.
nil is returned if there is no such column.
func (*DataBlock) GetTimestamps ¶
GetTimestamps appends _time column values from db to dst and returns the result.
It returns false if db doesn't have _time column or this column has invalid timestamps.
type DatadbStats ¶
type DatadbStats struct {
// InmemoryMergesCount is the number of inmemory merges performed in the given datadb.
InmemoryMergesCount uint64
// ActiveInmemoryMerges is the number of currently active inmemory merges performed by the given datadb.
ActiveInmemoryMerges uint64
// InmemoryRowsMerged is the number of rows merged to inmemory parts.
InmemoryRowsMerged uint64
// SmallMergesCount is the number of small file merges performed in the given datadb.
SmallMergesCount uint64
// ActiveSmallMerges is the number of currently active small file merges performed by the given datadb.
ActiveSmallMerges uint64
// SmallRowsMerged is the number of rows merged to small parts.
SmallRowsMerged uint64
// BigMergesCount is the number of big file merges performed in the given datadb.
BigMergesCount uint64
// ActiveBigMerges is the number of currently active big file merges performed by the given datadb.
ActiveBigMerges uint64
// BigRowsMerged is the number of rows merged to big parts.
BigRowsMerged uint64
// PendingRows is the number of rows, which weren't flushed to searchable part yet.
PendingRows uint64
// InmemoryRowsCount is the number of rows, which weren't flushed to disk yet.
InmemoryRowsCount uint64
// SmallPartRowsCount is the number of rows stored on disk in small parts.
SmallPartRowsCount uint64
// BigPartRowsCount is the number of rows stored on disk in big parts.
BigPartRowsCount uint64
// InmemoryParts is the number of in-memory parts, which weren't flushed to disk yet.
InmemoryParts uint64
// SmallParts is the number of file-based small parts stored on disk.
SmallParts uint64
// BigParts is the number of file-based big parts stored on disk.
BigParts uint64
// InmemoryBlocks is the number of in-memory blocks, which weren't flushed to disk yet.
InmemoryBlocks uint64
// SmallPartBlocks is the number of file-based small blocks stored on disk.
SmallPartBlocks uint64
// BigPartBlocks is the number of file-based big blocks stored on disk.
BigPartBlocks uint64
// CompressedInmemorySize is the size of compressed data stored in memory.
CompressedInmemorySize uint64
// CompressedSmallPartSize is the size of compressed small parts data stored on disk.
CompressedSmallPartSize uint64
// CompressedBigPartSize is the size of compressed big data stored on disk.
CompressedBigPartSize uint64
// UncompressedInmemorySize is the size of uncompressed data stored in memory.
UncompressedInmemorySize uint64
// UncompressedSmallPartSize is the size of uncompressed small data stored on disk.
UncompressedSmallPartSize uint64
// UncompressedBigPartSize is the size of uncompressed big data stored on disk.
UncompressedBigPartSize uint64
}
DatadbStats contains various stats for datadb.
func (*DatadbStats) RowsCount ¶
func (s *DatadbStats) RowsCount() uint64
RowsCount returns the number of rows stored in datadb.
type DeleteTask ¶ added in v1.38.0
type DeleteTask struct {
// TaskID is the id of the task
TaskID string `json:"task_id"`
// TenantIDs are tenant ids for the task
TenantIDs []TenantID `json:"tenant_ids"`
// Filter is the filter used for logs' deletion; Logs matching the given filter are deleted
Filter string `json:"filter"`
// StartTime is the time when the task has been created
StartTime time.Time `json:"start_time"`
// contains filtered or unexported fields
}
DeleteTask describes a task for logs' deletion.
func UnmarshalDeleteTasksFromJSON ¶ added in v1.38.0
func UnmarshalDeleteTasksFromJSON(data []byte) ([]*DeleteTask, error)
UnmarshalDeleteTasksFromJSON unmarshals DeleteTask slice from JSON array at data
func (*DeleteTask) String ¶ added in v1.38.0
func (dt *DeleteTask) String() string
String returns string representation for the dt
type Field ¶
type Field struct {
// Name is the name of the field
Name string
// Value is the value of the field
Value string
}
Field is a single field for the log entry.
func SkipLeadingFieldsWithoutValues ¶
SkipLeadingFieldsWithoutValues skips leading fields without values.
type Fields ¶ added in v1.38.0
type Fields struct {
// Fields is a slice fields
Fields []Field
}
Fields holds a slice of Field items
func GetFields ¶ added in v1.38.0
func GetFields() *Fields
GetFields returns an empty Fields from the pool.
Pass the returned Fields to PutFields() when it is no longer needed.
func (*Fields) ClearUpToCapacity ¶ added in v1.41.0
func (f *Fields) ClearUpToCapacity()
ClearUpToCapacity clears f.Fields up to its' capacity.
This function is useful in order to make sure f.Fields do not reference underlying byte slices, so they could be freed by Go GC.
type Filter ¶
type Filter struct {
// contains filtered or unexported fields
}
Filter represents LogsQL filter
See https://docs.victoriametrics.com/victorialogs/logsql/#filters
func ParseFilter ¶
ParseFilter parses LogsQL filter
See https://docs.victoriametrics.com/victorialogs/logsql/#filters
type IndexdbStats ¶
type IndexdbStats struct {
// StreamsCreatedTotal is the number of log streams created since the indexdb initialization.
StreamsCreatedTotal uint64
// IndexdbSizeBytes is the size of data in indexdb.
IndexdbSizeBytes uint64
// IndexdbItemsCount is the number of items in indexdb.
IndexdbItemsCount uint64
// IndexdbBlocksCount is the number of blocks in indexdb.
IndexdbBlocksCount uint64
// IndexdbPartsCount is the number of parts in indexdb.
IndexdbPartsCount uint64
// IndexdbPendingItems is the number of pending items in IndexedDB before they are merged into the part.
IndexdbPendingItems uint64
// IndexdbActiveFileMerges is the number of active merges in indexdb.
IndexdbActiveFileMerges uint64
// IndexdbActiveInmemoryMerges is the number of active merges in indexdb.
IndexdbActiveInmemoryMerges uint64
// IndexdbFileMergesCount is the number of merges in indexdb.
IndexdbFileMergesCount uint64
// IndexdbInmemoryMergesCount is the number of merges in indexdb.
IndexdbInmemoryMergesCount uint64
// IndexdbFileItemsMerged is the number of items merged in indexdb.
IndexdbFileItemsMerged uint64
// IndexdbInmemoryItemsMerged is the number of items merged in indexdb.
IndexdbInmemoryItemsMerged uint64
}
IndexdbStats contains indexdb stats
type InsertRow ¶
type InsertRow struct {
TenantID TenantID
StreamTagsCanonical string
Timestamp int64
Fields []Field
}
InsertRow represents a row to insert into VictoriaLogs via native protocol.
func GetInsertRow ¶
func GetInsertRow() *InsertRow
GetInsertRow returns InsertRow from a pool.
Pass the returned row to PutInsertRow when it is no longer needed, so it could be reused.
type JSONParser ¶
type JSONParser struct {
// Fields contains the parsed JSON line after Parse() call
//
// The Fields are valid until the next call to ParseLogMessage()
// or until the parser is returned to the pool with PutJSONParser() call.
Fields []Field
// contains filtered or unexported fields
}
JSONParser parses a single JSON log message into Fields.
See https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model
Use GetJSONParser() for obtaining the parser.
func GetJSONParser ¶
func GetJSONParser() *JSONParser
GetJSONParser returns JSONParser ready to parse JSON lines.
Return the parser to the pool when it is no longer needed by calling PutJSONParser().
func (*JSONParser) ParseLogMessage ¶
func (p *JSONParser) ParseLogMessage(msg []byte) error
ParseLogMessage parses the given JSON log message msg into p.Fields.
The p.Fields remains valid until the next call to ParseLogMessage() or PutJSONParser().
type LogRows ¶
type LogRows struct {
// contains filtered or unexported fields
}
LogRows holds a set of rows needed for Storage.MustAddRows
LogRows must be obtained via GetLogRows()
func GetLogRows ¶
func GetLogRows(streamFields, ignoreFields, decolorizeFields []string, extraFields []Field, defaultMsgValue string) *LogRows
GetLogRows returns LogRows from the pool for the given streamFields.
streamFields is a set of fields, which must be associated with the stream.
ignoreFields is a set of fields, which must be ignored during data ingestion. ignoreFields entries may end with '*'. In this case they match any fields with the prefix until '*'.
decolorizeFields is a set of fields, which must be cleared from ANSI color escape sequences. decolorizeFields entries may end with '*'. In this case they match any fields with the prefix until '*'.
extraFields is a set of fields, which must be added to all the logs passed to MustAdd().
defaultMsgValue is the default value to store in non-existing or empty _msg.
Return back it to the pool with PutLogRows() when it is no longer needed.
func (*LogRows) ForEachRow ¶
ForEachRow calls callback for every row stored in the lr.
func (*LogRows) GetRowString ¶
GetRowString returns string representation of the row with the given idx.
func (*LogRows) MustAdd ¶
MustAdd adds a log entry with the given args to lr.
If streamFieldsLen >= 0, then the given number of initial fields are used as log stream fields instead of the pre-configured stream fields from GetLogRows().
It is OK to modify the args after returning from the function, since lr copies all the args to internal data.
Log entries are dropped with the warning message in the following cases: - if there are too many log fields - if there are too long log field names - if the total length of log entries is too long
func (*LogRows) MustAddInsertRow ¶
MustAddInsertRow adds r to lr.
func (*LogRows) NeedFlush ¶
NeedFlush returns true if lr contains too much data, so it must be flushed to the storage.
func (*LogRows) Reset ¶
func (lr *LogRows) Reset()
Reset resets lr with all its settings.
Call ResetKeepSettings() for resetting lr without resetting its settings.
func (*LogRows) ResetKeepSettings ¶
func (lr *LogRows) ResetKeepSettings()
ResetKeepSettings resets rows stored in lr, while keeping its settings passed to GetLogRows().
type NetQueryRunner ¶
type NetQueryRunner struct {
// contains filtered or unexported fields
}
NetQueryRunner is a runner for distributed query.
func NewNetQueryRunner ¶
func NewNetQueryRunner(qctx *QueryContext, runNetQuery RunNetQueryFunc, writeNetBlock WriteDataBlockFunc) (*NetQueryRunner, error)
NewNetQueryRunner creates a new NetQueryRunner for the given qctx.
runNetQuery is used for running distributed query. qctx results are sent to writeNetBlock.
func (*NetQueryRunner) Run ¶
func (nqr *NetQueryRunner) Run(ctx context.Context, concurrency int, netSearch func(stopCh <-chan struct{}, q *Query, writeBlock WriteDataBlockFunc) error) error
Run runs the nqr query.
The concurrency limits the number of concurrent goroutines, which process the query results at the local host.
netSearch must execute the given query q at remote storage nodes and pass results to writeBlock.
type PartitionStats ¶
type PartitionStats struct {
DatadbStats
IndexdbStats
}
PartitionStats contains stats for the partition.
type Query ¶
type Query struct {
// contains filtered or unexported fields
}
Query represents LogsQL query.
func ParseQueryAtTimestamp ¶
ParseQueryAtTimestamp parses s in the context of the given timestamp.
E.g. _time:duration filters are adjusted according to the provided timestamp as _time:[timestamp-duration, duration].
func ParseStatsQuery ¶
ParseStatsQuery parses LogsQL query s at the given timestamp with the needed stats query checks.
func (*Query) AddCountByTimePipe ¶
AddCountByTimePipe adds '| stats by (_time:step offset off, field1, ..., fieldN) count() hits' to the end of q.
func (*Query) AddExtraFilters ¶
AddExtraFilters adds extraFilters to q
func (*Query) AddFacetsPipe ¶
AddFacetsPipe adds ' facets <limit> max_values_per_field <maxValuesPerField> max_value_len <maxValueLen> <keepConstFields>` to the end of q.
func (*Query) AddPipeOffsetLimit ¶ added in v1.28.0
AddPipeOffsetLimit adds `| offset <offset> | limit <limit>` pipes to q.
See https://docs.victoriametrics.com/victorialogs/logsql/#offset-pipe and https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe
func (*Query) AddPipeSortByTimeDesc ¶ added in v1.27.0
func (q *Query) AddPipeSortByTimeDesc()
AddPipeSortByTimeDesc adds `| sort (_time) desc` pipe to q.
func (*Query) AddTimeFilter ¶
AddTimeFilter adds global filter _time:[start ... end] to q.
func (*Query) CanLiveTail ¶
CanLiveTail returns true if q can be used in live tailing
func (*Query) CanReturnLastNResults ¶
CanReturnLastNResults returns true if time range filter at q can be adjusted for returning the last N results with the biggest _time values.
func (*Query) CloneWithTimeFilter ¶
CloneWithTimeFilter clones q at the given timestamp and adds _time:[start, end] filter to the cloned q.
func (*Query) DropAllPipes ¶
func (q *Query) DropAllPipes()
DropAllPipes drops all the pipes from q.
func (*Query) GetConcurrency ¶
GetConcurrency returns concurrency for the q.
See https://docs.victoriametrics.com/victorialogs/logsql/#query-options
func (*Query) GetFilterTimeRange ¶
GetFilterTimeRange returns filter time range for the given q.
func (*Query) GetLastNResultsQuery ¶ added in v1.27.0
GetLastNResultsQuery() returns a query for optimized querying of the last <limit> results with the biggest _time values with an optional <offset>.
The returned query is nil if q cannot be used for optimized querying of the last N results.
func (*Query) GetParallelReaders ¶ added in v1.34.0
GetParallelReaders returns the number of parallel readers to use for executing the given query.
func (*Query) GetStatsLabels ¶ added in v1.40.0
GetStatsLabels returns stats labels from q for /select/logsql/stats_query endpoint
The remaining fields are considered metrics.
func (*Query) GetStatsLabelsAddGroupingByTime ¶ added in v1.40.0
GetStatsLabelsAddGroupingByTime returns stats labels from q for /select/logsql/stats_query and /select/logsql/stats_query_range endpoints
if step > 0, then _time:step is added to the last `stats by (...)` pipe at q.
func (*Query) GetTimestamp ¶
GetTimestamp returns timestamp context for the given q, which was passed to ParseQueryAtTimestamp().
func (*Query) HasGlobalTimeFilter ¶
HasGlobalTimeFilter returns true when query contains a global time filter.
type QueryContext ¶ added in v1.31.0
type QueryContext struct {
// Context is the context for executing the Query.
Context context.Context
// QueryStats is query stats, which is updated after Query execution.
QueryStats *QueryStats
// TenantIDs is the list of tenant ids to Query.
TenantIDs []TenantID
// Query is the query to execute.
Query *Query
// AllowPartialResponse indicates whether to allow partial response. This flag is used only in cluster setup when vlselect queries vlstorage nodes.
AllowPartialResponse bool
// HiddenFieldsFilters is an optional list of field filters, which must be hidden during query execution.
//
// The list may contain full field names and field prefixes ending with *.
// Prefix match all the fields starting with the given prefix.
HiddenFieldsFilters []string
// contains filtered or unexported fields
}
QueryContext is used for execting the query passed to NewQueryContext()
func NewQueryContext ¶ added in v1.31.0
func NewQueryContext(ctx context.Context, qs *QueryStats, tenantIDs []TenantID, q *Query, allowPartialResponse bool, hiddenFieldsFilters []string) *QueryContext
NewQueryContext returns new context for the given query.
func (*QueryContext) QueryDurationNsecs ¶ added in v1.31.0
func (qctx *QueryContext) QueryDurationNsecs() int64
QueryDurationNsecs returns the duration in nanoseconds since the NewQueryContext call.
func (*QueryContext) WithContext ¶ added in v1.31.0
func (qctx *QueryContext) WithContext(ctx context.Context) *QueryContext
WithContext returns new QueryContext with the given ctx, while preserving other fields from qctx.
func (*QueryContext) WithContextAndQuery ¶ added in v1.31.0
func (qctx *QueryContext) WithContextAndQuery(ctx context.Context, q *Query) *QueryContext
WithContextAndQuery returns new QueryContext with the given ctx and q, while preserving other fields from qctx.
func (*QueryContext) WithQuery ¶ added in v1.31.0
func (qctx *QueryContext) WithQuery(q *Query) *QueryContext
WithQuery returns new QueryContext with the given q, while preserving other fields from qctx.
type QueryStats ¶ added in v1.31.0
type QueryStats struct {
// BytesReadColumnsHeaders is the total number of columns header bytes read from disk during the search.
BytesReadColumnsHeaders uint64
// BytesReadColumnsHeaderIndexes is the total number of columns header index bytes read from disk during the search.
BytesReadColumnsHeaderIndexes uint64
// BytesReadBloomFilters is the total number of bloom filter bytes read from disk during the search.
BytesReadBloomFilters uint64
// BytesReadValues is the total number of values bytes read from disk during the search.
BytesReadValues uint64
// BytesReadTimestamps is the total number of timestamps bytes read from disk during the search.
BytesReadTimestamps uint64
// BytesReadBlockHeaders is the total number of headers bytes read from disk during the search.
BytesReadBlockHeaders uint64
// BlocksProcessed is the number of data blocks processed during query execution.
BlocksProcessed uint64
// RowsProcessed is the number of log rows processed during query execution.
RowsProcessed uint64
// RowsFound is the number of rows found by the query.
RowsFound uint64
// ValuesRead is the number of log field values read during query exection.
ValuesRead uint64
// TimestampsRead is the number of timestamps read during query execution.
TimestampsRead uint64
// BytesProcessedUncompressedValues is the total number of uncompressed values bytes processed during the search.
BytesProcessedUncompressedValues uint64
}
QueryStats contains various query execution stats.
func (*QueryStats) CreateDataBlock ¶ added in v1.31.0
func (qs *QueryStats) CreateDataBlock(queryDurationNsecs int64) *DataBlock
CreateDataBlock creates a DataBlock from qs.
func (*QueryStats) GetBytesReadTotal ¶ added in v1.31.0
func (qs *QueryStats) GetBytesReadTotal() uint64
GetBytesReadTotal returns the total number of bytes read, which is tracked by qs.
func (*QueryStats) UpdateAtomic ¶ added in v1.31.0
func (qs *QueryStats) UpdateAtomic(src *QueryStats)
UpdateAtomic add src to qs in an atomic manner.
func (*QueryStats) UpdateFromDataBlock ¶ added in v1.31.0
func (qs *QueryStats) UpdateFromDataBlock(db *DataBlock) error
UpdateAtomicFromDataBlock adds query stats from db to qs.
type RunNetQueryFunc ¶
type RunNetQueryFunc func(qctx *QueryContext, writeBlock WriteDataBlockFunc) error
RunNetQueryFunc must run qctx and pass the query results to writeBlock.
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage is the storage for log entries.
func MustOpenStorage ¶
func MustOpenStorage(path string, cfg *StorageConfig) *Storage
MustOpenStorage opens Storage at the given path.
MustClose must be called on the returned Storage when it is no longer needed.
func (*Storage) DebugFlush ¶
func (s *Storage) DebugFlush()
DebugFlush flushes all the buffered rows, so they become visible for search.
This function is for debugging and testing purposes only, since it is slow.
func (*Storage) DeleteActiveTasks ¶ added in v1.38.0
func (s *Storage) DeleteActiveTasks(_ context.Context) ([]*DeleteTask, error)
DeleteActiveTasks returns currently running active delete tasks, which were started via DeleteRunTask().
func (*Storage) DeleteRunTask ¶ added in v1.38.0
func (s *Storage) DeleteRunTask(_ context.Context, taskID string, timestamp int64, tenantIDs []TenantID, f *Filter) error
DeleteRunTask starts deletion of logs according to the given filter f for the given tenantIDs.
The taskID must contain an unique id of the task. It is used for tracking the task at the list returned by DeleteActiveTasks(). The timestamp must contain the timestamp in seconds when the task is started.
func (*Storage) DeleteStopTask ¶ added in v1.38.0
DeleteStopTask stops the delete task with the given taskID.
It waits until the task is stopped before returning. If there is no a task with the given taskID, then the function returns immediately.
func (*Storage) EnableLogNewStreams ¶ added in v1.37.0
EnableLogNewStreams enables logging newly ingested streams during the given number of seconds
func (*Storage) GetFieldNames ¶
func (s *Storage) GetFieldNames(qctx *QueryContext) ([]ValueWithHits, error)
GetFieldNames returns field names for the given qctx.
func (*Storage) GetFieldValues ¶
func (s *Storage) GetFieldValues(qctx *QueryContext, fieldName string, limit uint64) ([]ValueWithHits, error)
GetFieldValues returns unique values with the number of hits for the given fieldName returned by qctx.
If limit > 0, then up to limit unique values are returned.
func (*Storage) GetStreamFieldNames ¶
func (s *Storage) GetStreamFieldNames(qctx *QueryContext) ([]ValueWithHits, error)
GetStreamFieldNames returns stream field names for the given qctx.
func (*Storage) GetStreamFieldValues ¶
func (s *Storage) GetStreamFieldValues(qctx *QueryContext, fieldName string, limit uint64) ([]ValueWithHits, error)
GetStreamFieldValues returns stream field values for the given fieldName and the given qctx.
If limit > 0, then up to limit unique values are returned.
func (*Storage) GetStreamIDs ¶
func (s *Storage) GetStreamIDs(qctx *QueryContext, limit uint64) ([]ValueWithHits, error)
GetStreamIDs returns stream_id field values from qctx results.
If limit > 0, then up to limit unique streams are returned.
func (*Storage) GetStreams ¶
func (s *Storage) GetStreams(qctx *QueryContext, limit uint64) ([]ValueWithHits, error)
GetStreams returns streams from qctx results.
If limit > 0, then up to limit unique streams are returned.
func (*Storage) GetTenantIDs ¶ added in v1.36.0
GetTenantIDs returns tenantIDs for the given start and end.
func (*Storage) IsReadOnly ¶
IsReadOnly returns true if s is in read-only mode.
func (*Storage) MustAddRows ¶
MustAddRows adds lr to s.
It is recommended checking whether the s is in read-only mode by calling IsReadOnly() before calling MustAddRows.
The added rows become visible for search after small duration of time. Call DebugFlush if the added rows must be queried immediately (for example, in tests).
func (*Storage) MustClose ¶
func (s *Storage) MustClose()
MustClose closes s.
It is expected that nobody uses the storage at the close time.
func (*Storage) MustForceMerge ¶
MustForceMerge force-merges parts in s partitions with names starting from the given partitionNamePrefix.
Partitions are merged sequentially in order to reduce load on the system.
func (*Storage) PartitionAttach ¶ added in v1.27.0
PartitionAttach attaches the partition with the given name to s.
The name must have the YYYYMMDD format.
The attached partition can be detached via PartitionDetach() call.
func (*Storage) PartitionDetach ¶ added in v1.27.0
PartitionDetach detaches the partition with the given name from s.
The name must have the YYYYMMDD format.
The detached partition can be attached again via PartitionAttach() call.
func (*Storage) PartitionList ¶ added in v1.31.0
PartitionList returns the list of the names for the currently attached partitions.
Every partition name has YYYYMMDD format.
func (*Storage) PartitionSnapshotCreate ¶ added in v1.31.0
PartitionSnapshotCreate creates a snapshot for the partition with the given name
The snaphsot name must have YYYYMMDD format.
The function returns an absolute path to the created snapshot on success.
func (*Storage) PartitionSnapshotList ¶ added in v1.31.0
PartitionSnapshotList returns a list of absolute paths to all the snapshots across active partitions.
func (*Storage) RunQuery ¶
func (s *Storage) RunQuery(qctx *QueryContext, writeBlock WriteDataBlockFunc) error
RunQuery runs the given qctx and calls writeBlock for results.
func (*Storage) UpdateStats ¶
func (s *Storage) UpdateStats(ss *StorageStats)
UpdateStats updates ss for the given s.
type StorageConfig ¶
type StorageConfig struct {
// Retention is the retention for the ingested data.
//
// Older data is automatically deleted.
Retention time.Duration
// DefaultParallelReaders is the default number of parallel readers to use per each query execution.
//
// Higher value can help improving query performance on storage with high disk read latency such as S3.
DefaultParallelReaders int
// MaxDiskSpaceUsageBytes is an optional maximum disk space logs can use.
//
// The oldest per-day partitions are automatically dropped if the total disk space usage exceeds this limit.
MaxDiskSpaceUsageBytes int64
// MaxDiskUsagePercent is an optional threshold in percentage (1-100) for disk usage of the filesystem holding the storage path.
// When the current disk usage exceeds this percentage, the oldest per-day partitions are automatically dropped.
MaxDiskUsagePercent int
// FlushInterval is the interval for flushing the in-memory data to disk at the Storage.
FlushInterval time.Duration
// FutureRetention is the allowed retention from the current time to future for the ingested data.
//
// Log entries with timestamps bigger than now+FutureRetention are ignored.
FutureRetention time.Duration
// MaxBackfillAge is the maximum allowed age for the backfilled logs.
//
// Log entries with timestamps older than now-MaxBackfillAge are ignored.
MaxBackfillAge time.Duration
// MinFreeDiskSpaceBytes is the minimum free disk space at storage path after which the storage stops accepting new data
// and enters read-only mode.
MinFreeDiskSpaceBytes int64
// LogNewStreams indicates whether to log newly created log streams.
//
// This can be useful for debugging of high cardinality issues.
// https://docs.victoriametrics.com/victorialogs/keyconcepts/#high-cardinality
LogNewStreams bool
// LogIngestedRows indicates whether to log the ingested log entries.
//
// This can be useful for debugging of data ingestion.
LogIngestedRows bool
}
StorageConfig is the config for the Storage.
type StorageStats ¶
type StorageStats struct {
// RowsDroppedTooBigTimestamp is the number of rows dropped during data ingestion because their timestamp is bigger than the maximum allowed.
RowsDroppedTooBigTimestamp uint64
// RowsDroppedTooSmallTimestamp is the number of rows dropped during data ingestion because their timestamp is smaller than the minimum allowed.
RowsDroppedTooSmallTimestamp uint64
// PartitionsCount is the number of partitions in the storage.
PartitionsCount uint64
// MaxDiskSpaceUsageBytes is the maximum disk space logs can use.
MaxDiskSpaceUsageBytes int64
// IsReadOnly indicates whether the storage is read-only.
IsReadOnly bool
// PartitionStats contains partition stats.
PartitionStats
// MinTimestamp is the minimum event timestamp across the entire storage (in nanoseconds).
// It is set to math.MinInt64 if there is no data.
MinTimestamp int64
// MaxTimestamp is the maximum event timestamp across the entire storage (in nanoseconds).
// It is set to math.MaxInt64 if there is no data.
MaxTimestamp int64
}
StorageStats represents stats for the storage. It may be obtained by calling Storage.UpdateStats().
type StreamFilter ¶
type StreamFilter struct {
// contains filtered or unexported fields
}
StreamFilter is a filter for streams, e.g. `_stream:{...}`
func (*StreamFilter) String ¶
func (sf *StreamFilter) String() string
type StreamTags ¶
type StreamTags struct {
// contains filtered or unexported fields
}
StreamTags contains stream tags.
func (*StreamTags) Add ¶
func (st *StreamTags) Add(name, value string)
Add adds (name:value) tag to st.
func (*StreamTags) Less ¶
func (st *StreamTags) Less(i, j int) bool
Less returns true if tag i is smaller than the tag j.
func (*StreamTags) MarshalCanonical ¶
func (st *StreamTags) MarshalCanonical(dst []byte) []byte
MarshalCanonical marshal st in a canonical way
func (*StreamTags) String ¶
func (st *StreamTags) String() string
String returns string representation of st.
func (*StreamTags) UnmarshalCanonical ¶
func (st *StreamTags) UnmarshalCanonical(src []byte) ([]byte, error)
UnmarshalCanonical unmarshals st from src marshaled with MarshalCanonical.
type SyslogParser ¶
type SyslogParser struct {
// Fields contains parsed fields after Parse call.
Fields []Field
// contains filtered or unexported fields
}
SyslogParser is parser for syslog messages.
It understands the following syslog formats:
- https://datatracker.ietf.org/doc/html/rfc5424 - https://datatracker.ietf.org/doc/html/rfc3164
It extracts the following list of syslog message fields into Fields - https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
func GetSyslogParser ¶
func GetSyslogParser(currentYear int, timezone *time.Location) *SyslogParser
GetSyslogParser returns syslog parser from the pool.
currentYear must contain the current year. It is used for properly setting timestamp field for rfc3164 format, which doesn't contain year.
the timezone is used for rfc3164 format for setting the desired timezone.
Return back the parser to the pool by calling PutSyslogParser when it is no longer needed.
func (*SyslogParser) AddField ¶ added in v1.27.0
func (p *SyslogParser) AddField(name, value string)
AddField adds name=value log field to p.Fields.
func (*SyslogParser) AddMessageField ¶ added in v1.35.0
func (p *SyslogParser) AddMessageField(s string)
func (*SyslogParser) Parse ¶
func (p *SyslogParser) Parse(s string)
Parse parses syslog message from s into p.Fields.
p.Fields is valid until s is modified or p state is changed.
type TenantID ¶
type TenantID struct {
// AccountID is the id of the account for the log stream.
AccountID uint32 `json:"account_id"`
// ProjectID is the id of the project for the log stream.
ProjectID uint32 `json:"project_id"`
}
TenantID is an id of a tenant for log streams.
Each log stream is associated with a single TenantID.
func GetTenantIDFromRequest ¶
GetTenantIDFromRequest returns tenantID from r.
func ParseTenantID ¶
ParseTenantID returns tenantID from s.
s is expected in the form of accountID:projectID. If s is empty, then zero tenantID is returned.
func UnmarshalTenantIDsFromJSON ¶ added in v1.38.0
UnmarshalTenantIDsFromJSON unmarshals tenantIDs from JSON array at src.
type TimeFormatter ¶
type TimeFormatter int64
TimeFormatter implements fmt.Stringer for timestamp in nanoseconds
func (*TimeFormatter) String ¶
func (tf *TimeFormatter) String() string
String returns human-readable representation for tf.
type ValueWithHits ¶
ValueWithHits contains value and hits.
func MergeValuesWithHits ¶
func MergeValuesWithHits(a [][]ValueWithHits, limit uint64, resetHitsOnLimitExceeded bool) []ValueWithHits
MergeValuesWithHits merges a entries and applies the given limit to the number of returned entries.
If resetHitsOnLimitExceeded is set to true and the number of merged entries exceeds the given limit, then hits are zeroed in the returned response.
func (*ValueWithHits) Marshal ¶
func (vh *ValueWithHits) Marshal(dst []byte) []byte
Marshal appends marshaled vh to dst and returns the result
func (*ValueWithHits) UnmarshalInplace ¶
func (vh *ValueWithHits) UnmarshalInplace(src []byte) ([]byte, error)
UnmarshalInplace unmarshals vh from src and returns the remaining tail.
vh is valid until src is changed.
type WriteDataBlockFunc ¶
WriteDataBlockFunc must process the db.
WriteDataBlockFunc cannot hold references to db or any of its fields after the function returns. If you need BlockColumn names or values after the function returns, copy them using strings.Clone.
Source Files
¶
- arena.go
- bitmap.go
- block.go
- block_data.go
- block_header.go
- block_result.go
- block_search.go
- block_stream_merger.go
- block_stream_reader.go
- block_stream_writer.go
- bloomfilter.go
- cache.go
- chunked_allocator.go
- color_sequence.go
- column_names.go
- consts.go
- datadb.go
- delete_task.go
- encoding.go
- filenames.go
- filter.go
- filter_and.go
- filter_any_case_phrase.go
- filter_any_case_prefix.go
- filter_contains_all.go
- filter_contains_any.go
- filter_contains_common_case.go
- filter_day_range.go
- filter_eq_field.go
- filter_equals_common_case.go
- filter_exact.go
- filter_exact_prefix.go
- filter_in.go
- filter_ipv4_range.go
- filter_le_field.go
- filter_len_range.go
- filter_noop.go
- filter_not.go
- filter_or.go
- filter_pattern_match.go
- filter_phrase.go
- filter_prefix.go
- filter_range.go
- filter_regexp.go
- filter_sequence.go
- filter_stream.go
- filter_stream_id.go
- filter_string_range.go
- filter_substring.go
- filter_time.go
- filter_value_type.go
- filter_week_range.go
- hash128.go
- hash_tokenizer.go
- hits_map.go
- if_filter.go
- in_values.go
- index_block_header.go
- indexdb.go
- inmemory_part.go
- json_parser.go
- log_rows.go
- logfmt_parser.go
- net_query_runner.go
- parser.go
- part.go
- part_header.go
- partition.go
- pattern.go
- pattern_matcher.go
- pipe.go
- pipe_block_stats.go
- pipe_blocks_count.go
- pipe_collapse_nums.go
- pipe_copy.go
- pipe_decolorize.go
- pipe_delete.go
- pipe_drop_empty_fields.go
- pipe_extract.go
- pipe_extract_regexp.go
- pipe_facets.go
- pipe_field_names.go
- pipe_field_values.go
- pipe_field_values_local.go
- pipe_fields.go
- pipe_filter.go
- pipe_first.go
- pipe_format.go
- pipe_generate_sequence.go
- pipe_hash.go
- pipe_join.go
- pipe_json_array_len.go
- pipe_last.go
- pipe_len.go
- pipe_limit.go
- pipe_math.go
- pipe_offset.go
- pipe_pack.go
- pipe_pack_json.go
- pipe_pack_logfmt.go
- pipe_query_stats.go
- pipe_query_stats_local.go
- pipe_rename.go
- pipe_replace.go
- pipe_replace_regexp.go
- pipe_running_stats.go
- pipe_sample.go
- pipe_set_stream_fields.go
- pipe_sort.go
- pipe_sort_topk.go
- pipe_split.go
- pipe_stats.go
- pipe_stream_context.go
- pipe_time_add.go
- pipe_top.go
- pipe_total_stats.go
- pipe_union.go
- pipe_uniq.go
- pipe_uniq_local.go
- pipe_unpack.go
- pipe_unpack_json.go
- pipe_unpack_logfmt.go
- pipe_unpack_syslog.go
- pipe_unpack_words.go
- pipe_unroll.go
- pipe_update.go
- query_stats.go
- rows.go
- running_stats_count.go
- running_stats_max.go
- running_stats_min.go
- running_stats_sum.go
- stats_avg.go
- stats_count.go
- stats_count_empty.go
- stats_count_uniq.go
- stats_count_uniq_hash.go
- stats_histogram.go
- stats_json_values.go
- stats_json_values_sorted.go
- stats_json_values_topk.go
- stats_max.go
- stats_median.go
- stats_min.go
- stats_quantile.go
- stats_rate.go
- stats_rate_sum.go
- stats_row_any.go
- stats_row_max.go
- stats_row_min.go
- stats_sum.go
- stats_sum_len.go
- stats_uniq_values.go
- stats_values.go
- storage.go
- storage_search.go
- stream_filter.go
- stream_id.go
- stream_tags.go
- stringbucket.go
- syslog_parser.go
- tenant_id.go
- tokenizer.go
- u128.go
- values_encoder.go