Documentation
¶
Index ¶
- type Subscriber
- type SubscriberBase
- func (sb *SubscriberBase) ActiveSignalIndexCache() *transport.SignalIndexCache
- func (sb *SubscriberBase) AdjustedValue(measurement *transport.Measurement) float64
- func (sb *SubscriberBase) ConfigurationChanged()
- func (sb *SubscriberBase) Connect()
- func (sb *SubscriberBase) ConnectionEstablished()
- func (sb *SubscriberBase) ConnectionTerminated()
- func (sb *SubscriberBase) DataStartTime(startTime time.Time)
- func (sb *SubscriberBase) Disconnect()
- func (sb *SubscriberBase) Dispose()
- func (sb *SubscriberBase) ErrorMessage(message string)
- func (sb *SubscriberBase) HistoricalReadComplete()
- func (sb *SubscriberBase) IsConnected() bool
- func (sb *SubscriberBase) IsSubscribed() bool
- func (sb *SubscriberBase) LookupMetadata(signalID guid.Guid) *transport.MeasurementMetadata
- func (sb *SubscriberBase) Metadata(measurement *transport.Measurement) *transport.MeasurementMetadata
- func (sb *SubscriberBase) ReceivedMetadata(metadata []byte)
- func (sb *SubscriberBase) ReceivedNewBufferBlocks(bufferBlocks []transport.BufferBlock)
- func (sb *SubscriberBase) ReceivedNewMeasurements(measurements []transport.Measurement)
- func (sb *SubscriberBase) ReceivedNotification(notification string)
- func (sb *SubscriberBase) RequestMetadata()
- func (sb *SubscriberBase) StatusMessage(message string)
- func (sb *SubscriberBase) Subscribe()
- func (sb *SubscriberBase) SubscriberID() guid.Guid
- func (sb *SubscriberBase) Subscription() *transport.SubscriptionInfo
- func (sb *SubscriberBase) SubscriptionUpdated(signalIndexCache *transport.SignalIndexCache)
- func (sb *SubscriberBase) TotalCommandChannelBytesReceived() uint64
- func (sb *SubscriberBase) TotalDataChannelBytesReceived() uint64
- func (sb *SubscriberBase) TotalMeasurementsReceived() uint64
- func (sb *SubscriberBase) Unsubscribe()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Subscriber ¶
type Subscriber interface {
// StatusMessage handles informational message logging.
StatusMessage(message string)
// ErrorMessage handles error message logging.
ErrorMessage(message string)
// ReceivedMetadata handles reception of the metadata response.
ReceivedMetadata(metadata []byte)
// SubscriptionUpdated handles notifications that a new SignalIndexCache has been received.
SubscriptionUpdated(signalIndexCache *transport.SignalIndexCache)
// DataStartTime handles notifications of first received measurement. This can be useful in
// cases where SubscriptionInfo.IncludeTime has been set to false.
DataStartTime(startTime time.Time)
// ConfigurationChanged handles notifications that the publisher configuration has changed.
ConfigurationChanged()
// ReceivedNewMeasurements handles reception of new measurements.
ReceivedNewMeasurements(measurements []transport.Measurement)
// ReceivedNewBufferBlocks handles reception of new buffer blocks.
ReceivedNewBufferBlocks(bufferBlocks []transport.BufferBlock)
// ReceivedNotification handles reception of a notification.
ReceivedNotification(notification string)
// HistoricalReadComplete handles notification that temporal processing has completed,
// i.e., the end of a historical playback data stream has been reached.
HistoricalReadComplete()
// ConnectionEstablished handles notification that a connection has been established.
ConnectionEstablished()
// ConnectionTerminated handles notification that a connection has been terminated.
ConnectionTerminated()
}
Subscriber defines the primary functionality of an STTP data subscription. Struct implementations of this interface that embed the SubscriberBase as a composite field will inherit a default implementation of all required interface methods. This allows implementations to focus only on needed functionality.
type SubscriberBase ¶
type SubscriberBase struct {
// Hostname is the DataPublisher DNS name or IP.
Hostname string
// Port it the TCP/IP listening port of the DataPublisher.
Port uint16
// MaxRetries defines the maximum number of times to retry a connection.
// Set value to -1 to retry infinitely.
MaxRetries int32
// RetryInterval defines the base retry interval, in milliseconds. Retries will
// exponentially back-off starting from this interval.
RetryInterval int32
// MaxRetryInterval defines the maximum retry interval, in milliseconds.
MaxRetryInterval int32
// AutoReconnect defines flag that determines if connections should be
// automatically reattempted.
AutoReconnect bool
// AutoRequestMetadata defines the flag that determines if metadata should be
// automatically requested upon successful connection. When true, metadata will
// be requested upon connection before subscription; otherwise, any metadata
// operations must be handled manually.
AutoRequestMetadata bool
// AutoSubscribe defines the flag that determines if subscription should be
// handled automatically upon successful connection. When AutoRequestMetadata
// is true and AutoSubscribe is true, subscription will occur after reception
// of metadata. When AutoRequestMetadata is false and AutoSubscribe is true,
// subscription will occur at successful connection. When AutoSubscribe is
// false, any subscribe operations must be handled manually.
AutoSubscribe bool
// CompressPayloadData determines whether payload data is compressed.
CompressPayloadData bool
// CompressMetadata determines whether the metadata transfer is compressed.
CompressMetadata bool
// CompressSignalIndexCache determines whether the signal index cache is compressed.
CompressSignalIndexCache bool
// MetadataFilters defines any filters to be applied to incoming metadata to reduce total
// received metadata. Each filter expression should be separated by semi-colons.
MetadataFilters string
// Version defines the target STTP protocol version. This currently defaults to 2.
Version byte
// contains filtered or unexported fields
}
SubscriberBase provides the default functionality for a Subscriber implementation.
func NewSubscriberBase ¶
func NewSubscriberBase(subscriber Subscriber) SubscriberBase
NewSubscriberBase creates a new SubscriberBase with specified Subscriber.
func (*SubscriberBase) ActiveSignalIndexCache ¶
func (sb *SubscriberBase) ActiveSignalIndexCache() *transport.SignalIndexCache
ActiveSignalIndexCache gets the active signal index cache.
func (*SubscriberBase) AdjustedValue ¶ added in v0.3.0
func (sb *SubscriberBase) AdjustedValue(measurement *transport.Measurement) float64
AdjustedValue gets the Value of a Measurement with any linear adjustments applied from the measurement's Adder and Multiplier metadata, if found.
func (*SubscriberBase) ConfigurationChanged ¶
func (sb *SubscriberBase) ConfigurationChanged()
ConfigurationChanged implements the default handler for notifications that the data publisher configuration has changed.
func (*SubscriberBase) Connect ¶
func (sb *SubscriberBase) Connect()
Connect starts the connection cycle to an STTP publisher. When AutoReconnect is true, the connection will automatically be retried when the connection drops. If AutoRequestMetadata is true, then upon successful connection, meta-data will be requested. When AutoRequestMetadata is true and AutoSubscribe is true, subscription will occur after reception of metadata. When AutoRequestMetadata is false and AutoSubscribe is true, subscription will occur at successful connection.
func (*SubscriberBase) ConnectionEstablished ¶
func (sb *SubscriberBase) ConnectionEstablished()
ConnectionEstablished implements the default handler for notification that a connection has been established. Default implementation simply writes connection feedback to StatusMessage handler.
func (*SubscriberBase) ConnectionTerminated ¶
func (sb *SubscriberBase) ConnectionTerminated()
ConnectionTerminated implements the default handler for notification that a connection has been terminated. Default implementation simply writes connection terminated feedback to ErrorMessage handler.
func (*SubscriberBase) DataStartTime ¶
func (sb *SubscriberBase) DataStartTime(startTime time.Time)
DataStartTime implements the default handler for notifications of first received measurement.
func (*SubscriberBase) Disconnect ¶
func (sb *SubscriberBase) Disconnect()
Disconnect disconnects from an STTP publisher.
func (*SubscriberBase) Dispose ¶
func (sb *SubscriberBase) Dispose()
Dispose cleanly shuts down a DataSubscriber that is no longer being used, e.g., during a normal application exit.
func (*SubscriberBase) ErrorMessage ¶
func (sb *SubscriberBase) ErrorMessage(message string)
ErrorMessage implements the default handler for error message logging. Default implementation synchronously writes output to to stderr. Logging is recommended.
func (*SubscriberBase) HistoricalReadComplete ¶
func (sb *SubscriberBase) HistoricalReadComplete()
HistoricalReadComplete implements the default handler for notification that temporal processing has completed, i.e., the end of a historical playback data stream has been reached.
func (*SubscriberBase) IsConnected ¶ added in v0.2.0
func (sb *SubscriberBase) IsConnected() bool
IsConnected determines if Subscriber is currently connected to a data publisher.
func (*SubscriberBase) IsSubscribed ¶ added in v0.2.0
func (sb *SubscriberBase) IsSubscribed() bool
IsSubscribed determines if Subscriber is currently subscribed to a data stream.
func (*SubscriberBase) LookupMetadata ¶ added in v0.3.0
func (sb *SubscriberBase) LookupMetadata(signalID guid.Guid) *transport.MeasurementMetadata
LookupMetadata gets the MeasurementMetadata for the specified signalID from the local registry. If the metadata does not exist, a new record is created and returned.
func (*SubscriberBase) Metadata ¶ added in v0.3.0
func (sb *SubscriberBase) Metadata(measurement *transport.Measurement) *transport.MeasurementMetadata
Metadata gets the MeasurementMetadata associated with a measurement from the local registry. If the metadata does not exist, a new record is created and returned.
func (*SubscriberBase) ReceivedMetadata ¶
func (sb *SubscriberBase) ReceivedMetadata(metadata []byte)
ReceivedMetadata implements the default handler for reception of the metadata response.
func (*SubscriberBase) ReceivedNewBufferBlocks ¶
func (sb *SubscriberBase) ReceivedNewBufferBlocks(bufferBlocks []transport.BufferBlock)
ReceivedNewBufferBlocks implements the default handler for reception of new buffer blocks.
func (*SubscriberBase) ReceivedNewMeasurements ¶
func (sb *SubscriberBase) ReceivedNewMeasurements(measurements []transport.Measurement)
ReceivedNewMeasurements implements the default handler for reception of new measurements.
func (*SubscriberBase) ReceivedNotification ¶
func (sb *SubscriberBase) ReceivedNotification(notification string)
ReceivedNotification implements the default handler for reception of a notification.
func (*SubscriberBase) RequestMetadata ¶ added in v0.2.0
func (sb *SubscriberBase) RequestMetadata()
RequestMetadata sends a request to the data publisher indicating that the Subscriber would like new metadata. Any defined MetadataFilters will be included in request.
func (*SubscriberBase) StatusMessage ¶
func (sb *SubscriberBase) StatusMessage(message string)
StatusMessage implements the default handler for informational message logging. Default implementation synchronously writes output to stdio. Logging is recommended.
func (*SubscriberBase) Subscribe ¶ added in v0.2.0
func (sb *SubscriberBase) Subscribe()
Subscribe sends a request to the data publisher indicating that the Subscriber would like to start receiving streaming data. Subscribe parameters are controlled by the SubscriptionInfo fields available through the Subscription receiver method.
func (*SubscriberBase) SubscriberID ¶
func (sb *SubscriberBase) SubscriberID() guid.Guid
SubscriberID gets the subscriber ID as assigned by the data publisher upon receipt of the SignalIndexCache.
func (*SubscriberBase) Subscription ¶
func (sb *SubscriberBase) Subscription() *transport.SubscriptionInfo
Subscription gets subscription related settings for Subscriber.
func (*SubscriberBase) SubscriptionUpdated ¶
func (sb *SubscriberBase) SubscriptionUpdated(signalIndexCache *transport.SignalIndexCache)
SubscriptionUpdated implements the default handler for notifications that a new SignalIndexCache has been received.
func (*SubscriberBase) TotalCommandChannelBytesReceived ¶
func (sb *SubscriberBase) TotalCommandChannelBytesReceived() uint64
TotalCommandChannelBytesReceived gets the total number of bytes received via the command channel since last connection.
func (*SubscriberBase) TotalDataChannelBytesReceived ¶
func (sb *SubscriberBase) TotalDataChannelBytesReceived() uint64
TotalDataChannelBytesReceived gets the total number of bytes received via the data channel since last connection.
func (*SubscriberBase) TotalMeasurementsReceived ¶
func (sb *SubscriberBase) TotalMeasurementsReceived() uint64
TotalMeasurementsReceived gets the total number of measurements received since last subscription.
func (*SubscriberBase) Unsubscribe ¶ added in v0.2.0
func (sb *SubscriberBase) Unsubscribe()
Unsubscribe sends a request to the data publisher indicating that the Subscriber would like to stop receiving streaming data.