Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrNoDataPoints = errors.New("no data points found") //数据不存在 ErrNoRowsData = errors.New("no rows given") // row empty ErrUnknown = "UNKNOWN" )
Functions ¶
func NewMemoryPartition ¶
func NewMemoryPartition(partitionDuration time.Duration, precision TimestampPrecision) partition
Types ¶
type Option ¶
type Option func(*Storage)
func WithLogger ¶
Defaults to a logger implementation that does nothing.
func WithPartitionDuration ¶
Defaults to 5min
func WithTimestampPrecision ¶
func WithTimestampPrecision(precision TimestampPrecision) Option
Defaults to Nanoseconds
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
memory data tree. just for data code list │ │
Read Write
│ │
│ V
│ ┌───────────────────┐ max: 1615010800
├─────> Memory Partition
│ └───────────────────┘ min: 1615007201
│
│ ┌───────────────────┐ max: 1615007200
├─────> Memory Partition
│ └───────────────────┘ min: 1615003601
│
│ ┌───────────────────┐ max: 1615003600
└─────> Memory Partition
└───────────────────┘ min: 1615000000
func (*Storage) InsertRows ¶
Example ¶
stg, err := NewStorage(
WithTimestampPrecision(Seconds),
WithWriteTimeout(15*time.Second),
WithRetention(1*time.Hour),
WithLogger(logging),
)
if err != nil {
panic(err)
}
defer func() {
if err := stg.Close(); err != nil {
panic(err)
}
}()
err = stg.InsertRows([]Row{
Row{
Name: "metric1",
DataPoint: DataPoint{Timestamp: 1600000000, Value: 0.1},
},
})
if err != nil {
panic(err)
}
points, err := stg.Select("metric1", nil, 1600000000, 1600000001)
if err != nil {
panic(err)
}
for _, p := range points {
fmt.Printf("timestamp: %v, value: %v\n", p.Timestamp, p.Value)
}
Output: timestamp: 1600000000, value: 0.1
Example (Concurrent) ¶
stg, err := NewStorage(
WithTimestampPrecision(Seconds),
)
if err != nil {
panic(err)
}
defer stg.Close()
// First insert in order to ensure min timestamp
if err := stg.InsertRows([]Row{
{Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000000}},
}); err != nil {
panic(err)
}
var wg sync.WaitGroup
for i := int64(1600000001); i < 1600000100; i++ {
wg.Add(1)
go func(timestamp int64) {
if err := stg.InsertRows([]Row{
{Name: "metric1", DataPoint: DataPoint{Timestamp: timestamp}},
}); err != nil {
panic(err)
}
wg.Done()
}(i)
}
wg.Wait()
points, err := stg.Select("metric1", nil, 1600000000, 1600000100)
if err != nil {
panic(err)
}
for _, p := range points {
fmt.Printf("timestamp: %v, value: %v\n", p.Timestamp, p.Value)
}
func (*Storage) Select ¶
Example (From_memory) ¶
tmpDir, err := os.MkdirTemp("", "storage-example")
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
stg, err := NewStorage(
WithPartitionDuration(2*time.Hour),
WithTimestampPrecision(Seconds),
WithWriteTimeout(15*time.Second),
WithRetention(1*time.Hour),
WithLogger(logging),
)
if err != nil {
panic(err)
}
defer func() {
if err := stg.Close(); err != nil {
panic(err)
}
}()
// Ingest data points of metric1
for timestamp := int64(1600000000); timestamp < 1600000050; timestamp++ {
err := stg.InsertRows([]Row{
{Name: "metric1", DataPoint: DataPoint{Timestamp: timestamp, Value: 0.1}},
})
if err != nil {
panic(err)
}
}
// Ingest data points of metric2
for timestamp := int64(1600000050); timestamp < 1600000100; timestamp++ {
err := stg.InsertRows([]Row{
{Name: "metric2", DataPoint: DataPoint{Timestamp: timestamp, Value: 0.2}},
})
if err != nil {
panic(err)
}
}
points, err := stg.Select("metric1", nil, 1600000000, 1600000050)
if errors.Is(err, ErrNoDataPoints) {
return
}
if err != nil {
panic(err)
}
fmt.Println("Data points of metric1:")
for _, p := range points {
fmt.Printf("Timestamp: %v, Value: %v\n", p.Timestamp, p.Value)
}
points2, err := stg.Select("metric2", nil, 1600000050, 1600000100)
if errors.Is(err, ErrNoDataPoints) {
return
}
if err != nil {
panic(err)
}
fmt.Println("Data points of metric2:")
for _, p := range points2 {
fmt.Printf("Timestamp: %v, Value: %v\n", p.Timestamp, p.Value)
}
Output: Data points of metric1: Timestamp: 1600000000, Value: 0.1 Timestamp: 1600000001, Value: 0.1 Timestamp: 1600000002, Value: 0.1 Timestamp: 1600000003, Value: 0.1 Timestamp: 1600000004, Value: 0.1 Timestamp: 1600000005, Value: 0.1 Timestamp: 1600000006, Value: 0.1 Timestamp: 1600000007, Value: 0.1 Timestamp: 1600000008, Value: 0.1 Timestamp: 1600000009, Value: 0.1 Timestamp: 1600000010, Value: 0.1 Timestamp: 1600000011, Value: 0.1 Timestamp: 1600000012, Value: 0.1 Timestamp: 1600000013, Value: 0.1 Timestamp: 1600000014, Value: 0.1 Timestamp: 1600000015, Value: 0.1 Timestamp: 1600000016, Value: 0.1 Timestamp: 1600000017, Value: 0.1 Timestamp: 1600000018, Value: 0.1 Timestamp: 1600000019, Value: 0.1 Timestamp: 1600000020, Value: 0.1 Timestamp: 1600000021, Value: 0.1 Timestamp: 1600000022, Value: 0.1 Timestamp: 1600000023, Value: 0.1 Timestamp: 1600000024, Value: 0.1 Timestamp: 1600000025, Value: 0.1 Timestamp: 1600000026, Value: 0.1 Timestamp: 1600000027, Value: 0.1 Timestamp: 1600000028, Value: 0.1 Timestamp: 1600000029, Value: 0.1 Timestamp: 1600000030, Value: 0.1 Timestamp: 1600000031, Value: 0.1 Timestamp: 1600000032, Value: 0.1 Timestamp: 1600000033, Value: 0.1 Timestamp: 1600000034, Value: 0.1 Timestamp: 1600000035, Value: 0.1 Timestamp: 1600000036, Value: 0.1 Timestamp: 1600000037, Value: 0.1 Timestamp: 1600000038, Value: 0.1 Timestamp: 1600000039, Value: 0.1 Timestamp: 1600000040, Value: 0.1 Timestamp: 1600000041, Value: 0.1 Timestamp: 1600000042, Value: 0.1 Timestamp: 1600000043, Value: 0.1 Timestamp: 1600000044, Value: 0.1 Timestamp: 1600000045, Value: 0.1 Timestamp: 1600000046, Value: 0.1 Timestamp: 1600000047, Value: 0.1 Timestamp: 1600000048, Value: 0.1 Timestamp: 1600000049, Value: 0.1 Data points of metric2: Timestamp: 1600000050, Value: 0.2 Timestamp: 1600000051, Value: 0.2 Timestamp: 1600000052, Value: 0.2 Timestamp: 1600000053, Value: 0.2 Timestamp: 1600000054, Value: 0.2 Timestamp: 1600000055, Value: 0.2 Timestamp: 1600000056, Value: 0.2 Timestamp: 1600000057, Value: 0.2 Timestamp: 1600000058, Value: 0.2 Timestamp: 1600000059, Value: 0.2 Timestamp: 1600000060, Value: 0.2 Timestamp: 1600000061, Value: 0.2 Timestamp: 1600000062, Value: 0.2 Timestamp: 1600000063, Value: 0.2 Timestamp: 1600000064, Value: 0.2 Timestamp: 1600000065, Value: 0.2 Timestamp: 1600000066, Value: 0.2 Timestamp: 1600000067, Value: 0.2 Timestamp: 1600000068, Value: 0.2 Timestamp: 1600000069, Value: 0.2 Timestamp: 1600000070, Value: 0.2 Timestamp: 1600000071, Value: 0.2 Timestamp: 1600000072, Value: 0.2 Timestamp: 1600000073, Value: 0.2 Timestamp: 1600000074, Value: 0.2 Timestamp: 1600000075, Value: 0.2 Timestamp: 1600000076, Value: 0.2 Timestamp: 1600000077, Value: 0.2 Timestamp: 1600000078, Value: 0.2 Timestamp: 1600000079, Value: 0.2 Timestamp: 1600000080, Value: 0.2 Timestamp: 1600000081, Value: 0.2 Timestamp: 1600000082, Value: 0.2 Timestamp: 1600000083, Value: 0.2 Timestamp: 1600000084, Value: 0.2 Timestamp: 1600000085, Value: 0.2 Timestamp: 1600000086, Value: 0.2 Timestamp: 1600000087, Value: 0.2 Timestamp: 1600000088, Value: 0.2 Timestamp: 1600000089, Value: 0.2 Timestamp: 1600000090, Value: 0.2 Timestamp: 1600000091, Value: 0.2 Timestamp: 1600000092, Value: 0.2 Timestamp: 1600000093, Value: 0.2 Timestamp: 1600000094, Value: 0.2 Timestamp: 1600000095, Value: 0.2 Timestamp: 1600000096, Value: 0.2 Timestamp: 1600000097, Value: 0.2 Timestamp: 1600000098, Value: 0.2 Timestamp: 1600000099, Value: 0.2
Example (From_memory_out_of_order) ¶
Out of order data points that are not yet flushed are in the buffer but do not appear in select.
stg, err := NewStorage(
WithTimestampPrecision(Seconds),
)
if err != nil {
panic(err)
}
defer func() {
if err := stg.Close(); err != nil {
panic(err)
}
}()
err = stg.InsertRows([]Row{
{Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000000, Value: 0.1}},
{Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000002, Value: 0.1}},
{Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000001, Value: 0.1}},
{Name: "metric1", DataPoint: DataPoint{Timestamp: 1600000003, Value: 0.1}},
})
if err != nil {
panic(err)
}
points, err := stg.Select("metric1", nil, 1600000000, 1600000003)
if err != nil {
panic(err)
}
for _, p := range points {
fmt.Printf("Timestamp: %v, Value: %v\n", p.Timestamp, p.Value)
}
// Out-of-order data points are ignored because they will get merged when flushing.
Output: Timestamp: 1600000000, Value: 0.1 Timestamp: 1600000002, Value: 0.1 Timestamp: 1600000003, Value: 0.1
type StorageInterface ¶
type StorageInterface interface {
Reader
// The precision of timestamps is nanoseconds by default. It can be changed using WithTimestampPrecision.
InsertRows(rows []Row) error
// Close gracefully shutdowns by flushing any unwritten data to the underlying disk partition.
Close() error
}
func NewStorage ¶
func NewStorage(opts ...Option) (StorageInterface, error)
Example (WithPartitionDuration) ¶
stg, err := NewStorage(
WithPartitionDuration(30*time.Minute),
WithTimestampPrecision(Seconds),
WithWriteTimeout(15*time.Second),
WithRetention(1*time.Hour),
WithLogger(logging),
)
if err != nil {
panic(err)
}
defer stg.Close()
type TimestampPrecision ¶
type TimestampPrecision int
const ( Nanoseconds TimestampPrecision = iota Microseconds Milliseconds Seconds )
func (TimestampPrecision) String ¶
func (p TimestampPrecision) String() string
Click to show internal directories.
Click to hide internal directories.