Documentation
¶
Index ¶
- Constants
- func MakeJobDomainSocket(clientId string, jobId string) error
- func SetupJobManager(clientId string, jobId string, publicKeyBytes []byte, jobAuthToken string, ...) error
- type CirBuf
- func (cb *CirBuf) Consume(numBytes int) error
- func (cb *CirBuf) HeadPos() int64
- func (cb *CirBuf) PeekData(data []byte) int
- func (cb *CirBuf) PeekDataAt(offset int, data []byte) int
- func (cb *CirBuf) SetEffectiveWindow(syncMode bool, windowSize int)
- func (cb *CirBuf) Size() int
- func (cb *CirBuf) TotalSize() int64
- func (cb *CirBuf) WriteAvailable(data []byte) (int, <-chan struct{})
- type CmdDef
- type DataSender
- type JobCmd
- func (jm *JobCmd) GetCmd() (*exec.Cmd, pty.Pty)
- func (jm *JobCmd) GetExitInfo() (bool, *wshrpc.CommandJobCmdExitedData)
- func (jm *JobCmd) GetPGID() (int, error)
- func (jm *JobCmd) HandleInput(data wshrpc.CommandJobInputData) error
- func (jm *JobCmd) SetTermSize(termSize waveobj.TermSize) error
- func (jm *JobCmd) TerminateByClosingPtyMaster()
- type JobManager
- func (jm *JobManager) GetCmd() *JobCmd
- func (jm *JobManager) GetJobAuthInfo() (string, string)
- func (jm *JobManager) IsJobStarted() bool
- func (jm *JobManager) PrepareConnect(msc *MainServerConn, data wshrpc.CommandJobPrepareConnectData) (*wshrpc.CommandJobConnectRtnData, error)
- func (jm *JobManager) SetAttachedClient(msc *MainServerConn)
- func (jm *JobManager) StartJob(msc *MainServerConn, data wshrpc.CommandStartJobData) (*wshrpc.CommandStartJobRtnData, error)
- func (jm *JobManager) StartStream(msc *MainServerConn) error
- type MainServerConn
- func (msc *MainServerConn) AuthenticateToJobManagerCommand(ctx context.Context, data wshrpc.CommandAuthenticateToJobData) error
- func (msc *MainServerConn) Close()
- func (msc *MainServerConn) JobInputCommand(ctx context.Context, data wshrpc.CommandJobInputData) error
- func (msc *MainServerConn) JobPrepareConnectCommand(ctx context.Context, data wshrpc.CommandJobPrepareConnectData) (*wshrpc.CommandJobConnectRtnData, error)
- func (msc *MainServerConn) JobStartStreamCommand(ctx context.Context, data wshrpc.CommandJobStartStreamData) error
- func (msc *MainServerConn) StartJobCommand(ctx context.Context, data wshrpc.CommandStartJobData) (*wshrpc.CommandStartJobRtnData, error)
- func (*MainServerConn) WshServerImpl()
- type StreamManager
- func (sm *StreamManager) AttachReader(r io.Reader) error
- func (sm *StreamManager) ClientConnected(streamId string, dataSender DataSender, rwndSize int, clientSeq int64) (int64, error)
- func (sm *StreamManager) ClientDisconnected()
- func (sm *StreamManager) Close()
- func (sm *StreamManager) GetStreamDoneInfo() (done bool, streamError string)
- func (sm *StreamManager) GetStreamId() string
- func (sm *StreamManager) RecvAck(ackPk wshrpc.CommandStreamAckData)
- func (sm *StreamManager) SetRwndSize(rwndSize int) error
Constants ¶
const ( CwndSize = 64 * 1024 // 64 KB window for connected mode CirBufSize = 2 * 1024 * 1024 // 2 MB max buffer size DisconnReadSz = 4 * 1024 // 4 KB read chunks when disconnected MaxPacketSize = 4 * 1024 // 4 KB max data per packet )
const JobAccessTokenLabel = "Wave-JobAccessToken"
const JobInputQueueSize = 1000
const JobInputQueueTimeout = 100 * time.Millisecond
const JobManagerStartLabel = "Wave-JobManagerStart"
Variables ¶
This section is empty.
Functions ¶
func MakeJobDomainSocket ¶
Types ¶
type CirBuf ¶
type CirBuf struct {
// contains filtered or unexported fields
}
func MakeCirBuf ¶
func (*CirBuf) SetEffectiveWindow ¶
SetEffectiveWindow changes the sync mode and effective window size for flow control. The windowSize is capped at the buffer size. When window shrinks: sync mode blocks new writes, async mode truncates old data to enforce limit. When window increases: blocked writers are woken up if space becomes available.
func (*CirBuf) WriteAvailable ¶
WriteAvailable attempts to write as much data as possible without blocking. Returns the number of bytes written and a channel to wait on if buffer is full (nil if not blocking). In sync mode when buffer is full, returns 0 written and a channel that will be closed when space is available. The caller should wait on the channel and retry the write. NOTE: Only one concurrent blocked write is allowed. Multiple blocked writes will panic.
type DataSender ¶
type DataSender interface {
SendData(dataPk wshrpc.CommandStreamData)
}
type JobCmd ¶
type JobCmd struct {
// contains filtered or unexported fields
}
func (*JobCmd) GetExitInfo ¶
func (jm *JobCmd) GetExitInfo() (bool, *wshrpc.CommandJobCmdExitedData)
func (*JobCmd) HandleInput ¶
func (jm *JobCmd) HandleInput(data wshrpc.CommandJobInputData) error
TODO set up a single input handler loop + queue so we dont need to hold the lock but still get synchronized in-order execution
func (*JobCmd) TerminateByClosingPtyMaster ¶
func (jm *JobCmd) TerminateByClosingPtyMaster()
type JobManager ¶
type JobManager struct {
ClientId string
JobId string
Cmd *JobCmd
JwtPublicKey []byte
JobAuthToken string
StreamManager *StreamManager
InputQueue *utilds.QuickReorderQueue[wshrpc.CommandJobInputData]
// contains filtered or unexported fields
}
var WshCmdJobManager JobManager
func (*JobManager) GetCmd ¶
func (jm *JobManager) GetCmd() *JobCmd
func (*JobManager) GetJobAuthInfo ¶
func (jm *JobManager) GetJobAuthInfo() (string, string)
func (*JobManager) IsJobStarted ¶
func (jm *JobManager) IsJobStarted() bool
func (*JobManager) PrepareConnect ¶
func (jm *JobManager) PrepareConnect(msc *MainServerConn, data wshrpc.CommandJobPrepareConnectData) (*wshrpc.CommandJobConnectRtnData, error)
func (*JobManager) SetAttachedClient ¶
func (jm *JobManager) SetAttachedClient(msc *MainServerConn)
func (*JobManager) StartJob ¶
func (jm *JobManager) StartJob(msc *MainServerConn, data wshrpc.CommandStartJobData) (*wshrpc.CommandStartJobRtnData, error)
func (*JobManager) StartStream ¶
func (jm *JobManager) StartStream(msc *MainServerConn) error
type MainServerConn ¶
type MainServerConn struct {
PeerAuthenticated atomic.Bool
SelfAuthenticated atomic.Bool
WshRpc *wshutil.WshRpc
Conn net.Conn
// contains filtered or unexported fields
}
func (*MainServerConn) AuthenticateToJobManagerCommand ¶
func (msc *MainServerConn) AuthenticateToJobManagerCommand(ctx context.Context, data wshrpc.CommandAuthenticateToJobData) error
func (*MainServerConn) Close ¶
func (msc *MainServerConn) Close()
func (*MainServerConn) JobInputCommand ¶
func (msc *MainServerConn) JobInputCommand(ctx context.Context, data wshrpc.CommandJobInputData) error
func (*MainServerConn) JobPrepareConnectCommand ¶
func (msc *MainServerConn) JobPrepareConnectCommand(ctx context.Context, data wshrpc.CommandJobPrepareConnectData) (*wshrpc.CommandJobConnectRtnData, error)
func (*MainServerConn) JobStartStreamCommand ¶
func (msc *MainServerConn) JobStartStreamCommand(ctx context.Context, data wshrpc.CommandJobStartStreamData) error
func (*MainServerConn) StartJobCommand ¶
func (msc *MainServerConn) StartJobCommand(ctx context.Context, data wshrpc.CommandStartJobData) (*wshrpc.CommandStartJobRtnData, error)
func (*MainServerConn) WshServerImpl ¶
func (*MainServerConn) WshServerImpl()
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager handles PTY output buffering with ACK-based flow control
func MakeStreamManager ¶
func MakeStreamManager() *StreamManager
func MakeStreamManagerWithSizes ¶
func MakeStreamManagerWithSizes(cwndSize, cirbufSize int) *StreamManager
func (*StreamManager) AttachReader ¶
func (sm *StreamManager) AttachReader(r io.Reader) error
AttachReader starts reading from the given reader
func (*StreamManager) ClientConnected ¶
func (sm *StreamManager) ClientConnected(streamId string, dataSender DataSender, rwndSize int, clientSeq int64) (int64, error)
ClientConnected transitions to CONNECTED mode
func (*StreamManager) ClientDisconnected ¶
func (sm *StreamManager) ClientDisconnected()
ClientDisconnected transitions to DISCONNECTED mode
func (*StreamManager) Close ¶
func (sm *StreamManager) Close()
Close shuts down the sender loop. The reader loop will exit on its next iteration or when the underlying reader is closed.
func (*StreamManager) GetStreamDoneInfo ¶
func (sm *StreamManager) GetStreamDoneInfo() (done bool, streamError string)
GetStreamDoneInfo returns whether the stream is done and the error if there was one. The error is only meaningful if done=true, as the error is delivered as part of the stream otherwise.
func (*StreamManager) GetStreamId ¶
func (sm *StreamManager) GetStreamId() string
GetStreamId returns the current stream ID (safe to call with lock held by caller)
func (*StreamManager) RecvAck ¶
func (sm *StreamManager) RecvAck(ackPk wshrpc.CommandStreamAckData)
RecvAck processes an ACK from the client must be connected, and streamid must match
func (*StreamManager) SetRwndSize ¶
func (sm *StreamManager) SetRwndSize(rwndSize int) error
SetRwndSize dynamically updates the receive window size