Documentation
¶
Index ¶
- type Config
- type MeasurementReader
- type Settings
- type Subscriber
- func (sb *Subscriber) ActiveSignalIndexCache() *transport.SignalIndexCache
- func (sb *Subscriber) AdjustedValue(measurement *transport.Measurement) float64
- func (sb *Subscriber) Close()
- func (sb *Subscriber) ConnectionID() string
- func (sb *Subscriber) DefaultConnectionEstablishedReceiver()
- func (sb *Subscriber) DefaultConnectionTerminatedReceiver()
- func (sb *Subscriber) DefaultErrorMessageLogger(message string)
- func (sb *Subscriber) DefaultStatusMessageLogger(message string)
- func (sb *Subscriber) Dial(address string, config *Config) error
- func (sb *Subscriber) Disconnect()
- func (sb *Subscriber) ErrorMessage(message string)
- func (sb *Subscriber) IsConnected() bool
- func (sb *Subscriber) IsListening() bool
- func (sb *Subscriber) IsSubscribed() bool
- func (sb *Subscriber) IsValidated() bool
- func (sb *Subscriber) Listen(address string, config *Config) error
- func (sb *Subscriber) LookupMetadata(signalID guid.Guid) *transport.MeasurementMetadata
- func (sb *Subscriber) Metadata(measurement *transport.Measurement) *transport.MeasurementMetadata
- func (sb *Subscriber) ReadMeasurements() *MeasurementReader
- func (sb *Subscriber) RequestMetadata()
- func (sb *Subscriber) SetConfigurationChangedReceiver(callback func())
- func (sb *Subscriber) SetConnectionEstablishedReceiver(callback func())
- func (sb *Subscriber) SetConnectionTerminatedReceiver(callback func())
- func (sb *Subscriber) SetDataStartTimeReceiver(callback func(startTime time.Time))
- func (sb *Subscriber) SetDeveloperLogger(l *slog.Logger)
- func (sb *Subscriber) SetErrorMessageLogger(callback func(message string))
- func (sb *Subscriber) SetHistoricalReadCompleteReceiver(callback func())
- func (sb *Subscriber) SetMetadataReceiver(callback func(dataSet *data.DataSet))
- func (sb *Subscriber) SetNewBufferBlocksReceiver(callback func(bufferBlocks []transport.BufferBlock))
- func (sb *Subscriber) SetNewMeasurementsReceiver(callback func(measurements []transport.Measurement))
- func (sb *Subscriber) SetNotificationReceiver(callback func(notification string))
- func (sb *Subscriber) SetStatusMessageLogger(callback func(message string))
- func (sb *Subscriber) SetSubscriptionUpdatedReceiver(callback func(signalIndexCache *transport.SignalIndexCache))
- func (sb *Subscriber) StatusMessage(message string)
- func (sb *Subscriber) Subscribe(filterExpression string, settings *Settings)
- func (sb *Subscriber) SubscriberID() guid.Guid
- func (sb *Subscriber) TotalCommandChannelBytesReceived() uint64
- func (sb *Subscriber) TotalDataChannelBytesReceived() uint64
- func (sb *Subscriber) TotalMeasurementsReceived() uint64
- func (sb *Subscriber) Unsubscribe()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// MaxRetries defines the maximum number of times to retry a connection.
// Set value to -1 to retry infinitely.
// Note: setting ignored for listening connections.
MaxRetries int32
// RetryInterval defines the base retry interval, in milliseconds. Retries will
// exponentially back-off starting from this interval.
// Note: setting ignored for listening connections.
RetryInterval int32
// MaxRetryInterval defines the maximum retry interval, in milliseconds.
// Note: setting ignored for listening connections.
MaxRetryInterval int32
// AutoReconnect defines flag that determines if connections should be
// automatically reattempted.
// Note: setting ignored for listening connections.
AutoReconnect bool
// AutoRequestMetadata defines the flag that determines if metadata should be
// automatically requested upon successful connection. When true, metadata will
// be requested upon connection before subscription; otherwise, any metadata
// operations must be handled manually.
AutoRequestMetadata bool
// AutoSubscribe defines the flag that determines if subscription should be
// handled automatically upon successful connection. When AutoRequestMetadata
// is true and AutoSubscribe is true, subscription will occur after reception
// of metadata. When AutoRequestMetadata is false and AutoSubscribe is true,
// subscription will occur at successful connection. When AutoSubscribe is
// false, any subscribe operations must be handled manually.
AutoSubscribe bool
// CompressPayloadData determines whether payload data is compressed.
CompressPayloadData bool
// CompressMetadata determines whether the metadata transfer is compressed.
CompressMetadata bool
// CompressSignalIndexCache determines whether the signal index cache is compressed.
CompressSignalIndexCache bool
// MetadataFilters defines any filters to be applied to incoming metadata to reduce total
// received metadata. Each filter expression should be separated by semi-colon.
MetadataFilters string
// Version defines the target STTP protocol version. This currently defaults to 2.
Version byte
// RfcGuidEncoding determines if Guid wire serialization should use RFC encoding.
// This defaults to true.
RfcGuidEncoding bool
}
Config defines the STTP connection related configuration parameters.
type MeasurementReader ¶
type MeasurementReader struct {
// contains filtered or unexported fields
}
MeasurementReader defines an STTP measurement reader.
func (*MeasurementReader) Close ¶
func (mr *MeasurementReader) Close()
Close closes the measurement reader channel.
func (*MeasurementReader) NextMeasurement ¶
func (mr *MeasurementReader) NextMeasurement(ctx context.Context) (*transport.Measurement, bool)
NextMeasurement blocks current thread until a new measurement arrives or provided context is completed. Returns tuple of measurement and completed state. Completed state flag will be false if a measurement was received; otherwise, state flag will be true along with a nil measurement when context is done.
type Settings ¶
type Settings struct {
// Throttled determines if data will be published using down-sampling.
Throttled bool
// PublishInterval defines the down-sampling publish interval, in seconds, to use when Throttled is true.
PublishInterval float64
// UdpPort defines the desired UDP port to use for publication. Zero value means do not receive data on UDP, i.e.,
// data will be delivered to the STTP client via TCP.
UdpPort uint16
// IncludeTime determines if time should be included in non-compressed, compact measurements.
IncludeTime bool
// EnableTimeReasonabilityCheck determines if publisher should perform time reasonability checks.
// When enabled LagTime and LeadTime will be used to determine if a measurement timestamp is reasonable.
EnableTimeReasonabilityCheck bool
// LagTime defines the allowed past time deviation tolerance in seconds (can be sub-second).
// Value is used to determine if a measurement timestamp is reasonable.
// Only applicable when EnableTimeReasonabilityCheck is true.
LagTime float64
// LeadTime defines the allowed future time deviation tolerance in seconds (can be sub-second).
// Value is used to determine if a measurement timestamp is reasonable.
// Only applicable when EnableTimeReasonabilityCheck is true.
LeadTime float64
// UseLocalClockAsRealTime determines if publisher should use local clock as real time. If false,
// the timestamp of the latest measurement will be used as real-time.
// Only applicable when EnableTimeReasonabilityCheck is true.
UseLocalClockAsRealTime bool
// UseMillisecondResolution determines if time should be restricted to milliseconds in non-compressed, compact measurements.
UseMillisecondResolution bool
// RequestNaNValueFilter requests that the publisher filter, i.e., does not send, any NaN values.
RequestNaNValueFilter bool
// StartTime defines the start time for a requested temporal data playback, i.e., a historical subscription.
// Simply by specifying a StartTime and StopTime, a subscription is considered a historical subscription.
// Note that the publisher may not support historical subscriptions, in which case the subscribe will fail.
StartTime string
// StopTime defines the stop time for a requested temporal data playback, i.e., a historical subscription.
// Simply by specifying a StartTime and StopTime, a subscription is considered a historical subscription.
// Note that the publisher may not support historical subscriptions, in which case the subscribe will fail.
StopTime string
// ConstraintParameters defines any custom constraint parameters for a requested temporal data playback. This can
// include parameters that may be needed to initiate, filter, or control historical data access.
ConstraintParameters string
// ProcessingInterval defines the initial playback speed, in milliseconds, for a requested temporal data playback.
// With the exception of the values of -1 and 0, this value specifies the desired processing interval for data, i.e.,
// basically a delay, or timer interval, over which to process data. A value of -1 means to use the default processing
// interval while a value of 0 means to process data as fast as possible.
ProcessingInterval int32
// ExtraConnectionStringParameters defines any extra custom connection string parameters that may be needed for a subscription.
ExtraConnectionStringParameters string
}
Settings defines the STTP subscription related settings.
Settings exists as a simplified implementation of the SubscriptionInfo found in the transport namespace. Internally, the Subscriber class maps Settings values to a SubscriptionInfo instance for use with a DataSubscriber.
func NewSettings ¶
func NewSettings() *Settings
NewSettings creates a new Settings instance initialized with default values.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber represents an STTP data subscriber.
The Subscriber exists as a simplified implementation of the DataSubscriber found in the transport namespace. The Subscriber is intended to simplify common uses of STTP data reception and maintains an internal instance of the DataSubscriber for subscription based functionality.
func (*Subscriber) ActiveSignalIndexCache ¶
func (sb *Subscriber) ActiveSignalIndexCache() *transport.SignalIndexCache
ActiveSignalIndexCache gets the active signal index cache.
func (*Subscriber) AdjustedValue ¶
func (sb *Subscriber) AdjustedValue(measurement *transport.Measurement) float64
AdjustedValue gets the Value of a Measurement with any linear adjustments applied from the measurement's Adder and Multiplier metadata, if found.
func (*Subscriber) Close ¶
func (sb *Subscriber) Close()
Close cleanly shuts down a Subscriber that is no longer being used, e.g., during a normal application exit.
func (*Subscriber) ConnectionID ¶
func (sb *Subscriber) ConnectionID() string
ConnectionID returns the IP address and DNS host name, if resolvable, of current STTP connection.
func (*Subscriber) DefaultConnectionEstablishedReceiver ¶
func (sb *Subscriber) DefaultConnectionEstablishedReceiver()
DefaultConnectionEstablishedReceiver implements the default handler for the ConnectionEstablished callback. Default implementation simply writes connection feedback to statusMessage callback.
func (*Subscriber) DefaultConnectionTerminatedReceiver ¶
func (sb *Subscriber) DefaultConnectionTerminatedReceiver()
DefaultConnectionTerminatedReceiver implements the default handler for the ConnectionTerminated callback. Default implementation simply writes connection terminated feedback to errorMessage callback.
func (*Subscriber) DefaultErrorMessageLogger ¶
func (sb *Subscriber) DefaultErrorMessageLogger(message string)
DefaultErrorMessageLogger implements the default handler for the errorMessage callback. Default implementation synchronously writes output to to stderr. Logging is recommended.
func (*Subscriber) DefaultStatusMessageLogger ¶
func (sb *Subscriber) DefaultStatusMessageLogger(message string)
DefaultStatusMessageLogger implements the default handler for the statusMessage callback. Default implementation synchronously writes output to stdio. Logging is recommended.
func (*Subscriber) Dial ¶
func (sb *Subscriber) Dial(address string, config *Config) error
Dial starts the client-based connection cycle to an STTP publisher. Config parameter controls connection related settings, set value to nil for default values. When the config defines AutoReconnect as true, the connection will automatically be retried when the connection drops. If the config defines AutoRequestMetadata as true, then upon successful connection, meta-data will be requested. When the config defines both AutoRequestMetadata and AutoSubscribe as true, subscription will occur after reception of metadata. When the config defines AutoRequestMetadata as false and AutoSubscribe as true, subscription will occur at successful connection.
func (*Subscriber) Disconnect ¶
func (sb *Subscriber) Disconnect()
Disconnect disconnects from an STTP publisher.
func (*Subscriber) ErrorMessage ¶
func (sb *Subscriber) ErrorMessage(message string)
ErrorMessage executes the defined error message logger callback.
func (*Subscriber) IsConnected ¶
func (sb *Subscriber) IsConnected() bool
IsConnected determines if Subscriber is currently connected to a data publisher. When Subscriber is listening for connections, this method will only return true once a data publisher successfully connects to the listening socket.
func (*Subscriber) IsListening ¶
func (sb *Subscriber) IsListening() bool
IsListening determines if Subscriber is currently listening for connections from a data publisher, i.e., if Subscriber is in reverse connection mode.
func (*Subscriber) IsSubscribed ¶
func (sb *Subscriber) IsSubscribed() bool
IsSubscribed determines if Subscriber is currently subscribed to a data stream.
func (*Subscriber) IsValidated ¶
func (sb *Subscriber) IsValidated() bool
IsValidated determines if Subscriber connection has been validated as an STTP connection. This method will return false until a valid response has been received from the data publisher.
func (*Subscriber) Listen ¶
func (sb *Subscriber) Listen(address string, config *Config) error
Listen establishes a listening socket for an incoming STTP publisher connection, also known as a reverse connection, see https://sttp.info/reverse-connections/. Config parameter controls connection related settings, set value to nil for default values. If the config defines AutoRequestMetadata as true, then upon successful connection, meta-data will be requested. When the config defines both AutoRequestMetadata and AutoSubscribe as true, subscription will occur after reception of metadata. When the config defines AutoRequestMetadata as false and AutoSubscribe as true, subscription will occur at successful connection.
func (*Subscriber) LookupMetadata ¶
func (sb *Subscriber) LookupMetadata(signalID guid.Guid) *transport.MeasurementMetadata
LookupMetadata gets the MeasurementMetadata for the specified signalID from the local registry. If the metadata does not exist, a new record is created and returned.
func (*Subscriber) Metadata ¶
func (sb *Subscriber) Metadata(measurement *transport.Measurement) *transport.MeasurementMetadata
Metadata gets the measurement-level metadata associated with a measurement from the local registry. If the metadata does not exist, a new record is created and returned.
func (*Subscriber) ReadMeasurements ¶
func (sb *Subscriber) ReadMeasurements() *MeasurementReader
ReadMeasurements sets up a new MeasurementReader to start reading measurements.
func (*Subscriber) RequestMetadata ¶
func (sb *Subscriber) RequestMetadata()
RequestMetadata sends a request to the data publisher indicating that the Subscriber would like new metadata. Any defined MetadataFilters will be included in request.
func (*Subscriber) SetConfigurationChangedReceiver ¶
func (sb *Subscriber) SetConfigurationChangedReceiver(callback func())
SetConfigurationChangedReceiver defines the callback that handles notifications that the data publisher configuration has changed. Assignment will take effect immediately, even while subscription is active.
func (*Subscriber) SetConnectionEstablishedReceiver ¶
func (sb *Subscriber) SetConnectionEstablishedReceiver(callback func())
SetConnectionEstablishedReceiver defines the callback that handles notification that a connection has been established. Default implementation simply writes connection feedback to StatusMessage handler. Assignment will take effect immediately, even while subscription is active.
func (*Subscriber) SetConnectionTerminatedReceiver ¶
func (sb *Subscriber) SetConnectionTerminatedReceiver(callback func())
SetConnectionTerminatedReceiver defines the callback that handles notification that a connection has been terminated. Default implementation simply writes connection terminated feedback to ErrorMessage handler. Assignment will take effect immediately, even while subscription is active.
func (*Subscriber) SetDataStartTimeReceiver ¶
func (sb *Subscriber) SetDataStartTimeReceiver(callback func(startTime time.Time))
SetDataStartTimeReceiver defines the callback that handles notification of first received measurement. Assignment will take effect immediately, even while subscription is active.
func (*Subscriber) SetDeveloperLogger ¶
func (sb *Subscriber) SetDeveloperLogger(l *slog.Logger)
SetDeveloperLogger enables low level logging for developers using this library. It is not intended for user-level diagnostics.
func (*Subscriber) SetErrorMessageLogger ¶
func (sb *Subscriber) SetErrorMessageLogger(callback func(message string))
SetErrorMessageLogger defines the callback that handles error message logging. Assignment will take effect immediately, even while subscription is active.
func (*Subscriber) SetHistoricalReadCompleteReceiver ¶
func (sb *Subscriber) SetHistoricalReadCompleteReceiver(callback func())
SetHistoricalReadCompleteReceiver defines the callback that handles notification that temporal processing has completed, i.e., the end of a historical playback data stream has been reached. Assignment will take effect immediately, even while subscription is active.
func (*Subscriber) SetMetadataReceiver ¶
func (sb *Subscriber) SetMetadataReceiver(callback func(dataSet *data.DataSet))
SetMetadataReceiver defines the callback that handles reception of the metadata response. Assignment will take effect immediately, even while subscription is active.
func (*Subscriber) SetNewBufferBlocksReceiver ¶
func (sb *Subscriber) SetNewBufferBlocksReceiver(callback func(bufferBlocks []transport.BufferBlock))
SetNewBufferBlocksReceiver defines the callback that handles reception of new buffer blocks. Assignment will take effect immediately, even while subscription is active.
func (*Subscriber) SetNewMeasurementsReceiver ¶
func (sb *Subscriber) SetNewMeasurementsReceiver(callback func(measurements []transport.Measurement))
SetNewMeasurementsReceiver defines the callback that handles reception of new measurements. Assignment will take effect immediately, even while subscription is active.
func (*Subscriber) SetNotificationReceiver ¶
func (sb *Subscriber) SetNotificationReceiver(callback func(notification string))
SetNotificationReceiver defines the callback that handles reception of a notification. Assignment will take effect immediately, even while subscription is active.
func (*Subscriber) SetStatusMessageLogger ¶
func (sb *Subscriber) SetStatusMessageLogger(callback func(message string))
SetStatusMessageLogger defines the callback that handles informational message logging. Assignment will take effect immediately, even while subscription is active.
func (*Subscriber) SetSubscriptionUpdatedReceiver ¶
func (sb *Subscriber) SetSubscriptionUpdatedReceiver(callback func(signalIndexCache *transport.SignalIndexCache))
SetSubscriptionUpdatedReceiver defines the callback that handles notifications that a new SignalIndexCache has been received. Assignment will take effect immediately, even while subscription is active.
func (*Subscriber) StatusMessage ¶
func (sb *Subscriber) StatusMessage(message string)
StatusMessage executes the defined status message logger callback.
func (*Subscriber) Subscribe ¶
func (sb *Subscriber) Subscribe(filterExpression string, settings *Settings)
Subscribe sets up a request indicating that the Subscriber would like to start receiving streaming data from a data publisher. If the subscriber is already connected, the updated filter expression and subscription settings will be requested immediately; otherwise, the settings will be used when the connection to the data publisher is established.
The filterExpression defines the desired measurements for a subscription. Examples include:
Directly specified signal IDs (UUID values in string format): 38A47B0-F10B-4143-9A0A-0DBC4FFEF1E8; E4BBFE6A-35BD-4E5B-92C9-11FF913E7877
Directly specified tag names: DOM_GPLAINS-BUS1:VH; TVA_SHELBY-BUS1:VH
Directly specified identifiers in "measurement key" format: PPA:15; STAT:20
A filter expression against a selection view: FILTER ActiveMeasurements WHERE Company='GPA' AND SignalType='FREQ'
Settings parameter controls subscription related settings, set value to nil for default values.
func (*Subscriber) SubscriberID ¶
func (sb *Subscriber) SubscriberID() guid.Guid
SubscriberID gets the subscriber ID as assigned by the data publisher upon receipt of the SignalIndexCache.
func (*Subscriber) TotalCommandChannelBytesReceived ¶
func (sb *Subscriber) TotalCommandChannelBytesReceived() uint64
TotalCommandChannelBytesReceived gets the total number of bytes received via the command channel since last connection.
func (*Subscriber) TotalDataChannelBytesReceived ¶
func (sb *Subscriber) TotalDataChannelBytesReceived() uint64
TotalDataChannelBytesReceived gets the total number of bytes received via the data channel since last connection.
func (*Subscriber) TotalMeasurementsReceived ¶
func (sb *Subscriber) TotalMeasurementsReceived() uint64
TotalMeasurementsReceived gets the total number of measurements received since last subscription.
func (*Subscriber) Unsubscribe ¶
func (sb *Subscriber) Unsubscribe()
Unsubscribe sends a request to the data publisher indicating that the Subscriber would like to stop receiving streaming data.