Documentation
¶
Index ¶
- Variables
- func Handle(h Handler) error
- func Init(opts ...Option)
- func Run() error
- func Start() error
- func Stop() error
- func String() string
- func Subscribe(s Subscriber) error
- type Handler
- type HandlerFunc
- type HandlerOption
- type HandlerOptions
- type HandlerWrapper
- type Message
- type Option
- func Address(a string) Option
- func Advertise(a string) Option
- func Broker(b broker.Broker) Option
- func Codec(contentType string, c codec.NewCodec) Option
- func Context(ctx context.Context) Option
- func Id(id string) Option
- func Metadata(md map[string]string) Option
- func Name(n string) Option
- func RegisterCheck(fn func(context.Context) error) Option
- func RegisterInterval(t time.Duration) Option
- func RegisterTTL(t time.Duration) Option
- func Registry(r registry.Registry) Option
- func TLSConfig(t *tls.Config) Option
- func Transport(t transport.Transport) Option
- func Version(v string) Option
- func Wait(wg *sync.WaitGroup) Option
- func WithRouter(r Router) Option
- func WrapHandler(w HandlerWrapper) Option
- func WrapSubscriber(w SubscriberWrapper) Option
- type Options
- type Request
- type Response
- type Router
- type Server
- type Stream
- type StreamWrapper
- type Subscriber
- type SubscriberFunc
- type SubscriberOption
- type SubscriberOptions
- type SubscriberWrapper
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func Handle ¶
Handle registers a handler interface with the default server to handle inbound requests.
func Run ¶
func Run() error
Run starts the default server and waits for a kill signal before existing. Also registers/deregisters the server
func Subscribe ¶
func Subscribe(s Subscriber) error
Subscribe registers a subscriber interface with the default server which subscribes to specified topic with the broker
Types ¶
type Handler ¶
type Handler interface {
Name() string
Handler() interface{}
Endpoints() []*regpb.Endpoint
Options() HandlerOptions
}
Handler interface represents a request handler. It's generated by passing any type of public concrete object with endpoints into server.NewHandler. Most will pass in a struct.
Example:
type Greeter struct{}
func (g *Greeter) Hello(context, request, response) error {
return nil
}
func NewHandler ¶
func NewHandler(h interface{}, opts ...HandlerOption) Handler
NewHandler creates a new handler interface using the default server Handlers are required to be a public object with public endpoint. Call to a service endpoint such as Foo.Bar expects the type:
type Foo struct{}
func (f *Foo) Bar(ctx, req, rsp) error {
return nil
}
type HandlerFunc ¶
HandlerFunc represents a single method of a handler. It's used primarily for the wrappers. What's handed to the actual method is the concrete request and response types.
type HandlerOption ¶
type HandlerOption func(*HandlerOptions)
func EndpointMetadata ¶
func EndpointMetadata(name string, md map[string]string) HandlerOption
EndpointMetadata is a Handler option that allows metadata to be added to individual endpoints.
func InternalHandler ¶
func InternalHandler(b bool) HandlerOption
InternalHandler options specifies that a handler is not advertised to the discovery system. In the future this may also limit request to the internal network or authorised user.
func OpenAPIHandler ¶
func OpenAPIHandler(openAPI *openapipb.OpenAPI) HandlerOption
OpenAPIHandler is a Handler option that allows swagger openapi to be added to individual endpoints.
type HandlerOptions ¶
type HandlerWrapper ¶
type HandlerWrapper func(HandlerFunc) HandlerFunc
HandlerWrapper wraps the HandlerFunc and returns the equivalent
type Message ¶
type Message interface {
// Topic of the message
Topic() string
// Payload the decoded payload value
Payload() interface{}
// ContentType the content type of the payload
ContentType() string
// Header the raw headers of the message
Header() map[string]string
// Body the raw body of the message
Body() []byte
// Codec used tp decode the message
Codec() codec.Reader
}
Message is an async message interface
type Option ¶
type Option func(*Options)
func Context ¶
Context specifies a context for the service. Can be used to signal shutdown of the service Can be used for extra option values.
func RegisterCheck ¶
RegisterCheck run func before registry service
func RegisterInterval ¶
RegisterInterval register the service with at interval
func RegisterTTL ¶
RegisterTTL register the service with a TTL
func Wait ¶
Wait tells the server to wait for requests to finish before exiting If `wg` is nil, server only wait for completion of rpc handler. For user need finer grained control, pass a concrete `wg` here, server will wait against it on stop.
func WrapHandler ¶
func WrapHandler(w HandlerWrapper) Option
WrapHandler adds a handler Wrapper to a list of options passed into the server
func WrapSubscriber ¶
func WrapSubscriber(w SubscriberWrapper) Option
WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server
type Options ¶
type Options struct {
Codecs map[string]codec.NewCodec
Broker broker.Broker
Registry registry.Registry
Transport transport.Transport
Metadata map[string]string
Name string
Address string
Advertise string
Id string
Version string
HdlrWrappers []HandlerWrapper
SubWrappers []SubscriberWrapper
// RegisterCheck runs a check function before registering the service
RegisterCheck func(context.Context) error
// The register expiry time
RegisterTTL time.Duration
// The interval on which to register
RegisterInterval time.Duration
// The router for requests
Router Router
// TLSConfig specifies tls.Config for secure serving
TLSConfig *tls.Config
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
func DefaultOptions ¶
func DefaultOptions() Options
DefaultOptions returns config options for the default service
func NewOptions ¶
type Request ¶
type Request interface {
// Service name requested
Service() string
// Method the action requested
Method() string
// Endpoint name requested
Endpoint() string
// ContentType Content Type provided
ContentType() string
// Header of the request
Header() map[string]string
// Body is the initial decoded value
Body() interface{}
// Read the undecoded request body
Read() ([]byte, error)
// Codec the encoded message body
Codec() codec.Reader
// Stream indicates whether its a stream
Stream() bool
}
Request is a synchronous request interface
type Response ¶
type Response interface {
// Codec encoded writer
Codec() codec.Writer
// WriteHeader write the header
WriteHeader(map[string]string)
// Write a response directly to the client
Write([]byte) error
}
Response is the response write for unencoded messages
type Router ¶
type Router interface {
// ProcessMessage processes a message
ProcessMessage(context.Context, Message) error
// ServeRequest processes a request to completion
ServeRequest(context.Context, Request, Response) error
}
Router handle serving messages
type Server ¶
type Server interface {
// Init initialise options
Init(...Option) error
// Options retrieve the options
Options() Options
// Handle register a handler
Handle(Handler) error
// NewHandler create a new handler
NewHandler(interface{}, ...HandlerOption) Handler
// NewSubscriber create a new subscriber
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
// Subscribe register a subscriber
Subscribe(Subscriber) error
// Start the server
Start() error
// Stop the server
Stop() error
// String implementation
String() string
}
Server is a simple vine server abstraction
type Stream ¶
type Stream interface {
Context() context.Context
Request() Request
Send(interface{}) error
Recv(interface{}) error
Error() error
Close() error
}
Stream represents a stream established with a client. A stream can be bidirectional which is indicated by the request. The last error will be left in Error(). EOF indicates end of the stream.
type StreamWrapper ¶
StreamWrapper wraps a Stream interface and returns the equivalent. Because streams exist for the lifetime of a method invocation this is a convenient way to wrap a Stream as its in use for trace, monitoring. metrics, etc.
type Subscriber ¶
type Subscriber interface {
Topic() string
Subscriber() interface{}
Endpoints() []*regpb.Endpoint
Options() SubscriberOptions
}
Subscriber interface represents a subscription to a given topic using a specific subscriber function or object with endpoints. It mirrors the handler in its behaviour.
func NewSubscriber ¶
func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber
NewSubscriber creates a new subscriber interface with the given topic and handler using the default server
type SubscriberFunc ¶
SubscriberFunc represents a single method of a subscriber. It's used primarily for the wrappers. What's handed to the actual method is the concrete publication message.
type SubscriberOption ¶
type SubscriberOption func(*SubscriberOptions)
func DisableAutoAck ¶
func DisableAutoAck() SubscriberOption
DisableAutoAck will disable auto acking of messages after they have been handled.
func InternalSubscriber ¶
func InternalSubscriber(b bool) SubscriberOption
InternalSubscriber options specifies that a subscriber is not advertised to the discovery system.
func SubscriberContext ¶
func SubscriberContext(ctx context.Context) SubscriberOption
SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberQueue ¶
func SubscriberQueue(n string) SubscriberOption
SubscriberQueue Shared queue name distributed messages across subscribers
type SubscriberOptions ¶
type SubscriberOptions struct {
// AutoAck defaults to true. When a handler returns
// with a nil error the message is acked.
AutoAck bool
Queue string
Internal bool
Context context.Context
}
func NewSubscriberOptions ¶
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions
type SubscriberWrapper ¶
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
SubscriberWrapper wraps the SubscriberFunc and returns the equivalent