streaming

package
v0.15.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2025 License: Apache-2.0 Imports: 12 Imported by: 907

Documentation

Overview

Package streaming interface

Index

Constants

View Source
const KitexUnusedProtection = 0

KitexUnusedProtection may be anonymously referenced in another package to avoid build error

Variables

This section is empty.

Functions

func CallWithTimeout added in v0.9.0

func CallWithTimeout(timeout time.Duration, cancel context.CancelFunc, f func() (err error)) error

CallWithTimeout executes a function with timeout. If timeout is 0, the function will be executed without timeout; panic is not recovered in this case; If time runs out, it will return a kerrors.ErrRPCTimeout; If your function panics, it will also return a kerrors.ErrRPCTimeout with the panic details; Other kinds of errors are always returned by your function.

NOTE: the `cancel` function is necessary to cancel the underlying transport, otherwise the recv/send call will block until the transport is closed, which may cause surge of goroutines.

func FinishClientStream added in v0.13.0

func FinishClientStream(s ClientStream, err error)

FinishClientStream records the end of stream you can call it manually when all business logic is done, and you don't want to call Recv/Send for the io.EOF (which triggers the DoFinish automatically). Note: if you're to wrap the original stream in a Client middleware, you should also implement WithDoFinish in your ClientStream implementation.

func FinishStream added in v0.9.0

func FinishStream(s Stream, err error)

FinishStream records the end of stream you can call it manually when all business logic is done, and you don't want to call Recv/Send for the io.EOF (which triggers the DoFinish automatically). Note: if you're to wrap the original stream in a Client middleware, you should also implement WithDoFinish in your Stream implementation. Deprecated: use FinishClientStream instead, this requires enabling the streamx feature.

func NewCtxWithStream added in v0.9.0

func NewCtxWithStream(ctx context.Context, stream Stream) context.Context

Deprecated.

func UnaryCompatibleMiddleware added in v0.9.0

func UnaryCompatibleMiddleware(mode serviceinfo.StreamingMode, allow bool) bool

UnaryCompatibleMiddleware returns whether to use compatible middleware for unary.

Types

type Args

type Args struct {
	ServerStream ServerStream
	ClientStream ClientStream
	// for gRPC compatible
	Stream Stream
}

Args endpoint request

type BidiStreamingClient added in v0.13.0

type BidiStreamingClient[Req, Res any] interface {
	Send(ctx context.Context, req *Req) error
	Recv(ctx context.Context) (*Res, error)
	ClientStream
}

BidiStreamingClient define client side bidi streaming APIs

func NewBidiStreamingClient added in v0.13.0

func NewBidiStreamingClient[Req, Res any](st ClientStream) BidiStreamingClient[Req, Res]

NewBidiStreamingClient creates an implementation of BidiStreamingClient[Req, Res].

type BidiStreamingServer added in v0.13.0

type BidiStreamingServer[Req, Res any] interface {
	Recv(ctx context.Context) (*Req, error)
	Send(ctx context.Context, res *Res) error
	ServerStream
}

BidiStreamingServer define server side bidi streaming APIs

func NewBidiStreamingServer added in v0.13.0

func NewBidiStreamingServer[Req, Res any](st ServerStream) BidiStreamingServer[Req, Res]

NewBidiStreamingServer creates an implementation of BidiStreamingServer[Req, Res].

type ClientStream added in v0.13.0

type ClientStream interface {
	// SendMsg send message to peer
	// will block until an error or enough buffer to send
	// not concurrent-safety
	SendMsg(ctx context.Context, m any) error
	// RecvMsg recvive message from peer
	// will block until an error or a message received
	// not concurrent-safety
	RecvMsg(ctx context.Context, m any) error
	// Header returns the header info received from the server if there
	// is any. It blocks if the header info is not ready to read.  If the header info
	// is nil and the error is also nil, then the stream was terminated without
	// headers, and the error can be discovered by calling RecvMsg.
	Header() (Header, error)
	// Trailer returns the trailer info from the server, if there is any.
	// It must only be called after stream.CloseAndRecv has returned, or
	// stream.Recv has returned a non-nil error (including io.EOF).
	Trailer() (Trailer, error)
	// CloseSend closes the send direction of the stream. It closes the stream
	// when non-nil error is met. It is also not safe to call CloseSend
	// concurrently with SendMsg.
	CloseSend(ctx context.Context) error
	// Context the stream context.Context
	Context() context.Context
}

ClientStream define client stream APIs

type ClientStreamingClient added in v0.13.0

type ClientStreamingClient[Req, Res any] interface {
	Send(ctx context.Context, req *Req) error
	CloseAndRecv(ctx context.Context) (*Res, error)
	ClientStream
}

ClientStreamingClient define client side client streaming APIs

func NewClientStreamingClient added in v0.13.0

func NewClientStreamingClient[Req, Res any](st ClientStream) ClientStreamingClient[Req, Res]

NewClientStreamingClient creates an implementation of ClientStreamingClient[Req, Res].

type ClientStreamingServer added in v0.13.0

type ClientStreamingServer[Req, Res any] interface {
	Recv(ctx context.Context) (*Req, error)
	SendAndClose(ctx context.Context, res *Res) error
	ServerStream
}

ClientStreamingServer define server side client streaming APIs

func NewClientStreamingServer added in v0.13.0

func NewClientStreamingServer[Req, Res any](st ServerStream) ClientStreamingServer[Req, Res]

NewClientStreamingServer creates an implementation of ClientStreamingServer[Req, Res].

type CloseCallbackRegister added in v0.13.0

type CloseCallbackRegister interface {
	RegisterCloseCallback(cb func(error))
}

CloseCallbackRegister register a callback when stream closed.

type EventHandler added in v0.13.0

type EventHandler func(ctx context.Context, evt stats.Event, err error)

EventHandler define stats event handler

type GRPCStreamGetter added in v0.13.0

type GRPCStreamGetter interface {
	GetGRPCStream() Stream
}
type Header map[string]string

type Result

type Result struct {
	ServerStream ServerStream
	ClientStream ClientStream
	// for gRPC compatible
	Stream Stream
}

Result endpoint response

type ServerStream added in v0.13.0

type ServerStream interface {
	// SendMsg send message to peer
	// will block until an error or enough buffer to send
	// not concurrent-safety
	SendMsg(ctx context.Context, m any) error
	// RecvMsg recvive message from peer
	// will block until an error or a message received
	// not concurrent-safety
	RecvMsg(ctx context.Context, m any) error
	// SetHeader sets the header info. It may be called multiple times.
	// When call multiple times, all the provided header info will be merged.
	// All the header info will be sent out when one of the following happens:
	//  - ServerStream.SendHeader() is called;
	//  - The first response is sent out;
	//  - The server handler returns (error or nil).
	SetHeader(hd Header) error
	// SendHeader sends the header info.
	// The provided hd and headers set by SetHeader() will be sent.
	// It fails if called multiple times or called after the first response is sent out.
	SendHeader(hd Header) error
	// SetTrailer sets the trailer info which will be sent with the RPC status.
	// When called more than once, all the provided trailer info will be merged.
	SetTrailer(hd Trailer) error
}

ServerStream define server stream APIs

func GetServerStreamFromArg added in v0.13.0

func GetServerStreamFromArg(arg interface{}) (ServerStream, error)

GetServerStream gets streamx.ServerStream from the arg. It's for kitex gen code ONLY.

type ServerStreamingClient added in v0.13.0

type ServerStreamingClient[Res any] interface {
	Recv(ctx context.Context) (*Res, error)
	ClientStream
}

ServerStreamingClient define client side server streaming APIs

func NewServerStreamingClient added in v0.13.0

func NewServerStreamingClient[Res any](st ClientStream) ServerStreamingClient[Res]

NewServerStreamingClient creates an implementation of ServerStreamingClient[Res].

type ServerStreamingServer added in v0.13.0

type ServerStreamingServer[Res any] interface {
	Send(ctx context.Context, res *Res) error
	ServerStream
}

ServerStreamingServer define server side server streaming APIs

func NewServerStreamingServer added in v0.13.0

func NewServerStreamingServer[Res any](st ServerStream) ServerStreamingServer[Res]

NewServerStreamingServer creates an implementation of ServerStreamingServer[Res].

type Stream

type Stream interface {
	// SetHeader sets the header metadata. It may be called multiple times.
	// When call multiple times, all the provided metadata will be merged.
	// All the metadata will be sent out when one of the following happens:
	//  - ServerStream.SendHeader() is called;
	//  - The first response is sent out;
	//  - An RPC status is sent out (error or success).
	SetHeader(metadata.MD) error
	// SendHeader sends the header metadata.
	// The provided md and headers set by SetHeader() will be sent.
	// It fails if called multiple times.
	SendHeader(metadata.MD) error
	// SetTrailer sets the trailer metadata which will be sent with the RPC status.
	// When called more than once, all the provided metadata will be merged.
	SetTrailer(metadata.MD)
	// Header is used for client side stream to receive header from server.
	Header() (metadata.MD, error)
	// Trailer is used for client side stream to receive trailer from server.
	Trailer() metadata.MD
	// Context the stream context.Context
	Context() context.Context
	// RecvMsg recvive message from peer
	// will block until an error or a message received
	// not concurrent-safety
	RecvMsg(m interface{}) error
	// SendMsg send message to peer
	// will block until an error or enough buffer to send
	// not concurrent-safety
	SendMsg(m interface{}) error
	// not concurrent-safety with SendMsg
	io.Closer
}

Stream both client and server stream Deprecated: It's only for gRPC, use ClientStream or ServerStream instead.

func GetStream added in v0.9.0

func GetStream(ctx context.Context) Stream

Deprecated.

type Trailer added in v0.13.0

type Trailer map[string]string

type WithDoFinish added in v0.9.0

type WithDoFinish interface {
	DoFinish(error)
}

WithDoFinish should be implemented when: (1) you want to wrap a stream in client middleware, and (2) you want to manually call streaming.FinishStream(stream, error) to record the end of stream Note: the DoFinish should be reentrant, better with a sync.Once.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL