streaming

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2025 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SliceToChan

func SliceToChan[T any](ctx context.Context, items []T) <-chan T

SliceToChan converts a slice of items into a channel that emits each item. It respects the provided context for cancellation.

Types

type BidiStream

type BidiStream[InT, OutT any] interface {
	Send(*InT) error
	Recv() (*OutT, error)
	CloseSend() error
}

BidiStream defines the interface for bidirectional streaming. This pattern allows sending and receiving messages independently.

type ClientStream

type ClientStream[InT, OutT any] interface {
	Send(*InT) error
	CloseAndRecv() (*OutT, error)
	CloseSend() error
}

ClientStream defines the interface for client streaming (many inputs → one output). This pattern is used when sending multiple requests and receiving a single response.

type ServerStream

type ServerStream[OutT any] interface {
	Recv() (*OutT, error)
}

ServerStream defines the interface for server streaming (one input → many outputs). This pattern is used when sending a single request and receiving multiple responses.

type StreamResult

type StreamResult[OutT any] interface {
	// ResCh returns a read-only channel for receiving results of type *OutT.
	// More than one result can be sent before the DoneCh is closed.
	//
	// NOTES:
	//   - For ClientStream, the ResCh can receive a single result before the DoneCh is closed.
	//   - For BidiStream, the ResCh can receive multiple results until the DoneCh is closed.
	ResCh() <-chan *OutT

	// ErrCh returns a read-only channel for receiving errors encountered during processing.
	// Errors are sent to this channel as they occur.
	// More than one error can be sent before the DoneCh is closed.
	ErrCh() <-chan error

	// DoneCh returns a read-only channel that is closed when processing is complete.
	// It is used to signal that no more results or errors will be sent.
	DoneCh() <-chan struct{}
}

StreamResult encapsulates the channels for receiving streaming results, errors, and completion signals. It provides a structured way to handle streaming responses.

Callers that handle StreamResult should:

  1. Range over the ResCh to process incoming results.
  2. Range over the ErrCh to handle errors as they occur.
  3. Monitor the DoneCh to know when processing is complete.

If the caller does not subscribe to these channels, the processing goroutines will block until the channels are read or the context is cancelled.

Example usage:

for {
    select {
    case res := <-result.ResCh():
        // Process result
    case err := <-result.ErrCh():
        // Handle error
    case <-result.DoneCh():
        // Processing is done
        // Exit loop
        return
    }
}

func ProcessBidiStream

func ProcessBidiStream[InT, OutT any](
	ctx context.Context,
	stream BidiStream[InT, OutT],
	inputCh <-chan *InT,
) (StreamResult[OutT], error)

ProcessBidiStream handles concurrent bidirectional streaming.

Pattern: Sender || Receiver (parallel goroutines)

This processor implements true bidirectional streaming with concurrent send and receive operations. It spawns two independent goroutines:

  • Sender: Continuously sends inputs from the input channel
  • Receiver: Continuously receives outputs and sends them to the output channel

This pattern maximizes throughput by:

  • Eliminating round-trip latency between requests
  • Allowing the server to batch/buffer/pipeline operations
  • Fully utilizing network bandwidth
  • Enabling concurrent processing on both client and server

This is useful when:

  • High performance and throughput are needed
  • Processing large batches of data
  • Server can process requests in parallel or batches
  • Responses can arrive in any order or timing
  • Network latency is significant

Returns:

  • result: StreamResult containing result, error, and done channels
  • error: Immediate error if validation fails

The caller should:

  1. Range over result channels to process outputs and errors
  2. Check if the processing is done
  3. Use context cancellation to stop processing early

func ProcessClientStream

func ProcessClientStream[InT, OutT any](
	ctx context.Context,
	stream ClientStream[InT, OutT],
	inputCh <-chan *InT,
) (StreamResult[OutT], error)

ProcessClientStream handles client streaming pattern (many inputs → one output).

Pattern: Send → Send → Send → CloseAndRecv()

This processor is ideal for operations where multiple requests are sent to the server, and a single final response is received after all requests have been processed.

The processor:

  • Sends all inputs from the channel to the stream
  • Closes the send side when input channel closes
  • Receives the final response via CloseAndRecv()

Returns:

  • result: StreamResult containing result, error, and done channels
  • error: Immediate error if validation fails

The caller should:

  1. Range over result channels to process outputs and errors
  2. Check if the processing is done
  3. Use context cancellation to stop processing early

func ProcessServerStream

func ProcessServerStream[OutT any](
	ctx context.Context,
	stream ServerStream[OutT],
) (StreamResult[OutT], error)

ProcessServerStream handles server streaming pattern (one input → many outputs).

Pattern: Send Request → Recv() → Recv() → Recv() → EOF

This processor is ideal for operations where a single request triggers multiple responses from the server, such as:

  • Streaming search results
  • Listing resources
  • Tailing logs
  • Real-time event streams

The processor:

  • Continuously receives outputs from the stream
  • Sends each output to the result channel
  • Handles EOF gracefully to signal completion
  • Propagates errors to the error channel

Returns:

  • result: StreamResult containing result, error, and done channels
  • error: Immediate error if validation fails

The caller should:

  1. Range over result channels to process outputs and errors
  2. Monitor the DoneCh to know when streaming is complete
  3. Use context cancellation to stop processing early

Example usage:

stream, err := client.Listen(ctx, req)
if err != nil {
    return err
}

result, err := streaming.ProcessServerStream(ctx, stream)
if err != nil {
    return err
}

for {
    select {
    case resp := <-result.ResCh():
        // Process response
    case err := <-result.ErrCh():
        // Handle error
        return err
    case <-result.DoneCh():
        // All responses received
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL