Documentation
¶
Overview ¶
Package streaming interface
Index ¶
- Constants
- func CallWithTimeout(timeout time.Duration, cancel context.CancelFunc, f func() (err error)) error
- func FinishClientStream(s ClientStream, err error)
- func FinishStream(s Stream, err error)
- func NewCtxWithStream(ctx context.Context, stream Stream) context.Context
- func UnaryCompatibleMiddleware(mode serviceinfo.StreamingMode, allow bool) bool
- type Args
- type BidiStreamingClient
- type BidiStreamingServer
- type ClientStream
- type ClientStreamingClient
- type ClientStreamingServer
- type CloseCallbackRegister
- type EventHandler
- type GRPCStreamGetter
- type Header
- type Result
- type ServerStream
- type ServerStreamingClient
- type ServerStreamingServer
- type Stream
- type Trailer
- type WithDoFinish
Constants ¶
const KitexUnusedProtection = 0
KitexUnusedProtection may be anonymously referenced in another package to avoid build error
Variables ¶
This section is empty.
Functions ¶
func CallWithTimeout ¶ added in v0.9.0
CallWithTimeout executes a function with timeout. If timeout is 0, the function will be executed without timeout; panic is not recovered in this case; If time runs out, it will return a kerrors.ErrRPCTimeout; If your function panics, it will also return a kerrors.ErrRPCTimeout with the panic details; Other kinds of errors are always returned by your function.
NOTE: the `cancel` function is necessary to cancel the underlying transport, otherwise the recv/send call will block until the transport is closed, which may cause surge of goroutines.
func FinishClientStream ¶ added in v0.13.0
func FinishClientStream(s ClientStream, err error)
FinishClientStream records the end of stream you can call it manually when all business logic is done, and you don't want to call Recv/Send for the io.EOF (which triggers the DoFinish automatically). Note: if you're to wrap the original stream in a Client middleware, you should also implement WithDoFinish in your ClientStream implementation.
func FinishStream ¶ added in v0.9.0
FinishStream records the end of stream you can call it manually when all business logic is done, and you don't want to call Recv/Send for the io.EOF (which triggers the DoFinish automatically). Note: if you're to wrap the original stream in a Client middleware, you should also implement WithDoFinish in your Stream implementation. Deprecated: use FinishClientStream instead, this requires enabling the streamx feature.
func NewCtxWithStream ¶ added in v0.9.0
Deprecated.
func UnaryCompatibleMiddleware ¶ added in v0.9.0
func UnaryCompatibleMiddleware(mode serviceinfo.StreamingMode, allow bool) bool
UnaryCompatibleMiddleware returns whether to use compatible middleware for unary.
Types ¶
type Args ¶
type Args struct {
ServerStream ServerStream
ClientStream ClientStream
// for gRPC compatible
Stream Stream
}
Args endpoint request
type BidiStreamingClient ¶ added in v0.13.0
type BidiStreamingClient[Req, Res any] interface { Send(ctx context.Context, req *Req) error Recv(ctx context.Context) (*Res, error) ClientStream }
BidiStreamingClient define client side bidi streaming APIs
func NewBidiStreamingClient ¶ added in v0.13.0
func NewBidiStreamingClient[Req, Res any](st ClientStream) BidiStreamingClient[Req, Res]
NewBidiStreamingClient creates an implementation of BidiStreamingClient[Req, Res].
type BidiStreamingServer ¶ added in v0.13.0
type BidiStreamingServer[Req, Res any] interface { Recv(ctx context.Context) (*Req, error) Send(ctx context.Context, res *Res) error ServerStream }
BidiStreamingServer define server side bidi streaming APIs
func NewBidiStreamingServer ¶ added in v0.13.0
func NewBidiStreamingServer[Req, Res any](st ServerStream) BidiStreamingServer[Req, Res]
NewBidiStreamingServer creates an implementation of BidiStreamingServer[Req, Res].
type ClientStream ¶ added in v0.13.0
type ClientStream interface {
// SendMsg send message to peer
// will block until an error or enough buffer to send
// not concurrent-safety
SendMsg(ctx context.Context, m any) error
// RecvMsg recvive message from peer
// will block until an error or a message received
// not concurrent-safety
RecvMsg(ctx context.Context, m any) error
// Header returns the header info received from the server if there
// is any. It blocks if the header info is not ready to read. If the header info
// is nil and the error is also nil, then the stream was terminated without
// headers, and the error can be discovered by calling RecvMsg.
Header() (Header, error)
// Trailer returns the trailer info from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
Trailer() (Trailer, error)
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.
CloseSend(ctx context.Context) error
// Context the stream context.Context
Context() context.Context
}
ClientStream define client stream APIs
type ClientStreamingClient ¶ added in v0.13.0
type ClientStreamingClient[Req, Res any] interface { Send(ctx context.Context, req *Req) error CloseAndRecv(ctx context.Context) (*Res, error) ClientStream }
ClientStreamingClient define client side client streaming APIs
func NewClientStreamingClient ¶ added in v0.13.0
func NewClientStreamingClient[Req, Res any](st ClientStream) ClientStreamingClient[Req, Res]
NewClientStreamingClient creates an implementation of ClientStreamingClient[Req, Res].
type ClientStreamingServer ¶ added in v0.13.0
type ClientStreamingServer[Req, Res any] interface { Recv(ctx context.Context) (*Req, error) SendAndClose(ctx context.Context, res *Res) error ServerStream }
ClientStreamingServer define server side client streaming APIs
func NewClientStreamingServer ¶ added in v0.13.0
func NewClientStreamingServer[Req, Res any](st ServerStream) ClientStreamingServer[Req, Res]
NewClientStreamingServer creates an implementation of ClientStreamingServer[Req, Res].
type CloseCallbackRegister ¶ added in v0.13.0
type CloseCallbackRegister interface {
RegisterCloseCallback(cb func(error))
}
CloseCallbackRegister register a callback when stream closed.
type EventHandler ¶ added in v0.13.0
EventHandler define stats event handler
type GRPCStreamGetter ¶ added in v0.13.0
type GRPCStreamGetter interface {
GetGRPCStream() Stream
}
type Result ¶
type Result struct {
ServerStream ServerStream
ClientStream ClientStream
// for gRPC compatible
Stream Stream
}
Result endpoint response
type ServerStream ¶ added in v0.13.0
type ServerStream interface {
// SendMsg send message to peer
// will block until an error or enough buffer to send
// not concurrent-safety
SendMsg(ctx context.Context, m any) error
// RecvMsg recvive message from peer
// will block until an error or a message received
// not concurrent-safety
RecvMsg(ctx context.Context, m any) error
// SetHeader sets the header info. It may be called multiple times.
// When call multiple times, all the provided header info will be merged.
// All the header info will be sent out when one of the following happens:
// - ServerStream.SendHeader() is called;
// - The first response is sent out;
// - The server handler returns (error or nil).
SetHeader(hd Header) error
// SendHeader sends the header info.
// The provided hd and headers set by SetHeader() will be sent.
// It fails if called multiple times or called after the first response is sent out.
SendHeader(hd Header) error
// SetTrailer sets the trailer info which will be sent with the RPC status.
// When called more than once, all the provided trailer info will be merged.
SetTrailer(hd Trailer) error
}
ServerStream define server stream APIs
func GetServerStreamFromArg ¶ added in v0.13.0
func GetServerStreamFromArg(arg interface{}) (ServerStream, error)
GetServerStream gets streamx.ServerStream from the arg. It's for kitex gen code ONLY.
type ServerStreamingClient ¶ added in v0.13.0
type ServerStreamingClient[Res any] interface { Recv(ctx context.Context) (*Res, error) ClientStream }
ServerStreamingClient define client side server streaming APIs
func NewServerStreamingClient ¶ added in v0.13.0
func NewServerStreamingClient[Res any](st ClientStream) ServerStreamingClient[Res]
NewServerStreamingClient creates an implementation of ServerStreamingClient[Res].
type ServerStreamingServer ¶ added in v0.13.0
type ServerStreamingServer[Res any] interface { Send(ctx context.Context, res *Res) error ServerStream }
ServerStreamingServer define server side server streaming APIs
func NewServerStreamingServer ¶ added in v0.13.0
func NewServerStreamingServer[Res any](st ServerStream) ServerStreamingServer[Res]
NewServerStreamingServer creates an implementation of ServerStreamingServer[Res].
type Stream ¶
type Stream interface {
// SetHeader sets the header metadata. It may be called multiple times.
// When call multiple times, all the provided metadata will be merged.
// All the metadata will be sent out when one of the following happens:
// - ServerStream.SendHeader() is called;
// - The first response is sent out;
// - An RPC status is sent out (error or success).
SetHeader(metadata.MD) error
// SendHeader sends the header metadata.
// The provided md and headers set by SetHeader() will be sent.
// It fails if called multiple times.
SendHeader(metadata.MD) error
// SetTrailer sets the trailer metadata which will be sent with the RPC status.
// When called more than once, all the provided metadata will be merged.
SetTrailer(metadata.MD)
// Header is used for client side stream to receive header from server.
Header() (metadata.MD, error)
// Trailer is used for client side stream to receive trailer from server.
Trailer() metadata.MD
// Context the stream context.Context
Context() context.Context
// RecvMsg recvive message from peer
// will block until an error or a message received
// not concurrent-safety
RecvMsg(m interface{}) error
// SendMsg send message to peer
// will block until an error or enough buffer to send
// not concurrent-safety
SendMsg(m interface{}) error
// not concurrent-safety with SendMsg
io.Closer
}
Stream both client and server stream Deprecated: It's only for gRPC, use ClientStream or ServerStream instead.
type WithDoFinish ¶ added in v0.9.0
type WithDoFinish interface {
DoFinish(error)
}
WithDoFinish should be implemented when: (1) you want to wrap a stream in client middleware, and (2) you want to manually call streaming.FinishStream(stream, error) to record the end of stream Note: the DoFinish should be reentrant, better with a sync.Once.