Documentation
¶
Index ¶
- Constants
- Variables
- type AccumulatedData
- type Accumulator
- func (a *Accumulator) Add(hash string) error
- func (a *Accumulator) GetName() string
- func (a *Accumulator) ProcessMapEntry(entry interface{}) bool
- func (d *Accumulator) SetTransport(transport Transport)
- func (d *Accumulator) Start()
- func (a *Accumulator) Stop()
- func (a *Accumulator) Store(item interface{}, ttl time.Duration) (string, error)
- func (a *Accumulator) StoreCustomHash(item interface{}, ttl time.Duration, hash string) error
- type Backend
- type DataProcessor
- type DataTransformerConf
- type DefaultTransportConfiguration
- type FlatOperation
- type Flattener
- type FlattenerPoint
- type HTTPTransport
- func (t *HTTPTransport) AccumulatedDataToDataChannelItem(point *AccumulatedData) (interface{}, error)
- func (t *HTTPTransport) AddJSONMapping(name string, p interface{}, variables ...string) error
- func (t *HTTPTransport) Close()
- func (t *HTTPTransport) ConfigureBackend(backend *Backend) error
- func (t *HTTPTransport) DataChannel(item interface{})
- func (t *HTTPTransport) DataChannelItemToAccumulatedData(configuration *DataTransformerConf, instance interface{}, calculateHash bool) (Hashable, error)
- func (t *HTTPTransport) DataChannelItemToFlattenerPoint(configuration *DataTransformerConf, instance interface{}, ...) (Hashable, error)
- func (t *HTTPTransport) FlattenerPointToDataChannelItem(point *FlattenerPoint) (interface{}, error)
- func (t *HTTPTransport) MatchType(tt transportType) bool
- func (t *HTTPTransport) PrintStackOnError() bool
- func (t *HTTPTransport) Serialize(item interface{}) (string, error)
- func (t *HTTPTransport) Start() error
- func (t *HTTPTransport) TransferData(dataList []interface{}) error
- type HTTPTransportConfig
- type Hashable
- type Manager
- func (m *Manager) FlattenHTTP(operation FlatOperation, name string, parameters ...interface{}) error
- func (m *Manager) FlattenOpenTSDB(operation FlatOperation, value float64, timestamp int64, metric string, ...) error
- func (m *Manager) GetTransport() Transport
- func (m *Manager) IncrementAccumulatedData(hash string) error
- func (m *Manager) SendHTTP(schemaName string, parameters ...interface{}) error
- func (m *Manager) SendOpenTSDB(value float64, timestamp int64, metric string, tags ...interface{}) error
- func (m *Manager) SerializeHTTP(schemaName string, parameters ...interface{}) (string, error)
- func (m *Manager) SerializeOpenTSDB(value float64, timestamp int64, metric string, tags ...interface{}) (string, error)
- func (m *Manager) Shutdown()
- func (m *Manager) Start() error
- func (m *Manager) StoreDataToAccumulateHTTP(ttl time.Duration, name string, parameters ...interface{}) (string, error)
- func (m *Manager) StoreDataToAccumulateOpenTSDB(ttl time.Duration, value float64, timestamp int64, metric string, ...) (string, error)
- func (m *Manager) StoreHashedDataToAccumulateHTTP(hash string, ttl time.Duration, name string, parameters ...interface{}) error
- func (m *Manager) StoreHashedDataToAccumulateOpenTSDB(hash string, ttl time.Duration, value float64, timestamp int64, metric string, ...) error
- type OpenTSDBTransport
- func (t *OpenTSDBTransport) AccumulatedDataToDataChannelItem(point *AccumulatedData) (interface{}, error)
- func (t *OpenTSDBTransport) Close()
- func (t *OpenTSDBTransport) ConfigureBackend(backend *Backend) error
- func (t *OpenTSDBTransport) DataChannel(item interface{})
- func (t *OpenTSDBTransport) DataChannelItemToAccumulatedData(configuration *DataTransformerConf, instance interface{}, calculateHash bool) (Hashable, error)
- func (t *OpenTSDBTransport) DataChannelItemToFlattenerPoint(configuration *DataTransformerConf, instance interface{}, ...) (Hashable, error)
- func (t *OpenTSDBTransport) FlattenerPointToDataChannelItem(point *FlattenerPoint) (interface{}, error)
- func (t *OpenTSDBTransport) MatchType(tt transportType) bool
- func (t *OpenTSDBTransport) PrintStackOnError() bool
- func (t *OpenTSDBTransport) Serialize(item interface{}) (string, error)
- func (t *OpenTSDBTransport) Start() error
- func (t *OpenTSDBTransport) TransferData(dataList []interface{}) error
- type OpenTSDBTransportConfig
- type Transport
Constants ¶
const ( // FlattenerName - the name FlattenerName string = "flattener" // Avg - aggregation Avg FlatOperation = 0 // Sum - aggregation Sum FlatOperation = 1 // Count - aggregation Count FlatOperation = 2 // Max - aggregation Max FlatOperation = 3 // Min - aggregation Min FlatOperation = 4 )
const ( // AccumulatorName - the name AccumulatorName string = "accumulator" )
Variables ¶
var ( // ErrNotStored - thrown when a hash was not stored ErrNotStored error = fmt.Errorf("hash is not stored") )
Functions ¶
This section is empty.
Types ¶
type AccumulatedData ¶ added in v1.1.0
type AccumulatedData struct {
// contains filtered or unexported fields
}
AccumulatedData - an accumulated point
func (*AccumulatedData) Execute ¶ added in v1.1.0
func (ad *AccumulatedData) Execute()
Execute - implements the Job interface
func (*AccumulatedData) GetHash ¶ added in v1.1.0
func (ad *AccumulatedData) GetHash() string
GetHash - returns the hash
type Accumulator ¶ added in v1.1.0
type Accumulator struct {
// contains filtered or unexported fields
}
Accumulator - the struct
func NewAccumulator ¶ added in v1.1.0
func NewAccumulator(configuration *DataTransformerConf) *Accumulator
NewAccumulator - creates a new instance
func (*Accumulator) Add ¶ added in v1.1.0
func (a *Accumulator) Add(hash string) error
Add - adds one more to the reference
func (*Accumulator) GetName ¶ added in v1.1.0
func (a *Accumulator) GetName() string
GetName - returns the processor's name
func (*Accumulator) ProcessMapEntry ¶ added in v1.1.0
func (a *Accumulator) ProcessMapEntry(entry interface{}) bool
ProcessMapEntry - sends the data to the transport
func (*Accumulator) SetTransport ¶ added in v1.1.0
func (d *Accumulator) SetTransport(transport Transport)
SetTransport - sets the transport
func (*Accumulator) Start ¶ added in v1.1.0
func (d *Accumulator) Start()
Start - starts the processor cycle
func (*Accumulator) Stop ¶ added in v1.1.0
func (a *Accumulator) Stop()
Stop - terminates the processing cycle
func (*Accumulator) Store ¶ added in v1.1.0
func (a *Accumulator) Store(item interface{}, ttl time.Duration) (string, error)
Store - stores a reference
func (*Accumulator) StoreCustomHash ¶ added in v1.3.0
func (a *Accumulator) StoreCustomHash(item interface{}, ttl time.Duration, hash string) error
StoreCustomHash - stores a custom reference
type DataProcessor ¶ added in v1.1.0
type DataProcessor interface {
// Start - starts the data processor
Start()
// Stop - stops the data processor
Stop()
// GetName - returns the processor's name
GetName() string
// SetTransport - sets the transport
SetTransport(transport Transport)
// ProcessMapEntry - process a map entry and return true to delete the entry
ProcessMapEntry(entry interface{}) (deleteAfter bool)
}
DataProcessor - a interface for data processors
type DataTransformerConf ¶ added in v1.1.0
type DataTransformerConf struct {
CycleDuration funks.Duration
HashingAlgorithm hashing.Algorithm
HashSize int
// contains filtered or unexported fields
}
DataTransformerConf - flattener configuration
type DefaultTransportConfiguration ¶
type DefaultTransportConfiguration struct {
TransportBufferSize int
BatchSendInterval funks.Duration
RequestTimeout funks.Duration
SerializerBufferSize int
DebugInput bool
DebugOutput bool
TimeBetweenBatches funks.Duration
PrintStackOnError bool
}
DefaultTransportConfiguration - the default fields used by the transport configuration
func (*DefaultTransportConfiguration) Validate ¶
func (c *DefaultTransportConfiguration) Validate() error
Validate - validates the default itens from the configuration
type Flattener ¶
type Flattener struct {
// contains filtered or unexported fields
}
Flattener - controls the timeline's point flattening
func NewFlattener ¶
func NewFlattener(configuration *DataTransformerConf) *Flattener
NewFlattener - creates a new flattener
func (*Flattener) Add ¶
func (f *Flattener) Add(point *FlattenerPoint) error
Add - adds a new entry to the flattening process
func (*Flattener) ProcessMapEntry ¶ added in v1.1.0
ProcessMapEntry - process the values from an entry
func (*Flattener) SetTransport ¶ added in v1.1.0
func (d *Flattener) SetTransport(transport Transport)
SetTransport - sets the transport
type FlattenerPoint ¶
type FlattenerPoint struct {
// contains filtered or unexported fields
}
FlattenerPoint - a flattener's point containing the value
func (*FlattenerPoint) GetHash ¶ added in v1.1.0
func (fp *FlattenerPoint) GetHash() string
GetHash - returns the hash
type HTTPTransport ¶
type HTTPTransport struct {
// contains filtered or unexported fields
}
HTTPTransport - implements the HTTP transport
func NewHTTPTransport ¶
func NewHTTPTransport(configuration *HTTPTransportConfig) (*HTTPTransport, error)
NewHTTPTransport - creates a new HTTP event manager
func (*HTTPTransport) AccumulatedDataToDataChannelItem ¶ added in v1.1.0
func (t *HTTPTransport) AccumulatedDataToDataChannelItem(point *AccumulatedData) (interface{}, error)
AccumulatedDataToDataChannelItem - converts the accumulated data to the data channel item
func (*HTTPTransport) AddJSONMapping ¶
func (t *HTTPTransport) AddJSONMapping(name string, p interface{}, variables ...string) error
AddJSONMapping - overrides the default generic property mappings
func (*HTTPTransport) ConfigureBackend ¶
func (t *HTTPTransport) ConfigureBackend(backend *Backend) error
ConfigureBackend - configures the backend
func (*HTTPTransport) DataChannel ¶
func (t *HTTPTransport) DataChannel(item interface{})
DataChannel - send a new point
func (*HTTPTransport) DataChannelItemToAccumulatedData ¶ added in v1.1.0
func (t *HTTPTransport) DataChannelItemToAccumulatedData(configuration *DataTransformerConf, instance interface{}, calculateHash bool) (Hashable, error)
DataChannelItemToAccumulatedData - converts the data channel item to the accumulated data
func (*HTTPTransport) DataChannelItemToFlattenerPoint ¶ added in v1.1.0
func (t *HTTPTransport) DataChannelItemToFlattenerPoint(configuration *DataTransformerConf, instance interface{}, operation FlatOperation) (Hashable, error)
DataChannelItemToFlattenerPoint - converts the data channel item to the flattened point one
func (*HTTPTransport) FlattenerPointToDataChannelItem ¶ added in v1.1.0
func (t *HTTPTransport) FlattenerPointToDataChannelItem(point *FlattenerPoint) (interface{}, error)
FlattenerPointToDataChannelItem - converts the flattened point to the data channel one
func (*HTTPTransport) MatchType ¶
func (t *HTTPTransport) MatchType(tt transportType) bool
MatchType - checks if this transport implementation matches the given type
func (*HTTPTransport) PrintStackOnError ¶ added in v1.6.1
func (t *HTTPTransport) PrintStackOnError() bool
PrintStackOnError - enables the stack print to the log
func (*HTTPTransport) Serialize ¶
func (t *HTTPTransport) Serialize(item interface{}) (string, error)
Serialize - renders the text using the configured serializer
func (*HTTPTransport) TransferData ¶
func (t *HTTPTransport) TransferData(dataList []interface{}) error
TransferData - transfers the data to the backend throught this transport
type HTTPTransportConfig ¶
type HTTPTransportConfig struct {
DefaultTransportConfiguration
ServiceEndpoint string
Method string
ExpectedResponseStatus int
TimestampProperty string
ValueProperty string
}
HTTPTransportConfig - has all HTTP event manager configurations
type Hashable ¶ added in v1.1.0
type Hashable interface {
// GetHash - return this instance hash
GetHash() string
}
Hashable - a struct with hash function
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager - the parent of all event managers
func NewManager ¶
func NewManager(transport Transport, flattener, accumulator DataProcessor, backend *Backend) (*Manager, error)
NewManager - creates a timeline manager
func (*Manager) FlattenHTTP ¶
func (m *Manager) FlattenHTTP(operation FlatOperation, name string, parameters ...interface{}) error
FlattenHTTP - flatten a point
func (*Manager) FlattenOpenTSDB ¶
func (m *Manager) FlattenOpenTSDB(operation FlatOperation, value float64, timestamp int64, metric string, tags ...interface{}) error
FlattenOpenTSDB - flatten a point
func (*Manager) GetTransport ¶
GetTransport - returns the configured transport
func (*Manager) IncrementAccumulatedData ¶ added in v1.1.0
IncrementAccumulatedData - stores a data to accumulate
func (*Manager) SendOpenTSDB ¶
func (m *Manager) SendOpenTSDB(value float64, timestamp int64, metric string, tags ...interface{}) error
SendOpenTSDB - sends a new data using the openTSDB transport
func (*Manager) SerializeHTTP ¶
SerializeHTTP - serializes a point using the json serializer
func (*Manager) SerializeOpenTSDB ¶
func (m *Manager) SerializeOpenTSDB(value float64, timestamp int64, metric string, tags ...interface{}) (string, error)
SerializeOpenTSDB - serializes a point using the opentsdb serializer
func (*Manager) StoreDataToAccumulateHTTP ¶ added in v1.1.0
func (m *Manager) StoreDataToAccumulateHTTP(ttl time.Duration, name string, parameters ...interface{}) (string, error)
StoreDataToAccumulateHTTP - stores a data to accumulate
func (*Manager) StoreDataToAccumulateOpenTSDB ¶ added in v1.1.0
func (m *Manager) StoreDataToAccumulateOpenTSDB(ttl time.Duration, value float64, timestamp int64, metric string, tags ...interface{}) (string, error)
StoreDataToAccumulateOpenTSDB - stores a data to accumulate
func (*Manager) StoreHashedDataToAccumulateHTTP ¶ added in v1.3.0
func (m *Manager) StoreHashedDataToAccumulateHTTP(hash string, ttl time.Duration, name string, parameters ...interface{}) error
StoreHashedDataToAccumulateHTTP - stores a data with custom hash to accumulate
func (*Manager) StoreHashedDataToAccumulateOpenTSDB ¶ added in v1.3.0
func (m *Manager) StoreHashedDataToAccumulateOpenTSDB(hash string, ttl time.Duration, value float64, timestamp int64, metric string, tags ...interface{}) error
StoreHashedDataToAccumulateOpenTSDB - stores a data with custom hash to accumulate
type OpenTSDBTransport ¶
type OpenTSDBTransport struct {
// contains filtered or unexported fields
}
OpenTSDBTransport - implements the openTSDB transport
func NewOpenTSDBTransport ¶
func NewOpenTSDBTransport(configuration *OpenTSDBTransportConfig) (*OpenTSDBTransport, error)
NewOpenTSDBTransport - creates a new openTSDB event manager
func (*OpenTSDBTransport) AccumulatedDataToDataChannelItem ¶ added in v1.1.0
func (t *OpenTSDBTransport) AccumulatedDataToDataChannelItem(point *AccumulatedData) (interface{}, error)
AccumulatedDataToDataChannelItem - converts the accumulated data to the data channel item
func (*OpenTSDBTransport) ConfigureBackend ¶
func (t *OpenTSDBTransport) ConfigureBackend(backend *Backend) error
ConfigureBackend - configures the backend
func (*OpenTSDBTransport) DataChannel ¶
func (t *OpenTSDBTransport) DataChannel(item interface{})
DataChannel - send a new point
func (*OpenTSDBTransport) DataChannelItemToAccumulatedData ¶ added in v1.1.0
func (t *OpenTSDBTransport) DataChannelItemToAccumulatedData(configuration *DataTransformerConf, instance interface{}, calculateHash bool) (Hashable, error)
DataChannelItemToAccumulatedData - converts the data channel item to the accumulated data
func (*OpenTSDBTransport) DataChannelItemToFlattenerPoint ¶ added in v1.1.0
func (t *OpenTSDBTransport) DataChannelItemToFlattenerPoint(configuration *DataTransformerConf, instance interface{}, operation FlatOperation) (Hashable, error)
DataChannelItemToFlattenerPoint - converts the data channel item to the flattened point one
func (*OpenTSDBTransport) FlattenerPointToDataChannelItem ¶ added in v1.1.0
func (t *OpenTSDBTransport) FlattenerPointToDataChannelItem(point *FlattenerPoint) (interface{}, error)
FlattenerPointToDataChannelItem - converts the flattened point to the data channel one
func (*OpenTSDBTransport) MatchType ¶
func (t *OpenTSDBTransport) MatchType(tt transportType) bool
MatchType - checks if this transport implementation matches the given type
func (*OpenTSDBTransport) PrintStackOnError ¶ added in v1.6.1
func (t *OpenTSDBTransport) PrintStackOnError() bool
PrintStackOnError - enables the stack print to the log
func (*OpenTSDBTransport) Serialize ¶
func (t *OpenTSDBTransport) Serialize(item interface{}) (string, error)
Serialize - renders the text using the configured serializer
func (*OpenTSDBTransport) Start ¶
func (t *OpenTSDBTransport) Start() error
Start - starts this transport
func (*OpenTSDBTransport) TransferData ¶
func (t *OpenTSDBTransport) TransferData(dataList []interface{}) error
TransferData - transfers the data to the backend throught this transport
type OpenTSDBTransportConfig ¶
type OpenTSDBTransportConfig struct {
DefaultTransportConfiguration
ReadBufferSize int
MaxReadTimeout funks.Duration
ReconnectionTimeout funks.Duration
MaxReconnectionRetries int
DisconnectAfterWrites bool
}
OpenTSDBTransportConfig - has all openTSDB event manager configurations
type Transport ¶
type Transport interface {
// Send - send a new point
DataChannel(item interface{})
// ConfigureBackend - configures the backend
ConfigureBackend(backend *Backend) error
// TransferData - transfers the data using this specific implementation
TransferData(dataList []interface{}) error
// Start - starts this transport
Start() error
// Close - closes this transport
Close()
// MatchType - checks if this transport implementation matches the given type
MatchType(tt transportType) bool
// Serialize - renders the text using the configured serializer
Serialize(item interface{}) (string, error)
// DataChannelItemToFlattenerPoint - converts the data channel item to the flattened point
DataChannelItemToFlattenerPoint(configuration *DataTransformerConf, item interface{}, operation FlatOperation) (Hashable, error)
// FlattenerPointToDataChannelItem - converts the flattened point to the data channel item
FlattenerPointToDataChannelItem(item *FlattenerPoint) (interface{}, error)
// DataChannelItemToAccumulatedData - converts the data channel item to the accumulated data
DataChannelItemToAccumulatedData(configuration *DataTransformerConf, item interface{}, calculateHash bool) (Hashable, error)
// AccumulatedDataToDataChannelItem - converts the accumulated data to the data channel item
AccumulatedDataToDataChannelItem(item *AccumulatedData) (interface{}, error)
// PrintStackOnError - enables the stack print to the log
PrintStackOnError() bool
}
Transport - the implementation type to send a event