Documentation
¶
Overview ¶
RPC PubSub
Index ¶
- Constants
- Variables
- func GenerateRpcCommandDeclMap() map[string]*RpcMethodDecl
- func GetIsCanceledFromContext(ctx context.Context) bool
- func GetRpcSourceFromContext(ctx context.Context) string
- func InitBroker()
- func MakeConnectionRouteId(connId string) string
- func MakeControllerRouteId(blockId string) string
- func MakeFeBlockRouteId(blockId string) string
- func MakeMethodMapForImpl(impl any, declMap map[string]*RpcMethodDecl) map[string]reflect.Method
- func MakeProcRouteId(procId string) string
- func MakeTabRouteId(tabId string) string
- type AbstractRpcClient
- type BrokerSubscription
- type BrokerType
- func (b *BrokerType) GetClient() Client
- func (b *BrokerType) Publish(event EventType)
- func (b *BrokerType) ReadEventHistory(eventType string, scope string, maxItems int) []*EventType
- func (b *BrokerType) SetClient(client Client)
- func (b *BrokerType) Subscribe(subRouteId string, sub SubscriptionRequest)
- func (b *BrokerType) Unsubscribe(subRouteId string, eventName string)
- func (b *BrokerType) UnsubscribeAll(subRouteId string)
- type Client
- type CommandHandlerFnType
- type EventListener
- type EventType
- type ExpMap
- type RpcClient
- func (w *RpcClient) GetAuthToken() string
- func (w *RpcClient) IsServerDone() bool
- func (w *RpcClient) RecvRpcMessage() ([]byte, bool)
- func (w *RpcClient) SendCommand(command string, data any, opts *RpcOpts) error
- func (w *RpcClient) SendComplexRequest(command string, data any, opts *RpcOpts) (rtnHandler *RpcRequestHandler, rtnErr error)
- func (w *RpcClient) SendRpcMessage(msg []byte)
- func (w *RpcClient) SendRpcRequest(command string, data any, opts *RpcOpts) (any, error)
- func (w *RpcClient) SetAuthToken(token string)
- func (w *RpcClient) SetServerImpl(serverImpl RpcServerImpl)
- type RpcMessage
- type RpcMethodDecl
- type RpcOpts
- type RpcRequestHandler
- type RpcResponseHandler
- func (handler *RpcResponseHandler) Context() context.Context
- func (handler *RpcResponseHandler) Finalize()
- func (handler *RpcResponseHandler) GetCommand() string
- func (handler *RpcResponseHandler) GetCommandRawData() any
- func (handler *RpcResponseHandler) GetSource() string
- func (handler *RpcResponseHandler) IsCanceled() bool
- func (handler *RpcResponseHandler) IsDone() bool
- func (handler *RpcResponseHandler) NeedsResponse() bool
- func (handler *RpcResponseHandler) SendMessage(msg string)
- func (handler *RpcResponseHandler) SendResponse(data any, done bool) error
- func (handler *RpcResponseHandler) SendResponseError(err error)
- type RpcServerImpl
- type SubscriptionRequest
- type WshRouter
- func (router *WshRouter) GetRouteKeys() []string
- func (router *WshRouter) GetRpc(routeId string) AbstractRpcClient
- func (router *WshRouter) GetUpstreamClient() AbstractRpcClient
- func (router *WshRouter) InjectMessage(msgBytes []byte, fromRouteId string)
- func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient, shouldAnnounce bool)
- func (router *WshRouter) RunSimpleRawCommand(ctx context.Context, msg RpcMessage, fromRouteId string) (*RpcMessage, error)
- func (router *WshRouter) SendEvent(routeId string, event EventType)
- func (router *WshRouter) SetUpstreamClient(rpc AbstractRpcClient)
- func (router *WshRouter) UnregisterRoute(routeId string)
- func (router *WshRouter) WaitForRegister(ctx context.Context, routeId string) error
- type WshRpcProxy
Constants ¶
View Source
const ( RpcType_Call = "call" // single response (regular rpc) RpcType_ResponseStream = "responsestream" // stream of responses (streaming rpc) RpcType_StreamingRequest = "streamingrequest" // streaming request RpcType_Complex = "complex" // streaming request/response )
View Source
const ( DefaultRoute = "outrigsrv" BareClientRoute = "outrigsrv-client" UpstreamRoute = "upstream" SysRoute = "sys" // this route doesn't exist, just a placeholder for system messages ElectronRoute = "electron" RoutePrefix_Conn = "conn:" RoutePrefix_Controller = "controller:" RoutePrefix_Proc = "proc:" RoutePrefix_Tab = "tab:" RoutePrefix_FeBlock = "feblock:" )
View Source
const CtxDoneChSize = 10
View Source
const DefaultInputChSize = 32
View Source
const DefaultMessageChSize = 32
View Source
const DefaultOutputChSize = 32
View Source
const DefaultTimeoutMs = 5000
View Source
const MaxPersist = 4096
View Source
const ReMakeArrThreshold = 10 * 1024
View Source
const RespChSize = 32
Variables ¶
View Source
var Broker = &BrokerType{ Lock: &sync.Mutex{}, SubMap: make(map[string]*BrokerSubscription), PersistMap: make(map[persistKey]*persistEventWrap), }
View Source
var WshCommandDeclMap = GenerateRpcCommandDeclMap()
Functions ¶
func GenerateRpcCommandDeclMap ¶
func GenerateRpcCommandDeclMap() map[string]*RpcMethodDecl
func GetRpcSourceFromContext ¶
func InitBroker ¶
func InitBroker()
func MakeConnectionRouteId ¶
func MakeControllerRouteId ¶
func MakeFeBlockRouteId ¶
func MakeMethodMapForImpl ¶
func MakeProcRouteId ¶
func MakeTabRouteId ¶
Types ¶
type AbstractRpcClient ¶
type BrokerSubscription ¶
type BrokerSubscription struct {
AllSubs []string // routeids subscribed to "all" events
ScopeSubs map[string][]string // routeids subscribed to specific scopes
StarSubs map[string][]string // routeids subscribed to star scope (scopes with "*" or "**" in them)
}
func (*BrokerSubscription) IsEmpty ¶
func (bs *BrokerSubscription) IsEmpty() bool
type BrokerType ¶
type BrokerType struct {
Lock *sync.Mutex
Client Client
SubMap map[string]*BrokerSubscription
PersistMap map[persistKey]*persistEventWrap
}
func (*BrokerType) GetClient ¶
func (b *BrokerType) GetClient() Client
func (*BrokerType) Publish ¶
func (b *BrokerType) Publish(event EventType)
func (*BrokerType) ReadEventHistory ¶
func (b *BrokerType) ReadEventHistory(eventType string, scope string, maxItems int) []*EventType
does not take wildcards, use "" for all
func (*BrokerType) SetClient ¶
func (b *BrokerType) SetClient(client Client)
func (*BrokerType) Subscribe ¶
func (b *BrokerType) Subscribe(subRouteId string, sub SubscriptionRequest)
if already subscribed, this will *resubscribe* with the new subscription (remove the old one, and replace with this one)
func (*BrokerType) Unsubscribe ¶
func (b *BrokerType) Unsubscribe(subRouteId string, eventName string)
func (*BrokerType) UnsubscribeAll ¶
func (b *BrokerType) UnsubscribeAll(subRouteId string)
type CommandHandlerFnType ¶
type CommandHandlerFnType = func(*RpcResponseHandler) bool
returns true if handler is complete, false for an async handler
type EventListener ¶
func MakeEventListener ¶
func MakeEventListener() *EventListener
func (*EventListener) On ¶
func (el *EventListener) On(eventName string, fn func(*EventType)) string
func (*EventListener) RecvEvent ¶
func (el *EventListener) RecvEvent(e *EventType)
func (*EventListener) Unregister ¶
func (el *EventListener) Unregister(eventName string, id string)
type ExpMap ¶
type ExpMap[T any] struct { // contains filtered or unexported fields }
func MakeExpMap ¶
type RpcClient ¶
type RpcClient struct {
Lock *sync.Mutex
InputCh chan []byte
OutputCh chan []byte
CtxDoneCh chan string // for context cancellation, value is ResId
AuthToken string
RpcMap map[string]*rpcData
ServerImpl RpcServerImpl
EventListener *EventListener
ResponseHandlerMap map[string]*RpcResponseHandler // reqId => handler
Debug bool
DebugName string
ServerDone bool
}
func GetRpcClientFromContext ¶
func MakeRpcClient ¶
func MakeRpcClient(inputCh chan []byte, outputCh chan []byte, serverImpl RpcServerImpl, debugName string) *RpcClient
closes outputCh when inputCh is closed/done
func (*RpcClient) GetAuthToken ¶
func (*RpcClient) IsServerDone ¶
func (*RpcClient) RecvRpcMessage ¶
func (*RpcClient) SendCommand ¶
no response
func (*RpcClient) SendComplexRequest ¶
func (*RpcClient) SendRpcMessage ¶
func (*RpcClient) SendRpcRequest ¶
single response
func (*RpcClient) SetAuthToken ¶
func (*RpcClient) SetServerImpl ¶
func (w *RpcClient) SetServerImpl(serverImpl RpcServerImpl)
type RpcMessage ¶
type RpcMessage struct {
Command string `json:"command,omitempty"`
ReqId string `json:"reqid,omitempty"`
ResId string `json:"resid,omitempty"`
Timeout int64 `json:"timeout,omitempty"`
Route string `json:"route,omitempty"` // to route/forward requests to alternate servers
AuthToken string `json:"authtoken,omitempty"` // needed for routing unauthenticated requests (RpcMultiProxy)
Source string `json:"source,omitempty"` // source route id
Cont bool `json:"cont,omitempty"` // flag if additional requests/responses are forthcoming
Cancel bool `json:"cancel,omitempty"` // used to cancel a streaming request or response (sent from the side that is not streaming)
Error string `json:"error,omitempty"`
DataType string `json:"datatype,omitempty"`
Data any `json:"data,omitempty"`
}
func (*RpcMessage) IsRpcRequest ¶
func (r *RpcMessage) IsRpcRequest() bool
func (*RpcMessage) Validate ¶
func (r *RpcMessage) Validate() error
type RpcMethodDecl ¶
type RpcRequestHandler ¶
type RpcRequestHandler struct {
// contains filtered or unexported fields
}
func (*RpcRequestHandler) Context ¶
func (handler *RpcRequestHandler) Context() context.Context
func (*RpcRequestHandler) NextResponse ¶
func (handler *RpcRequestHandler) NextResponse() (any, error)
func (*RpcRequestHandler) ResponseDone ¶
func (handler *RpcRequestHandler) ResponseDone() bool
func (*RpcRequestHandler) SendCancel ¶
func (handler *RpcRequestHandler) SendCancel()
type RpcResponseHandler ¶
type RpcResponseHandler struct {
// contains filtered or unexported fields
}
func GetRpcResponseHandlerFromContext ¶
func GetRpcResponseHandlerFromContext(ctx context.Context) *RpcResponseHandler
func (*RpcResponseHandler) Context ¶
func (handler *RpcResponseHandler) Context() context.Context
func (*RpcResponseHandler) Finalize ¶
func (handler *RpcResponseHandler) Finalize()
if async, caller must call finalize
func (*RpcResponseHandler) GetCommand ¶
func (handler *RpcResponseHandler) GetCommand() string
func (*RpcResponseHandler) GetCommandRawData ¶
func (handler *RpcResponseHandler) GetCommandRawData() any
func (*RpcResponseHandler) GetSource ¶
func (handler *RpcResponseHandler) GetSource() string
func (*RpcResponseHandler) IsCanceled ¶
func (handler *RpcResponseHandler) IsCanceled() bool
func (*RpcResponseHandler) IsDone ¶
func (handler *RpcResponseHandler) IsDone() bool
func (*RpcResponseHandler) NeedsResponse ¶
func (handler *RpcResponseHandler) NeedsResponse() bool
func (*RpcResponseHandler) SendMessage ¶
func (handler *RpcResponseHandler) SendMessage(msg string)
func (*RpcResponseHandler) SendResponse ¶
func (handler *RpcResponseHandler) SendResponse(data any, done bool) error
func (*RpcResponseHandler) SendResponseError ¶
func (handler *RpcResponseHandler) SendResponseError(err error)
type RpcServerImpl ¶
type RpcServerImpl interface {
RpcServerImpl()
}
type SubscriptionRequest ¶
type SubscriptionRequest = rpctypes.SubscriptionRequest
type WshRouter ¶
type WshRouter struct {
Lock *sync.Mutex
RouteMap map[string]AbstractRpcClient // routeid => client
UpstreamClient AbstractRpcClient // upstream client (if we are not the terminal router)
AnnouncedRoutes map[string]string // routeid => local routeid
RpcMap map[string]*routeInfo // rpcid => routeinfo
SimpleRequestMap map[string]chan *RpcMessage // simple reqid => response channel
InputCh chan msgAndRoute
}
func GetDefaultRouter ¶
func GetDefaultRouter() *WshRouter
GetDefaultRouter returns the DefaultRouter, initializing it if needed
func NewWshRouter ¶
func NewWshRouter() *WshRouter
func (*WshRouter) GetRouteKeys ¶
GetRouteKeys returns a sorted list of all route keys in the router
func (*WshRouter) GetRpc ¶
func (router *WshRouter) GetRpc(routeId string) AbstractRpcClient
this may return nil (returns default only for empty routeId)
func (*WshRouter) GetUpstreamClient ¶
func (router *WshRouter) GetUpstreamClient() AbstractRpcClient
func (*WshRouter) InjectMessage ¶
func (*WshRouter) RegisterRoute ¶
func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient, shouldAnnounce bool)
this will also consume the output channel of the abstract client
func (*WshRouter) RunSimpleRawCommand ¶
func (router *WshRouter) RunSimpleRawCommand(ctx context.Context, msg RpcMessage, fromRouteId string) (*RpcMessage, error)
func (*WshRouter) SetUpstreamClient ¶
func (router *WshRouter) SetUpstreamClient(rpc AbstractRpcClient)
func (*WshRouter) UnregisterRoute ¶
type WshRpcProxy ¶
func MakeRpcProxy ¶
func MakeRpcProxy() *WshRpcProxy
func (*WshRpcProxy) RecvRpcMessage ¶
func (p *WshRpcProxy) RecvRpcMessage() ([]byte, bool)
func (*WshRpcProxy) SendRpcMessage ¶
func (p *WshRpcProxy) SendRpcMessage(msg []byte)
Click to show internal directories.
Click to hide internal directories.