wshutil

package
v0.14.0-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2026 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Mode_Normal  = "normal"
	Mode_Esc     = "esc"
	Mode_WaveEsc = "waveesc"
)
View Source
const (
	DefaultRoute     = "wavesrv"
	ElectronRoute    = "electron"
	ControlRoute     = "$control"      // control plane route
	ControlRootRoute = "$control:root" // control plane route to root router

	ControlPrefix = "$"

	RoutePrefix_Conn       = "conn:"
	RoutePrefix_Controller = "controller:"
	RoutePrefix_Proc       = "proc:"
	RoutePrefix_Tab        = "tab:"
	RoutePrefix_FeBlock    = "feblock:"
	RoutePrefix_Builder    = "builder:"
	RoutePrefix_Link       = "link:"
	RoutePrefix_Job        = "job:"
	RoutePrefix_Bare       = "bare:"
)
View Source
const BEL = 0x07
View Source
const CtxDoneChSize = 10
View Source
const DefaultInputChSize = 32
View Source
const DefaultMessageChSize = 32
View Source
const DefaultOutputChSize = 32
View Source
const DefaultTimeoutMs = 5000
View Source
const ESC = 0x1b
View Source
const HexChars = "0123456789ABCDEF"
View Source
const LinkKind_Leaf = "leaf"
View Source
const LinkKind_Router = "router"
View Source
const MaxBufferedDataSize = 256 * 1024
View Source
const RespChSize = 32
View Source
const RouterInputChQueueSize = 100
View Source
const ST = 0x9c
View Source
const WaveJwtTokenVarName = wavebase.WaveJwtTokenVarName
View Source
const WaveOSC = "23198"

these should both be 5 characters

View Source
const WaveOSCPrefix = "\x1b]" + WaveOSC + ";"
View Source
const WaveOSCPrefixLen = 5 + 3 // \x1b] + WaveOSC + ; + \x07
View Source
const WaveServerOSC = "23199"
View Source
const WaveServerOSCPrefix = "\x1b]" + WaveServerOSC + ";"

Variables

View Source
var BacklogLogThresholds = map[int]bool{1: true, 5: true, 10: true, 20: true, 30: true, 40: true, 50: true, 100: true, 200: true, 500: true, 1000: true}
View Source
var WshCommandDeclMap = wshrpc.GenerateWshCommandDeclMap()

Functions

func AdaptMsgChToPty

func AdaptMsgChToPty(outputCh chan []byte, oscEsc string, output io.Writer) error

func AdaptOutputChToStream

func AdaptOutputChToStream(outputCh chan []byte, output io.Writer) error

func AdaptStreamToMsgCh

func AdaptStreamToMsgCh(input io.Reader, output chan baseds.RpcInputChType, readCallback func()) error

func DoShutdown

func DoShutdown(reason string, exitCode int, quiet bool)

func EncodeWaveOSCBytes

func EncodeWaveOSCBytes(oscNum string, barr []byte) ([]byte, error)

func EncodeWaveOSCMessageEx

func EncodeWaveOSCMessageEx(oscNum string, msg *RpcMessage) ([]byte, error)

func ExtractUnverifiedRpcContext

func ExtractUnverifiedRpcContext(tokenStr string) (*wshrpc.RpcContext, error)

only for use on client

func ExtractUnverifiedSocketName

func ExtractUnverifiedSocketName(tokenStr string) (string, error)

only for use on client

func GetInfo added in v0.11.0

func GetInfo() wshrpc.RemoteInfo

func GetIsCanceledFromContext

func GetIsCanceledFromContext(ctx context.Context) bool

func GetRpcSourceFromContext

func GetRpcSourceFromContext(ctx context.Context) string

func HandleStdIOClient added in v0.9.0

func HandleStdIOClient(logName string, input chan utilfn.LineOutput, output io.Writer)

blocking, returns if there is an error, or on EOF of input

func InstallRcFiles added in v0.11.0

func InstallRcFiles() error

func MakeBuilderRouteId added in v0.12.2

func MakeBuilderRouteId(builderId string) string

func MakeClientJWTToken

func MakeClientJWTToken(rpcCtx wshrpc.RpcContext) (string, error)

func MakeConnectionRouteId

func MakeConnectionRouteId(connId string) string

func MakeControllerRouteId

func MakeControllerRouteId(blockId string) string

func MakeFeBlockRouteId added in v0.9.0

func MakeFeBlockRouteId(blockId string) string

func MakeJobRouteId added in v0.14.0

func MakeJobRouteId(jobId string) string

func MakeLinkRouteId added in v0.14.0

func MakeLinkRouteId(linkId baseds.LinkId) string

func MakeProcRouteId

func MakeProcRouteId(procId string) string

func MakeRandomProcRouteId added in v0.14.0

func MakeRandomProcRouteId() string

func MakeTabRouteId added in v0.9.0

func MakeTabRouteId(tabId string) string

func RespErr added in v0.11.0

func RespErr[T any](err error) wshrpc.RespOrErrorUnion[T]

func RunWshRpcOverListener

func RunWshRpcOverListener(listener net.Listener, readCallback func())

func SendErrCh added in v0.11.0

func SendErrCh[T any](err error) <-chan wshrpc.RespOrErrorUnion[T]

func ValidateAndExtractRpcContextFromToken

func ValidateAndExtractRpcContextFromToken(tokenStr string) (*wshrpc.RpcContext, error)

Types

type AbstractRpcClient

type AbstractRpcClient interface {
	GetPeerInfo() string
	SendRpcMessage(msg []byte, ingressLinkId baseds.LinkId, debugStr string) bool
	RecvRpcMessage() ([]byte, bool) // blocking
}

type CommandHandlerFnType

type CommandHandlerFnType = func(*RpcResponseHandler) bool

returns true if handler is complete, false for an async handler

type EventListener

type EventListener struct {
	Lock      *sync.Mutex
	Listeners map[string][]singleListener
}

func MakeEventListener

func MakeEventListener() *EventListener

func (*EventListener) On

func (el *EventListener) On(eventName string, fn func(*wps.WaveEvent)) string

func (*EventListener) RecvEvent

func (el *EventListener) RecvEvent(e *wps.WaveEvent)

func (*EventListener) Unregister

func (el *EventListener) Unregister(eventName string, id string)

type PtyBuffer

type PtyBuffer struct {
	CVar        *sync.Cond
	DataBuf     *bytes.Buffer
	EscMode     string
	EscSeqBuf   []byte
	OSCPrefix   string
	InputReader io.Reader
	MessageCh   chan baseds.RpcInputChType
	AtEOF       bool
	Err         error
}

func MakePtyBuffer

func MakePtyBuffer(oscPrefix string, input io.Reader, messageCh chan baseds.RpcInputChType) *PtyBuffer

closes messageCh when input is closed (or error)

func (*PtyBuffer) Read

func (b *PtyBuffer) Read(p []byte) (n int, err error)

type ResponseFnType

type ResponseFnType = func(any) error

type RpcMessage

type RpcMessage struct {
	Command  string `json:"command,omitempty"`
	ReqId    string `json:"reqid,omitempty"`
	ResId    string `json:"resid,omitempty"`
	Timeout  int64  `json:"timeout,omitempty"`
	Route    string `json:"route,omitempty"`  // to route/forward requests to alternate servers
	Source   string `json:"source,omitempty"` // source route id
	Cont     bool   `json:"cont,omitempty"`   // flag if additional requests/responses are forthcoming
	Cancel   bool   `json:"cancel,omitempty"` // used to cancel a streaming request or response (sent from the side that is not streaming)
	Error    string `json:"error,omitempty"`
	DataType string `json:"datatype,omitempty"`
	Data     any    `json:"data,omitempty"`
}

func (*RpcMessage) IsRpcRequest

func (r *RpcMessage) IsRpcRequest() bool

func (*RpcMessage) Validate

func (r *RpcMessage) Validate() error

type RpcRequestHandler

type RpcRequestHandler struct {
	// contains filtered or unexported fields
}

func (*RpcRequestHandler) Context

func (handler *RpcRequestHandler) Context() context.Context

func (*RpcRequestHandler) NextResponse

func (handler *RpcRequestHandler) NextResponse() (any, error)

func (*RpcRequestHandler) ResponseDone

func (handler *RpcRequestHandler) ResponseDone() bool

func (*RpcRequestHandler) SendCancel

func (handler *RpcRequestHandler) SendCancel(ctx context.Context) error

type RpcResponseHandler

type RpcResponseHandler struct {
	// contains filtered or unexported fields
}

func GetRpcResponseHandlerFromContext

func GetRpcResponseHandlerFromContext(ctx context.Context) *RpcResponseHandler

func (*RpcResponseHandler) Context

func (handler *RpcResponseHandler) Context() context.Context

func (*RpcResponseHandler) Finalize

func (handler *RpcResponseHandler) Finalize()

if async, caller must call finalize

func (*RpcResponseHandler) GetCommand

func (handler *RpcResponseHandler) GetCommand() string

func (*RpcResponseHandler) GetCommandRawData

func (handler *RpcResponseHandler) GetCommandRawData() any

func (*RpcResponseHandler) GetIngressLinkId added in v0.14.0

func (handler *RpcResponseHandler) GetIngressLinkId() baseds.LinkId

func (*RpcResponseHandler) GetRpcContext

func (handler *RpcResponseHandler) GetRpcContext() wshrpc.RpcContext

func (*RpcResponseHandler) GetSource

func (handler *RpcResponseHandler) GetSource() string

func (*RpcResponseHandler) IsCanceled

func (handler *RpcResponseHandler) IsCanceled() bool

func (*RpcResponseHandler) IsDone

func (handler *RpcResponseHandler) IsDone() bool

func (*RpcResponseHandler) NeedsResponse

func (handler *RpcResponseHandler) NeedsResponse() bool

func (*RpcResponseHandler) SendMessage

func (handler *RpcResponseHandler) SendMessage(msg string)

func (*RpcResponseHandler) SendResponse

func (handler *RpcResponseHandler) SendResponse(data any, done bool) error

func (*RpcResponseHandler) SendResponseError

func (handler *RpcResponseHandler) SendResponseError(err error)

type ServerImpl

type ServerImpl interface {
	WshServerImpl()
}

type WriteFlusher added in v0.9.0

type WriteFlusher interface {
	Write([]byte) (int, error)
	Flush() error
}

type WshRouter

type WshRouter struct {
	// contains filtered or unexported fields
}
var DefaultRouter *WshRouter

func NewWshRouter

func NewWshRouter() *WshRouter

func (*WshRouter) GetControlRpc added in v0.14.0

func (router *WshRouter) GetControlRpc() *WshRpc

func (*WshRouter) GetLinkIdForRoute added in v0.14.0

func (router *WshRouter) GetLinkIdForRoute(routeId string) baseds.LinkId

func (*WshRouter) IsRootRouter added in v0.14.0

func (router *WshRouter) IsRootRouter() bool

func (*WshRouter) RegisterTrustedLeaf added in v0.14.0

func (router *WshRouter) RegisterTrustedLeaf(rpc AbstractRpcClient, routeId string) (baseds.LinkId, error)

only for leaves

func (*WshRouter) RegisterTrustedRouter added in v0.14.0

func (router *WshRouter) RegisterTrustedRouter(rpc AbstractRpcClient) baseds.LinkId

only for routers

func (router *WshRouter) RegisterUntrustedLink(client AbstractRpcClient) baseds.LinkId

func (*WshRouter) RegisterUpstream added in v0.14.0

func (router *WshRouter) RegisterUpstream(rpc AbstractRpcClient) baseds.LinkId

func (*WshRouter) SendEvent

func (router *WshRouter) SendEvent(routeId string, event wps.WaveEvent)

func (*WshRouter) SetAsRootRouter added in v0.14.0

func (router *WshRouter) SetAsRootRouter()
func (router *WshRouter) UnregisterLink(linkId baseds.LinkId)

func (*WshRouter) WaitForRegister

func (router *WshRouter) WaitForRegister(ctx context.Context, routeId string) error

type WshRouterControlImpl added in v0.14.0

type WshRouterControlImpl struct {
	Router *WshRouter
}

func (*WshRouterControlImpl) AuthenticateCommand added in v0.14.0

func (impl *WshRouterControlImpl) AuthenticateCommand(ctx context.Context, data string) (wshrpc.CommandAuthenticateRtnData, error)

func (*WshRouterControlImpl) AuthenticateJobManagerCommand added in v0.14.0

func (impl *WshRouterControlImpl) AuthenticateJobManagerCommand(ctx context.Context, data wshrpc.CommandAuthenticateJobManagerData) error

func (*WshRouterControlImpl) AuthenticateJobManagerVerifyCommand added in v0.14.0

func (impl *WshRouterControlImpl) AuthenticateJobManagerVerifyCommand(ctx context.Context, data wshrpc.CommandAuthenticateJobManagerData) error

func (*WshRouterControlImpl) AuthenticateTokenCommand added in v0.14.0

func (*WshRouterControlImpl) AuthenticateTokenVerifyCommand added in v0.14.0

func (*WshRouterControlImpl) ControlGetRouteIdCommand added in v0.14.0

func (impl *WshRouterControlImpl) ControlGetRouteIdCommand(ctx context.Context) (string, error)

func (*WshRouterControlImpl) RouteAnnounceCommand added in v0.14.0

func (impl *WshRouterControlImpl) RouteAnnounceCommand(ctx context.Context) error

func (*WshRouterControlImpl) RouteUnannounceCommand added in v0.14.0

func (impl *WshRouterControlImpl) RouteUnannounceCommand(ctx context.Context) error

func (*WshRouterControlImpl) SetPeerInfoCommand added in v0.14.0

func (impl *WshRouterControlImpl) SetPeerInfoCommand(ctx context.Context, peerInfo string) error

func (*WshRouterControlImpl) WshServerImpl added in v0.14.0

func (impl *WshRouterControlImpl) WshServerImpl()

type WshRpc

type WshRpc struct {
	Lock               *sync.Mutex
	InputCh            chan baseds.RpcInputChType
	OutputCh           chan []byte
	CtxDoneCh          chan string // for context cancellation, value is ResId
	RpcContext         *atomic.Pointer[wshrpc.RpcContext]
	RpcMap             map[string]*rpcData
	ServerImpl         ServerImpl
	EventListener      *EventListener
	ResponseHandlerMap map[string]*RpcResponseHandler // reqId => handler
	StreamBroker       *streamclient.Broker
	Debug              bool
	DebugName          string
	ServerDone         bool
}

func GetWshRpcFromContext

func GetWshRpcFromContext(ctx context.Context) *WshRpc

func MakeWshRpc

func MakeWshRpc(rpcCtx wshrpc.RpcContext, serverImpl ServerImpl, debugName string) *WshRpc

func MakeWshRpcWithChannels added in v0.14.0

func MakeWshRpcWithChannels(inputCh chan baseds.RpcInputChType, outputCh chan []byte, rpcCtx wshrpc.RpcContext, serverImpl ServerImpl, debugName string) *WshRpc

closes outputCh when inputCh is closed/done

func SetupConnRpcClient

func SetupConnRpcClient(conn net.Conn, serverImpl ServerImpl, debugStr string) (*WshRpc, chan error, error)

func SetupDomainSocketRpcClient

func SetupDomainSocketRpcClient(sockName string, serverImpl ServerImpl, debugName string) (*WshRpc, error)

func SetupPacketRpcClient added in v0.9.0

func SetupPacketRpcClient(input io.Reader, output io.Writer, serverImpl ServerImpl, debugStr string) (*WshRpc, chan []byte)

func (*WshRpc) GetPeerInfo added in v0.14.0

func (w *WshRpc) GetPeerInfo() string

func (*WshRpc) GetRpcContext

func (w *WshRpc) GetRpcContext() wshrpc.RpcContext

func (*WshRpc) IsServerDone added in v0.11.0

func (w *WshRpc) IsServerDone() bool

func (*WshRpc) RecvRpcMessage

func (w *WshRpc) RecvRpcMessage() ([]byte, bool)

func (*WshRpc) SendCommand

func (w *WshRpc) SendCommand(command string, data any, opts *wshrpc.RpcOpts) error

no response

func (*WshRpc) SendComplexRequest

func (w *WshRpc) SendComplexRequest(command string, data any, opts *wshrpc.RpcOpts) (rtnHandler *RpcRequestHandler, rtnErr error)

func (*WshRpc) SendRpcMessage

func (w *WshRpc) SendRpcMessage(msg []byte, ingressLinkId baseds.LinkId, debugStr string) bool

func (*WshRpc) SendRpcRequest

func (w *WshRpc) SendRpcRequest(command string, data any, opts *wshrpc.RpcOpts) (any, error)

single response

func (*WshRpc) SetRpcContext

func (w *WshRpc) SetRpcContext(ctx wshrpc.RpcContext)

func (*WshRpc) SetServerImpl

func (w *WshRpc) SetServerImpl(serverImpl ServerImpl)

type WshRpcProxy

type WshRpcProxy struct {
	Lock         *sync.Mutex
	RpcContext   *wshrpc.RpcContext
	ToRemoteCh   chan []byte
	FromRemoteCh chan baseds.RpcInputChType
	PeerInfo     string
}

func MakeRpcProxy

func MakeRpcProxy(peerInfo string) *WshRpcProxy

func MakeRpcProxyWithSize added in v0.14.0

func MakeRpcProxyWithSize(peerInfo string, inputChSize int, outputChSize int) *WshRpcProxy

func (*WshRpcProxy) GetPeerInfo added in v0.14.0

func (p *WshRpcProxy) GetPeerInfo() string

func (*WshRpcProxy) RecvRpcMessage

func (p *WshRpcProxy) RecvRpcMessage() ([]byte, bool)

func (*WshRpcProxy) SendRpcMessage

func (p *WshRpcProxy) SendRpcMessage(msg []byte, ingressLinkId baseds.LinkId, debugStr string) bool

func (*WshRpcProxy) SetPeerInfo added in v0.14.0

func (p *WshRpcProxy) SetPeerInfo(peerInfo string)

type WshRpcStreamClientAdapter added in v0.14.0

type WshRpcStreamClientAdapter struct {
	// contains filtered or unexported fields
}

func AdaptWshRpc added in v0.14.0

func AdaptWshRpc(rpc *WshRpc) *WshRpcStreamClientAdapter

func (*WshRpcStreamClientAdapter) StreamDataAckCommand added in v0.14.0

func (a *WshRpcStreamClientAdapter) StreamDataAckCommand(data wshrpc.CommandStreamAckData, opts *wshrpc.RpcOpts) error

func (*WshRpcStreamClientAdapter) StreamDataCommand added in v0.14.0

func (a *WshRpcStreamClientAdapter) StreamDataCommand(data wshrpc.CommandStreamData, opts *wshrpc.RpcOpts) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL