Documentation
¶
Index ¶
- func IsSocketClosedError(err error) bool
- func ToIntRequestN(n uint32) int
- func ToUint32RequestN(n int) uint32
- type AbstractRSocket
- func (a AbstractRSocket) FireAndForget(message payload.Payload)
- func (a AbstractRSocket) MetadataPush(message payload.Payload)
- func (a AbstractRSocket) RequestChannel(messages flux.Flux) flux.Flux
- func (a AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono
- func (a AbstractRSocket) RequestStream(message payload.Payload) flux.Flux
- type BaseSocket
- func (p *BaseSocket) Close() error
- func (p *BaseSocket) FireAndForget(message payload.Payload)
- func (p *BaseSocket) MetadataPush(message payload.Payload)
- func (p *BaseSocket) OnClose(fn func(error))
- func (p *BaseSocket) RequestChannel(messages flux.Flux) flux.Flux
- func (p *BaseSocket) RequestResponse(message payload.Payload) mono.Mono
- func (p *BaseSocket) RequestStream(message payload.Payload) flux.Flux
- type ClientSocket
- type Closeable
- type DuplexConnection
- func (dc *DuplexConnection) Close() error
- func (dc *DuplexConnection) FireAndForget(req payload.Payload)
- func (dc *DuplexConnection) GetError() (err error)
- func (dc *DuplexConnection) LoopWrite(ctx context.Context) error
- func (dc *DuplexConnection) MetadataPush(payload payload.Payload)
- func (dc *DuplexConnection) RequestChannel(sending flux.Flux) (ret flux.Flux)
- func (dc *DuplexConnection) RequestResponse(req payload.Payload) (res mono.Mono)
- func (dc *DuplexConnection) RequestStream(sending payload.Payload) (ret flux.Flux)
- func (dc *DuplexConnection) SetError(err error)
- func (dc *DuplexConnection) SetResponder(responder Responder)
- func (dc *DuplexConnection) SetTransport(tp *transport.Transport) (ok bool)
- type Keepaliver
- type Responder
- type ServerSocket
- type SetupInfo
- type StreamID
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsSocketClosedError ¶ added in v0.6.0
IsSocketClosedError returns true if input error is for socket closed.
func ToIntRequestN ¶ added in v0.6.0
ToIntRequestN converts n to valid request n.
func ToUint32RequestN ¶ added in v0.6.0
ToUint32RequestN converts n to valid request n.
Types ¶
type AbstractRSocket ¶
type AbstractRSocket struct {
FF func(payload.Payload)
MP func(payload.Payload)
RR func(payload.Payload) mono.Mono
RS func(payload.Payload) flux.Flux
RC func(flux.Flux) flux.Flux
}
AbstractRSocket represents an abstract RSocket.
func (AbstractRSocket) FireAndForget ¶
func (a AbstractRSocket) FireAndForget(message payload.Payload)
FireAndForget starts a request of FireAndForget.
func (AbstractRSocket) MetadataPush ¶
func (a AbstractRSocket) MetadataPush(message payload.Payload)
MetadataPush starts a request of MetadataPush.
func (AbstractRSocket) RequestChannel ¶
func (a AbstractRSocket) RequestChannel(messages flux.Flux) flux.Flux
RequestChannel starts a request of RequestChannel.
func (AbstractRSocket) RequestResponse ¶
func (a AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono
RequestResponse starts a request of RequestResponse.
func (AbstractRSocket) RequestStream ¶
func (a AbstractRSocket) RequestStream(message payload.Payload) flux.Flux
RequestStream starts a request of RequestStream.
type BaseSocket ¶ added in v0.6.0
type BaseSocket struct {
// contains filtered or unexported fields
}
BaseSocket is basic socket.
func NewBaseSocket ¶ added in v0.6.0
func NewBaseSocket(rawSocket *DuplexConnection) *BaseSocket
NewBaseSocket creates a new BaseSocket.
func (*BaseSocket) FireAndForget ¶ added in v0.6.0
func (p *BaseSocket) FireAndForget(message payload.Payload)
FireAndForget sends FireAndForget request.
func (*BaseSocket) MetadataPush ¶ added in v0.6.0
func (p *BaseSocket) MetadataPush(message payload.Payload)
MetadataPush sends MetadataPush request.
func (*BaseSocket) OnClose ¶ added in v0.6.0
func (p *BaseSocket) OnClose(fn func(error))
OnClose registers handler when socket closed.
func (*BaseSocket) RequestChannel ¶ added in v0.6.0
func (p *BaseSocket) RequestChannel(messages flux.Flux) flux.Flux
RequestChannel sends RequestChannel request.
func (*BaseSocket) RequestResponse ¶ added in v0.6.0
func (p *BaseSocket) RequestResponse(message payload.Payload) mono.Mono
RequestResponse sends RequestResponse request.
func (*BaseSocket) RequestStream ¶ added in v0.6.0
func (p *BaseSocket) RequestStream(message payload.Payload) flux.Flux
RequestStream sends RequestStream request.
type ClientSocket ¶
type ClientSocket interface {
Closeable
Responder
// Setup setups current socket.
Setup(ctx context.Context, connectTimeout time.Duration, setup *SetupInfo) error
}
ClientSocket represents a client-side socket.
func NewClient ¶
func NewClient(tp transport.ClientTransporter, socket *DuplexConnection) ClientSocket
NewClient create a simple client-side socket.
func NewResumableClientSocket ¶ added in v0.6.0
func NewResumableClientSocket(tp transport.ClientTransporter, socket *DuplexConnection) ClientSocket
NewResumableClientSocket creates a client-side socket with resume support.
type Closeable ¶
type Closeable interface {
io.Closer
// OnClose bind a handler when closing.
OnClose(closer func(error))
}
Closeable represents a closeable target.
type DuplexConnection ¶ added in v0.6.0
type DuplexConnection struct {
// contains filtered or unexported fields
}
DuplexConnection represents a socket of RSocket which can be a requester or a responder.
func NewClientDuplexConnection ¶ added in v0.6.0
func NewClientDuplexConnection(reqSche, resSche scheduler.Scheduler, mtu int, keepaliveInterval time.Duration) *DuplexConnection
NewClientDuplexConnection creates a new client-side DuplexConnection.
func NewServerDuplexConnection ¶ added in v0.6.0
func NewServerDuplexConnection(reqSche, resSche scheduler.Scheduler, mtu int, leases lease.Factory) *DuplexConnection
NewServerDuplexConnection creates a new server-side DuplexConnection.
func (*DuplexConnection) Close ¶ added in v0.6.0
func (dc *DuplexConnection) Close() error
Close close current socket.
func (*DuplexConnection) FireAndForget ¶ added in v0.6.0
func (dc *DuplexConnection) FireAndForget(req payload.Payload)
FireAndForget start a request of FireAndForget.
func (*DuplexConnection) GetError ¶ added in v0.6.0
func (dc *DuplexConnection) GetError() (err error)
GetError get the error set.
func (*DuplexConnection) LoopWrite ¶ added in v0.6.0
func (dc *DuplexConnection) LoopWrite(ctx context.Context) error
LoopWrite start write loop
func (*DuplexConnection) MetadataPush ¶ added in v0.6.0
func (dc *DuplexConnection) MetadataPush(payload payload.Payload)
MetadataPush start a request of MetadataPush.
func (*DuplexConnection) RequestChannel ¶ added in v0.6.0
func (dc *DuplexConnection) RequestChannel(sending flux.Flux) (ret flux.Flux)
RequestChannel start a request of RequestChannel.
func (*DuplexConnection) RequestResponse ¶ added in v0.6.0
func (dc *DuplexConnection) RequestResponse(req payload.Payload) (res mono.Mono)
RequestResponse start a request of RequestResponse.
func (*DuplexConnection) RequestStream ¶ added in v0.6.0
func (dc *DuplexConnection) RequestStream(sending payload.Payload) (ret flux.Flux)
RequestStream start a request of RequestStream.
func (*DuplexConnection) SetError ¶ added in v0.6.0
func (dc *DuplexConnection) SetError(err error)
SetError sets error for current socket.
func (*DuplexConnection) SetResponder ¶ added in v0.6.0
func (dc *DuplexConnection) SetResponder(responder Responder)
SetResponder sets a responder for current socket.
func (*DuplexConnection) SetTransport ¶ added in v0.6.0
func (dc *DuplexConnection) SetTransport(tp *transport.Transport) (ok bool)
SetTransport sets a transport for current socket.
type Keepaliver ¶ added in v0.6.0
type Keepaliver struct {
// contains filtered or unexported fields
}
Keepaliver controls connection keepalive.
func NewKeepaliver ¶ added in v0.6.0
func NewKeepaliver(interval time.Duration) *Keepaliver
NewKeepaliver creates a new keepaliver.
func (Keepaliver) Done ¶ added in v0.6.0
func (p Keepaliver) Done() <-chan struct{}
Done returns done chan.
type Responder ¶
type Responder interface {
// FireAndForget is a single one-way message.
FireAndForget(message payload.Payload)
// MetadataPush sends asynchronous Metadata frame.
MetadataPush(message payload.Payload)
// RequestResponse request single response.
RequestResponse(message payload.Payload) mono.Mono
// RequestStream request a completable stream.
RequestStream(message payload.Payload) flux.Flux
// RequestChannel request a completable stream in both directions.
RequestChannel(messages flux.Flux) flux.Flux
}
Responder is a contract providing different interaction models for RSocket protocol.
type ServerSocket ¶
type ServerSocket interface {
Closeable
Responder
// SetResponder sets a responder for current socket.
SetResponder(responder Responder)
// SetTransport sets a transport for current socket.
SetTransport(tp *transport.Transport)
// Pause pause current socket.
Pause() bool
// Start starts current socket.
Start(ctx context.Context) error
// Token returns token of socket.
Token() (token []byte, ok bool)
}
ServerSocket represents a server-side socket.
func NewResumableServerSocket ¶ added in v0.6.0
func NewResumableServerSocket(socket *DuplexConnection, token []byte) ServerSocket
NewResumableServerSocket creates a new server-side socket with resume support.
func NewSimpleServerSocket ¶ added in v0.6.0
func NewSimpleServerSocket(socket *DuplexConnection) ServerSocket
NewSimpleServerSocket creates a new server-side socket.