Documentation
¶
Overview ¶
Package edge provides mechanisms for message passing along edges. Several composable interfaces are defined to aid in implementing a node which consumes messages from an edge.
Index ¶
- Variables
- func Forward(outs []StatsEdge, msg Message) error
- func NewDeleteGroupMessage(info GroupInfo) *deleteGroupMessage
- type BarrierMessage
- type BatchBuffer
- type BatchPointMessage
- type BatchPointMessages
- type BeginBatchMessage
- type BufferedBatchMessage
- type BufferedBatchMessageDecoder
- type BufferedReceiver
- type Consumer
- type DeleteGroupMessage
- type DimensionGetter
- type DimensionSetter
- type Edge
- type EndBatchMessage
- type FieldGetter
- type FieldSetter
- type FieldsTagsTimeGetter
- type FieldsTagsTimeGetterMessage
- type FieldsTagsTimeSetter
- type ForwardBufferedReceiver
- type ForwardReceiver
- type GroupIDGetter
- type GroupInfo
- type GroupInfoer
- type GroupStats
- type GroupedConsumer
- type GroupedReceiver
- type Message
- type MessageType
- type MultiReceiver
- type NameGetter
- type NameSetter
- type PointMessage
- type PointMeta
- type Receiver
- type StatsEdge
- type TagGetter
- type TagSetter
- type TimeGetter
- type TimeSetter
Constants ¶
This section is empty.
Variables ¶
var ErrAborted = errors.New("edge aborted")
ErrAborted is returned from the Edge interface when operations are performed on the edge after it has been aborted.
Functions ¶
func NewDeleteGroupMessage ¶ added in v1.5.0
func NewDeleteGroupMessage(info GroupInfo) *deleteGroupMessage
Types ¶
type BarrierMessage ¶
type BarrierMessage interface {
Message
ShallowCopy() BarrierMessage
GroupInfoer
NameGetter
DimensionGetter
TagGetter
TimeGetter
}
BarrierMessage indicates that no data older than the barrier time will arrive.
func NewBarrierMessage ¶
func NewBarrierMessage(group GroupInfo, time time.Time) BarrierMessage
type BatchBuffer ¶
type BatchBuffer struct {
// contains filtered or unexported fields
}
BatchBuffer buffers batch messages into a BufferedBatchMessage.
func (*BatchBuffer) BatchPoint ¶
func (r *BatchBuffer) BatchPoint(bp BatchPointMessage) error
func (*BatchBuffer) BeginBatch ¶
func (r *BatchBuffer) BeginBatch(begin BeginBatchMessage) error
func (*BatchBuffer) BufferedBatchMessage ¶
func (r *BatchBuffer) BufferedBatchMessage(end EndBatchMessage) BufferedBatchMessage
type BatchPointMessage ¶
type BatchPointMessage interface {
Message
ShallowCopy() BatchPointMessage
FieldsTagsTimeSetter
}
BatchPointMessage is a single point in a batch of data.
func BatchPointFromPoint ¶
func BatchPointFromPoint(p PointMessage) BatchPointMessage
func NewBatchPointMessage ¶
type BatchPointMessages ¶
type BatchPointMessages []BatchPointMessage
func (BatchPointMessages) Len ¶
func (l BatchPointMessages) Len() int
func (BatchPointMessages) Swap ¶
func (l BatchPointMessages) Swap(i int, j int)
type BeginBatchMessage ¶
type BeginBatchMessage interface {
Message
ShallowCopy() BeginBatchMessage
NameSetter
GroupInfoer
TagSetter
DimensionSetter
SetTagsAndDimensions(models.Tags, models.Dimensions)
// Time is the maximum time of any point in the batch
TimeSetter
// SizeHint provides a hint about the size of the batch to come.
// If non-zero expect a batch with SizeHint points,
// otherwise an unknown number of points are coming.
SizeHint() int
SetSizeHint(int)
}
BeginBatchMessage marks the beginning of a batch of points. Once a BeginBatchMessage is received all subsequent message will be BatchPointMessages until an EndBatchMessage is received.
func NewBeginBatchMessage ¶
type BufferedBatchMessage ¶
type BufferedBatchMessage interface {
Message
ShallowCopy() BufferedBatchMessage
Begin() BeginBatchMessage
SetBegin(BeginBatchMessage)
// Expose common read interfaces of begin and point messages.
PointMeta
Points() []BatchPointMessage
SetPoints([]BatchPointMessage)
End() EndBatchMessage
SetEnd(EndBatchMessage)
ToResult() models.Result
ToRow() *models.Row
}
BufferedBatchMessage is a message containing all data for a single batch.
func NewBufferedBatchMessage ¶
func NewBufferedBatchMessage( begin BeginBatchMessage, points []BatchPointMessage, end EndBatchMessage, ) BufferedBatchMessage
func ResultToBufferedBatches ¶
func ResultToBufferedBatches(res influxdb.Result, groupByName bool) ([]BufferedBatchMessage, error)
type BufferedBatchMessageDecoder ¶
type BufferedBatchMessageDecoder interface {
Decode() (BufferedBatchMessage, error)
More() bool
}
func NewBufferedBatchMessageDecoder ¶
func NewBufferedBatchMessageDecoder(r io.Reader) BufferedBatchMessageDecoder
type BufferedReceiver ¶
type BufferedReceiver interface {
Receiver
// BufferedBatch processes an entire buffered batch.
// Do not modify the batch or the slice of Points as it is shared.
BufferedBatch(batch BufferedBatchMessage) error
}
type Consumer ¶
type Consumer interface {
// Consume reads messages off an edge until the edge is closed or aborted.
// An error is returned if either the edge or receiver errors.
Consume() error
}
Consumer reads messages off an edge and passes them to a receiver.
func NewConsumerWithReceiver ¶
NewConsumerWithReceiver creates a new consumer for the edge e and receiver r.
func NewMultiConsumer ¶
func NewMultiConsumer(ins []Edge, r MultiReceiver) Consumer
func NewMultiConsumerWithStats ¶
func NewMultiConsumerWithStats(ins []StatsEdge, r MultiReceiver) Consumer
type DeleteGroupMessage ¶
type DeleteGroupMessage interface {
Message
GroupInfoer
}
type DimensionGetter ¶
type DimensionGetter interface {
Dimensions() models.Dimensions
}
type DimensionSetter ¶
type DimensionSetter interface {
DimensionGetter
SetDimensions(models.Dimensions)
}
type Edge ¶
type Edge interface {
// Collect instructs the edge to accept a new message.
Collect(Message) error
// Emit blocks until a message is available and returns it or returns false if the edge has been closed or aborted.
Emit() (Message, bool)
// Close stops the edge, all messages currently buffered will be processed.
// Future calls to Collect will panic.
Close() error
// Abort immediately stops the edge and all currently buffered messages are dropped.
// Future calls to Collect return the error ErrAborted.
Abort()
// Type indicates whether the edge will emit stream or batch data.
Type() pipeline.EdgeType
}
Edge represents the connection between two nodes that communicate via messages. Edge communication is unidirectional and asynchronous. Edges are safe for concurrent use.
type EndBatchMessage ¶
type EndBatchMessage interface {
Message
ShallowCopy() EndBatchMessage
}
EndBatchMessage indicates that all points for a batch have arrived.
func NewEndBatchMessage ¶
func NewEndBatchMessage() EndBatchMessage
type FieldGetter ¶
type FieldSetter ¶
type FieldSetter interface {
FieldGetter
SetFields(models.Fields)
}
type FieldsTagsTimeGetter ¶
type FieldsTagsTimeGetter interface {
FieldGetter
TagGetter
TimeGetter
}
type FieldsTagsTimeGetterMessage ¶
type FieldsTagsTimeGetterMessage interface {
Message
FieldsTagsTimeGetter
}
type FieldsTagsTimeSetter ¶
type FieldsTagsTimeSetter interface {
FieldSetter
TagSetter
TimeSetter
}
type ForwardBufferedReceiver ¶
type ForwardBufferedReceiver interface {
ForwardReceiver
BufferedBatch(batch BufferedBatchMessage) (Message, error)
}
ForwardBufferedReceiver handles messages as they arrive and can return a message to be forwarded to output edges. If a returned messages is nil, no message is forwarded.
type ForwardReceiver ¶
type ForwardReceiver interface {
BeginBatch(begin BeginBatchMessage) (Message, error)
BatchPoint(bp BatchPointMessage) (Message, error)
EndBatch(end EndBatchMessage) (Message, error)
Point(p PointMessage) (Message, error)
Barrier(b BarrierMessage) (Message, error)
DeleteGroup(d DeleteGroupMessage) (Message, error)
// Done is called once the receiver will no longer receive any messages.
Done()
}
ForwardReceiver handles messages as they arrive and can return a message to be forwarded to output edges. If a returned messages is nil, no message is forwarded.
func NewTimedForwardReceiver ¶
func NewTimedForwardReceiver(t timer.Timer, r ForwardReceiver) ForwardReceiver
NewTimedForwardReceiver creates a forward receiver which times the time spent in r.
type GroupIDGetter ¶
type GroupInfoer ¶
type GroupInfoer interface {
GroupIDGetter
GroupInfo() GroupInfo
}
type GroupStats ¶
GroupStats represents the statistics for a specific group.
type GroupedConsumer ¶
type GroupedConsumer interface {
Consumer
// CardinalityVar is an exported var that indicates the current number of groups being managed.
CardinalityVar() expvar.IntVar
}
GroupedConsumer reads messages off an edge and passes them by group to receivers created from a grouped receiver.
func NewGroupedConsumer ¶
func NewGroupedConsumer(e Edge, r GroupedReceiver) GroupedConsumer
NewGroupedConsumer creates a new grouped consumer for edge e and grouped receiver r.
type GroupedReceiver ¶
type GroupedReceiver interface {
// NewGroup signals that a new group has been discovered in the data.
// Information on the group and the message that first triggered its creation are provided.
NewGroup(group GroupInfo, first PointMeta) (Receiver, error)
}
GroupedReceiver creates and deletes receivers as groups are created and deleted.
type Message ¶
type Message interface {
// Type returns the type of the message.
Type() MessageType
}
Message represents data to be passed along an edge. Messages can be shared across many contexts.
All messages implement their own ShallowCopy method. All ShallowCopy methods create a copy of the message but does not deeply copy any reference types.
Never mutate a reference type returned from a message without first directly copying the reference type.
type MessageType ¶
type MessageType int
const ( BeginBatch MessageType = iota BatchPoint EndBatch BufferedBatch Point Barrier DeleteGroup )
func (MessageType) String ¶
func (m MessageType) String() string
type MultiReceiver ¶
type MultiReceiver interface {
BufferedBatch(src int, batch BufferedBatchMessage) error
Point(src int, p PointMessage) error
Barrier(src int, b BarrierMessage) error
Delete(src int, d DeleteGroupMessage) error
Finish() error
}
type NameGetter ¶
type NameGetter interface {
Name() string
}
type NameSetter ¶
type NameSetter interface {
NameGetter
SetName(string)
}
type PointMessage ¶
type PointMessage interface {
Message
ShallowCopy() PointMessage
NameSetter
Database() string
SetDatabase(string)
RetentionPolicy() string
SetRetentionPolicy(string)
GroupInfoer
DimensionSetter
SetTagsAndDimensions(models.Tags, models.Dimensions)
FieldsTagsTimeSetter
Bytes(precision string) []byte
ToResult() models.Result
ToRow() *models.Row
}
PointMessage is a single point.
func NewPointMessage ¶
func NewPointMessage( name, database, retentionPolicy string, dimensions models.Dimensions, fields models.Fields, tags models.Tags, time time.Time) PointMessage
type PointMeta ¶
type PointMeta interface {
NameGetter
GroupInfoer
DimensionGetter
TagGetter
TimeGetter
}
PointMeta is the common read interfaces of point and batch messages.
type Receiver ¶
type Receiver interface {
BeginBatch(begin BeginBatchMessage) error
BatchPoint(bp BatchPointMessage) error
EndBatch(end EndBatchMessage) error
Point(p PointMessage) error
Barrier(b BarrierMessage) error
DeleteGroup(d DeleteGroupMessage) error
// Done is called once the receiver will no longer receive any messages.
Done()
}
Receiver handles messages as they arrive via a consumer.
func NewReceiverFromForwardReceiver ¶
func NewReceiverFromForwardReceiver(outs []Edge, r ForwardReceiver) Receiver
NewReceiverFromForwardReceiver creates a new receiver from the provided list of edges and forward receiver.
func NewReceiverFromForwardReceiverWithStats ¶
func NewReceiverFromForwardReceiverWithStats(outs []StatsEdge, r ForwardReceiver) Receiver
NewReceiverFromForwardReceiverWithStats creates a new receiver from the provided list of stats edges and forward receiver.
type StatsEdge ¶
type StatsEdge interface {
Edge
// Collected returns the number of messages collected by this edge.
Collected() int64
// Emitted returns the number of messages emitted by this edge.
Emitted() int64
// CollectedVar is an exported var the represents the number of messages collected by this edge.
CollectedVar() expvar.IntVar
// EmittedVar is an exported var the represents the number of messages emitted by this edge.
EmittedVar() expvar.IntVar
// ReadGroupStats allows for the reading of the current statistics by group.
ReadGroupStats(func(*GroupStats))
}
StatsEdge is an edge that tracks various statistics about message passing through the edge.
func NewStatsEdge ¶
NewStatsEdge creates an edge that tracks statistics about the message passing through the edge.
type TimeGetter ¶
type TimeSetter ¶
type TimeSetter interface {
TimeGetter
SetTime(time.Time)
}