rpc

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

RPC PubSub

Index

Constants

View Source
const (
	RpcType_Call             = "call"             // single response (regular rpc)
	RpcType_ResponseStream   = "responsestream"   // stream of responses (streaming rpc)
	RpcType_StreamingRequest = "streamingrequest" // streaming request
	RpcType_Complex          = "complex"          // streaming request/response
)
View Source
const (
	DefaultRoute    = "outrigsrv"
	BareClientRoute = "outrigsrv-client"
	UpstreamRoute   = "upstream"
	SysRoute        = "sys" // this route doesn't exist, just a placeholder for system messages
	ElectronRoute   = "electron"

	RoutePrefix_Conn       = "conn:"
	RoutePrefix_Controller = "controller:"
	RoutePrefix_Proc       = "proc:"
	RoutePrefix_Tab        = "tab:"
	RoutePrefix_FeBlock    = "feblock:"
)
View Source
const CtxDoneChSize = 10
View Source
const DefaultInputChSize = 32
View Source
const DefaultMessageChSize = 32
View Source
const DefaultOutputChSize = 32
View Source
const DefaultTimeoutMs = 5000
View Source
const MaxPersist = 4096
View Source
const ReMakeArrThreshold = 10 * 1024
View Source
const RespChSize = 32

Variables

View Source
var Broker = &BrokerType{
	Lock:       &sync.Mutex{},
	SubMap:     make(map[string]*BrokerSubscription),
	PersistMap: make(map[persistKey]*persistEventWrap),
}
View Source
var WshCommandDeclMap = GenerateRpcCommandDeclMap()

Functions

func GenerateRpcCommandDeclMap

func GenerateRpcCommandDeclMap() map[string]*RpcMethodDecl

func GetIsCanceledFromContext

func GetIsCanceledFromContext(ctx context.Context) bool

func GetRpcSourceFromContext

func GetRpcSourceFromContext(ctx context.Context) string

func InitBroker

func InitBroker()

func MakeConnectionRouteId

func MakeConnectionRouteId(connId string) string

func MakeControllerRouteId

func MakeControllerRouteId(blockId string) string

func MakeFeBlockRouteId

func MakeFeBlockRouteId(blockId string) string

func MakeMethodMapForImpl

func MakeMethodMapForImpl(impl any, declMap map[string]*RpcMethodDecl) map[string]reflect.Method

func MakeProcRouteId

func MakeProcRouteId(procId string) string

func MakeTabRouteId

func MakeTabRouteId(tabId string) string

Types

type AbstractRpcClient

type AbstractRpcClient interface {
	SendRpcMessage(msg []byte)
	RecvRpcMessage() ([]byte, bool) // blocking
}

type BrokerSubscription

type BrokerSubscription struct {
	AllSubs   []string            // routeids subscribed to "all" events
	ScopeSubs map[string][]string // routeids subscribed to specific scopes
	StarSubs  map[string][]string // routeids subscribed to star scope (scopes with "*" or "**" in them)
}

func (*BrokerSubscription) IsEmpty

func (bs *BrokerSubscription) IsEmpty() bool

type BrokerType

type BrokerType struct {
	Lock       *sync.Mutex
	Client     Client
	SubMap     map[string]*BrokerSubscription
	PersistMap map[persistKey]*persistEventWrap
}

func (*BrokerType) GetClient

func (b *BrokerType) GetClient() Client

func (*BrokerType) Publish

func (b *BrokerType) Publish(event EventType)

func (*BrokerType) ReadEventHistory

func (b *BrokerType) ReadEventHistory(eventType string, scope string, maxItems int) []*EventType

does not take wildcards, use "" for all

func (*BrokerType) SetClient

func (b *BrokerType) SetClient(client Client)

func (*BrokerType) Subscribe

func (b *BrokerType) Subscribe(subRouteId string, sub SubscriptionRequest)

if already subscribed, this will *resubscribe* with the new subscription (remove the old one, and replace with this one)

func (*BrokerType) Unsubscribe

func (b *BrokerType) Unsubscribe(subRouteId string, eventName string)

func (*BrokerType) UnsubscribeAll

func (b *BrokerType) UnsubscribeAll(subRouteId string)

type Client

type Client interface {
	SendEvent(routeId string, event EventType)
}

type CommandHandlerFnType

type CommandHandlerFnType = func(*RpcResponseHandler) bool

returns true if handler is complete, false for an async handler

type EventListener

type EventListener struct {
	Lock      *sync.Mutex
	Listeners map[string][]singleListener
}

func MakeEventListener

func MakeEventListener() *EventListener

func (*EventListener) On

func (el *EventListener) On(eventName string, fn func(*EventType)) string

func (*EventListener) RecvEvent

func (el *EventListener) RecvEvent(e *EventType)

func (*EventListener) Unregister

func (el *EventListener) Unregister(eventName string, id string)

type EventType

type EventType = rpctypes.EventType

type ExpMap

type ExpMap[T any] struct {
	// contains filtered or unexported fields
}

func MakeExpMap

func MakeExpMap[T any]() *ExpMap[T]

func (*ExpMap[T]) Get

func (em *ExpMap[T]) Get(key string) (T, bool)

func (*ExpMap[T]) Set

func (em *ExpMap[T]) Set(key string, value T, exp time.Time)

type RpcClient

type RpcClient struct {
	Lock               *sync.Mutex
	InputCh            chan []byte
	OutputCh           chan []byte
	CtxDoneCh          chan string // for context cancellation, value is ResId
	AuthToken          string
	RpcMap             map[string]*rpcData
	ServerImpl         RpcServerImpl
	EventListener      *EventListener
	ResponseHandlerMap map[string]*RpcResponseHandler // reqId => handler
	Debug              bool
	DebugName          string
	ServerDone         bool
}

func GetRpcClientFromContext

func GetRpcClientFromContext(ctx context.Context) *RpcClient

func MakeRpcClient

func MakeRpcClient(inputCh chan []byte, outputCh chan []byte, serverImpl RpcServerImpl, debugName string) *RpcClient

closes outputCh when inputCh is closed/done

func (*RpcClient) GetAuthToken

func (w *RpcClient) GetAuthToken() string

func (*RpcClient) IsServerDone

func (w *RpcClient) IsServerDone() bool

func (*RpcClient) RecvRpcMessage

func (w *RpcClient) RecvRpcMessage() ([]byte, bool)

func (*RpcClient) SendCommand

func (w *RpcClient) SendCommand(command string, data any, opts *RpcOpts) error

no response

func (*RpcClient) SendComplexRequest

func (w *RpcClient) SendComplexRequest(command string, data any, opts *RpcOpts) (rtnHandler *RpcRequestHandler, rtnErr error)

func (*RpcClient) SendRpcMessage

func (w *RpcClient) SendRpcMessage(msg []byte)

func (*RpcClient) SendRpcRequest

func (w *RpcClient) SendRpcRequest(command string, data any, opts *RpcOpts) (any, error)

single response

func (*RpcClient) SetAuthToken

func (w *RpcClient) SetAuthToken(token string)

func (*RpcClient) SetServerImpl

func (w *RpcClient) SetServerImpl(serverImpl RpcServerImpl)

type RpcMessage

type RpcMessage struct {
	Command   string `json:"command,omitempty"`
	ReqId     string `json:"reqid,omitempty"`
	ResId     string `json:"resid,omitempty"`
	Timeout   int64  `json:"timeout,omitempty"`
	Route     string `json:"route,omitempty"`     // to route/forward requests to alternate servers
	AuthToken string `json:"authtoken,omitempty"` // needed for routing unauthenticated requests (RpcMultiProxy)
	Source    string `json:"source,omitempty"`    // source route id
	Cont      bool   `json:"cont,omitempty"`      // flag if additional requests/responses are forthcoming
	Cancel    bool   `json:"cancel,omitempty"`    // used to cancel a streaming request or response (sent from the side that is not streaming)
	Error     string `json:"error,omitempty"`
	DataType  string `json:"datatype,omitempty"`
	Data      any    `json:"data,omitempty"`
}

func (*RpcMessage) IsRpcRequest

func (r *RpcMessage) IsRpcRequest() bool

func (*RpcMessage) Validate

func (r *RpcMessage) Validate() error

type RpcMethodDecl

type RpcMethodDecl struct {
	Command                 string
	CommandType             string
	MethodName              string
	CommandDataType         reflect.Type
	DefaultResponseDataType reflect.Type
}

type RpcOpts

type RpcOpts struct {
	Timeout        int64  `json:"timeout,omitempty"`
	NoResponse     bool   `json:"noresponse,omitempty"`
	Route          string `json:"route,omitempty"`
	StreamCancelFn func() `json:"-"` // this is an *output* parameter, set by the handler
}

type RpcRequestHandler

type RpcRequestHandler struct {
	// contains filtered or unexported fields
}

func (*RpcRequestHandler) Context

func (handler *RpcRequestHandler) Context() context.Context

func (*RpcRequestHandler) NextResponse

func (handler *RpcRequestHandler) NextResponse() (any, error)

func (*RpcRequestHandler) ResponseDone

func (handler *RpcRequestHandler) ResponseDone() bool

func (*RpcRequestHandler) SendCancel

func (handler *RpcRequestHandler) SendCancel()

type RpcResponseHandler

type RpcResponseHandler struct {
	// contains filtered or unexported fields
}

func GetRpcResponseHandlerFromContext

func GetRpcResponseHandlerFromContext(ctx context.Context) *RpcResponseHandler

func (*RpcResponseHandler) Context

func (handler *RpcResponseHandler) Context() context.Context

func (*RpcResponseHandler) Finalize

func (handler *RpcResponseHandler) Finalize()

if async, caller must call finalize

func (*RpcResponseHandler) GetCommand

func (handler *RpcResponseHandler) GetCommand() string

func (*RpcResponseHandler) GetCommandRawData

func (handler *RpcResponseHandler) GetCommandRawData() any

func (*RpcResponseHandler) GetSource

func (handler *RpcResponseHandler) GetSource() string

func (*RpcResponseHandler) IsCanceled

func (handler *RpcResponseHandler) IsCanceled() bool

func (*RpcResponseHandler) IsDone

func (handler *RpcResponseHandler) IsDone() bool

func (*RpcResponseHandler) NeedsResponse

func (handler *RpcResponseHandler) NeedsResponse() bool

func (*RpcResponseHandler) SendMessage

func (handler *RpcResponseHandler) SendMessage(msg string)

func (*RpcResponseHandler) SendResponse

func (handler *RpcResponseHandler) SendResponse(data any, done bool) error

func (*RpcResponseHandler) SendResponseError

func (handler *RpcResponseHandler) SendResponseError(err error)

type RpcServerImpl

type RpcServerImpl interface {
	RpcServerImpl()
}

type SubscriptionRequest

type SubscriptionRequest = rpctypes.SubscriptionRequest

type WshRouter

type WshRouter struct {
	Lock             *sync.Mutex
	RouteMap         map[string]AbstractRpcClient // routeid => client
	UpstreamClient   AbstractRpcClient            // upstream client (if we are not the terminal router)
	AnnouncedRoutes  map[string]string            // routeid => local routeid
	RpcMap           map[string]*routeInfo        // rpcid => routeinfo
	SimpleRequestMap map[string]chan *RpcMessage  // simple reqid => response channel
	InputCh          chan msgAndRoute
}

func GetDefaultRouter

func GetDefaultRouter() *WshRouter

GetDefaultRouter returns the DefaultRouter, initializing it if needed

func NewWshRouter

func NewWshRouter() *WshRouter

func (*WshRouter) GetRouteKeys

func (router *WshRouter) GetRouteKeys() []string

GetRouteKeys returns a sorted list of all route keys in the router

func (*WshRouter) GetRpc

func (router *WshRouter) GetRpc(routeId string) AbstractRpcClient

this may return nil (returns default only for empty routeId)

func (*WshRouter) GetUpstreamClient

func (router *WshRouter) GetUpstreamClient() AbstractRpcClient

func (*WshRouter) InjectMessage

func (router *WshRouter) InjectMessage(msgBytes []byte, fromRouteId string)

func (*WshRouter) RegisterRoute

func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient, shouldAnnounce bool)

this will also consume the output channel of the abstract client

func (*WshRouter) RunSimpleRawCommand

func (router *WshRouter) RunSimpleRawCommand(ctx context.Context, msg RpcMessage, fromRouteId string) (*RpcMessage, error)

func (*WshRouter) SendEvent

func (router *WshRouter) SendEvent(routeId string, event EventType)

func (*WshRouter) SetUpstreamClient

func (router *WshRouter) SetUpstreamClient(rpc AbstractRpcClient)

func (*WshRouter) UnregisterRoute

func (router *WshRouter) UnregisterRoute(routeId string)

func (*WshRouter) WaitForRegister

func (router *WshRouter) WaitForRegister(ctx context.Context, routeId string) error

type WshRpcProxy

type WshRpcProxy struct {
	Lock         *sync.Mutex
	ToRemoteCh   chan []byte
	FromRemoteCh chan []byte
}

func MakeRpcProxy

func MakeRpcProxy() *WshRpcProxy

func (*WshRpcProxy) RecvRpcMessage

func (p *WshRpcProxy) RecvRpcMessage() ([]byte, bool)

func (*WshRpcProxy) SendRpcMessage

func (p *WshRpcProxy) SendRpcMessage(msg []byte)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL