Documentation
¶
Index ¶
- Constants
- Variables
- func IsDisconnectionError(err error) bool
- func SetupGlobalReporter(r Reporter)
- type Connection
- type ConnectionSetup
- type FundingData
- type KlineData
- type Match
- type Matcher
- type PingHandler
- type Reporter
- type Response
- type UnhandledMessageWarning
- type Websocket
- func (w *Websocket) AddSubscription(c *subscription.Subscription) error
- func (w *Websocket) AddSuccessfulSubscriptions(channels ...subscription.Subscription)
- func (w *Websocket) CanUseAuthenticatedEndpoints() bool
- func (w *Websocket) CanUseAuthenticatedWebsocketForWrapper() bool
- func (w *Websocket) Connect() error
- func (w *Websocket) Disable() error
- func (w *Websocket) Enable() error
- func (w *Websocket) FlushChannels() error
- func (w *Websocket) GetChannelDifference(genSubs []subscription.Subscription) (sub, unsub []subscription.Subscription)
- func (w *Websocket) GetName() string
- func (w *Websocket) GetProxyAddress() string
- func (w *Websocket) GetSubscription(key any) *subscription.Subscription
- func (w *Websocket) GetSubscriptions() []subscription.Subscription
- func (w *Websocket) GetWebsocketURL() string
- func (w *Websocket) IsConnected() bool
- func (w *Websocket) IsConnecting() bool
- func (w *Websocket) IsConnectionMonitorRunning() bool
- func (w *Websocket) IsDataMonitorRunning() bool
- func (w *Websocket) IsEnabled() bool
- func (w *Websocket) IsInitialised() bool
- func (w *Websocket) IsTrafficMonitorRunning() bool
- func (w *Websocket) RemoveSubscriptions(channels ...subscription.Subscription)
- func (w *Websocket) ResubscribeToChannel(subscribedChannel *subscription.Subscription) error
- func (w *Websocket) SetCanUseAuthenticatedEndpoints(b bool)
- func (w *Websocket) SetProxyAddress(proxyAddr string) error
- func (w *Websocket) SetSubscriptionState(c *subscription.Subscription, state subscription.State) error
- func (w *Websocket) SetWebsocketURL(url string, auth, reconnect bool) error
- func (w *Websocket) Setup(s *WebsocketSetup) error
- func (w *Websocket) SetupNewConnection(c ConnectionSetup) error
- func (w *Websocket) Shutdown() error
- func (w *Websocket) SubscribeToChannels(channels []subscription.Subscription) error
- func (w *Websocket) UnsubscribeChannels(channels []subscription.Subscription) error
- type WebsocketConnection
- func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header) error
- func (w *WebsocketConnection) GenerateMessageID(highPrec bool) int64
- func (w *WebsocketConnection) GetURL() string
- func (w *WebsocketConnection) IsConnected() bool
- func (w *WebsocketConnection) ReadMessage() Response
- func (w *WebsocketConnection) SendJSONMessage(data interface{}) error
- func (w *WebsocketConnection) SendMessageReturnResponse(signature, request interface{}) ([]byte, error)
- func (w *WebsocketConnection) SendRawMessage(messageType int, message []byte) error
- func (w *WebsocketConnection) SetProxy(proxy string)
- func (w *WebsocketConnection) SetURL(url string)
- func (w *WebsocketConnection) SetupPingHandler(handler PingHandler)
- func (w *WebsocketConnection) Shutdown() error
- type WebsocketPositionUpdated
- type WebsocketSetup
Constants ¶
const ( WebsocketNotAuthenticatedUsingRest = "%v - Websocket not authenticated, using REST\n" Ping = "ping" Pong = "pong" UnhandledMessage = " - Unhandled websocket message: " )
Websocket functionality list and state consts
Variables ¶
var ( ErrWebsocketNotEnabled = errors.New("websocket not enabled") ErrSubscriptionNotFound = errors.New("subscription not found") ErrSubscribedAlready = errors.New("duplicate subscription") ErrSubscriptionFailure = errors.New("subscription failure") ErrSubscriptionNotSupported = errors.New("subscription channel not supported ") ErrUnsubscribeFailure = errors.New("unsubscribe failure") ErrChannelInStateAlready = errors.New("channel already in state") ErrAlreadyDisabled = errors.New("websocket already disabled") ErrNotConnected = errors.New("websocket is not connected") )
Public websocket errors
Functions ¶
func IsDisconnectionError ¶
IsDisconnectionError Determines if the error sent over chan ReadMessageErrors is a disconnection error
func SetupGlobalReporter ¶
func SetupGlobalReporter(r Reporter)
SetupGlobalReporter sets a reporter interface to be used for all exchange requests
Types ¶
type Connection ¶
type Connection interface {
Dial(*websocket.Dialer, http.Header) error
ReadMessage() Response
SendJSONMessage(interface{}) error
SetupPingHandler(PingHandler)
GenerateMessageID(highPrecision bool) int64
SendMessageReturnResponse(signature interface{}, request interface{}) ([]byte, error)
SendRawMessage(messageType int, message []byte) error
SetURL(string)
SetProxy(string)
GetURL() string
Shutdown() error
}
Connection defines a streaming services connection
type ConnectionSetup ¶
type ConnectionSetup struct {
ResponseCheckTimeout time.Duration
ResponseMaxLimit time.Duration
RateLimit int64
URL string
Authenticated bool
ConnectionLevelReporter Reporter
}
ConnectionSetup defines variables for an individual stream connection
type FundingData ¶
type FundingData struct {
Timestamp time.Time
CurrencyPair currency.Pair
AssetType asset.Item
Exchange string
Amount float64
Rate float64
Period int64
Side order.Side
}
FundingData defines funding data
type KlineData ¶
type KlineData struct {
Timestamp time.Time
Pair currency.Pair
AssetType asset.Item
Exchange string
StartTime time.Time
CloseTime time.Time
Interval string
OpenPrice float64
ClosePrice float64
HighPrice float64
LowPrice float64
Volume float64
}
KlineData defines kline feed
type Match ¶
type Match struct {
// contains filtered or unexported fields
}
Match is a distributed subtype that handles the matching of requests and responses in a timely manner, reducing the need to differentiate between connections. Stream systems fan in all incoming payloads to one routine for processing.
func (*Match) IncomingWithData ¶
IncomingWithData matches with requests and takes in the returned payload, to be processed outside of a stream processing routine and returns true if a handler was found
type Matcher ¶
type Matcher struct {
C chan []byte
// contains filtered or unexported fields
}
Matcher defines a payload matching return mechanism
type PingHandler ¶
type PingHandler struct {
Websocket bool
UseGorillaHandler bool
MessageType int
Message []byte
Delay time.Duration
}
PingHandler container for ping handler settings
type Reporter ¶
Reporter interface groups observability functionality over Websocket request latency.
type UnhandledMessageWarning ¶
type UnhandledMessageWarning struct {
Message string
}
UnhandledMessageWarning defines a container for unhandled message warnings
type Websocket ¶
type Websocket struct {
Subscribe chan []subscription.Subscription
Unsubscribe chan []subscription.Subscription
// Subscriber function for package defined websocket subscriber
// functionality
Subscriber func([]subscription.Subscription) error
// Unsubscriber function for packaged defined websocket unsubscriber
// functionality
Unsubscriber func([]subscription.Subscription) error
// GenerateSubs function for package defined websocket generate
// subscriptions functionality
GenerateSubs func() ([]subscription.Subscription, error)
DataHandler chan interface{}
ToRoutine chan interface{}
Match *Match
// shutdown synchronises shutdown event across routines
ShutdownC chan struct{}
Wg *sync.WaitGroup
// Orderbook is a local buffer of orderbooks
Orderbook buffer.Orderbook
// Trade is a notifier of occurring trades
Trade trade.Trade
// Fills is a notifier of occurring fills
Fills fill.Fills
// trafficAlert monitors if there is a halt in traffic throughput
TrafficAlert chan struct{}
// ReadMessageErrors will received all errors from ws.ReadMessage() and
// verify if its a disconnection
ReadMessageErrors chan error
// Standard stream connection
Conn Connection
// Authenticated stream connection
AuthConn Connection
// Latency reporter
ExchangeLevelReporter Reporter
// MaxSubScriptionsPerConnection defines the maximum number of
// subscriptions per connection that is allowed by the exchange.
MaxSubscriptionsPerConnection int
// contains filtered or unexported fields
}
Websocket defines a return type for websocket connections via the interface wrapper for routine processing
func (*Websocket) AddSubscription ¶
func (w *Websocket) AddSubscription(c *subscription.Subscription) error
AddSubscription adds a subscription to the subscription lists Unlike AddSubscriptions this method will error if the subscription already exists
func (*Websocket) AddSuccessfulSubscriptions ¶
func (w *Websocket) AddSuccessfulSubscriptions(channels ...subscription.Subscription)
AddSuccessfulSubscriptions adds subscriptions to the subscription lists that has been successfully subscribed
func (*Websocket) CanUseAuthenticatedEndpoints ¶
CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in a thread safe manner
func (*Websocket) CanUseAuthenticatedWebsocketForWrapper ¶
CanUseAuthenticatedWebsocketForWrapper Handles a common check to verify whether a wrapper can use an authenticated websocket endpoint
func (*Websocket) Connect ¶
Connect initiates a websocket connection by using a package defined connection function
func (*Websocket) Disable ¶
Disable disables the exchange websocket protocol Note that connectionMonitor will be responsible for shutting down the websocket after disabling
func (*Websocket) FlushChannels ¶
FlushChannels flushes channel subscriptions when there is a pair/asset change
func (*Websocket) GetChannelDifference ¶
func (w *Websocket) GetChannelDifference(genSubs []subscription.Subscription) (sub, unsub []subscription.Subscription)
GetChannelDifference finds the difference between the subscribed channels and the new subscription list when pairs are disabled or enabled.
func (*Websocket) GetProxyAddress ¶
GetProxyAddress returns the current websocket proxy
func (*Websocket) GetSubscription ¶
func (w *Websocket) GetSubscription(key any) *subscription.Subscription
GetSubscription returns a pointer to a copy of the subscription at the key provided returns nil if no subscription is at that key or the key is nil
func (*Websocket) GetSubscriptions ¶
func (w *Websocket) GetSubscriptions() []subscription.Subscription
GetSubscriptions returns a new slice of the subscriptions
func (*Websocket) GetWebsocketURL ¶
GetWebsocketURL returns the running websocket URL
func (*Websocket) IsConnected ¶
IsConnected returns whether the websocket is connected
func (*Websocket) IsConnecting ¶
IsConnecting returns whether the websocket is connecting
func (*Websocket) IsConnectionMonitorRunning ¶
IsConnectionMonitorRunning returns status of connection monitor
func (*Websocket) IsDataMonitorRunning ¶
IsDataMonitorRunning returns status of data monitor
func (*Websocket) IsInitialised ¶
IsInitialised returns whether the websocket has been Setup() already
func (*Websocket) IsTrafficMonitorRunning ¶
IsTrafficMonitorRunning returns status of the traffic monitor
func (*Websocket) RemoveSubscriptions ¶
func (w *Websocket) RemoveSubscriptions(channels ...subscription.Subscription)
RemoveSubscriptions removes subscriptions from the subscription list
func (*Websocket) ResubscribeToChannel ¶
func (w *Websocket) ResubscribeToChannel(subscribedChannel *subscription.Subscription) error
ResubscribeToChannel resubscribes to channel
func (*Websocket) SetCanUseAuthenticatedEndpoints ¶
SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in a thread safe manner
func (*Websocket) SetProxyAddress ¶
SetProxyAddress sets websocket proxy address
func (*Websocket) SetSubscriptionState ¶
func (w *Websocket) SetSubscriptionState(c *subscription.Subscription, state subscription.State) error
SetSubscriptionState sets an existing subscription state returns an error if the subscription is not found, or the new state is already set
func (*Websocket) SetWebsocketURL ¶
SetWebsocketURL sets websocket URL and can refresh underlying connections
func (*Websocket) Setup ¶
func (w *Websocket) Setup(s *WebsocketSetup) error
Setup sets main variables for websocket connection
func (*Websocket) SetupNewConnection ¶
func (w *Websocket) SetupNewConnection(c ConnectionSetup) error
SetupNewConnection sets up an auth or unauth streaming connection
func (*Websocket) Shutdown ¶
Shutdown attempts to shut down a websocket connection and associated routines by using a package defined shutdown function
func (*Websocket) SubscribeToChannels ¶
func (w *Websocket) SubscribeToChannels(channels []subscription.Subscription) error
SubscribeToChannels appends supplied channels to channelsToSubscribe
func (*Websocket) UnsubscribeChannels ¶
func (w *Websocket) UnsubscribeChannels(channels []subscription.Subscription) error
UnsubscribeChannels unsubscribes from a websocket channel
type WebsocketConnection ¶
type WebsocketConnection struct {
Verbose bool
RateLimit int64
ExchangeName string
URL string
ProxyURL string
Wg *sync.WaitGroup
Connection *websocket.Conn
ShutdownC chan struct{}
Match *Match
ResponseMaxLimit time.Duration
Traffic chan struct{}
Reporter Reporter
// contains filtered or unexported fields
}
WebsocketConnection contains all the data needed to send a message to a WS connection
func (*WebsocketConnection) GenerateMessageID ¶
func (w *WebsocketConnection) GenerateMessageID(highPrec bool) int64
GenerateMessageID Creates a random message ID
func (*WebsocketConnection) GetURL ¶
func (w *WebsocketConnection) GetURL() string
GetURL returns the connection URL
func (*WebsocketConnection) IsConnected ¶
func (w *WebsocketConnection) IsConnected() bool
IsConnected exposes websocket connection status
func (*WebsocketConnection) ReadMessage ¶
func (w *WebsocketConnection) ReadMessage() Response
ReadMessage reads messages, can handle text, gzip and binary
func (*WebsocketConnection) SendJSONMessage ¶
func (w *WebsocketConnection) SendJSONMessage(data interface{}) error
SendJSONMessage sends a JSON encoded message over the connection
func (*WebsocketConnection) SendMessageReturnResponse ¶
func (w *WebsocketConnection) SendMessageReturnResponse(signature, request interface{}) ([]byte, error)
SendMessageReturnResponse will send a WS message to the connection and wait for response
func (*WebsocketConnection) SendRawMessage ¶
func (w *WebsocketConnection) SendRawMessage(messageType int, message []byte) error
SendRawMessage sends a message over the connection without JSON encoding it
func (*WebsocketConnection) SetProxy ¶
func (w *WebsocketConnection) SetProxy(proxy string)
SetProxy sets connection proxy
func (*WebsocketConnection) SetURL ¶
func (w *WebsocketConnection) SetURL(url string)
SetURL sets connection URL
func (*WebsocketConnection) SetupPingHandler ¶
func (w *WebsocketConnection) SetupPingHandler(handler PingHandler)
SetupPingHandler will automatically send ping or pong messages based on WebsocketPingHandler configuration
func (*WebsocketConnection) Shutdown ¶
func (w *WebsocketConnection) Shutdown() error
Shutdown shuts down and closes specific connection
type WebsocketPositionUpdated ¶
type WebsocketPositionUpdated struct {
Timestamp time.Time
Pair currency.Pair
AssetType asset.Item
Exchange string
}
WebsocketPositionUpdated reflects a change in orders/contracts on an exchange
type WebsocketSetup ¶
type WebsocketSetup struct {
ExchangeConfig *config.Exchange
DefaultURL string
RunningURL string
RunningURLAuth string
Connector func() error
Subscriber func([]subscription.Subscription) error
Unsubscriber func([]subscription.Subscription) error
GenerateSubscriptions func() ([]subscription.Subscription, error)
Features *protocol.Features
// Local orderbook buffer config values
OrderbookBufferConfig buffer.Config
TradeFeed bool
// Fill data config values
FillsFeed bool
// MaxWebsocketSubscriptionsPerConnection defines the maximum number of
// subscriptions per connection that is allowed by the exchange.
MaxWebsocketSubscriptionsPerConnection int
}
WebsocketSetup defines variables for setting up a websocket connection