Documentation
¶
Overview ¶
Example ¶
// Serve a server
err := rsocket.Receive().
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
return rsocket.NewAbstractSocket(
rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
log.Println("incoming request:", msg)
return mono.Just(payload.NewString("Pong", time.Now().String()))
}),
), nil
}).
Transport(rsocket.TCPServer().SetAddr(":7878").Build()).
Serve(context.Background())
if err != nil {
panic(err)
}
// Connect to a server.
cli, err := rsocket.Connect().
SetupPayload(payload.NewString("Hello World", "From Golang")).
Transport(rsocket.TCPClient().SetHostAndPort("127.0.0.1", 7878).Build()).
Start(context.Background())
if err != nil {
panic(err)
}
defer cli.Close()
cli.RequestResponse(payload.NewString("Ping", time.Now().String())).
DoOnSuccess(func(elem payload.Payload) error {
log.Println("incoming response:", elem)
return nil
}).
Subscribe(context.Background())
Index ¶
- Constants
- type Client
- type ClientBuilder
- type ClientResumeOptions
- type ClientSocketAcceptor
- type ClientStarter
- type CloseableRSocket
- type Error
- type ErrorCode
- type OpServerResume
- type OptAbstractSocket
- func FireAndForget(fn func(request payload.Payload)) OptAbstractSocket
- func MetadataPush(fn func(request payload.Payload)) OptAbstractSocket
- func RequestChannel(fn func(requests flux.Flux) (responses flux.Flux)) OptAbstractSocket
- func RequestResponse(fn func(request payload.Payload) (response mono.Mono)) OptAbstractSocket
- func RequestStream(fn func(request payload.Payload) (responses flux.Flux)) OptAbstractSocket
- type RSocket
- type ServerAcceptor
- type ServerBuilder
- type Start
- type TCPClientBuilder
- type TCPServerBuilder
- type ToClientStarter
- type ToServerStarter
- type UnixClientBuilder
- type UnixServerBuilder
- type WebsocketClientBuilder
- func (wc *WebsocketClientBuilder) Build() transport.ClientTransporter
- func (wc *WebsocketClientBuilder) SetHeader(header http.Header) *WebsocketClientBuilder
- func (wc *WebsocketClientBuilder) SetTLSConfig(c *tls.Config) *WebsocketClientBuilder
- func (wc *WebsocketClientBuilder) SetURL(url string) *WebsocketClientBuilder
- type WebsocketServerBuilder
- func (ws *WebsocketServerBuilder) Build() transport.ServerTransporter
- func (ws *WebsocketServerBuilder) SetAddr(addr string) *WebsocketServerBuilder
- func (ws *WebsocketServerBuilder) SetPath(path string) *WebsocketServerBuilder
- func (ws *WebsocketServerBuilder) SetTLSConfig(c *tls.Config) *WebsocketServerBuilder
- func (ws *WebsocketServerBuilder) SetUpgrader(upgrader *websocket.Upgrader) *WebsocketServerBuilder
Examples ¶
Constants ¶
const ( // ErrorCodeInvalidSetup means the setup frame is invalid for the server. ErrorCodeInvalidSetup = core.ErrorCodeInvalidSetup // ErrorCodeUnsupportedSetup means some (or all) of the parameters specified by the client are unsupported by the server. ErrorCodeUnsupportedSetup = core.ErrorCodeUnsupportedSetup // ErrorCodeRejectedSetup means server rejected the setup, it can specify the reason in the payload. ErrorCodeRejectedSetup = core.ErrorCodeRejectedSetup // ErrorCodeRejectedResume means server rejected the resume, it can specify the reason in the payload. ErrorCodeRejectedResume = core.ErrorCodeRejectedResume // ErrorCodeConnectionError means the connection is being terminated. ErrorCodeConnectionError = core.ErrorCodeConnectionError // ErrorCodeConnectionClose means the connection is being terminated. ErrorCodeConnectionClose = core.ErrorCodeConnectionClose // ErrorCodeApplicationError means application layer logic generating a Reactive Streams onError event. ErrorCodeApplicationError = core.ErrorCodeApplicationError // ErrorCodeRejected means Responder reject it. ErrorCodeRejected = core.ErrorCodeRejected // ErrorCodeCanceled means the Responder canceled the request but may have started processing it (similar to REJECTED but doesn't guarantee lack of side-effects). ErrorCodeCanceled = core.ErrorCodeCanceled // ErrorCodeInvalid means the request is invalid. ErrorCodeInvalid = core.ErrorCodeInvalid )
const ( // DefaultUnixSockPath is the default UDS sock file path. DefaultUnixSockPath = "/var/run/rsocket.sock" // DefaultPort is the default port RSocket used. DefaultPort = 7878 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶ added in v0.2.0
type Client interface {
CloseableRSocket
}
Client is Client Side of a RSocket socket. Sends Frames to a RSocket Server.
type ClientBuilder ¶
type ClientBuilder interface {
ToClientStarter
// Fragment set fragmentation size which default is 16_777_215(16MB).
// Also zero mtu means using default fragmentation size.
Fragment(mtu int) ClientBuilder
// KeepAlive defines current client keepalive settings.
KeepAlive(tickPeriod, ackTimeout time.Duration, missedAcks int) ClientBuilder
// Resume enable the functionality of resume.
Resume(opts ...ClientResumeOptions) ClientBuilder
// Lease enable the functionality of lease.
Lease() ClientBuilder
// DataMimeType is used to set payload data MIME type.
// Default MIME type is `application/binary`.
DataMimeType(mime string) ClientBuilder
// MetadataMimeType is used to set payload metadata MIME type.
// Default MIME type is `application/binary`.
MetadataMimeType(mime string) ClientBuilder
// SetupPayload set the setup payload.
SetupPayload(setup payload.Payload) ClientBuilder
// ConnectTimeout set connect timeout.
ConnectTimeout(timeout time.Duration) ClientBuilder
// OnClose register handler when client socket closed.
OnClose(func(error)) ClientBuilder
// OnConnect register handler when client socket connected.
OnConnect(func(Client, error)) ClientBuilder
// Acceptor set acceptor for RSocket client.
Acceptor(acceptor ClientSocketAcceptor) ToClientStarter
}
ClientBuilder can be used to build a RSocket client.
func Connect ¶
func Connect() ClientBuilder
Connect create a new RSocket client builder with default settings.
Example ¶
cli, err := rsocket.Connect().
Resume(). // Enable RESUME.
Lease(). // Enable LEASE.
Fragment(4096).
SetupPayload(payload.NewString("Hello", "World")).
Acceptor(func(socket rsocket.RSocket) rsocket.RSocket {
return rsocket.NewAbstractSocket(
rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
return mono.Just(payload.NewString("Pong", time.Now().String()))
}),
)
}).
Transport(rsocket.TCPClient().SetAddr("127.0.0.1:7878").Build()).
Start(context.Background())
if err != nil {
panic(err)
}
defer func() {
_ = cli.Close()
}()
// Simple FireAndForget.
cli.FireAndForget(payload.NewString("This is a FNF message.", ""))
// Simple RequestResponse.
cli.RequestResponse(payload.NewString("This is a RequestResponse message.", "")).
DoOnSuccess(func(elem payload.Payload) error {
log.Println("response:", elem)
return nil
}).
Subscribe(context.Background())
var s rx.Subscription
// RequestStream with backpressure. (one by one)
cli.RequestStream(payload.NewString("This is a RequestStream message.", "")).
DoOnNext(func(elem payload.Payload) error {
log.Println("next element in stream:", elem)
s.Request(1)
return nil
}).
Subscribe(context.Background(), rx.OnSubscribe(func(ctx context.Context, s rx.Subscription) {
s.Request(1)
}))
// Simple RequestChannel.
sendFlux := flux.Create(func(ctx context.Context, s flux.Sink) {
for i := 0; i < 3; i++ {
s.Next(payload.NewString(fmt.Sprintf("This is a RequestChannel message #%d.", i), ""))
}
s.Complete()
})
cli.RequestChannel(sendFlux).
DoOnNext(func(elem payload.Payload) error {
log.Println("next element in channel:", elem)
return nil
}).
Subscribe(context.Background())
type ClientResumeOptions ¶ added in v0.2.0
type ClientResumeOptions func(opts *resumeOpts)
ClientResumeOptions represents resume options for client.
func WithClientResumeToken ¶ added in v0.2.0
func WithClientResumeToken(gen func() []byte) ClientResumeOptions
WithClientResumeToken creates a resume token generator.
type ClientSocketAcceptor ¶
ClientSocketAcceptor is alias for RSocket handler function.
type ClientStarter ¶
type ClientStarter interface {
// Start start a client socket.
Start(ctx context.Context) (Client, error)
}
ClientStarter can be used to start a client.
type CloseableRSocket ¶ added in v0.2.0
CloseableRSocket is RSocket which can be closed and handle close event.
type Error ¶ added in v0.5.10
type Error = core.CustomError
Error provides a method of accessing code and data.
type OpServerResume ¶ added in v0.2.0
type OpServerResume func(o *serverResumeOptions)
OpServerResume represents resume options for RSocket server.
func WithServerResumeSessionDuration ¶ added in v0.2.0
func WithServerResumeSessionDuration(duration time.Duration) OpServerResume
WithServerResumeSessionDuration sets resume session duration for RSocket server.
type OptAbstractSocket ¶
type OptAbstractSocket func(*socket.AbstractRSocket)
OptAbstractSocket is option for abstract socket.
func FireAndForget ¶
func FireAndForget(fn func(request payload.Payload)) OptAbstractSocket
FireAndForget register request handler for FireAndForget.
func MetadataPush ¶
func MetadataPush(fn func(request payload.Payload)) OptAbstractSocket
MetadataPush register request handler for MetadataPush.
func RequestChannel ¶
func RequestChannel(fn func(requests flux.Flux) (responses flux.Flux)) OptAbstractSocket
RequestChannel register request handler for RequestChannel.
func RequestResponse ¶
func RequestResponse(fn func(request payload.Payload) (response mono.Mono)) OptAbstractSocket
RequestResponse register request handler for RequestResponse.
func RequestStream ¶
func RequestStream(fn func(request payload.Payload) (responses flux.Flux)) OptAbstractSocket
RequestStream register request handler for RequestStream.
type RSocket ¶
type RSocket interface {
// FireAndForget is a single one-way message.
FireAndForget(message payload.Payload)
// MetadataPush sends asynchronous Metadata frame.
MetadataPush(message payload.Payload)
// RequestResponse request single response.
RequestResponse(message payload.Payload) mono.Mono
// RequestStream request a completable stream.
RequestStream(message payload.Payload) flux.Flux
// RequestChannel request a completable stream in both directions.
RequestChannel(messages flux.Flux) flux.Flux
}
RSocket is a contract providing different interaction models for RSocket protocol.
func NewAbstractSocket ¶
func NewAbstractSocket(opts ...OptAbstractSocket) RSocket
NewAbstractSocket returns an abstract implementation of RSocket. You can specify the actual implementation of any request.
type ServerAcceptor ¶
type ServerAcceptor = func(setup payload.SetupPayload, sendingSocket CloseableRSocket) (RSocket, error)
ServerAcceptor is alias for server acceptor.
type ServerBuilder ¶
type ServerBuilder interface {
// Fragment set fragmentation size which default is 16_777_215(16MB).
Fragment(mtu int) ServerBuilder
// Lease enable feature of Lease.
Lease(leases lease.Factory) ServerBuilder
// Resume enable resume for current server.
Resume(opts ...OpServerResume) ServerBuilder
// Acceptor register server acceptor which is used to handle incoming RSockets.
Acceptor(acceptor ServerAcceptor) ToServerStarter
// OnStart register a handler when serve success.
OnStart(onStart func()) ServerBuilder
}
ServerBuilder can be used to build a RSocket server.
func Receive ¶
func Receive() ServerBuilder
Receive receives server connections from client RSockets.
Example ¶
err := rsocket.Receive().
Resume(rsocket.WithServerResumeSessionDuration(30 * time.Second)).
Fragment(65535).
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
// Handle close.
sendingSocket.OnClose(func(err error) {
log.Println("sending socket is closed")
})
// You can reject connection. For example, do some authorization.
// return nil, errors.New("ACCESS_DENY")
// Request to client.
sendingSocket.RequestResponse(payload.NewString("Ping", time.Now().String())).
DoOnSuccess(func(elem payload.Payload) error {
log.Println("response of Ping from client:", elem)
return nil
}).
SubscribeOn(scheduler.Parallel()).
Subscribe(context.Background())
// Return responser which just echo.
return rsocket.NewAbstractSocket(
rsocket.FireAndForget(func(msg payload.Payload) {
log.Println("receive fnf:", msg)
}),
rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
return mono.Just(msg)
}),
rsocket.RequestStream(func(msg payload.Payload) flux.Flux {
return flux.Create(func(ctx context.Context, s flux.Sink) {
for i := 0; i < 3; i++ {
s.Next(payload.NewString(msg.DataUTF8(), fmt.Sprintf("This is response #%04d", i)))
}
s.Complete()
})
}),
rsocket.RequestChannel(func(requests flux.Flux) flux.Flux {
return requests
}),
), nil
}).
Transport(rsocket.TCPServer().SetHostAndPort("127.0.0.1", 7878).Build()).
Serve(context.Background())
panic(err)
type TCPClientBuilder ¶ added in v0.6.0
type TCPClientBuilder struct {
// contains filtered or unexported fields
}
TCPClientBuilder provides builder which can be used to create a client-side TCP transport easily.
func TCPClient ¶ added in v0.6.0
func TCPClient() *TCPClientBuilder
TCPClient creates a new TCPClientBuilder
func (*TCPClientBuilder) Build ¶ added in v0.6.0
func (tc *TCPClientBuilder) Build() transport.ClientTransporter
Build builds and returns a new TCP ClientTransporter.
func (*TCPClientBuilder) SetAddr ¶ added in v0.6.0
func (tc *TCPClientBuilder) SetAddr(addr string) *TCPClientBuilder
SetAddr sets the addr
func (*TCPClientBuilder) SetHostAndPort ¶ added in v0.6.0
func (tc *TCPClientBuilder) SetHostAndPort(host string, port int) *TCPClientBuilder
SetHostAndPort sets the host and port.
func (*TCPClientBuilder) SetTLSConfig ¶ added in v0.6.0
func (tc *TCPClientBuilder) SetTLSConfig(c *tls.Config) *TCPClientBuilder
SetTLSConfig sets the tls config.
Here's an example:
tc := &tls.Config{
InsecureSkipVerify: true,
}
type TCPServerBuilder ¶ added in v0.6.0
type TCPServerBuilder struct {
// contains filtered or unexported fields
}
TCPServerBuilder provides builder which can be used to create a server-side TCP transport easily.
func TCPServer ¶ added in v0.6.0
func TCPServer() *TCPServerBuilder
TCPServer creates a new TCPServerBuilder
func (*TCPServerBuilder) Build ¶ added in v0.6.0
func (ts *TCPServerBuilder) Build() transport.ServerTransporter
Build builds and returns a new TCP ServerTransporter.
func (*TCPServerBuilder) SetAddr ¶ added in v0.6.0
func (ts *TCPServerBuilder) SetAddr(addr string) *TCPServerBuilder
SetAddr sets the addr.
func (*TCPServerBuilder) SetHostAndPort ¶ added in v0.6.0
func (ts *TCPServerBuilder) SetHostAndPort(host string, port int) *TCPServerBuilder
SetHostAndPort sets the host and port.
func (*TCPServerBuilder) SetTLSConfig ¶ added in v0.6.0
func (ts *TCPServerBuilder) SetTLSConfig(c *tls.Config) *TCPServerBuilder
SetTLSConfig sets the tls config.
You can generate cert.pem and key.pem for local testing:
go run $GOROOT/src/crypto/tls/generate_cert.go --host localhost
Load X509
cert, err := tls.LoadX509KeyPair("cert.pem", "key.pem")
if err != nil {
panic(err)
}
// Init TLS configuration.
tc := &tls.Config{
MinVersion: tls.VersionTLS12,
CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
PreferServerCipherSuites: true,
CipherSuites: []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_RSA_WITH_AES_256_CBC_SHA,
},
Certificates: []tls.Certificate{cert},
}
type ToClientStarter ¶ added in v0.6.0
type ToClientStarter interface {
// Transport set generator func for current RSocket client.
//
// Examples:
//
// rsocket.TCPClient().SetHostAndPort("127.0.0.1", 7878).Build()
// rsocket.WebsocketClient().SetURL("ws://127.0.0.1:8080/hello").Build()
// rsocket.UnixClient().SetPath("/var/run/rsocket.sock").Build()
Transport(transport.ClientTransporter) ClientStarter
}
ToClientStarter is used to build a RSocket client with custom Transport.
type ToServerStarter ¶ added in v0.6.0
type ToServerStarter interface {
// Transport specify transport generator func.
// Example:
// rsocket.TCPServer().SetAddr(":8888").Build()
Transport(t transport.ServerTransporter) Start
}
ToServerStarter is used to build a RSocket server with custom Transport string.
type UnixClientBuilder ¶ added in v0.6.0
type UnixClientBuilder struct {
// contains filtered or unexported fields
}
UnixClientBuilder provides builder which can be used to create a client-side UDS transport easily.
func UnixClient ¶ added in v0.6.0
func UnixClient() *UnixClientBuilder
UnixClient creates a new UnixClientBuilder.
func (UnixClientBuilder) Build ¶ added in v0.6.0
func (uc UnixClientBuilder) Build() transport.ClientTransporter
Build builds and returns a new ClientTransporter.
func (*UnixClientBuilder) SetPath ¶ added in v0.6.0
func (uc *UnixClientBuilder) SetPath(path string) *UnixClientBuilder
SetPath sets UDS sock file path.
type UnixServerBuilder ¶ added in v0.6.0
type UnixServerBuilder struct {
// contains filtered or unexported fields
}
UnixServerBuilder provides builder which can be used to create a server-side UDS transport easily.
func UnixServer ¶ added in v0.6.0
func UnixServer() *UnixServerBuilder
UnixServer creates a new UnixServerBuilder.
func (*UnixServerBuilder) Build ¶ added in v0.6.0
func (us *UnixServerBuilder) Build() transport.ServerTransporter
Build builds and returns a new ServerTransporter.
func (*UnixServerBuilder) SetPath ¶ added in v0.6.0
func (us *UnixServerBuilder) SetPath(path string) *UnixServerBuilder
SetPath sets UDS sock file path.
type WebsocketClientBuilder ¶ added in v0.6.0
type WebsocketClientBuilder struct {
// contains filtered or unexported fields
}
WebsocketClientBuilder provides builder which can be used to create a client-side Websocket transport easily.
func WebsocketClient ¶ added in v0.6.0
func WebsocketClient() *WebsocketClientBuilder
WebsocketClient creates a new WebsocketClientBuilder.
func (*WebsocketClientBuilder) Build ¶ added in v0.6.0
func (wc *WebsocketClientBuilder) Build() transport.ClientTransporter
Build builds and returns a new websocket ClientTransporter
func (*WebsocketClientBuilder) SetHeader ¶ added in v0.6.0
func (wc *WebsocketClientBuilder) SetHeader(header http.Header) *WebsocketClientBuilder
SetHeader sets header.
func (*WebsocketClientBuilder) SetTLSConfig ¶ added in v0.6.0
func (wc *WebsocketClientBuilder) SetTLSConfig(c *tls.Config) *WebsocketClientBuilder
SetTLSConfig sets the tls config.
Here's an example:
tc := &tls.Config{
InsecureSkipVerify: true,
}
func (*WebsocketClientBuilder) SetURL ¶ added in v0.6.0
func (wc *WebsocketClientBuilder) SetURL(url string) *WebsocketClientBuilder
SetURL sets the target url. Example: ws://127.0.0.1:7878/hello/world
type WebsocketServerBuilder ¶ added in v0.6.0
type WebsocketServerBuilder struct {
// contains filtered or unexported fields
}
WebsocketServerBuilder provides builder which can be used to create a server-side Websocket transport easily.
func WebsocketServer ¶ added in v0.6.0
func WebsocketServer() *WebsocketServerBuilder
WebsocketServer creates a new WebsocketServerBuilder.
func (*WebsocketServerBuilder) Build ¶ added in v0.6.0
func (ws *WebsocketServerBuilder) Build() transport.ServerTransporter
Build builds and returns a new websocket ServerTransporter.
func (*WebsocketServerBuilder) SetAddr ¶ added in v0.6.0
func (ws *WebsocketServerBuilder) SetAddr(addr string) *WebsocketServerBuilder
SetAddr sets the websocket listen addr. Default addr is "127.0.0.1:7878".
func (*WebsocketServerBuilder) SetPath ¶ added in v0.6.0
func (ws *WebsocketServerBuilder) SetPath(path string) *WebsocketServerBuilder
SetPath sets the path of websocket.
func (*WebsocketServerBuilder) SetTLSConfig ¶ added in v0.6.0
func (ws *WebsocketServerBuilder) SetTLSConfig(c *tls.Config) *WebsocketServerBuilder
SetTLSConfig sets the tls config.
You can generate cert.pem and key.pem for local testing:
go run $GOROOT/src/crypto/tls/generate_cert.go --host localhost
Load X509
cert, err := tls.LoadX509KeyPair("cert.pem", "key.pem")
if err != nil {
panic(err)
}
// Init TLS configuration.
tc := &tls.Config{
MinVersion: tls.VersionTLS12,
CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
PreferServerCipherSuites: true,
CipherSuites: []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_RSA_WITH_AES_256_CBC_SHA,
},
Certificates: []tls.Certificate{cert},
}
func (*WebsocketServerBuilder) SetUpgrader ¶ added in v0.6.0
func (ws *WebsocketServerBuilder) SetUpgrader(upgrader *websocket.Upgrader) *WebsocketServerBuilder
SetUpgrader sets websocket upgrader. You can customize your own websocket upgrader instead of the default upgrader.
Example(also the default value):
upgrader := &websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
Directories
¶
| Path | Synopsis |
|---|---|
|
Package balancer defines APIs for load balancing in RSocket.
|
Package balancer defines APIs for load balancing in RSocket. |
|
cmd
|
|
|
rsocket-cli
command
|
|
|
examples
|
|
|
echo
command
|
|
|
echo_bench
command
|
|
|
fibonacci
command
|
|
|
word_counter
command
|
|
|
internal
|
|
