hub

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2025 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// MaxBufferSize is the maximum number of messages to buffer per downstream engram
	MaxBufferSize = getMaxBufferSize()
	// MaxBufferBytes is the maximum total size of buffered messages in bytes
	MaxBufferBytes = getMaxBufferBytes()
)

Functions

This section is empty.

Types

type MessageBuffer

type MessageBuffer struct {
	// contains filtered or unexported fields
}

MessageBuffer holds buffered messages for a downstream engram that's not ready

func NewMessageBuffer

func NewMessageBuffer(storyRunName, stepName string) *MessageBuffer

NewMessageBuffer creates a new message buffer

func (*MessageBuffer) Add

func (mb *MessageBuffer) Add(msg *hubv1.DataPacket) bool

Add attempts to add a message to the buffer. Returns true if added, false if buffer is full.

func (*MessageBuffer) DroppedCount

func (mb *MessageBuffer) DroppedCount() int64

DroppedCount returns the total number of dropped messages

func (*MessageBuffer) Flush

func (mb *MessageBuffer) Flush(ctx context.Context, downstream hubv1.HubService_ProcessServer) int

Flush attempts to send all buffered messages to the downstream. Returns number of messages flushed.

func (*MessageBuffer) FlushWithSender

func (mb *MessageBuffer) FlushWithSender(ctx context.Context, send func(*hubv1.DataPacket) error) int

FlushWithSender flushes buffered messages using the provided sender function. The sender function should perform the actual send (e.g., stream.Send) and handle any necessary serialization outside this method.

func (*MessageBuffer) LastActive

func (mb *MessageBuffer) LastActive() time.Time

LastActive returns the last time this buffer had activity (add/flush)

func (*MessageBuffer) Size

func (mb *MessageBuffer) Size() int

Size returns the current number of buffered messages

type Server

type Server struct {
	hubv1.UnimplementedHubServiceServer
	// contains filtered or unexported fields
}

Server is the gRPC hub server.

func NewServer

func NewServer(k8sClient client.Client) *Server

NewServer creates a new hub server.

func (*Server) Process

func (s *Server) Process(stream hubv1.HubService_ProcessServer) error

Process is the bidirectional streaming RPC for the hub.

func (*Server) Start

func (s *Server) Start(ctx context.Context, port int) error

Start starts the gRPC server.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents a single client stream.

func (*Stream) Send

func (s *Stream) Send(ctx context.Context, req *hubv1.DataPacket) error

Send sends a packet to the stream.

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager manages all active client streams.

func NewStreamManager

func NewStreamManager() *StreamManager

NewStreamManager creates a new StreamManager.

func (*StreamManager) AddStream

func (sm *StreamManager) AddStream(storyRunName, stepID string, grpcStream hubv1.HubService_ProcessServer)

AddStream adds a new stream to the manager.

func (*StreamManager) RemoveStream

func (sm *StreamManager) RemoveStream(storyRunName, stepID string)

RemoveStream removes a stream from the manager.

func (*StreamManager) SendHeartbeats

func (sm *StreamManager) SendHeartbeats(ctx context.Context)

SendHeartbeats sends a heartbeat to all active streams.

func (*StreamManager) SendOrBuffer

func (sm *StreamManager) SendOrBuffer(ctx context.Context, storyRunName, stepID string, packet *hubv1.DataPacket) bool

SendOrBuffer tries to send a packet to a stream, or buffers it if the stream is not yet available.

func (*StreamManager) StartEvictor

func (sm *StreamManager) StartEvictor(ctx context.Context)

StartEvictor starts a background goroutine to clean up old, unused buffers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL