Documentation
¶
Index ¶
- type Config
- type MeasurementReader
- type Settings
- type Subscriber
- func (sb *Subscriber) ActiveSignalIndexCache() *transport.SignalIndexCache
- func (sb *Subscriber) AdjustedValue(measurement *transport.Measurement) float64
- func (sb *Subscriber) Close()
- func (sb *Subscriber) DefaultConnectionEstablishedReceiver()
- func (sb *Subscriber) DefaultConnectionTerminatedReceiver()
- func (sb *Subscriber) DefaultErrorMessageLogger(message string)
- func (sb *Subscriber) DefaultStatusMessageLogger(message string)
- func (sb *Subscriber) Dial(address string, config *Config) error
- func (sb *Subscriber) Disconnect()
- func (sb *Subscriber) ErrorMessage(message string)
- func (sb *Subscriber) IsConnected() bool
- func (sb *Subscriber) IsSubscribed() bool
- func (sb *Subscriber) LookupMetadata(signalID guid.Guid) *transport.MeasurementMetadata
- func (sb *Subscriber) Metadata(measurement *transport.Measurement) *transport.MeasurementMetadata
- func (sb *Subscriber) ReadMeasurements() *MeasurementReader
- func (sb *Subscriber) RequestMetadata()
- func (sb *Subscriber) SetConfigurationChangedReceiver(callback func())
- func (sb *Subscriber) SetConnectionEstablishedReceiver(callback func())
- func (sb *Subscriber) SetConnectionTerminatedReceiver(callback func())
- func (sb *Subscriber) SetDataStartTimeReceiver(callback func(startTime time.Time))
- func (sb *Subscriber) SetErrorMessageLogger(callback func(message string))
- func (sb *Subscriber) SetHistoricalReadCompleteReceiver(callback func())
- func (sb *Subscriber) SetMetadataReceiver(callback func(dataSet *data.DataSet))
- func (sb *Subscriber) SetNewBufferBlocksReceiver(callback func(bufferBlocks []transport.BufferBlock))
- func (sb *Subscriber) SetNewMeasurementsReceiver(callback func(measurements []transport.Measurement))
- func (sb *Subscriber) SetNotificationReceiver(callback func(notification string))
- func (sb *Subscriber) SetStatusMessageLogger(callback func(message string))
- func (sb *Subscriber) SetSubscriptionUpdatedReceiver(callback func(signalIndexCache *transport.SignalIndexCache))
- func (sb *Subscriber) StatusMessage(message string)
- func (sb *Subscriber) Subscribe(filterExpression string, settings *Settings)
- func (sb *Subscriber) SubscriberID() guid.Guid
- func (sb *Subscriber) TotalCommandChannelBytesReceived() uint64
- func (sb *Subscriber) TotalDataChannelBytesReceived() uint64
- func (sb *Subscriber) TotalMeasurementsReceived() uint64
- func (sb *Subscriber) Unsubscribe()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶ added in v0.5.0
type Config struct {
// MaxRetries defines the maximum number of times to retry a connection.
// Set value to -1 to retry infinitely.
MaxRetries int32
// RetryInterval defines the base retry interval, in milliseconds. Retries will
// exponentially back-off starting from this interval.
RetryInterval int32
// MaxRetryInterval defines the maximum retry interval, in milliseconds.
MaxRetryInterval int32
// AutoReconnect defines flag that determines if connections should be
// automatically reattempted.
AutoReconnect bool
// AutoRequestMetadata defines the flag that determines if metadata should be
// automatically requested upon successful connection. When true, metadata will
// be requested upon connection before subscription; otherwise, any metadata
// operations must be handled manually.
AutoRequestMetadata bool
// AutoSubscribe defines the flag that determines if subscription should be
// handled automatically upon successful connection. When AutoRequestMetadata
// is true and AutoSubscribe is true, subscription will occur after reception
// of metadata. When AutoRequestMetadata is false and AutoSubscribe is true,
// subscription will occur at successful connection. When AutoSubscribe is
// false, any subscribe operations must be handled manually.
AutoSubscribe bool
// CompressPayloadData determines whether payload data is compressed.
CompressPayloadData bool
// CompressMetadata determines whether the metadata transfer is compressed.
CompressMetadata bool
// CompressSignalIndexCache determines whether the signal index cache is compressed.
CompressSignalIndexCache bool
// MetadataFilters defines any filters to be applied to incoming metadata to reduce total
// received metadata. Each filter expression should be separated by semi-colon.
MetadataFilters string
// Version defines the target STTP protocol version. This currently defaults to 2.
Version byte
// RfcGuidEncoding determines if Guid wire serialization should use RFC encoding.
// This defaults to true.
RfcGuidEncoding bool
}
Config defines the STTP connection parameters.
type MeasurementReader ¶ added in v0.5.0
type MeasurementReader struct {
// contains filtered or unexported fields
}
MeasurementReader defines an STTP measurement reader.
func (*MeasurementReader) NextMeasurement ¶ added in v0.5.0
func (mr *MeasurementReader) NextMeasurement() *transport.Measurement
NextMeasurement blocks current thread until a new measurement arrives.
type Settings ¶ added in v0.5.0
type Settings struct {
// Throttled determines if data will be published using down-sampling.
Throttled bool
// PublishInterval defines the down-sampling publish interval to use when Throttled is true.
PublishInterval float64
// UdpPort defines the desired UDP port to use for publication. Zero value
UdpPort uint16
// IncludeTime determines if time should be included in non-compressed, compact measurements.
IncludeTime bool
// UseMillisecondResolution determines if time should be restricted to milliseconds in non-compressed, compact measurements.
UseMillisecondResolution bool
// RequestNaNValueFilter requests that the publisher filter, i.e., does not send, any NaN values.
RequestNaNValueFilter bool
// StartTime defines the start time for a requested temporal data playback, i.e., a historical subscription.
// Simply by specifying a StartTime and StopTime, a subscription is considered a historical subscription.
// Note that the publisher may not support historical subscriptions, in which case the subscribe will fail.
StartTime string
// StopTime defines the stop time for a requested temporal data playback, i.e., a historical subscription.
// Simply by specifying a StartTime and StopTime, a subscription is considered a historical subscription.
// Note that the publisher may not support historical subscriptions, in which case the subscribe will fail.
StopTime string
// ConstraintParameters defines any custom constraint parameters for a requested temporal data playback. This can
// include parameters that may be needed to initiate, filter, or control historical data access.
ConstraintParameters string
// ProcessingInterval defines the initial playback speed, in milliseconds, for a requested temporal data playback.
// With the exception of the values of -1 and 0, this value specifies the desired processing interval for data, i.e.,
// basically a delay, or timer interval, over which to process data. A value of -1 means to use the default processing
// interval while a value of 0 means to process data as fast as possible.
ProcessingInterval int32
// ExtraConnectionStringParameters defines any extra custom connection string parameters that may be needed for a subscription.
ExtraConnectionStringParameters string
}
Settings defines the STTP subscription related settings.
func NewSettings ¶ added in v0.5.0
func NewSettings() *Settings
NewSettings creates a new Settings instance initialized with default values.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber represents an STTP data subscriber.
func NewSubscriber ¶ added in v0.5.0
func NewSubscriber() *Subscriber
NewSubscriber creates a new Subscriber.
func (*Subscriber) ActiveSignalIndexCache ¶ added in v0.5.0
func (sb *Subscriber) ActiveSignalIndexCache() *transport.SignalIndexCache
ActiveSignalIndexCache gets the active signal index cache.
func (*Subscriber) AdjustedValue ¶ added in v0.5.0
func (sb *Subscriber) AdjustedValue(measurement *transport.Measurement) float64
AdjustedValue gets the Value of a Measurement with any linear adjustments applied from the measurement's Adder and Multiplier metadata, if found.
func (*Subscriber) Close ¶ added in v0.5.0
func (sb *Subscriber) Close()
Close cleanly shuts down a DataSubscriber that is no longer being used, e.g., during a normal application exit.
func (*Subscriber) DefaultConnectionEstablishedReceiver ¶ added in v0.5.0
func (sb *Subscriber) DefaultConnectionEstablishedReceiver()
DefaultConnectionEstablishedReceiver implements the default handler for the ConnectionEstablished callback. Default implementation simply writes connection feedback to statusMessage callback.
func (*Subscriber) DefaultConnectionTerminatedReceiver ¶ added in v0.5.0
func (sb *Subscriber) DefaultConnectionTerminatedReceiver()
DefaultConnectionTerminatedReceiver implements the default handler for the ConnectionTerminated callback. Default implementation simply writes connection terminated feedback to errorMessage callback.
func (*Subscriber) DefaultErrorMessageLogger ¶ added in v0.5.0
func (sb *Subscriber) DefaultErrorMessageLogger(message string)
DefaultErrorMessageLogger implements the default handler for the errorMessage callback. Default implementation synchronously writes output to to stderr. Logging is recommended.
func (*Subscriber) DefaultStatusMessageLogger ¶ added in v0.5.0
func (sb *Subscriber) DefaultStatusMessageLogger(message string)
DefaultStatusMessageLogger implements the default handler for the statusMessage callback. Default implementation synchronously writes output to stdio. Logging is recommended.
func (*Subscriber) Dial ¶ added in v0.5.0
func (sb *Subscriber) Dial(address string, config *Config) error
Dial starts the client-based connection cycle to an STTP publisher. Config parameter controls connection related settings, set value to nil for default values. When the config defines AutoReconnect as true, the connection will automatically be retried when the connection drops. If the config defines AutoRequestMetadata as true, then upon successful connection, meta-data will be requested. When the config defines both AutoRequestMetadata and AutoSubscribe as true, subscription will occur after reception of metadata. When the config defines AutoRequestMetadata as false and AutoSubscribe as true, subscription will occur at successful connection.
func (*Subscriber) Disconnect ¶ added in v0.5.0
func (sb *Subscriber) Disconnect()
Disconnect disconnects from an STTP publisher.
func (*Subscriber) ErrorMessage ¶
func (sb *Subscriber) ErrorMessage(message string)
ErrorMessage executes the defined error message logger callback.
func (*Subscriber) IsConnected ¶ added in v0.5.0
func (sb *Subscriber) IsConnected() bool
IsConnected determines if Subscriber is currently connected to a data publisher.
func (*Subscriber) IsSubscribed ¶ added in v0.5.0
func (sb *Subscriber) IsSubscribed() bool
IsSubscribed determines if Subscriber is currently subscribed to a data stream.
func (*Subscriber) LookupMetadata ¶ added in v0.5.0
func (sb *Subscriber) LookupMetadata(signalID guid.Guid) *transport.MeasurementMetadata
LookupMetadata gets the MeasurementMetadata for the specified signalID from the local registry. If the metadata does not exist, a new record is created and returned.
func (*Subscriber) Metadata ¶ added in v0.5.0
func (sb *Subscriber) Metadata(measurement *transport.Measurement) *transport.MeasurementMetadata
Metadata gets the measurement-level metadata associated with a measurement from the local registry. If the metadata does not exist, a new record is created and returned.
func (*Subscriber) ReadMeasurements ¶ added in v0.5.0
func (sb *Subscriber) ReadMeasurements() *MeasurementReader
ReadMeasurements sets up a new MeasurementReader to start reading measurements.
func (*Subscriber) RequestMetadata ¶ added in v0.5.0
func (sb *Subscriber) RequestMetadata()
RequestMetadata sends a request to the data publisher indicating that the Subscriber would like new metadata. Any defined MetadataFilters will be included in request.
func (*Subscriber) SetConfigurationChangedReceiver ¶ added in v0.5.0
func (sb *Subscriber) SetConfigurationChangedReceiver(callback func())
SetConfigurationChangedReceiver defines the callback that handles notifications that the data publisher configuration has changed.
func (*Subscriber) SetConnectionEstablishedReceiver ¶ added in v0.5.0
func (sb *Subscriber) SetConnectionEstablishedReceiver(callback func())
SetConnectionEstablishedReceiver defines the callback that handles notification that a connection has been established. Default implementation simply writes connection feedback to StatusMessage handler.
func (*Subscriber) SetConnectionTerminatedReceiver ¶ added in v0.5.0
func (sb *Subscriber) SetConnectionTerminatedReceiver(callback func())
SetConnectionTerminatedReceiver defines the callback that handles notification that a connection has been terminated. Default implementation simply writes connection terminated feedback to ErrorMessage handler.
func (*Subscriber) SetDataStartTimeReceiver ¶ added in v0.5.0
func (sb *Subscriber) SetDataStartTimeReceiver(callback func(startTime time.Time))
SetDataStartTimeReceiver defines the callback that handles notification of first received measurement.
func (*Subscriber) SetErrorMessageLogger ¶ added in v0.5.0
func (sb *Subscriber) SetErrorMessageLogger(callback func(message string))
SetErrorMessageLogger defines the callback that handles error message logging.
func (*Subscriber) SetHistoricalReadCompleteReceiver ¶ added in v0.5.0
func (sb *Subscriber) SetHistoricalReadCompleteReceiver(callback func())
SetHistoricalReadCompleteReceiver defines the callback that handles notification that temporal processing has completed, i.e., the end of a historical playback data stream has been reached.
func (*Subscriber) SetMetadataReceiver ¶ added in v0.5.0
func (sb *Subscriber) SetMetadataReceiver(callback func(dataSet *data.DataSet))
SetMetadataReceiver defines the callback that handles reception of the metadata response.
func (*Subscriber) SetNewBufferBlocksReceiver ¶ added in v0.5.0
func (sb *Subscriber) SetNewBufferBlocksReceiver(callback func(bufferBlocks []transport.BufferBlock))
SetNewBufferBlocksReceiver defines the callback that handles reception of new buffer blocks.
func (*Subscriber) SetNewMeasurementsReceiver ¶ added in v0.5.0
func (sb *Subscriber) SetNewMeasurementsReceiver(callback func(measurements []transport.Measurement))
SetNewMeasurementsReceiver defines the callback that handles reception of new measurements.
func (*Subscriber) SetNotificationReceiver ¶ added in v0.5.0
func (sb *Subscriber) SetNotificationReceiver(callback func(notification string))
SetNotificationReceiver defines the callback that handles reception of a notification.
func (*Subscriber) SetStatusMessageLogger ¶ added in v0.5.0
func (sb *Subscriber) SetStatusMessageLogger(callback func(message string))
SetStatusMessageLogger defines the callback that handles informational message logging.
func (*Subscriber) SetSubscriptionUpdatedReceiver ¶ added in v0.5.0
func (sb *Subscriber) SetSubscriptionUpdatedReceiver(callback func(signalIndexCache *transport.SignalIndexCache))
SetSubscriptionUpdatedReceiver defines the callback that handles notifications that a new SignalIndexCache has been received.
func (*Subscriber) StatusMessage ¶
func (sb *Subscriber) StatusMessage(message string)
StatusMessage executes the defined status message logger callback.
func (*Subscriber) Subscribe ¶ added in v0.5.0
func (sb *Subscriber) Subscribe(filterExpression string, settings *Settings)
Subscribe sets up a request indicating that the Subscriber would like to start receiving streaming data from a data publisher. If the subscriber is already connected, the updated filter expression and subscription settings will be requested immediately; otherwise, the settings will be used when the connection to the data publisher is established.
The filterExpression defines the desired measurements for a subscription. Examples include:
- Directly specified signal IDs (UUID values in string format): 38A47B0-F10B-4143-9A0A-0DBC4FFEF1E8; E4BBFE6A-35BD-4E5B-92C9-11FF913E7877
- Directly specified tag names: DOM_GPLAINS-BUS1:VH; TVA_SHELBY-BUS1:VH
- Directly specified identifiers in "measurement key" format: PPA:15; STAT:20
- A filter expression against a selection view: FILTER ActiveMeasurements WHERE Company='GPA' AND SignalType='FREQ'
Settings parameter controls subscription related settings, set value to nil for default values.
func (*Subscriber) SubscriberID ¶ added in v0.5.0
func (sb *Subscriber) SubscriberID() guid.Guid
SubscriberID gets the subscriber ID as assigned by the data publisher upon receipt of the SignalIndexCache.
func (*Subscriber) TotalCommandChannelBytesReceived ¶ added in v0.5.0
func (sb *Subscriber) TotalCommandChannelBytesReceived() uint64
TotalCommandChannelBytesReceived gets the total number of bytes received via the command channel since last connection.
func (*Subscriber) TotalDataChannelBytesReceived ¶ added in v0.5.0
func (sb *Subscriber) TotalDataChannelBytesReceived() uint64
TotalDataChannelBytesReceived gets the total number of bytes received via the data channel since last connection.
func (*Subscriber) TotalMeasurementsReceived ¶ added in v0.5.0
func (sb *Subscriber) TotalMeasurementsReceived() uint64
TotalMeasurementsReceived gets the total number of measurements received since last subscription.
func (*Subscriber) Unsubscribe ¶ added in v0.5.0
func (sb *Subscriber) Unsubscribe()
Unsubscribe sends a request to the data publisher indicating that the Subscriber would like to stop receiving streaming data.