Documentation
¶
Index ¶
- func SliceToChan[T any](ctx context.Context, items []T) <-chan T
- type BidiStream
- type ClientStream
- type ServerStream
- type StreamResult
- func ProcessBidiStream[InT, OutT any](ctx context.Context, stream BidiStream[InT, OutT], inputCh <-chan *InT) (StreamResult[OutT], error)
- func ProcessClientStream[InT, OutT any](ctx context.Context, stream ClientStream[InT, OutT], inputCh <-chan *InT) (StreamResult[OutT], error)
- func ProcessServerStream[OutT any](ctx context.Context, stream ServerStream[OutT]) (StreamResult[OutT], error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SliceToChan ¶
SliceToChan converts a slice of items into a channel that emits each item. It respects the provided context for cancellation.
Types ¶
type BidiStream ¶
type BidiStream[InT, OutT any] interface { Send(*InT) error Recv() (*OutT, error) CloseSend() error }
BidiStream defines the interface for bidirectional streaming. This pattern allows sending and receiving messages independently.
type ClientStream ¶
type ClientStream[InT, OutT any] interface { Send(*InT) error CloseAndRecv() (*OutT, error) CloseSend() error }
ClientStream defines the interface for client streaming (many inputs → one output). This pattern is used when sending multiple requests and receiving a single response.
type ServerStream ¶
ServerStream defines the interface for server streaming (one input → many outputs). This pattern is used when sending a single request and receiving multiple responses.
type StreamResult ¶
type StreamResult[OutT any] interface { // ResCh returns a read-only channel for receiving results of type *OutT. // More than one result can be sent before the DoneCh is closed. // // NOTES: // - For ClientStream, the ResCh can receive a single result before the DoneCh is closed. // - For BidiStream, the ResCh can receive multiple results until the DoneCh is closed. ResCh() <-chan *OutT // ErrCh returns a read-only channel for receiving errors encountered during processing. // Errors are sent to this channel as they occur. // More than one error can be sent before the DoneCh is closed. ErrCh() <-chan error // DoneCh returns a read-only channel that is closed when processing is complete. // It is used to signal that no more results or errors will be sent. DoneCh() <-chan struct{} }
StreamResult encapsulates the channels for receiving streaming results, errors, and completion signals. It provides a structured way to handle streaming responses.
Callers that handle StreamResult should:
- Range over the ResCh to process incoming results.
- Range over the ErrCh to handle errors as they occur.
- Monitor the DoneCh to know when processing is complete.
If the caller does not subscribe to these channels, the processing goroutines will block until the channels are read or the context is cancelled.
Example usage:
for {
select {
case res := <-result.ResCh():
// Process result
case err := <-result.ErrCh():
// Handle error
case <-result.DoneCh():
// Processing is done
// Exit loop
return
}
}
func ProcessBidiStream ¶
func ProcessBidiStream[InT, OutT any]( ctx context.Context, stream BidiStream[InT, OutT], inputCh <-chan *InT, ) (StreamResult[OutT], error)
ProcessBidiStream handles concurrent bidirectional streaming.
Pattern: Sender || Receiver (parallel goroutines)
This processor implements true bidirectional streaming with concurrent send and receive operations. It spawns two independent goroutines:
- Sender: Continuously sends inputs from the input channel
- Receiver: Continuously receives outputs and sends them to the output channel
This pattern maximizes throughput by:
- Eliminating round-trip latency between requests
- Allowing the server to batch/buffer/pipeline operations
- Fully utilizing network bandwidth
- Enabling concurrent processing on both client and server
This is useful when:
- High performance and throughput are needed
- Processing large batches of data
- Server can process requests in parallel or batches
- Responses can arrive in any order or timing
- Network latency is significant
Returns:
- result: StreamResult containing result, error, and done channels
- error: Immediate error if validation fails
The caller should:
- Range over result channels to process outputs and errors
- Check if the processing is done
- Use context cancellation to stop processing early
func ProcessClientStream ¶
func ProcessClientStream[InT, OutT any]( ctx context.Context, stream ClientStream[InT, OutT], inputCh <-chan *InT, ) (StreamResult[OutT], error)
ProcessClientStream handles client streaming pattern (many inputs → one output).
Pattern: Send → Send → Send → CloseAndRecv()
This processor is ideal for operations where multiple requests are sent to the server, and a single final response is received after all requests have been processed.
The processor:
- Sends all inputs from the channel to the stream
- Closes the send side when input channel closes
- Receives the final response via CloseAndRecv()
Returns:
- result: StreamResult containing result, error, and done channels
- error: Immediate error if validation fails
The caller should:
- Range over result channels to process outputs and errors
- Check if the processing is done
- Use context cancellation to stop processing early
func ProcessServerStream ¶
func ProcessServerStream[OutT any]( ctx context.Context, stream ServerStream[OutT], ) (StreamResult[OutT], error)
ProcessServerStream handles server streaming pattern (one input → many outputs).
Pattern: Send Request → Recv() → Recv() → Recv() → EOF
This processor is ideal for operations where a single request triggers multiple responses from the server, such as:
- Streaming search results
- Listing resources
- Tailing logs
- Real-time event streams
The processor:
- Continuously receives outputs from the stream
- Sends each output to the result channel
- Handles EOF gracefully to signal completion
- Propagates errors to the error channel
Returns:
- result: StreamResult containing result, error, and done channels
- error: Immediate error if validation fails
The caller should:
- Range over result channels to process outputs and errors
- Monitor the DoneCh to know when streaming is complete
- Use context cancellation to stop processing early
Example usage:
stream, err := client.Listen(ctx, req)
if err != nil {
return err
}
result, err := streaming.ProcessServerStream(ctx, stream)
if err != nil {
return err
}
for {
select {
case resp := <-result.ResCh():
// Process response
case err := <-result.ErrCh():
// Handle error
return err
case <-result.DoneCh():
// All responses received
return nil
case <-ctx.Done():
return ctx.Err()
}
}