wshutil

package
v0.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Mode_Normal  = "normal"
	Mode_Esc     = "esc"
	Mode_WaveEsc = "waveesc"
)
View Source
const (
	DefaultRoute     = "wavesrv"
	ElectronRoute    = "electron"
	ControlRoute     = "$control"      // control plane route
	ControlRootRoute = "$control:root" // control plane route to root router

	ControlPrefix = "$"

	RoutePrefix_Conn       = "conn:"
	RoutePrefix_Controller = "controller:"
	RoutePrefix_Proc       = "proc:"
	RoutePrefix_Tab        = "tab:"
	RoutePrefix_FeBlock    = "feblock:"
	RoutePrefix_Builder    = "builder:"
	RoutePrefix_Link       = "link:"
	RoutePrefix_Job        = "job:"
	RoutePrefix_Bare       = "bare:"
)
View Source
const BEL = 0x07
View Source
const CtxDoneChSize = 10
View Source
const DefaultInputChSize = 32
View Source
const DefaultMessageChSize = 32
View Source
const DefaultOutputChSize = 32
View Source
const DefaultTimeoutMs = 5000
View Source
const ESC = 0x1b
View Source
const HexChars = "0123456789ABCDEF"
View Source
const LinkKind_Leaf = "leaf"
View Source
const LinkKind_Router = "router"
View Source
const MaxBufferedDataSize = 256 * 1024
View Source
const RespChSize = 32
View Source
const RouterInputChQueueSize = 100
View Source
const ST = 0x9c
View Source
const WaveJwtTokenVarName = wavebase.WaveJwtTokenVarName
View Source
const WaveOSC = "23198"

these should both be 5 characters

View Source
const WaveOSCPrefix = "\x1b]" + WaveOSC + ";"
View Source
const WaveOSCPrefixLen = 5 + 3 // \x1b] + WaveOSC + ; + \x07
View Source
const WaveServerOSC = "23199"
View Source
const WaveServerOSCPrefix = "\x1b]" + WaveServerOSC + ";"

Variables

View Source
var BacklogLogThresholds = map[int]bool{1: true, 5: true, 10: true, 20: true, 30: true, 40: true, 50: true, 100: true, 200: true, 500: true, 1000: true}
View Source
var WshCommandDeclMap = wshrpc.GenerateWshCommandDeclMap()

Functions

func AdaptMsgChToPty

func AdaptMsgChToPty(outputCh chan []byte, oscEsc string, output io.Writer) error

func AdaptOutputChToStream

func AdaptOutputChToStream(outputCh chan []byte, output io.Writer) error

func AdaptStreamToMsgCh

func AdaptStreamToMsgCh(input io.Reader, output chan baseds.RpcInputChType, readCallback func()) error

func DoShutdown

func DoShutdown(reason string, exitCode int, quiet bool)

func EncodeWaveOSCBytes

func EncodeWaveOSCBytes(oscNum string, barr []byte) ([]byte, error)

func EncodeWaveOSCMessageEx

func EncodeWaveOSCMessageEx(oscNum string, msg *RpcMessage) ([]byte, error)

func ExtractUnverifiedRpcContext

func ExtractUnverifiedRpcContext(tokenStr string) (*wshrpc.RpcContext, error)

only for use on client

func ExtractUnverifiedSocketName

func ExtractUnverifiedSocketName(tokenStr string) (string, error)

only for use on client

func GetInfo

func GetInfo() wshrpc.RemoteInfo

func GetIsCanceledFromContext

func GetIsCanceledFromContext(ctx context.Context) bool

func GetRpcSourceFromContext

func GetRpcSourceFromContext(ctx context.Context) string

func HandleStdIOClient

func HandleStdIOClient(logName string, input chan utilfn.LineOutput, output io.Writer)

blocking, returns if there is an error, or on EOF of input

func InstallRcFiles

func InstallRcFiles() error

func MakeBuilderRouteId

func MakeBuilderRouteId(builderId string) string

func MakeClientJWTToken

func MakeClientJWTToken(rpcCtx wshrpc.RpcContext) (string, error)

func MakeConnectionRouteId

func MakeConnectionRouteId(connId string) string

func MakeControllerRouteId

func MakeControllerRouteId(blockId string) string

func MakeFeBlockRouteId

func MakeFeBlockRouteId(blockId string) string

func MakeJobRouteId

func MakeJobRouteId(jobId string) string

func MakeLinkRouteId

func MakeLinkRouteId(linkId baseds.LinkId) string

func MakeProcRouteId

func MakeProcRouteId(procId string) string

func MakeRandomProcRouteId

func MakeRandomProcRouteId() string

func MakeTabRouteId

func MakeTabRouteId(tabId string) string

func RespErr

func RespErr[T any](err error) wshrpc.RespOrErrorUnion[T]

func RunWshRpcOverListener

func RunWshRpcOverListener(listener net.Listener, readCallback func())

func SendErrCh

func SendErrCh[T any](err error) <-chan wshrpc.RespOrErrorUnion[T]

func ValidateAndExtractRpcContextFromToken

func ValidateAndExtractRpcContextFromToken(tokenStr string) (*wshrpc.RpcContext, error)

Types

type AbstractRpcClient

type AbstractRpcClient interface {
	GetPeerInfo() string
	SendRpcMessage(msg []byte, ingressLinkId baseds.LinkId, debugStr string) bool
	RecvRpcMessage() ([]byte, bool) // blocking
}

type CommandHandlerFnType

type CommandHandlerFnType = func(*RpcResponseHandler) bool

returns true if handler is complete, false for an async handler

type EventListener

type EventListener struct {
	Lock      *sync.Mutex
	Listeners map[string][]singleListener
}

func MakeEventListener

func MakeEventListener() *EventListener

func (*EventListener) On

func (el *EventListener) On(eventName string, fn func(*wps.WaveEvent)) string

func (*EventListener) RecvEvent

func (el *EventListener) RecvEvent(e *wps.WaveEvent)

func (*EventListener) Unregister

func (el *EventListener) Unregister(eventName string, id string)

type PtyBuffer

type PtyBuffer struct {
	CVar        *sync.Cond
	DataBuf     *bytes.Buffer
	EscMode     string
	EscSeqBuf   []byte
	OSCPrefix   string
	InputReader io.Reader
	MessageCh   chan baseds.RpcInputChType
	AtEOF       bool
	Err         error
}

func MakePtyBuffer

func MakePtyBuffer(oscPrefix string, input io.Reader, messageCh chan baseds.RpcInputChType) *PtyBuffer

closes messageCh when input is closed (or error)

func (*PtyBuffer) Read

func (b *PtyBuffer) Read(p []byte) (n int, err error)

type ResponseFnType

type ResponseFnType = func(any) error

type RpcMessage

type RpcMessage struct {
	Command  string `json:"command,omitempty"`
	ReqId    string `json:"reqid,omitempty"`
	ResId    string `json:"resid,omitempty"`
	Timeout  int64  `json:"timeout,omitempty"`
	Route    string `json:"route,omitempty"`  // to route/forward requests to alternate servers
	Source   string `json:"source,omitempty"` // source route id
	Cont     bool   `json:"cont,omitempty"`   // flag if additional requests/responses are forthcoming
	Cancel   bool   `json:"cancel,omitempty"` // used to cancel a streaming request or response (sent from the side that is not streaming)
	Error    string `json:"error,omitempty"`
	DataType string `json:"datatype,omitempty"`
	Data     any    `json:"data,omitempty"`
}

func (*RpcMessage) IsRpcRequest

func (r *RpcMessage) IsRpcRequest() bool

func (*RpcMessage) Validate

func (r *RpcMessage) Validate() error

type RpcRequestHandler

type RpcRequestHandler struct {
	// contains filtered or unexported fields
}

func (*RpcRequestHandler) Context

func (handler *RpcRequestHandler) Context() context.Context

func (*RpcRequestHandler) NextResponse

func (handler *RpcRequestHandler) NextResponse() (any, error)

func (*RpcRequestHandler) ResponseDone

func (handler *RpcRequestHandler) ResponseDone() bool

func (*RpcRequestHandler) SendCancel

func (handler *RpcRequestHandler) SendCancel(ctx context.Context) error

type RpcResponseHandler

type RpcResponseHandler struct {
	// contains filtered or unexported fields
}

func GetRpcResponseHandlerFromContext

func GetRpcResponseHandlerFromContext(ctx context.Context) *RpcResponseHandler

func (*RpcResponseHandler) Context

func (handler *RpcResponseHandler) Context() context.Context

func (*RpcResponseHandler) Finalize

func (handler *RpcResponseHandler) Finalize()

if async, caller must call finalize

func (*RpcResponseHandler) GetCommand

func (handler *RpcResponseHandler) GetCommand() string

func (*RpcResponseHandler) GetCommandRawData

func (handler *RpcResponseHandler) GetCommandRawData() any

func (*RpcResponseHandler) GetIngressLinkId

func (handler *RpcResponseHandler) GetIngressLinkId() baseds.LinkId

func (*RpcResponseHandler) GetRpcContext

func (handler *RpcResponseHandler) GetRpcContext() wshrpc.RpcContext

func (*RpcResponseHandler) GetSource

func (handler *RpcResponseHandler) GetSource() string

func (*RpcResponseHandler) IsCanceled

func (handler *RpcResponseHandler) IsCanceled() bool

func (*RpcResponseHandler) IsDone

func (handler *RpcResponseHandler) IsDone() bool

func (*RpcResponseHandler) NeedsResponse

func (handler *RpcResponseHandler) NeedsResponse() bool

func (*RpcResponseHandler) SendMessage

func (handler *RpcResponseHandler) SendMessage(msg string)

func (*RpcResponseHandler) SendResponse

func (handler *RpcResponseHandler) SendResponse(data any, done bool) error

func (*RpcResponseHandler) SendResponseError

func (handler *RpcResponseHandler) SendResponseError(err error)

type ServerImpl

type ServerImpl interface {
	WshServerImpl()
}

type WriteFlusher

type WriteFlusher interface {
	Write([]byte) (int, error)
	Flush() error
}

type WshRouter

type WshRouter struct {
	// contains filtered or unexported fields
}
var DefaultRouter *WshRouter

func NewWshRouter

func NewWshRouter() *WshRouter

func (*WshRouter) GetControlRpc

func (router *WshRouter) GetControlRpc() *WshRpc

func (*WshRouter) GetLinkIdForRoute

func (router *WshRouter) GetLinkIdForRoute(routeId string) baseds.LinkId

func (*WshRouter) IsRootRouter

func (router *WshRouter) IsRootRouter() bool

func (*WshRouter) RegisterTrustedLeaf

func (router *WshRouter) RegisterTrustedLeaf(rpc AbstractRpcClient, routeId string) (baseds.LinkId, error)

only for leaves

func (*WshRouter) RegisterTrustedRouter

func (router *WshRouter) RegisterTrustedRouter(rpc AbstractRpcClient) baseds.LinkId

only for routers

func (router *WshRouter) RegisterUntrustedLink(client AbstractRpcClient) baseds.LinkId

func (*WshRouter) RegisterUpstream

func (router *WshRouter) RegisterUpstream(rpc AbstractRpcClient) baseds.LinkId

func (*WshRouter) SendEvent

func (router *WshRouter) SendEvent(routeId string, event wps.WaveEvent)

func (*WshRouter) SetAsRootRouter

func (router *WshRouter) SetAsRootRouter()
func (router *WshRouter) UnregisterLink(linkId baseds.LinkId)

func (*WshRouter) WaitForRegister

func (router *WshRouter) WaitForRegister(ctx context.Context, routeId string) error

type WshRouterControlImpl

type WshRouterControlImpl struct {
	Router *WshRouter
}

func (*WshRouterControlImpl) AuthenticateCommand

func (impl *WshRouterControlImpl) AuthenticateCommand(ctx context.Context, data string) (wshrpc.CommandAuthenticateRtnData, error)

func (*WshRouterControlImpl) AuthenticateJobManagerCommand

func (impl *WshRouterControlImpl) AuthenticateJobManagerCommand(ctx context.Context, data wshrpc.CommandAuthenticateJobManagerData) error

func (*WshRouterControlImpl) AuthenticateJobManagerVerifyCommand

func (impl *WshRouterControlImpl) AuthenticateJobManagerVerifyCommand(ctx context.Context, data wshrpc.CommandAuthenticateJobManagerData) error

func (*WshRouterControlImpl) AuthenticateTokenCommand

func (*WshRouterControlImpl) AuthenticateTokenVerifyCommand

func (*WshRouterControlImpl) ControlGetRouteIdCommand

func (impl *WshRouterControlImpl) ControlGetRouteIdCommand(ctx context.Context) (string, error)

func (*WshRouterControlImpl) RouteAnnounceCommand

func (impl *WshRouterControlImpl) RouteAnnounceCommand(ctx context.Context) error

func (*WshRouterControlImpl) RouteUnannounceCommand

func (impl *WshRouterControlImpl) RouteUnannounceCommand(ctx context.Context) error

func (*WshRouterControlImpl) SetPeerInfoCommand

func (impl *WshRouterControlImpl) SetPeerInfoCommand(ctx context.Context, peerInfo string) error

func (*WshRouterControlImpl) WshServerImpl

func (impl *WshRouterControlImpl) WshServerImpl()

type WshRpc

type WshRpc struct {
	Lock               *sync.Mutex
	InputCh            chan baseds.RpcInputChType
	OutputCh           chan []byte
	CtxDoneCh          chan string // for context cancellation, value is ResId
	RpcContext         *atomic.Pointer[wshrpc.RpcContext]
	RpcMap             map[string]*rpcData
	ServerImpl         ServerImpl
	EventListener      *EventListener
	ResponseHandlerMap map[string]*RpcResponseHandler // reqId => handler
	StreamBroker       *streamclient.Broker
	Debug              bool
	DebugName          string
	ServerDone         bool
}

func GetWshRpcFromContext

func GetWshRpcFromContext(ctx context.Context) *WshRpc

func MakeWshRpc

func MakeWshRpc(rpcCtx wshrpc.RpcContext, serverImpl ServerImpl, debugName string) *WshRpc

func MakeWshRpcWithChannels

func MakeWshRpcWithChannels(inputCh chan baseds.RpcInputChType, outputCh chan []byte, rpcCtx wshrpc.RpcContext, serverImpl ServerImpl, debugName string) *WshRpc

closes outputCh when inputCh is closed/done

func SetupConnRpcClient

func SetupConnRpcClient(conn net.Conn, serverImpl ServerImpl, debugStr string) (*WshRpc, chan error, error)

func SetupDomainSocketRpcClient

func SetupDomainSocketRpcClient(sockName string, serverImpl ServerImpl, debugName string) (*WshRpc, error)

func SetupPacketRpcClient

func SetupPacketRpcClient(input io.Reader, output io.Writer, serverImpl ServerImpl, debugStr string) (*WshRpc, chan []byte)

func (*WshRpc) GetPeerInfo

func (w *WshRpc) GetPeerInfo() string

func (*WshRpc) GetRpcContext

func (w *WshRpc) GetRpcContext() wshrpc.RpcContext

func (*WshRpc) IsServerDone

func (w *WshRpc) IsServerDone() bool

func (*WshRpc) RecvRpcMessage

func (w *WshRpc) RecvRpcMessage() ([]byte, bool)

func (*WshRpc) SendCommand

func (w *WshRpc) SendCommand(command string, data any, opts *wshrpc.RpcOpts) error

no response

func (*WshRpc) SendComplexRequest

func (w *WshRpc) SendComplexRequest(command string, data any, opts *wshrpc.RpcOpts) (rtnHandler *RpcRequestHandler, rtnErr error)

func (*WshRpc) SendRpcMessage

func (w *WshRpc) SendRpcMessage(msg []byte, ingressLinkId baseds.LinkId, debugStr string) bool

func (*WshRpc) SendRpcRequest

func (w *WshRpc) SendRpcRequest(command string, data any, opts *wshrpc.RpcOpts) (any, error)

single response

func (*WshRpc) SetRpcContext

func (w *WshRpc) SetRpcContext(ctx wshrpc.RpcContext)

func (*WshRpc) SetServerImpl

func (w *WshRpc) SetServerImpl(serverImpl ServerImpl)

type WshRpcProxy

type WshRpcProxy struct {
	Lock         *sync.Mutex
	RpcContext   *wshrpc.RpcContext
	ToRemoteCh   chan []byte
	FromRemoteCh chan baseds.RpcInputChType
	PeerInfo     string
}

func MakeRpcProxy

func MakeRpcProxy(peerInfo string) *WshRpcProxy

func MakeRpcProxyWithSize

func MakeRpcProxyWithSize(peerInfo string, inputChSize int, outputChSize int) *WshRpcProxy

func (*WshRpcProxy) GetPeerInfo

func (p *WshRpcProxy) GetPeerInfo() string

func (*WshRpcProxy) RecvRpcMessage

func (p *WshRpcProxy) RecvRpcMessage() ([]byte, bool)

func (*WshRpcProxy) SendRpcMessage

func (p *WshRpcProxy) SendRpcMessage(msg []byte, ingressLinkId baseds.LinkId, debugStr string) bool

func (*WshRpcProxy) SetPeerInfo

func (p *WshRpcProxy) SetPeerInfo(peerInfo string)

type WshRpcStreamClientAdapter

type WshRpcStreamClientAdapter struct {
	// contains filtered or unexported fields
}

func AdaptWshRpc

func AdaptWshRpc(rpc *WshRpc) *WshRpcStreamClientAdapter

func (*WshRpcStreamClientAdapter) StreamDataAckCommand

func (a *WshRpcStreamClientAdapter) StreamDataAckCommand(data wshrpc.CommandStreamAckData, opts *wshrpc.RpcOpts) error

func (*WshRpcStreamClientAdapter) StreamDataCommand

func (a *WshRpcStreamClientAdapter) StreamDataCommand(data wshrpc.CommandStreamData, opts *wshrpc.RpcOpts) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL