netstorage

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2022 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UnknownMessage uint8 = iota

	SeriesKeysRequestMessage
	SeriesKeysResponseMessage

	SeriesExactCardinalityRequestMessage
	SeriesExactCardinalityResponseMessage

	SeriesCardinalityRequestMessage
	SeriesCardinalityResponseMessage

	ShowTagValuesRequestMessage
	ShowTagValuesResponseMessage

	ShowTagValuesCardinalityRequestMessage
	ShowTagValuesCardinalityResponseMessage

	GetShardSplitPointsRequestMessage
	GetShardSplitPointsResponseMessage

	DeleteRequestMessage
	DeleteResponseMessage

	CreateDataBaseRequestMessage
	CreateDatabaseResponseMessage
)
View Source
const (
	ShowMeasurementsStatement           = "ShowMeasurementsStatement"
	ShowTagKeysStatement                = "ShowTagKeysStatement"
	ShowTagValuesStatement              = "ShowTagValuesStatement"
	ShowSeriesCardinalityStatement      = "ShowSeriesCardinalityStatement"
	ShowMeasurementCardinalityStatement = "ShowMeasurementCardinalityStatement"
)
View Source
const (
	PackageTypeFast = byte(2)
)

Variables

View Source
var (
	ErrPartitionNotFound = errors.New("partition not found")
)

Functions

func ErrorPanic

func ErrorPanic(err error, log *zap.Logger)

func GetResponseMessageType

func GetResponseMessageType(typ uint8) uint8

func NewMessage

func NewMessage(typ uint8) codec.BinaryCodec

func RegisterNewEngineFun

func RegisterNewEngineFun(name string, fn NewEngineFun)

func RegisteredEngines

func RegisteredEngines() []string

Types

type BaseMessage

type BaseMessage struct {
	Typ  uint8
	Data codec.BinaryCodec
}

func (*BaseMessage) Marshal

func (bm *BaseMessage) Marshal(buf []byte) ([]byte, error)

func (*BaseMessage) Size

func (bm *BaseMessage) Size() int

type ColumnKeys

type ColumnKeys struct {
	Name string
	Keys []metaclient.FieldKey
}

type CreateDataBaseRequest

type CreateDataBaseRequest struct {
	internal2.CreateDataBaseRequest
}

func (*CreateDataBaseRequest) MarshalBinary

func (r *CreateDataBaseRequest) MarshalBinary() ([]byte, error)

func (*CreateDataBaseRequest) UnmarshalBinary

func (r *CreateDataBaseRequest) UnmarshalBinary(buf []byte) error

type CreateDataBaseResponse

type CreateDataBaseResponse struct {
	internal2.CreateDataBaseResponse
}

func (*CreateDataBaseResponse) Error

func (r *CreateDataBaseResponse) Error() error

func (*CreateDataBaseResponse) MarshalBinary

func (r *CreateDataBaseResponse) MarshalBinary() ([]byte, error)

func (*CreateDataBaseResponse) UnmarshalBinary

func (r *CreateDataBaseResponse) UnmarshalBinary(buf []byte) error

type DDLCallback

type DDLCallback struct {
	// contains filtered or unexported fields
}

func (*DDLCallback) GetCodec

func (c *DDLCallback) GetCodec() transport.Codec

func (*DDLCallback) GetResponse

func (c *DDLCallback) GetResponse() interface{}

func (*DDLCallback) Handle

func (c *DDLCallback) Handle(data interface{}) error

type DDLMessage

type DDLMessage struct {
	BaseMessage
}

func NewDDLMessage

func NewDDLMessage(typ uint8, data codec.BinaryCodec) *DDLMessage

func (*DDLMessage) Instance

func (m *DDLMessage) Instance() transport.Codec

func (*DDLMessage) Unmarshal

func (m *DDLMessage) Unmarshal(buf []byte) error

type DeleteRequest

type DeleteRequest struct {
	Database    string
	Rp          string
	Measurement string
	ShardIds    []uint64
	Type        DeleteType
	PtId        uint32
}

func (*DeleteRequest) MarshalBinary

func (ddr *DeleteRequest) MarshalBinary() ([]byte, error)

func (*DeleteRequest) UnmarshalBinary

func (ddr *DeleteRequest) UnmarshalBinary(data []byte) error

type DeleteResponse

type DeleteResponse struct {
	Err error
}

func (*DeleteResponse) MarshalBinary

func (dr *DeleteResponse) MarshalBinary() ([]byte, error)

func (*DeleteResponse) UnmarshalBinary

func (dr *DeleteResponse) UnmarshalBinary(data []byte) error

type DeleteType

type DeleteType int
const (
	DatabaseDelete DeleteType = iota
	RetentionPolicyDelete
	MeasurementDelete
)

type Engine

type Engine interface {
	Open(ptIds []uint32, durationInfos map[uint64]*meta.ShardDurationInfo) error
	Close() error
	ForceFlush()
	SetReadOnly(readonly bool)

	DeleteShard(db string, ptId uint32, shardID uint64) error
	DeleteIndex(db string, pt uint32, shardID uint64) error
	ExpiredShards() []*meta.ShardIdentifier
	ExpiredIndexes() []*meta.IndexIdentifier

	FetchShardsNeedChangeStore() ([]*meta.ShardIdentifier, []*meta.ShardIdentifier)
	ChangeShardTierToWarm(db string, ptId uint32, shardID uint64) error

	CreateShard(db, rp string, ptId uint32, shardID uint64, timeRangeInfo *meta.ShardTimeRangeInfo) error
	WriteRows(db, rp string, ptId uint32, shardID uint64, points []influx.Row, binaryRows []byte) error
	CreateDBPT(db string, pt uint32)

	GetShardSplitPoints(db string, ptId uint32, shardID uint64, idxes []int64) ([]string, error)

	DeleteDatabase(db string, ptId uint32) error

	DropRetentionPolicy(db string, rp string, ptId uint32) error

	DropMeasurement(db string, rp string, name string, shardIds []uint64) error

	SeriesKeys(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr) ([]string, error)
	SeriesCardinality(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr) ([]meta.MeasurementCardinalityInfo, error)
	SeriesExactCardinality(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr) (map[string]uint64, error)

	TagValues(db string, ptId []uint32, tagKeys map[string][][]byte, condition influxql.Expr) (TablesTagSets, error)
	TagValuesCardinality(db string, ptIDs []uint32, tagKeys map[string][][]byte, condition influxql.Expr) (map[string]uint64, error)
	DropSeries(database string, sources []influxql.Source, ptId []uint32, condition influxql.Expr) (int, error)

	DbPTRef(db string, ptId uint32) error
	DbPTUnref(db string, ptId uint32)
	CreateLogicalPlan(ctx context.Context, db string, ptId uint32, shardID uint64, sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)
	LogicalPlanCost(db string, ptId uint32, sources influxql.Sources, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)

	UpdateShardDurationInfo(info *meta.ShardDurationInfo) error

	SysCtrl(req *SysCtrlRequest) error
	Statistics(buffer []byte) ([]byte, error)
}

type EngineOptions

type EngineOptions struct {
	Version string
	// Limits the concurrent number of TSM files that can be loaded at once.
	OpenLimiter limiter.Fixed

	ImmTableMaxMemoryPercentage int

	// WriteColdDuration is the length of time at which the engine will snapshot the mutable
	WriteColdDuration time.Duration

	// ShardMutableSizeLimit is the maximum size a shard's cache can reach before it starts rejecting writes.
	ShardMutableSizeLimit int64

	// NodeMutableSizeLimit is the maximum size a node's cache can reach before it starts rejecting writes.
	NodeMutableSizeLimit int64

	// MaxWriteHangTime is the maximum time to hang for data write to store if node mem of mem is not enough
	MaxWriteHangTime time.Duration

	FullCompactColdDuration  time.Duration
	MaxConcurrentCompactions int
	MaxFullCompactions       int
	CompactThroughput        int64
	CompactThroughputBurst   int64
	CompactRecovery          bool
	SnapshotThroughput       int64
	SnapshotThroughputBurst  int64

	// WalSyncInterval is the interval of wal file sync
	WalEnabled        bool
	WalSyncInterval   time.Duration
	WalReplayParallel bool

	// Immutable config
	ReadCacheLimit   int
	CacheDataBlock   bool
	CacheMetaBlock   bool
	EnableMmapRead   bool
	CompactionMethod int // 0:auto, 1:stream, 2: non-stream
}

func NewEngineOptions

func NewEngineOptions() EngineOptions

type ExactCardinalityResponse

type ExactCardinalityResponse struct {
	internal2.ExactCardinalityResponse
}

func (*ExactCardinalityResponse) Error

func (r *ExactCardinalityResponse) Error() error

func (*ExactCardinalityResponse) MarshalBinary

func (r *ExactCardinalityResponse) MarshalBinary() ([]byte, error)

func (*ExactCardinalityResponse) UnmarshalBinary

func (r *ExactCardinalityResponse) UnmarshalBinary(buf []byte) error

type ExecuteStatementMessage

type ExecuteStatementMessage struct {
	StatementType string
	Result        []byte
	Filtered      []byte
}

type GetShardSplitPointsRequest

type GetShardSplitPointsRequest struct {
	internal2.GetShardSplitPointsRequest
}

func (*GetShardSplitPointsRequest) Error

func (r *GetShardSplitPointsRequest) Error() error

func (*GetShardSplitPointsRequest) MarshalBinary

func (r *GetShardSplitPointsRequest) MarshalBinary() ([]byte, error)

func (*GetShardSplitPointsRequest) UnmarshalBinary

func (r *GetShardSplitPointsRequest) UnmarshalBinary(buf []byte) error

type GetShardSplitPointsResponse

type GetShardSplitPointsResponse struct {
	internal2.GetShardSplitPointsResponse
}

func (*GetShardSplitPointsResponse) Error

func (*GetShardSplitPointsResponse) MarshalBinary

func (r *GetShardSplitPointsResponse) MarshalBinary() ([]byte, error)

func (*GetShardSplitPointsResponse) UnmarshalBinary

func (r *GetShardSplitPointsResponse) UnmarshalBinary(buf []byte) error

type NetStorage

type NetStorage struct {
	// contains filtered or unexported fields
}

func (*NetStorage) Close

func (s *NetStorage) Close() error

func (*NetStorage) DeleteDatabase

func (s *NetStorage) DeleteDatabase(node *meta2.DataNode, database string, pt uint32) error

func (*NetStorage) DeleteMeasurement

func (s *NetStorage) DeleteMeasurement(node *meta2.DataNode, db string, rp string, name string, shardIds []uint64) error

func (*NetStorage) DeleteRetentionPolicy

func (s *NetStorage) DeleteRetentionPolicy(node *meta2.DataNode, db string, rp string, pt uint32) error

func (*NetStorage) DropShard

func (s *NetStorage) DropShard(nodeID uint64, database, rpName string, dbPts []uint32, shardID uint64) error

func (*NetStorage) GetShardSplitPoints

func (s *NetStorage) GetShardSplitPoints(node *meta2.DataNode, database string, pt uint32,
	shardId uint64, idxes []int64) ([]string, error)

func (*NetStorage) HandleDeleteReq

func (s *NetStorage) HandleDeleteReq(node *meta2.DataNode, req *DeleteRequest) error

func (*NetStorage) Open

func (s *NetStorage) Open() error

func (*NetStorage) SendSysCtrlOnNode

func (s *NetStorage) SendSysCtrlOnNode(nodeID uint64, req SysCtrlRequest) (map[string]string, error)

func (*NetStorage) SeriesCardinality

func (s *NetStorage) SeriesCardinality(nodeID uint64, db string, dbPts []uint32, measurements []string, condition influxql.Expr) ([]meta2.MeasurementCardinalityInfo, error)

func (*NetStorage) SeriesExactCardinality

func (s *NetStorage) SeriesExactCardinality(nodeID uint64, db string, dbPts []uint32, measurements []string, condition influxql.Expr) (map[string]uint64, error)

func (*NetStorage) ShowSeries

func (s *NetStorage) ShowSeries(nodeID uint64, db string, ptIDs []uint32, measurements []string, condition influxql.Expr) ([]string, error)

func (*NetStorage) TagValues

func (s *NetStorage) TagValues(nodeID uint64, db string, ptIDs []uint32, tagKeys map[string]map[string]struct{}, cond influxql.Expr) (TablesTagSets, error)

func (*NetStorage) TagValuesCardinality

func (s *NetStorage) TagValuesCardinality(nodeID uint64, db string, ptIDs []uint32,
	tagKeys map[string]map[string]struct{}, cond influxql.Expr) (map[string]uint64, error)

func (*NetStorage) WriteRows

func (s *NetStorage) WriteRows(nodeID uint64, database string, rpName string, pt uint32, shard uint64, rows *[]influx.Row, timeout time.Duration) error

type NewEngineFun

type NewEngineFun func(dataPath, walPath string, options EngineOptions, ctx *metaclient.LoadCtx) (Engine, error)

func GetNewEngineFunction

func GetNewEngineFunction(entType string) NewEngineFun

type PartialWriteError

type PartialWriteError struct {
	Reason  error
	Dropped int

	// A sorted slice of series keys that were dropped.
	DroppedKeys [][]byte
}

PartialWriteError indicates a write request could only write a portion of the requested values.

func (PartialWriteError) Error

func (e PartialWriteError) Error() string

type Requester

type Requester struct {
	// contains filtered or unexported fields
}

func NewRequester

func NewRequester(msgTyp uint8, data codec.BinaryCodec, mc meta.MetaClient) *Requester

type SeriesCardinalityRequest

type SeriesCardinalityRequest struct {
	SeriesKeysRequest
}

type SeriesCardinalityResponse

type SeriesCardinalityResponse struct {
	meta.CardinalityResponse
}

type SeriesExactCardinalityRequest

type SeriesExactCardinalityRequest struct {
	SeriesKeysRequest
}

type SeriesExactCardinalityResponse

type SeriesExactCardinalityResponse struct {
	ExactCardinalityResponse
}

type SeriesKeysRequest

type SeriesKeysRequest struct {
	internal2.SeriesKeysRequest
}

func (*SeriesKeysRequest) MarshalBinary

func (r *SeriesKeysRequest) MarshalBinary() ([]byte, error)

func (*SeriesKeysRequest) UnmarshalBinary

func (r *SeriesKeysRequest) UnmarshalBinary(buf []byte) error

type SeriesKeysResponse

type SeriesKeysResponse struct {
	internal2.SeriesKeysResponse
}

func (*SeriesKeysResponse) Error

func (r *SeriesKeysResponse) Error() error

func (*SeriesKeysResponse) MarshalBinary

func (r *SeriesKeysResponse) MarshalBinary() ([]byte, error)

func (*SeriesKeysResponse) UnmarshalBinary

func (r *SeriesKeysResponse) UnmarshalBinary(buf []byte) error

type ShowTagValuesCardinalityRequest

type ShowTagValuesCardinalityRequest struct {
	ShowTagValuesRequest
}

type ShowTagValuesCardinalityResponse

type ShowTagValuesCardinalityResponse struct {
	ExactCardinalityResponse
}

type ShowTagValuesRequest

type ShowTagValuesRequest struct {
	internal2.ShowTagValuesRequest
}

func (*ShowTagValuesRequest) GetTagKeysBytes

func (r *ShowTagValuesRequest) GetTagKeysBytes() map[string][][]byte

func (*ShowTagValuesRequest) MarshalBinary

func (r *ShowTagValuesRequest) MarshalBinary() ([]byte, error)

func (*ShowTagValuesRequest) SetTagKeys

func (r *ShowTagValuesRequest) SetTagKeys(tagKeys map[string]map[string]struct{})

func (*ShowTagValuesRequest) UnmarshalBinary

func (r *ShowTagValuesRequest) UnmarshalBinary(buf []byte) error

type ShowTagValuesResponse

type ShowTagValuesResponse struct {
	internal2.ShowTagValuesResponse
}

func (*ShowTagValuesResponse) Error

func (r *ShowTagValuesResponse) Error() error

func (*ShowTagValuesResponse) GetTagValuesSlice

func (r *ShowTagValuesResponse) GetTagValuesSlice() TablesTagSets

func (*ShowTagValuesResponse) MarshalBinary

func (r *ShowTagValuesResponse) MarshalBinary() ([]byte, error)

func (*ShowTagValuesResponse) SetTagValuesSlice

func (r *ShowTagValuesResponse) SetTagValuesSlice(s TablesTagSets)

func (*ShowTagValuesResponse) UnmarshalBinary

func (r *ShowTagValuesResponse) UnmarshalBinary(buf []byte) error

type Storage

type Storage interface {
	Open() error
	Close() error

	WriteRows(nodeID uint64, database string, rp string, pt uint32, shard uint64, rows *[]influx.Row, timeout time.Duration) error
	DropShard(nodeID uint64, database, rpName string, dbPts []uint32, shardID uint64) error

	TagValues(nodeID uint64, db string, ptIDs []uint32, tagKeys map[string]map[string]struct{}, cond influxql.Expr) (TablesTagSets, error)
	TagValuesCardinality(nodeID uint64, db string, ptIDs []uint32, tagKeys map[string]map[string]struct{}, cond influxql.Expr) (map[string]uint64, error)

	ShowSeries(nodeID uint64, db string, ptId []uint32, measurements []string, condition influxql.Expr) ([]string, error)
	SeriesCardinality(nodeID uint64, db string, dbPts []uint32, measurements []string, condition influxql.Expr) ([]meta2.MeasurementCardinalityInfo, error)
	SeriesExactCardinality(nodeID uint64, db string, dbPts []uint32, measurements []string, condition influxql.Expr) (map[string]uint64, error)

	SendSysCtrlOnNode(nodID uint64, req SysCtrlRequest) (map[string]string, error)

	GetShardSplitPoints(node *meta2.DataNode, database string, pt uint32,
		shardId uint64, idxes []int64) ([]string, error)
	DeleteDatabase(node *meta2.DataNode, database string, pt uint32) error
	DeleteRetentionPolicy(node *meta2.DataNode, db string, rp string, pt uint32) error
	DeleteMeasurement(node *meta2.DataNode, db string, rp string, name string, shardIds []uint64) error
}

func NewNetStorage

func NewNetStorage(mcli meta.MetaClient) Storage

type SysCtrlCallback

type SysCtrlCallback struct {
	// contains filtered or unexported fields
}

func (*SysCtrlCallback) GetCodec

func (c *SysCtrlCallback) GetCodec() transport.Codec

func (*SysCtrlCallback) GetResponse

func (c *SysCtrlCallback) GetResponse() interface{}

func (*SysCtrlCallback) Handle

func (c *SysCtrlCallback) Handle(data interface{}) error

type SysCtrlRequest

type SysCtrlRequest struct {
	// contains filtered or unexported fields
}

func (*SysCtrlRequest) Get

func (s *SysCtrlRequest) Get(key string) (v string, ok bool)

func (*SysCtrlRequest) Instance

func (s *SysCtrlRequest) Instance() transport.Codec

func (*SysCtrlRequest) Marshal

func (s *SysCtrlRequest) Marshal(buf []byte) ([]byte, error)

func (*SysCtrlRequest) Mod

func (s *SysCtrlRequest) Mod() string

func (*SysCtrlRequest) Param

func (s *SysCtrlRequest) Param() map[string]string

func (*SysCtrlRequest) SetMod

func (s *SysCtrlRequest) SetMod(m string)

func (*SysCtrlRequest) SetParam

func (s *SysCtrlRequest) SetParam(m map[string]string)

func (*SysCtrlRequest) Size

func (s *SysCtrlRequest) Size() int

func (*SysCtrlRequest) Unmarshal

func (s *SysCtrlRequest) Unmarshal(buf []byte) error

type SysCtrlResponse

type SysCtrlResponse struct {
	// contains filtered or unexported fields
}

func (*SysCtrlResponse) Error

func (s *SysCtrlResponse) Error() error

func (*SysCtrlResponse) Instance

func (s *SysCtrlResponse) Instance() transport.Codec

func (*SysCtrlResponse) Marshal

func (s *SysCtrlResponse) Marshal(buf []byte) ([]byte, error)

func (*SysCtrlResponse) Result

func (s *SysCtrlResponse) Result() map[string]string

func (*SysCtrlResponse) SetErr

func (s *SysCtrlResponse) SetErr(err string)

func (*SysCtrlResponse) SetResult

func (s *SysCtrlResponse) SetResult(ret map[string]string)

func (*SysCtrlResponse) Size

func (s *SysCtrlResponse) Size() int

func (*SysCtrlResponse) Unmarshal

func (s *SysCtrlResponse) Unmarshal(buf []byte) error

type TableColumnKeys

type TableColumnKeys []ColumnKeys

func (TableColumnKeys) Len

func (a TableColumnKeys) Len() int

func (TableColumnKeys) Less

func (a TableColumnKeys) Less(i, j int) bool

func (TableColumnKeys) Swap

func (a TableColumnKeys) Swap(i, j int)

type TableTagKeys

type TableTagKeys []TagKeys

func (TableTagKeys) Len

func (a TableTagKeys) Len() int

func (TableTagKeys) Less

func (a TableTagKeys) Less(i, j int) bool

func (TableTagKeys) Swap

func (a TableTagKeys) Swap(i, j int)

type TableTagSets

type TableTagSets struct {
	Name   string
	Values TagSets
}

type TablesTagSets

type TablesTagSets []TableTagSets

func (TablesTagSets) Len

func (a TablesTagSets) Len() int

func (TablesTagSets) Less

func (a TablesTagSets) Less(i, j int) bool

func (TablesTagSets) Swap

func (a TablesTagSets) Swap(i, j int)

type TagKeys

type TagKeys struct {
	Name string
	Keys []string
}

type TagSet

type TagSet struct {
	Key, Value string
}

type TagSets

type TagSets []TagSet

func (TagSets) Len

func (a TagSets) Len() int

func (TagSets) Less

func (a TagSets) Less(i, j int) bool

func (TagSets) Swap

func (a TagSets) Swap(i, j int)

type WritePointsCallback

type WritePointsCallback struct {
	// contains filtered or unexported fields
}

func (*WritePointsCallback) Error

func (c *WritePointsCallback) Error() error

func (*WritePointsCallback) GetCodec

func (c *WritePointsCallback) GetCodec() transport.Codec

func (*WritePointsCallback) Handle

func (c *WritePointsCallback) Handle(data interface{}) error

type WritePointsRequest

type WritePointsRequest struct {
	// contains filtered or unexported fields
}

func NewWritePointsRequest

func NewWritePointsRequest(points []byte) *WritePointsRequest

func (*WritePointsRequest) Instance

func (r *WritePointsRequest) Instance() transport.Codec

func (*WritePointsRequest) Marshal

func (r *WritePointsRequest) Marshal(buf []byte) ([]byte, error)

func (*WritePointsRequest) Points

func (r *WritePointsRequest) Points() []byte

func (*WritePointsRequest) Size

func (r *WritePointsRequest) Size() int

func (*WritePointsRequest) Unmarshal

func (r *WritePointsRequest) Unmarshal(buf []byte) error

type WritePointsResponse

type WritePointsResponse struct {
	Code    uint8
	Message string
}

func NewWritePointsResponse

func NewWritePointsResponse(code uint8, message string) *WritePointsResponse

func (*WritePointsResponse) Instance

func (r *WritePointsResponse) Instance() transport.Codec

func (*WritePointsResponse) Marshal

func (r *WritePointsResponse) Marshal(buf []byte) ([]byte, error)

func (*WritePointsResponse) Size

func (r *WritePointsResponse) Size() int

func (*WritePointsResponse) Unmarshal

func (r *WritePointsResponse) Unmarshal(buf []byte) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL