Documentation
¶
Overview ¶
Example ¶
// Serve a server
err := Receive().
Acceptor(func(setup SetupPayload, sendingSocket CloseableRSocket) RSocket {
return NewAbstractSocket(
RequestResponse(func(msg Payload) mono.Mono {
log.Println("incoming request:", msg)
return mono.Just(NewString("Pong", time.Now().String()))
}),
)
}).
Transport("tcp://127.0.0.1:7878").
Serve(context.Background())
if err != nil {
panic(err)
}
// Connect to a server.
cli, err := Connect().
SetupPayload(NewString("Hello World", "From Golang")).
Transport("tcp://127.0.0.1:7878").
Start(context.Background())
if err != nil {
panic(err)
}
defer func() {
_ = cli.Close()
}()
cli.RequestResponse(NewString("Ping", time.Now().String())).
DoOnSuccess(func(elem Payload) {
log.Println("incoming response:", elem)
}).
Subscribe(context.Background())
Index ¶
- type Client
- type ClientBuilder
- type ClientResumeOptions
- type ClientSocketAcceptor
- type ClientStarter
- type ClientTransportBuilder
- type CloseableRSocket
- type OpServerResume
- type OptAbstractSocket
- func FireAndForget(fn func(msg payload.Payload)) OptAbstractSocket
- func MetadataPush(fn func(msg payload.Payload)) OptAbstractSocket
- func RequestChannel(fn func(msgs rx.Publisher) flux.Flux) OptAbstractSocket
- func RequestResponse(fn func(msg payload.Payload) mono.Mono) OptAbstractSocket
- func RequestStream(fn func(msg payload.Payload) flux.Flux) OptAbstractSocket
- type RSocket
- type ServerAcceptor
- type ServerBuilder
- type ServerTransportBuilder
- type Start
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶ added in v0.2.0
type Client interface {
CloseableRSocket
}
Client is Client Side of a RSocket socket. Sends Frames to a RSocket Server.
type ClientBuilder ¶
type ClientBuilder interface {
ClientTransportBuilder
// Fragment set fragmentation size which default is 16_777_215(16MB).
Fragment(mtu int) ClientBuilder
// KeepAlive defines current client keepalive settings.
KeepAlive(tickPeriod, ackTimeout time.Duration, missedAcks int) ClientBuilder
// Resume enable resume for current RSocket.
Resume(opts ...ClientResumeOptions) ClientBuilder
// DataMimeType is used to set payload data MIME type.
// Default MIME type is `application/binary`.
DataMimeType(mime string) ClientBuilder
// MetadataMimeType is used to set payload metadata MIME type.
// Default MIME type is `application/binary`.
MetadataMimeType(mime string) ClientBuilder
// SetupPayload set the setup payload.
SetupPayload(setup payload.Payload) ClientBuilder
// OnClose register handler when client socket closed.
OnClose(fn func()) ClientBuilder
// Acceptor set acceptor for RSocket client.
Acceptor(acceptor ClientSocketAcceptor) ClientTransportBuilder
}
ClientBuilder can be used to build a RSocket client.
func Connect ¶
func Connect() ClientBuilder
Connect create a new RSocket client builder with default settings.
Example ¶
cli, err := Connect().
Resume().
Fragment(65535).
SetupPayload(NewString("Hello", "World")).
Acceptor(func(socket RSocket) RSocket {
return NewAbstractSocket(RequestResponse(func(msg Payload) mono.Mono {
return mono.Just(NewString("Pong", time.Now().String()))
}))
}).
Transport("tcp://127.0.0.1:7878").
Start(context.Background())
if err != nil {
panic(err)
}
defer func() {
_ = cli.Close()
}()
// Simple FireAndForget.
cli.FireAndForget(NewString("This is a FNF message.", ""))
// Simple RequestResponse.
cli.RequestResponse(NewString("This is a RequestResponse message.", "")).
DoOnSuccess(func(elem Payload) {
log.Println("response:", elem)
}).
Subscribe(context.Background())
var s rx.Subscription
// RequestStream with backpressure. (one by one)
cli.RequestStream(NewString("This is a RequestStream message.", "")).
DoOnNext(func(elem Payload) {
log.Println("next element in stream:", elem)
s.Request(1)
}).
Subscribe(context.Background(), rx.OnSubscribe(func(s rx.Subscription) {
s.Request(1)
}))
// Simple RequestChannel.
sendFlux := flux.Create(func(ctx context.Context, s flux.Sink) {
for i := 0; i < 3; i++ {
s.Next(NewString(fmt.Sprintf("This is a RequestChannel message #%d.", i), ""))
}
s.Complete()
})
cli.RequestChannel(sendFlux).
DoOnNext(func(elem Payload) {
log.Println("next element in channel:", elem)
}).
Subscribe(context.Background())
type ClientResumeOptions ¶ added in v0.2.0
type ClientResumeOptions func(opts *resumeOpts)
ClientResumeOptions represents resume options for client.
func WithClientResumeToken ¶ added in v0.2.0
func WithClientResumeToken(gen func() []byte) ClientResumeOptions
WithClientResumeToken creates a resume token generator.
type ClientSocketAcceptor ¶
ClientSocketAcceptor is alias for RSocket handler function.
type ClientStarter ¶
type ClientStarter interface {
// Start start a client socket.
Start(ctx context.Context) (Client, error)
// Start start a client socket with TLS.
// Here's an example:
// tc:=&tls.Config{
// InsecureSkipVerify: true,
//}
StartTLS(ctx context.Context, tc *tls.Config) (Client, error)
}
ClientStarter can be used to start a client.
type ClientTransportBuilder ¶
type ClientTransportBuilder interface {
// Transport set Transport for current RSocket client.
// URI is used to create RSocket Transport:
// Example:
// "tcp://127.0.0.1:7878" means a TCP RSocket transport.
// "ws://127.0.0.1:8080/a/b/c" means a Websocket RSocket transport.
// "wss://127.0.0.1:8080/a/b/c" means a Websocket RSocket transport with HTTPS.
Transport(uri string) ClientStarter
}
ClientTransportBuilder is used to build a RSocket client with custom Transport string.
type CloseableRSocket ¶ added in v0.2.0
CloseableRSocket is a RSocket which support more events.
type OpServerResume ¶ added in v0.2.0
type OpServerResume func(o *serverResumeOptions)
OpServerResume represents resume options for RSocket server.
func WithServerResumeSessionDuration ¶ added in v0.2.0
func WithServerResumeSessionDuration(duration time.Duration) OpServerResume
WithServerResumeSessionDuration sets resume session duration for RSocket server.
type OptAbstractSocket ¶
type OptAbstractSocket func(*socket.AbstractRSocket)
OptAbstractSocket is option for abstract socket.
func FireAndForget ¶
func FireAndForget(fn func(msg payload.Payload)) OptAbstractSocket
FireAndForget register request handler for FireAndForget.
func MetadataPush ¶
func MetadataPush(fn func(msg payload.Payload)) OptAbstractSocket
MetadataPush register request handler for MetadataPush.
func RequestChannel ¶
func RequestChannel(fn func(msgs rx.Publisher) flux.Flux) OptAbstractSocket
RequestChannel register request handler for RequestChannel.
func RequestResponse ¶
func RequestResponse(fn func(msg payload.Payload) mono.Mono) OptAbstractSocket
RequestResponse register request handler for RequestResponse.
func RequestStream ¶
func RequestStream(fn func(msg payload.Payload) flux.Flux) OptAbstractSocket
RequestStream register request handler for RequestStream.
type RSocket ¶
type RSocket interface {
// FireAndForget is a single one-way message.
FireAndForget(msg payload.Payload)
// MetadataPush sends asynchronous Metadata frame.
MetadataPush(msg payload.Payload)
// RequestResponse request single response.
RequestResponse(msg payload.Payload) mono.Mono
// RequestStream request a completable stream.
RequestStream(msg payload.Payload) flux.Flux
// RequestChannel request a completable stream in both directions.
RequestChannel(msgs rx.Publisher) flux.Flux
}
RSocket is a contract providing different interaction models for RSocket protocol.
func NewAbstractSocket ¶
func NewAbstractSocket(opts ...OptAbstractSocket) RSocket
NewAbstractSocket returns an abstract implementation of RSocket. You can specify the actual implementation of any request.
type ServerAcceptor ¶
type ServerAcceptor = func(setup payload.SetupPayload, sendingSocket CloseableRSocket) RSocket
ServerAcceptor is alias for server accepter.
type ServerBuilder ¶
type ServerBuilder interface {
// Fragment set fragmentation size which default is 16_777_215(16MB).
Fragment(mtu int) ServerBuilder
// Resume enable resume for current server.
Resume(opts ...OpServerResume) ServerBuilder
// Acceptor register server acceptor which is used to handle incoming RSockets.
Acceptor(acceptor ServerAcceptor) ServerTransportBuilder
// OnStart register a handler when serve success.
OnStart(onStart func()) ServerBuilder
}
ServerBuilder can be used to build a RSocket server.
func Receive ¶
func Receive() ServerBuilder
Receive receives server connections from client RSockets.
Example ¶
err := Receive().
Resume(WithServerResumeSessionDuration(30 * time.Second)).
Fragment(65535).
Acceptor(func(setup SetupPayload, sendingSocket CloseableRSocket) RSocket {
// Handle close.
sendingSocket.OnClose(func() {
log.Println("sending socket is closed")
})
// Request to client.
sendingSocket.RequestResponse(NewString("Ping", time.Now().String())).
DoOnSuccess(func(elem Payload) {
log.Println("response of Ping from client:", elem)
}).
SubscribeOn(scheduler.Elastic()).
Subscribe(context.Background())
// Return responser which just echo.
return NewAbstractSocket(
FireAndForget(func(msg Payload) {
log.Println("receive fnf:", msg)
}),
RequestResponse(func(msg Payload) mono.Mono {
return mono.Just(msg)
}),
RequestStream(func(msg Payload) flux.Flux {
return flux.Create(func(ctx context.Context, s flux.Sink) {
for i := 0; i < 3; i++ {
s.Next(NewString(msg.DataUTF8(), fmt.Sprintf("This is response #%04d", i)))
}
s.Complete()
})
}),
RequestChannel(func(msgs rx.Publisher) flux.Flux {
return msgs.(flux.Flux)
}),
)
}).
Transport("tcp://0.0.0.0:7878").
Serve(context.Background())
panic(err)
type ServerTransportBuilder ¶
type ServerTransportBuilder interface {
// Transport specify transport string.
Transport(transport string) Start
}
ServerTransportBuilder is used to build a RSocket server with custom Transport string.
type Start ¶
type Start interface {
// Serve serve RSocket server.
Serve(ctx context.Context) error
// Serve serve RSocket server with TLS.
//
// You can generate cert.pem and key.pem for local testing:
//
// go run $GOROOT/src/crypto/tls/generate_cert.go --host localhost
//
// Load X509
// cert, err := tls.LoadX509KeyPair("cert.pem", "key.pem")
// if err != nil {
// panic(err)
// }
// // Init TLS configuration.
// tc := &tls.Config{
// MinVersion: tls.VersionTLS12,
// CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
// PreferServerCipherSuites: true,
// CipherSuites: []uint16{
// tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
// tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
// tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
// tls.TLS_RSA_WITH_AES_256_CBC_SHA,
// },
// Certificates: []tls.Certificate{cert},
// }
ServeTLS(ctx context.Context, c *tls.Config) error
}
Start start a RSocket server.
