Documentation
¶
Overview ¶
Package balancer defines APIs for load balancing in RSocket.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Balancer ¶
type Balancer interface {
io.Closer
// Put puts a new client.
Put(client rsocket.Client)
// PutLabel puts a new client with a label.
PutLabel(label string, client rsocket.Client)
// Next returns next balanced RSocket client.
Next() rsocket.Client
// OnLeave handle events when a client exit.
OnLeave(fn func(label string))
}
Balancer manage input RSocket clients.
func NewRoundRobinBalancer ¶
func NewRoundRobinBalancer() Balancer
NewRoundRobinBalancer returns a new Round-Robin Balancer.
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
Group manage a group of Balancer. Group can be used to create a simple RSocket Broker.
func NewGroup ¶
NewGroup returns a new Group.
Example ¶
group := NewGroup(func() Balancer {
return NewRoundRobinBalancer()
})
defer func() {
_ = group.Close()
}()
// Create a broker with resume.
err := Receive().
Resume(WithServerResumeSessionDuration(10 * time.Second)).
Acceptor(func(setup SetupPayload, sendingSocket CloseableRSocket) RSocket {
// Register service using Setup Metadata as service ID.
if serviceID, ok := setup.MetadataUTF8(); ok {
group.Get(serviceID).Put(sendingSocket)
}
// Proxy requests by group.
return NewAbstractSocket(RequestResponse(func(msg Payload) mono.Mono {
requestServiceID, ok := msg.MetadataUTF8()
if !ok {
panic(errors.New("missing service ID in metadata"))
}
log.Println("[broker] redirect request to service", requestServiceID)
return group.Get(requestServiceID).Next().RequestResponse(msg)
}))
}).
Transport(uri).
Serve(context.Background())
if err != nil {
panic(err)
}
Click to show internal directories.
Click to hide internal directories.