Documentation
¶
Index ¶
- type AbstractRSocket
- func (p AbstractRSocket) FireAndForget(message payload.Payload)
- func (p AbstractRSocket) MetadataPush(message payload.Payload)
- func (p AbstractRSocket) RequestChannel(messages rx.Publisher) flux.Flux
- func (p AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono
- func (p AbstractRSocket) RequestStream(message payload.Payload) flux.Flux
- type ClientSocket
- type Closeable
- type DuplexRSocket
- func (p *DuplexRSocket) Close() error
- func (p *DuplexRSocket) FireAndForget(sending payload.Payload)
- func (p *DuplexRSocket) MetadataPush(payload payload.Payload)
- func (p *DuplexRSocket) RequestChannel(publisher rx.Publisher) (ret flux.Flux)
- func (p *DuplexRSocket) RequestResponse(pl payload.Payload) (mo mono.Mono)
- func (p *DuplexRSocket) RequestStream(sending payload.Payload) (ret flux.Flux)
- func (p *DuplexRSocket) SetError(e error)
- func (p *DuplexRSocket) SetResponder(responder Responder)
- func (p *DuplexRSocket) SetTransport(tp *transport.Transport)
- type Responder
- type ServerSocket
- type SetupInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AbstractRSocket ¶
type AbstractRSocket struct {
FF func(payload.Payload)
MP func(payload.Payload)
RR func(payload.Payload) mono.Mono
RS func(payload.Payload) flux.Flux
RC func(rx.Publisher) flux.Flux
}
AbstractRSocket represents an abstract RSocket.
func (AbstractRSocket) FireAndForget ¶
func (p AbstractRSocket) FireAndForget(message payload.Payload)
FireAndForget starts a request of FireAndForget.
func (AbstractRSocket) MetadataPush ¶
func (p AbstractRSocket) MetadataPush(message payload.Payload)
MetadataPush starts a request of MetadataPush.
func (AbstractRSocket) RequestChannel ¶
func (p AbstractRSocket) RequestChannel(messages rx.Publisher) flux.Flux
RequestChannel starts a request of RequestChannel.
func (AbstractRSocket) RequestResponse ¶
func (p AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono
RequestResponse starts a request of RequestResponse.
func (AbstractRSocket) RequestStream ¶
func (p AbstractRSocket) RequestStream(message payload.Payload) flux.Flux
RequestStream starts a request of RequestStream.
type ClientSocket ¶
type ClientSocket interface {
Closeable
Responder
// Setup setups current socket.
Setup(ctx context.Context, setup *SetupInfo) (err error)
}
ClientSocket represents a client-side socket.
func NewClient ¶
func NewClient(uri *transport.URI, socket *DuplexRSocket, tc *tls.Config, headers map[string][]string) ClientSocket
NewClient create a simple client-side socket.
func NewClientResume ¶
func NewClientResume(uri *transport.URI, socket *DuplexRSocket, tc *tls.Config, headers map[string][]string) ClientSocket
NewClientResume creates a client-side socket with resume support.
type Closeable ¶
type Closeable interface {
io.Closer
// OnClose bind a handler when closing.
OnClose(closer func(error))
}
Closeable represents a closeable target.
type DuplexRSocket ¶
type DuplexRSocket struct {
// contains filtered or unexported fields
}
DuplexRSocket represents a socket of RSocket which can be a requester or a responder.
func NewClientDuplexRSocket ¶
func NewClientDuplexRSocket( mtu int, keepaliveInterval time.Duration, ) (s *DuplexRSocket)
NewClientDuplexRSocket creates a new client-side DuplexRSocket.
func NewServerDuplexRSocket ¶
func NewServerDuplexRSocket(mtu int, leases lease.Leases) *DuplexRSocket
NewServerDuplexRSocket creates a new server-side DuplexRSocket.
func (*DuplexRSocket) FireAndForget ¶
func (p *DuplexRSocket) FireAndForget(sending payload.Payload)
FireAndForget start a request of FireAndForget.
func (*DuplexRSocket) MetadataPush ¶
func (p *DuplexRSocket) MetadataPush(payload payload.Payload)
MetadataPush start a request of MetadataPush.
func (*DuplexRSocket) RequestChannel ¶
func (p *DuplexRSocket) RequestChannel(publisher rx.Publisher) (ret flux.Flux)
RequestChannel start a request of RequestChannel.
func (*DuplexRSocket) RequestResponse ¶
func (p *DuplexRSocket) RequestResponse(pl payload.Payload) (mo mono.Mono)
RequestResponse start a request of RequestResponse.
func (*DuplexRSocket) RequestStream ¶
func (p *DuplexRSocket) RequestStream(sending payload.Payload) (ret flux.Flux)
RequestStream start a request of RequestStream.
func (*DuplexRSocket) SetError ¶ added in v0.4.0
func (p *DuplexRSocket) SetError(e error)
SetError sets error for current socket.
func (*DuplexRSocket) SetResponder ¶
func (p *DuplexRSocket) SetResponder(responder Responder)
SetResponder sets a responder for current socket.
func (*DuplexRSocket) SetTransport ¶
func (p *DuplexRSocket) SetTransport(tp *transport.Transport)
SetTransport sets a transport for current socket.
type Responder ¶
type Responder interface {
// FireAndForget is a single one-way message.
FireAndForget(message payload.Payload)
// MetadataPush sends asynchronous Metadata frame.
MetadataPush(message payload.Payload)
// RequestResponse request single response.
RequestResponse(message payload.Payload) mono.Mono
// RequestStream request a completable stream.
RequestStream(message payload.Payload) flux.Flux
// RequestChannel request a completable stream in both directions.
RequestChannel(messages rx.Publisher) flux.Flux
}
Responder is a contract providing different interaction models for RSocket protocol.
type ServerSocket ¶
type ServerSocket interface {
Closeable
Responder
// SetResponder sets a responder for current socket.
SetResponder(responder Responder)
// SetTransport sets a transport for current socket.
SetTransport(tp *transport.Transport)
// Pause pause current socket.
Pause() bool
// Start starts current socket.
Start(ctx context.Context) error
// Token returns token of socket.
Token() (token []byte, ok bool)
}
ServerSocket represents a server-side socket.
func NewServer ¶
func NewServer(socket *DuplexRSocket) ServerSocket
NewServer creates a new server-side socket.
func NewServerResume ¶
func NewServerResume(socket *DuplexRSocket, token []byte) ServerSocket
NewServerResume creates a new server-side socket with resume support.