timeline

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2020 License: MIT Imports: 17 Imported by: 1

README

timeline

A library to send points to some OpenTSDB.

Documentation

Index

Constants

View Source
const (
	// FlattenerName - the name
	FlattenerName string = "flattener"

	// Avg - aggregation
	Avg FlatOperation = 0

	// Sum - aggregation
	Sum FlatOperation = 1

	// Count - aggregation
	Count FlatOperation = 2

	// Max - aggregation
	Max FlatOperation = 3

	// Min - aggregation
	Min FlatOperation = 4
)
View Source
const (
	// AccumulatorName - the name
	AccumulatorName string = "accumulator"
)

Variables

View Source
var (
	// ErrNotStored - thrown when a hash was not stored
	ErrNotStored error = fmt.Errorf("hash is not stored")
)

Functions

This section is empty.

Types

type AccumulatedData added in v1.1.0

type AccumulatedData struct {
	// contains filtered or unexported fields
}

AccumulatedData - an accumulated point

func (*AccumulatedData) Execute added in v1.1.0

func (ad *AccumulatedData) Execute()

Execute - implements the Job interface

func (*AccumulatedData) GetHash added in v1.1.0

func (ad *AccumulatedData) GetHash() string

GetHash - returns the hash

type Accumulator added in v1.1.0

type Accumulator struct {
	// contains filtered or unexported fields
}

Accumulator - the struct

func NewAccumulator added in v1.1.0

func NewAccumulator(configuration *DataTransformerConf) *Accumulator

NewAccumulator - creates a new instance

func (*Accumulator) Add added in v1.1.0

func (a *Accumulator) Add(hash string) error

Add - adds one more to the reference

func (*Accumulator) GetName added in v1.1.0

func (a *Accumulator) GetName() string

GetName - returns the processor's name

func (*Accumulator) ProcessMapEntry added in v1.1.0

func (a *Accumulator) ProcessMapEntry(entry interface{}) bool

ProcessMapEntry - sends the data to the transport

func (*Accumulator) SetTransport added in v1.1.0

func (d *Accumulator) SetTransport(transport Transport)

SetTransport - sets the transport

func (*Accumulator) Start added in v1.1.0

func (d *Accumulator) Start()

Start - starts the processor cycle

func (*Accumulator) Stop added in v1.1.0

func (a *Accumulator) Stop()

Stop - terminates the processing cycle

func (*Accumulator) Store added in v1.1.0

func (a *Accumulator) Store(item interface{}, ttl time.Duration) (string, error)

Store - stores a reference

func (*Accumulator) StoreCustomHash added in v1.3.0

func (a *Accumulator) StoreCustomHash(item interface{}, ttl time.Duration, hash string) error

StoreCustomHash - stores a custom reference

type Backend

type Backend struct {
	Host string
	Port int
}

Backend - the destiny opentsdb backend

type DataProcessor added in v1.1.0

type DataProcessor interface {

	// Start - starts the data processor
	Start()

	// Stop - stops the data processor
	Stop()

	// GetName - returns the processor's name
	GetName() string

	// SetTransport - sets the transport
	SetTransport(transport Transport)

	// ProcessMapEntry - process a map entry and return true to delete the entry
	ProcessMapEntry(entry interface{}) (deleteAfter bool)
}

DataProcessor - a interface for data processors

type DataTransformerConf added in v1.1.0

type DataTransformerConf struct {
	CycleDuration    time.Duration
	HashingAlgorithm hashing.Algorithm
	HashSize         int
	// contains filtered or unexported fields
}

DataTransformerConf - flattener configuration

type DefaultTransportConfiguration

type DefaultTransportConfiguration struct {
	TransportBufferSize  int
	BatchSendInterval    time.Duration
	RequestTimeout       time.Duration
	SerializerBufferSize int
}

DefaultTransportConfiguration - the default fields used by the transport configuration

func (*DefaultTransportConfiguration) Validate

func (c *DefaultTransportConfiguration) Validate() error

Validate - validates the default itens from the configuration

type FlatOperation

type FlatOperation uint8

FlatOperation - the type of the aggregation used

type Flattener

type Flattener struct {
	// contains filtered or unexported fields
}

Flattener - controls the timeline's point flattening

func NewFlattener

func NewFlattener(configuration *DataTransformerConf) *Flattener

NewFlattener - creates a new flattener

func (*Flattener) Add

func (f *Flattener) Add(point *FlattenerPoint) error

Add - adds a new entry to the flattening process

func (*Flattener) GetName added in v1.1.0

func (f *Flattener) GetName() string

GetName - returns the processor's name

func (*Flattener) ProcessMapEntry added in v1.1.0

func (f *Flattener) ProcessMapEntry(entry interface{}) bool

ProcessMapEntry - process the values from an entry

func (*Flattener) SetTransport added in v1.1.0

func (d *Flattener) SetTransport(transport Transport)

SetTransport - sets the transport

func (*Flattener) Start

func (d *Flattener) Start()

Start - starts the processor cycle

func (*Flattener) Stop added in v1.1.0

func (d *Flattener) Stop()

Stop - terminates the processing cycle

type FlattenerPoint

type FlattenerPoint struct {
	// contains filtered or unexported fields
}

FlattenerPoint - a flattener's point containing the value

func (*FlattenerPoint) GetHash added in v1.1.0

func (fp *FlattenerPoint) GetHash() string

GetHash - returns the hash

type HTTPTransport

type HTTPTransport struct {
	// contains filtered or unexported fields
}

HTTPTransport - implements the HTTP transport

func NewHTTPTransport

func NewHTTPTransport(configuration *HTTPTransportConfig) (*HTTPTransport, error)

NewHTTPTransport - creates a new HTTP event manager

func (*HTTPTransport) AccumulatedDataToDataChannelItem added in v1.1.0

func (t *HTTPTransport) AccumulatedDataToDataChannelItem(point *AccumulatedData) (interface{}, error)

AccumulatedDataToDataChannelItem - converts the accumulated data to the data channel item

func (*HTTPTransport) AddJSONMapping

func (t *HTTPTransport) AddJSONMapping(name string, p interface{}, variables ...string) error

AddJSONMapping - overrides the default generic property mappings

func (*HTTPTransport) Close

func (t *HTTPTransport) Close()

Close - closes this transport

func (*HTTPTransport) ConfigureBackend

func (t *HTTPTransport) ConfigureBackend(backend *Backend) error

ConfigureBackend - configures the backend

func (*HTTPTransport) DataChannel

func (t *HTTPTransport) DataChannel() chan<- interface{}

DataChannel - send a new point

func (*HTTPTransport) DataChannelItemToAccumulatedData added in v1.1.0

func (t *HTTPTransport) DataChannelItemToAccumulatedData(configuration *DataTransformerConf, instance interface{}, calculateHash bool) (Hashable, error)

DataChannelItemToAccumulatedData - converts the data channel item to the accumulated data

func (*HTTPTransport) DataChannelItemToFlattenerPoint added in v1.1.0

func (t *HTTPTransport) DataChannelItemToFlattenerPoint(configuration *DataTransformerConf, instance interface{}, operation FlatOperation) (Hashable, error)

DataChannelItemToFlattenerPoint - converts the data channel item to the flattened point one

func (*HTTPTransport) FlattenerPointToDataChannelItem added in v1.1.0

func (t *HTTPTransport) FlattenerPointToDataChannelItem(point *FlattenerPoint) (interface{}, error)

FlattenerPointToDataChannelItem - converts the flattened point to the data channel one

func (*HTTPTransport) MatchType

func (t *HTTPTransport) MatchType(tt transportType) bool

MatchType - checks if this transport implementation matches the given type

func (*HTTPTransport) Serialize

func (t *HTTPTransport) Serialize(item interface{}) (string, error)

Serialize - renders the text using the configured serializer

func (*HTTPTransport) Start

func (t *HTTPTransport) Start() error

Start - starts this transport

func (*HTTPTransport) TransferData

func (t *HTTPTransport) TransferData(dataList []interface{}) error

TransferData - transfers the data to the backend throught this transport

type HTTPTransportConfig

type HTTPTransportConfig struct {
	DefaultTransportConfiguration
	ServiceEndpoint        string
	Method                 string
	ExpectedResponseStatus int
	TimestampProperty      string
	ValueProperty          string
}

HTTPTransportConfig - has all HTTP event manager configurations

type Hashable added in v1.1.0

type Hashable interface {

	// GetHash - return this instance hash
	GetHash() string
}

Hashable - a struct with hash function

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager - the parent of all event managers

func NewManager

func NewManager(transport Transport, flattener, accumulator DataProcessor, backend *Backend) (*Manager, error)

NewManager - creates a timeline manager

func (*Manager) FlattenHTTP

func (m *Manager) FlattenHTTP(operation FlatOperation, name string, parameters ...interface{}) error

FlattenHTTP - flatten a point

func (*Manager) FlattenOpenTSDB

func (m *Manager) FlattenOpenTSDB(operation FlatOperation, value float64, timestamp int64, metric string, tags ...interface{}) error

FlattenOpenTSDB - flatten a point

func (*Manager) GetTransport

func (m *Manager) GetTransport() Transport

GetTransport - returns the configured transport

func (*Manager) IncrementAccumulatedData added in v1.1.0

func (m *Manager) IncrementAccumulatedData(hash string) error

IncrementAccumulatedData - stores a data to accumulate

func (*Manager) SendHTTP

func (m *Manager) SendHTTP(schemaName string, parameters ...interface{}) error

SendHTTP - sends a new data using the http transport

func (*Manager) SendOpenTSDB

func (m *Manager) SendOpenTSDB(value float64, timestamp int64, metric string, tags ...interface{}) error

SendOpenTSDB - sends a new data using the openTSDB transport

func (*Manager) SerializeHTTP

func (m *Manager) SerializeHTTP(schemaName string, parameters ...interface{}) (string, error)

SerializeHTTP - serializes a point using the json serializer

func (*Manager) SerializeOpenTSDB

func (m *Manager) SerializeOpenTSDB(value float64, timestamp int64, metric string, tags ...interface{}) (string, error)

SerializeOpenTSDB - serializes a point using the opentsdb serializer

func (*Manager) Shutdown

func (m *Manager) Shutdown()

Shutdown - shuts down the transport

func (*Manager) Start

func (m *Manager) Start() error

Start - starts the manager

func (*Manager) StoreDataToAccumulateHTTP added in v1.1.0

func (m *Manager) StoreDataToAccumulateHTTP(ttl time.Duration, name string, parameters ...interface{}) (string, error)

StoreDataToAccumulateHTTP - stores a data to accumulate

func (*Manager) StoreDataToAccumulateOpenTSDB added in v1.1.0

func (m *Manager) StoreDataToAccumulateOpenTSDB(ttl time.Duration, value float64, timestamp int64, metric string, tags ...interface{}) (string, error)

StoreDataToAccumulateOpenTSDB - stores a data to accumulate

func (*Manager) StoreHashedDataToAccumulateHTTP added in v1.3.0

func (m *Manager) StoreHashedDataToAccumulateHTTP(hash string, ttl time.Duration, name string, parameters ...interface{}) error

StoreHashedDataToAccumulateHTTP - stores a data with custom hash to accumulate

func (*Manager) StoreHashedDataToAccumulateOpenTSDB added in v1.3.0

func (m *Manager) StoreHashedDataToAccumulateOpenTSDB(hash string, ttl time.Duration, value float64, timestamp int64, metric string, tags ...interface{}) error

StoreHashedDataToAccumulateOpenTSDB - stores a data with custom hash to accumulate

type NumberPoint

type NumberPoint struct {
	Point
	Value float64 `json:"value"`
}

NumberPoint - a point with number type value

type OpenTSDBTransport

type OpenTSDBTransport struct {
	// contains filtered or unexported fields
}

OpenTSDBTransport - implements the openTSDB transport

func NewOpenTSDBTransport

func NewOpenTSDBTransport(configuration *OpenTSDBTransportConfig) (*OpenTSDBTransport, error)

NewOpenTSDBTransport - creates a new openTSDB event manager

func (*OpenTSDBTransport) AccumulatedDataToDataChannelItem added in v1.1.0

func (t *OpenTSDBTransport) AccumulatedDataToDataChannelItem(point *AccumulatedData) (interface{}, error)

AccumulatedDataToDataChannelItem - converts the accumulated data to the data channel item

func (*OpenTSDBTransport) Close

func (t *OpenTSDBTransport) Close()

Close - closes this transport

func (*OpenTSDBTransport) ConfigureBackend

func (t *OpenTSDBTransport) ConfigureBackend(backend *Backend) error

ConfigureBackend - configures the backend

func (*OpenTSDBTransport) DataChannel

func (t *OpenTSDBTransport) DataChannel() chan<- interface{}

DataChannel - send a new point

func (*OpenTSDBTransport) DataChannelItemToAccumulatedData added in v1.1.0

func (t *OpenTSDBTransport) DataChannelItemToAccumulatedData(configuration *DataTransformerConf, instance interface{}, calculateHash bool) (Hashable, error)

DataChannelItemToAccumulatedData - converts the data channel item to the accumulated data

func (*OpenTSDBTransport) DataChannelItemToFlattenerPoint added in v1.1.0

func (t *OpenTSDBTransport) DataChannelItemToFlattenerPoint(configuration *DataTransformerConf, instance interface{}, operation FlatOperation) (Hashable, error)

DataChannelItemToFlattenerPoint - converts the data channel item to the flattened point one

func (*OpenTSDBTransport) FlattenerPointToDataChannelItem added in v1.1.0

func (t *OpenTSDBTransport) FlattenerPointToDataChannelItem(point *FlattenerPoint) (interface{}, error)

FlattenerPointToDataChannelItem - converts the flattened point to the data channel one

func (*OpenTSDBTransport) MatchType

func (t *OpenTSDBTransport) MatchType(tt transportType) bool

MatchType - checks if this transport implementation matches the given type

func (*OpenTSDBTransport) Serialize

func (t *OpenTSDBTransport) Serialize(item interface{}) (string, error)

Serialize - renders the text using the configured serializer

func (*OpenTSDBTransport) Start

func (t *OpenTSDBTransport) Start() error

Start - starts this transport

func (*OpenTSDBTransport) TransferData

func (t *OpenTSDBTransport) TransferData(dataList []interface{}) error

TransferData - transfers the data to the backend throught this transport

type OpenTSDBTransportConfig

type OpenTSDBTransportConfig struct {
	DefaultTransportConfiguration
	MaxReadTimeout      time.Duration
	ReconnectionTimeout time.Duration
}

OpenTSDBTransportConfig - has all openTSDB event manager configurations

type Point

type Point struct {
	Metric    string            `json:"metric"`
	Tags      map[string]string `json:"tags"`
	Timestamp int64             `json:"timestamp"`
}

Point - the base point

type TextPoint

type TextPoint struct {
	Point
	Text string `json:"text"`
}

TextPoint - a point with text type value

type Transport

type Transport interface {

	// Send - send a new point
	DataChannel() chan<- interface{}

	// ConfigureBackend - configures the backend
	ConfigureBackend(backend *Backend) error

	// TransferData - transfers the data using this specific implementation
	TransferData(dataList []interface{}) error

	// Start - starts this transport
	Start() error

	// Close - closes this transport
	Close()

	// MatchType - checks if this transport implementation matches the given type
	MatchType(tt transportType) bool

	// Serialize - renders the text using the configured serializer
	Serialize(item interface{}) (string, error)

	// DataChannelItemToFlattenerPoint - converts the data channel item to the flattened point
	DataChannelItemToFlattenerPoint(configuration *DataTransformerConf, item interface{}, operation FlatOperation) (Hashable, error)

	// FlattenerPointToDataChannelItem - converts the flattened point to the data channel item
	FlattenerPointToDataChannelItem(item *FlattenerPoint) (interface{}, error)

	// DataChannelItemToAccumulatedData - converts the data channel item to the accumulated data
	DataChannelItemToAccumulatedData(configuration *DataTransformerConf, item interface{}, calculateHash bool) (Hashable, error)

	// AccumulatedDataToDataChannelItem - converts the accumulated data to the data channel item
	AccumulatedDataToDataChannelItem(item *AccumulatedData) (interface{}, error)
}

Transport - the implementation type to send a event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL