Documentation
¶
Index ¶
- Variables
- type MessageBuffer
- func (mb *MessageBuffer) Add(msg *hubv1.DataPacket) bool
- func (mb *MessageBuffer) DroppedCount() int64
- func (mb *MessageBuffer) Flush(ctx context.Context, downstream hubv1.HubService_ProcessServer) int
- func (mb *MessageBuffer) FlushWithSender(ctx context.Context, send func(*hubv1.DataPacket) error) int
- func (mb *MessageBuffer) LastActive() time.Time
- func (mb *MessageBuffer) Size() int
- type Server
- type Stream
- type StreamManager
- func (sm *StreamManager) AddStream(storyRunName, stepID string, grpcStream hubv1.HubService_ProcessServer)
- func (sm *StreamManager) RemoveStream(storyRunName, stepID string)
- func (sm *StreamManager) SendHeartbeats(ctx context.Context)
- func (sm *StreamManager) SendOrBuffer(ctx context.Context, storyRunName, stepID string, packet *hubv1.DataPacket) bool
- func (sm *StreamManager) StartEvictor(ctx context.Context)
Constants ¶
This section is empty.
Variables ¶
var ( // MaxBufferSize is the maximum number of messages to buffer per downstream engram MaxBufferSize = getMaxBufferSize() // MaxBufferBytes is the maximum total size of buffered messages in bytes MaxBufferBytes = getMaxBufferBytes() )
Functions ¶
This section is empty.
Types ¶
type MessageBuffer ¶
type MessageBuffer struct {
// contains filtered or unexported fields
}
MessageBuffer holds buffered messages for a downstream engram that's not ready
func NewMessageBuffer ¶
func NewMessageBuffer(storyRunName, stepName string) *MessageBuffer
NewMessageBuffer creates a new message buffer
func (*MessageBuffer) Add ¶
func (mb *MessageBuffer) Add(msg *hubv1.DataPacket) bool
Add attempts to add a message to the buffer. Returns true if added, false if buffer is full.
func (*MessageBuffer) DroppedCount ¶
func (mb *MessageBuffer) DroppedCount() int64
DroppedCount returns the total number of dropped messages
func (*MessageBuffer) Flush ¶
func (mb *MessageBuffer) Flush(ctx context.Context, downstream hubv1.HubService_ProcessServer) int
Flush attempts to send all buffered messages to the downstream. Returns number of messages flushed.
func (*MessageBuffer) FlushWithSender ¶
func (mb *MessageBuffer) FlushWithSender(ctx context.Context, send func(*hubv1.DataPacket) error) int
FlushWithSender flushes buffered messages using the provided sender function. The sender function should perform the actual send (e.g., stream.Send) and handle any necessary serialization outside this method.
func (*MessageBuffer) LastActive ¶
func (mb *MessageBuffer) LastActive() time.Time
LastActive returns the last time this buffer had activity (add/flush)
func (*MessageBuffer) Size ¶
func (mb *MessageBuffer) Size() int
Size returns the current number of buffered messages
type Server ¶
type Server struct {
hubv1.UnimplementedHubServiceServer
// contains filtered or unexported fields
}
Server is the gRPC hub server.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream represents a single client stream.
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager manages all active client streams.
func NewStreamManager ¶
func NewStreamManager() *StreamManager
NewStreamManager creates a new StreamManager.
func (*StreamManager) AddStream ¶
func (sm *StreamManager) AddStream(storyRunName, stepID string, grpcStream hubv1.HubService_ProcessServer)
AddStream adds a new stream to the manager.
func (*StreamManager) RemoveStream ¶
func (sm *StreamManager) RemoveStream(storyRunName, stepID string)
RemoveStream removes a stream from the manager.
func (*StreamManager) SendHeartbeats ¶
func (sm *StreamManager) SendHeartbeats(ctx context.Context)
SendHeartbeats sends a heartbeat to all active streams.
func (*StreamManager) SendOrBuffer ¶
func (sm *StreamManager) SendOrBuffer(ctx context.Context, storyRunName, stepID string, packet *hubv1.DataPacket) bool
SendOrBuffer tries to send a packet to a stream, or buffers it if the stream is not yet available.
func (*StreamManager) StartEvictor ¶
func (sm *StreamManager) StartEvictor(ctx context.Context)
StartEvictor starts a background goroutine to clean up old, unused buffers.