streaming

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMasterConnectionNotAvailable = errors.New("master connection not available")
	ErrLoginFailed                  = errors.New("LOGIN failed")
)
View Source
var ErrInvalidPrefixedRequestID = errors.New("invalid prefixed request ID format")

ErrInvalidPrefixedRequestID is returned when a prefixed request ID has an invalid format.

View Source
var ErrNoMetadata = errors.New("no metadata available")

Functions

func CreateMetadataFunc

func CreateMetadataFunc(ctx context.Context, schwabClient schwabapi.ProviderClient) func() (*Metadata, error)

CreateMetadataFunc creates a metadata refresh function.

func IsPrefixedRequestID

func IsPrefixedRequestID(requestID string) bool

IsPrefixedRequestID checks if a request ID has the expected client prefix format.

func PrefixRequestID

func PrefixRequestID(clientID, requestID string) string

PrefixRequestID adds a client ID prefix to a request ID to prevent collisions and enable proper routing of responses. Format: "client_<uuid>_<original_request_id>".

func UnprefixRequestID

func UnprefixRequestID(prefixedID string) (string, string, error)

UnprefixRequestID extracts the client ID and original request ID from a prefixed request ID. Returns an error if the format is invalid.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a connected WebSocket client.

type ClientInfo

type ClientInfo struct {
	ClientID string
	Scopes   []string
}

ClientInfo represents authenticated client information.

type ClientMap

type ClientMap struct {
	// contains filtered or unexported fields
}

ClientMap provides a thread-safe map for managing WebSocket clients.

func NewClientMap

func NewClientMap() *ClientMap

NewClientMap creates a new ClientMap.

func (*ClientMap) Count

func (cm *ClientMap) Count() int

Count returns the number of clients in the map.

func (*ClientMap) Delete

func (cm *ClientMap) Delete(id string)

Delete removes a client from the map.

func (*ClientMap) Load

func (cm *ClientMap) Load(id string) (*Client, bool)

Load retrieves a client from the map.

func (*ClientMap) Range

func (cm *ClientMap) Range(function func(id string, client *Client) bool)

Range calls f for each client in the map.

func (*ClientMap) Store

func (cm *ClientMap) Store(id string, client *Client)

Store stores a client in the map.

type DataItem

type DataItem struct {
	Service   string          `json:"service"`
	Timestamp int64           `json:"timestamp"`
	Command   string          `json:"command"`
	Content   json.RawMessage `json:"content"`
}

DataItem represents streaming data.

type Metadata

type Metadata struct {
	CorrelID    string    `json:"correl_id"`
	CustomerID  string    `json:"customer_id"`
	Channel     string    `json:"channel"`
	FunctionID  string    `json:"function_id"`
	WSEndpoint  string    `json:"ws_endpoint"`
	ExtractedAt time.Time `json:"extracted_at"`

	TTL time.Duration
}

Metadata represents streaming service metadata.

type MetadataManager

type MetadataManager struct {
	// contains filtered or unexported fields
}

MetadataManager manages streaming metadata with caching.

func NewMetadataManager

func NewMetadataManager(refreshFunc func() (*Metadata, error)) *MetadataManager

NewMetadataManager creates a metadata manager.

func (*MetadataManager) GetMetadata

func (m *MetadataManager) GetMetadata() (*Metadata, error)

GetMetadata returns current metadata, refreshing if needed.

type NotifyItem

type NotifyItem struct {
	Service   string          `json:"service,omitempty"`
	Timestamp int64           `json:"timestamp,omitempty"`
	Content   json.RawMessage `json:"content,omitempty"`
	Heartbeat string          `json:"heartbeat,omitempty"`
}

NotifyItem represents a notification.

type Proxy

type Proxy struct {
	// contains filtered or unexported fields
}

Proxy manages WebSocket connections between clients and Schwab.

func NewProxy

func NewProxy(
	tokenManager auth.TokenServicer,
	authServer *auth.Server,
	metadataFunc func() (*Metadata, error),
) *Proxy

NewProxy creates a new streaming proxy.

func (*Proxy) GetClientCount

func (sp *Proxy) GetClientCount() int

GetClientCount returns the number of connected clients.

func (*Proxy) GetConnectionState

func (sp *Proxy) GetConnectionState() string

GetConnectionState returns the current master connection state.

func (*Proxy) GetLastHeartbeat

func (sp *Proxy) GetLastHeartbeat() time.Time

GetLastHeartbeat returns the last heartbeat time (not implemented in simplified version).

func (*Proxy) HandleWebSocket

func (sp *Proxy) HandleWebSocket(writer http.ResponseWriter, req *http.Request)

HandleWebSocket handles incoming WebSocket connections from clients.

func (*Proxy) Shutdown

func (sp *Proxy) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the streaming proxy.

func (*Proxy) Start

func (sp *Proxy) Start(ctx context.Context) error

Start begins the streaming proxy operations.

type Request

type Request struct {
	Service    string         `json:"service"`
	Command    string         `json:"command"`
	RequestID  string         `json:"requestid"`
	Parameters map[string]any `json:"parameters,omitempty"`

	//nolint:tagliatelle // Required by Schwab API
	SchwabClientCustomerID string `json:"SchwabClientCustomerId,omitempty"`

	//nolint:tagliatelle // Required by Schwab API
	SchwabClientCorrelID string `json:"SchwabClientCorrelId,omitempty"`
}

Request represents a streaming API request.

type RequestBatch

type RequestBatch struct {
	Requests []Request `json:"requests"`
}

RequestBatch wraps multiple requests.

type Response

type Response struct {
	Response []ResponseItem `json:"response,omitempty"`
	Data     []DataItem     `json:"data,omitempty"`
	Notify   []NotifyItem   `json:"notify,omitempty"`
}

Response represents a streaming API response.

type ResponseContent

type ResponseContent struct {
	Code int    `json:"code"`
	Msg  string `json:"msg,omitempty"`
}

ResponseContent represents response content.

type ResponseItem

type ResponseItem struct {
	Service   string          `json:"service"`
	RequestID string          `json:"requestid"`
	Command   string          `json:"command"`
	Timestamp int64           `json:"timestamp"`
	Content   ResponseContent `json:"content"`

	//nolint:tagliatelle // Required by Schwab API
	SchwabClientCorrelID string `json:"SchwabClientCorrelId,omitempty"`
}

ResponseItem represents a single response.

type StreamerInfo

type StreamerInfo struct {
	//nolint:tagliatelle // Schwab API response structure
	StreamerSocketURL string `json:"streamerSocketUrl"`

	//nolint:tagliatelle // Schwab API response structure
	SchwabClientCustomerID string `json:"schwabClientCustomerId"`

	//nolint:tagliatelle // Schwab API response structure
	SchwabClientCorrelID string `json:"schwabClientCorrelId"`

	//nolint:tagliatelle // Schwab API response structure
	SchwabClientChannel string `json:"schwabClientChannel"`

	//nolint:tagliatelle // Schwab API response structure
	SchwabClientFunctionID string `json:"schwabClientFunctionId"`
}

StreamerInfo contains streaming configuration.

type UserPreferencesResponse

type UserPreferencesResponse struct {
	//nolint:tagliatelle // Schwab API response structure
	StreamerInfo []StreamerInfo `json:"streamerInfo"`

	// Extras stores any additional fields returned by the Schwab API so they
	// are preserved when the response is re-encoded and forwarded.
	Extras map[string]json.RawMessage `json:"-"`
}

UserPreferencesResponse represents the user preferences API response.

func (UserPreferencesResponse) MarshalJSON added in v0.3.1

func (u UserPreferencesResponse) MarshalJSON() ([]byte, error)

MarshalJSON writes streamerInfo along with any preserved extra fields.

func (*UserPreferencesResponse) UnmarshalJSON added in v0.3.1

func (u *UserPreferencesResponse) UnmarshalJSON(data []byte) error

UnmarshalJSON captures the streamerInfo plus any other fields in the payload.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL