jobmanager

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2026 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CwndSize      = 64 * 1024       // 64 KB window for connected mode
	CirBufSize    = 2 * 1024 * 1024 // 2 MB max buffer size
	DisconnReadSz = 4 * 1024        // 4 KB read chunks when disconnected
	MaxPacketSize = 4 * 1024        // 4 KB max data per packet
)
View Source
const JobAccessTokenLabel = "Wave-JobAccessToken"
View Source
const JobInputQueueSize = 1000
View Source
const JobInputQueueTimeout = 100 * time.Millisecond
View Source
const JobManagerStartLabel = "Wave-JobManagerStart"

Variables

This section is empty.

Functions

func MakeJobDomainSocket

func MakeJobDomainSocket(clientId string, jobId string) error

func SetupJobManager

func SetupJobManager(clientId string, jobId string, publicKeyBytes []byte, jobAuthToken string, readyFile *os.File) error

Types

type CirBuf

type CirBuf struct {
	// contains filtered or unexported fields
}

func MakeCirBuf

func MakeCirBuf(maxSize int, initSyncMode bool) *CirBuf

func (*CirBuf) Consume

func (cb *CirBuf) Consume(numBytes int) error

func (*CirBuf) HeadPos

func (cb *CirBuf) HeadPos() int64

func (*CirBuf) PeekData

func (cb *CirBuf) PeekData(data []byte) int

func (*CirBuf) PeekDataAt

func (cb *CirBuf) PeekDataAt(offset int, data []byte) int

func (*CirBuf) SetEffectiveWindow

func (cb *CirBuf) SetEffectiveWindow(syncMode bool, windowSize int)

SetEffectiveWindow changes the sync mode and effective window size for flow control. The windowSize is capped at the buffer size. When window shrinks: sync mode blocks new writes, async mode truncates old data to enforce limit. When window increases: blocked writers are woken up if space becomes available.

func (*CirBuf) Size

func (cb *CirBuf) Size() int

func (*CirBuf) TotalSize

func (cb *CirBuf) TotalSize() int64

func (*CirBuf) WriteAvailable

func (cb *CirBuf) WriteAvailable(data []byte) (int, <-chan struct{})

WriteAvailable attempts to write as much data as possible without blocking. Returns the number of bytes written and a channel to wait on if buffer is full (nil if not blocking). In sync mode when buffer is full, returns 0 written and a channel that will be closed when space is available. The caller should wait on the channel and retry the write. NOTE: Only one concurrent blocked write is allowed. Multiple blocked writes will panic.

type CmdDef

type CmdDef struct {
	Cmd      string
	Args     []string
	Env      map[string]string
	TermSize waveobj.TermSize
}

type DataSender

type DataSender interface {
	SendData(dataPk wshrpc.CommandStreamData)
}

type JobCmd

type JobCmd struct {
	// contains filtered or unexported fields
}

func MakeJobCmd

func MakeJobCmd(jobId string, cmdDef CmdDef) (*JobCmd, error)

func (*JobCmd) GetCmd

func (jm *JobCmd) GetCmd() (*exec.Cmd, pty.Pty)

func (*JobCmd) GetExitInfo

func (jm *JobCmd) GetExitInfo() (bool, *wshrpc.CommandJobCmdExitedData)

func (*JobCmd) GetPGID

func (jm *JobCmd) GetPGID() (int, error)

func (*JobCmd) HandleInput

func (jm *JobCmd) HandleInput(data wshrpc.CommandJobInputData) error

TODO set up a single input handler loop + queue so we dont need to hold the lock but still get synchronized in-order execution

func (*JobCmd) SetTermSize

func (jm *JobCmd) SetTermSize(termSize waveobj.TermSize) error

func (*JobCmd) TerminateByClosingPtyMaster

func (jm *JobCmd) TerminateByClosingPtyMaster()

type JobManager

type JobManager struct {
	ClientId      string
	JobId         string
	Cmd           *JobCmd
	JwtPublicKey  []byte
	JobAuthToken  string
	StreamManager *StreamManager
	InputQueue    *utilds.QuickReorderQueue[wshrpc.CommandJobInputData]
	// contains filtered or unexported fields
}
var WshCmdJobManager JobManager

func (*JobManager) GetCmd

func (jm *JobManager) GetCmd() *JobCmd

func (*JobManager) GetJobAuthInfo

func (jm *JobManager) GetJobAuthInfo() (string, string)

func (*JobManager) IsJobStarted

func (jm *JobManager) IsJobStarted() bool

func (*JobManager) SetAttachedClient

func (jm *JobManager) SetAttachedClient(msc *MainServerConn)

func (*JobManager) StartJob

func (*JobManager) StartStream

func (jm *JobManager) StartStream(msc *MainServerConn) error

type MainServerConn

type MainServerConn struct {
	PeerAuthenticated atomic.Bool
	SelfAuthenticated atomic.Bool
	WshRpc            *wshutil.WshRpc
	Conn              net.Conn
	// contains filtered or unexported fields
}

func (*MainServerConn) AuthenticateToJobManagerCommand

func (msc *MainServerConn) AuthenticateToJobManagerCommand(ctx context.Context, data wshrpc.CommandAuthenticateToJobData) error

func (*MainServerConn) Close

func (msc *MainServerConn) Close()

func (*MainServerConn) JobInputCommand

func (msc *MainServerConn) JobInputCommand(ctx context.Context, data wshrpc.CommandJobInputData) error

func (*MainServerConn) JobPrepareConnectCommand

func (*MainServerConn) JobStartStreamCommand

func (msc *MainServerConn) JobStartStreamCommand(ctx context.Context, data wshrpc.CommandJobStartStreamData) error

func (*MainServerConn) StartJobCommand

func (*MainServerConn) WshServerImpl

func (*MainServerConn) WshServerImpl()

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager handles PTY output buffering with ACK-based flow control

func MakeStreamManager

func MakeStreamManager() *StreamManager

func MakeStreamManagerWithSizes

func MakeStreamManagerWithSizes(cwndSize, cirbufSize int) *StreamManager

func (*StreamManager) AttachReader

func (sm *StreamManager) AttachReader(r io.Reader) error

AttachReader starts reading from the given reader

func (*StreamManager) ClientConnected

func (sm *StreamManager) ClientConnected(streamId string, dataSender DataSender, rwndSize int, clientSeq int64) (int64, error)

ClientConnected transitions to CONNECTED mode

func (*StreamManager) ClientDisconnected

func (sm *StreamManager) ClientDisconnected()

ClientDisconnected transitions to DISCONNECTED mode

func (*StreamManager) Close

func (sm *StreamManager) Close()

Close shuts down the sender loop. The reader loop will exit on its next iteration or when the underlying reader is closed.

func (*StreamManager) GetStreamDoneInfo

func (sm *StreamManager) GetStreamDoneInfo() (done bool, streamError string)

GetStreamDoneInfo returns whether the stream is done and the error if there was one. The error is only meaningful if done=true, as the error is delivered as part of the stream otherwise.

func (*StreamManager) GetStreamId

func (sm *StreamManager) GetStreamId() string

GetStreamId returns the current stream ID (safe to call with lock held by caller)

func (*StreamManager) RecvAck

func (sm *StreamManager) RecvAck(ackPk wshrpc.CommandStreamAckData)

RecvAck processes an ACK from the client must be connected, and streamid must match

func (*StreamManager) SetRwndSize

func (sm *StreamManager) SetRwndSize(rwndSize int) error

SetRwndSize dynamically updates the receive window size

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL