Documentation
¶
Overview ¶
Example ¶
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// endpoint handler - in this case, HandlerFunc is used,
// which is a built-in implementation of Handler interface
echoHandler := func(req micro.Request) {
req.Respond(req.Data())
}
// second endpoint
incrementHandler := func(req micro.Request) {
val, err := strconv.Atoi(string(req.Data()))
if err != nil {
req.Error("400", "request data should be a number", nil)
return
}
responseData := val + 1
req.Respond([]byte(strconv.Itoa(responseData)))
}
// third endpoint
multiplyHandler := func(req micro.Request) {
val, err := strconv.Atoi(string(req.Data()))
if err != nil {
req.Error("400", "request data should be a number", nil)
return
}
responseData := val * 2
req.Respond([]byte(strconv.Itoa(responseData)))
}
config := micro.Config{
Name: "IncrementService",
Version: "0.1.0",
Description: "Increment numbers",
// base handler - for simple services with single endpoints this is sufficient
Endpoint: µ.EndpointConfig{
Subject: "echo",
Handler: micro.HandlerFunc(echoHandler),
},
}
svc, err := micro.AddService(nc, config)
if err != nil {
log.Fatal(err)
}
defer svc.Stop()
// add a group to aggregate endpoints under common prefix
numbers := svc.AddGroup("numbers")
// register endpoints in a group
err = numbers.AddEndpoint("Increment", micro.HandlerFunc(incrementHandler))
if err != nil {
log.Fatal(err)
}
err = numbers.AddEndpoint("Multiply", micro.HandlerFunc(multiplyHandler))
if err != nil {
log.Fatal(err)
}
// send a request to a service
resp, err := nc.Request("numbers.Increment", []byte("3"), 1*time.Second)
if err != nil {
log.Fatal(err)
}
responseVal, err := strconv.Atoi(string(resp.Data))
if err != nil {
log.Fatal(err)
}
fmt.Println(responseVal)
Index ¶
- Constants
- Variables
- func ControlSubject(verb Verb, name, id string) (string, error)
- type Config
- type DoneHandler
- type Endpoint
- type EndpointConfig
- type EndpointInfo
- type EndpointOpt
- type EndpointStats
- type ErrHandler
- type Group
- type GroupOpt
- type Handler
- type HandlerFunc
- type Headers
- type Info
- type NATSError
- type Ping
- type Request
- type RespondOpt
- type Service
- type ServiceIdentity
- type Stats
- type StatsHandler
- type Verb
Examples ¶
Constants ¶
const ( // Queue Group name used across all services DefaultQueueGroup = "q" // APIPrefix is the root of all control subjects APIPrefix = "$SRV" )
const ( ErrorHeader = "Nats-Service-Error" ErrorCodeHeader = "Nats-Service-Error-Code" )
Service Error headers
const ( InfoResponseType = "io.nats.micro.v1.info_response" PingResponseType = "io.nats.micro.v1.ping_response" StatsResponseType = "io.nats.micro.v1.stats_response" )
Variables ¶
var ( ErrRespond = errors.New("NATS error when sending response") ErrMarshalResponse = errors.New("marshaling response") ErrArgRequired = errors.New("argument required") )
var ( // ErrConfigValidation is returned when service configuration is invalid ErrConfigValidation = errors.New("validation") // ErrVerbNotSupported is returned when invalid [Verb] is used (PING, INFO, STATS) ErrVerbNotSupported = errors.New("unsupported verb") // ErrServiceNameRequired is returned when attempting to generate control subject with ID but empty name ErrServiceNameRequired = errors.New("service name is required to generate ID control subject") )
Common errors returned by the Service framework.
Functions ¶
func ControlSubject ¶
ControlSubject returns monitoring subjects used by the Service. Providing a verb is mandatory (it should be one of Ping, Info or Stats). Depending on whether kind and id are provided, ControlSubject will return one of the following:
- verb only: subject used to monitor all available services
- verb and kind: subject used to monitor services with the provided name
- verb, name and id: subject used to monitor an instance of a service with the provided ID
Example ¶
package main
import (
"fmt"
"github.com/hanzoai/pubsub-go/micro"
)
func main() {
// subject used to get PING from all services
subjectPINGAll, _ := micro.ControlSubject(micro.PingVerb, "", "")
fmt.Println(subjectPINGAll)
// subject used to get PING from services with provided name
subjectPINGName, _ := micro.ControlSubject(micro.PingVerb, "CoolService", "")
fmt.Println(subjectPINGName)
// subject used to get PING from a service with provided name and ID
subjectPINGInstance, _ := micro.ControlSubject(micro.PingVerb, "CoolService", "123")
fmt.Println(subjectPINGInstance)
}
Output: $SRV.PING $SRV.PING.CoolService $SRV.PING.CoolService.123
Types ¶
type Config ¶
type Config struct {
// Name represents the name of the service.
Name string `json:"name"`
// Endpoint is an optional endpoint configuration.
// More complex, multi-endpoint services can be configured using
// Service.AddGroup and Service.AddEndpoint methods.
Endpoint *EndpointConfig `json:"endpoint"`
// Version is a SemVer compatible version string.
Version string `json:"version"`
// Description of the service.
Description string `json:"description"`
// Metadata annotates the service
Metadata map[string]string `json:"metadata,omitempty"`
// QueueGroup can be used to override the default queue group name.
QueueGroup string `json:"queue_group"`
// QueueGroupDisabled disables the queue group for the service.
QueueGroupDisabled bool `json:"queue_group_disabled"`
// StatsHandler is a user-defined custom function.
// used to calculate additional service stats.
StatsHandler StatsHandler
// DoneHandler is invoked when all service subscription are stopped.
DoneHandler DoneHandler
// ErrorHandler is invoked on any nats-related service error.
ErrorHandler ErrHandler
}
Config is a configuration of a service.
type DoneHandler ¶
type DoneHandler func(Service)
DoneHandler is a function used to configure a custom done handler for a service.
type Endpoint ¶
type Endpoint struct {
EndpointConfig
Name string
// contains filtered or unexported fields
}
Endpoint manages a service endpoint.
type EndpointConfig ¶
type EndpointConfig struct {
// Subject on which the endpoint is registered.
Subject string
// Handler used by the endpoint.
Handler Handler
// Metadata annotates the service
Metadata map[string]string `json:"metadata,omitempty"`
// QueueGroup can be used to override the default queue group name.
QueueGroup string `json:"queue_group"`
// QueueGroupDisabled disables the queue group for the endpoint.
QueueGroupDisabled bool `json:"queue_group_disabled"`
}
type EndpointInfo ¶
type EndpointOpt ¶
type EndpointOpt func(*endpointOpts) error
func WithEndpointMetadata ¶
func WithEndpointMetadata(metadata map[string]string) EndpointOpt
func WithEndpointPendingLimits ¶
func WithEndpointPendingLimits(msgLimit, bytesLimit int) EndpointOpt
WithEndpointPendingLimits sets the pending limits for the endpoint's subscription. These limits how many messages and/or bytes can be buffered in memory before the subscription is terminated with nats.ErrSlowConsumer. Either limit can be set to -1 to indicate no limit.
func WithEndpointQueueGroup ¶
func WithEndpointQueueGroup(queueGroup string) EndpointOpt
func WithEndpointQueueGroupDisabled ¶
func WithEndpointQueueGroupDisabled() EndpointOpt
func WithEndpointSubject ¶
func WithEndpointSubject(subject string) EndpointOpt
Example ¶
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
echoHandler := func(req micro.Request) {
req.Respond(req.Data())
}
config := micro.Config{
Name: "EchoService",
Version: "1.0.0",
}
srv, err := micro.AddService(nc, config)
if err != nil {
log.Fatal(err)
}
// endpoint will be registered under "service.echo" subject
err = srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler), micro.WithEndpointSubject("service.echo"))
if err != nil {
log.Fatal(err)
}
type EndpointStats ¶
type EndpointStats struct {
Name string `json:"name"`
Subject string `json:"subject"`
QueueGroup string `json:"queue_group"`
NumRequests int `json:"num_requests"`
NumErrors int `json:"num_errors"`
LastError string `json:"last_error"`
ProcessingTime time.Duration `json:"processing_time"`
AverageProcessingTime time.Duration `json:"average_processing_time"`
Data json.RawMessage `json:"data,omitempty"`
}
EndpointStats contains stats for a specific endpoint.
type ErrHandler ¶
ErrHandler is a function used to configure a custom error handler for a service,
type Group ¶
type Group interface {
// AddGroup creates a new group, prefixed by this group's prefix.
AddGroup(string, ...GroupOpt) Group
// AddEndpoint registers new endpoints on a service.
// The endpoint's subject will be prefixed with the group prefix.
AddEndpoint(string, Handler, ...EndpointOpt) error
}
Group allows for grouping endpoints on a service.
Endpoints created using AddEndpoint will be grouped under common prefix (group name) New groups can also be derived from a group using AddGroup.
type GroupOpt ¶
type GroupOpt func(*groupOpts)
func WithGroupQueueGroup ¶
func WithGroupQueueGroupDisabled ¶
func WithGroupQueueGroupDisabled() GroupOpt
type Handler ¶
type Handler interface {
Handle(Request)
}
Handler is used to respond to service requests.
Example ¶
package main
import (
"log"
"strconv"
"github.com/hanzoai/pubsub-go"
"github.com/hanzoai/pubsub-go/micro"
)
type rectangle struct {
height int
width int
}
// Handle is an implementation of micro.Handler used to
// calculate the area of a rectangle
func (r rectangle) Handle(req micro.Request) {
area := r.height * r.width
req.Respond([]byte(strconv.Itoa(area)))
}
func main() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
rec := rectangle{10, 5}
config := micro.Config{
Name: "RectangleAreaService",
Version: "0.1.0",
Endpoint: µ.EndpointConfig{
Subject: "area.rectangle",
Handler: rec,
},
}
svc, err := micro.AddService(nc, config)
if err != nil {
log.Fatal(err)
}
defer svc.Stop()
}
Output:
func ContextHandler ¶
ContextHandler is a helper function used to utilize context.Context in request handlers.
Example ¶
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
handler := func(ctx context.Context, req micro.Request) {
select {
case <-ctx.Done():
req.Error("400", "context canceled", nil)
default:
req.Respond([]byte("ok"))
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
config := micro.Config{
Name: "EchoService",
Version: "0.1.0",
Endpoint: µ.EndpointConfig{
Subject: "echo",
Handler: micro.ContextHandler(ctx, handler),
},
}
srv, _ := micro.AddService(nc, config)
defer srv.Stop()
type HandlerFunc ¶
type HandlerFunc func(Request)
HandlerFunc is a function implementing Handler. It allows using a function as a request handler, without having to implement Handle on a separate type.
func (HandlerFunc) Handle ¶
func (fn HandlerFunc) Handle(req Request)
type Headers ¶
type Headers nats.Header
Headers is a wrapper around [*nats.Header]
type Info ¶
type Info struct {
ServiceIdentity
Type string `json:"type"`
Description string `json:"description"`
Endpoints []EndpointInfo `json:"endpoints"`
}
Info is the basic information about a service type.
type NATSError ¶
type NATSError struct {
Subject string
Description string
// contains filtered or unexported fields
}
NATSError represents an error returned by a NATS Subscription. It contains a subject on which the subscription failed, so that it can be linked with a specific service endpoint.
type Ping ¶
type Ping struct {
ServiceIdentity
Type string `json:"type"`
}
Ping is the response type for PING monitoring endpoint.
type Request ¶
type Request interface {
// Respond sends the response for the request.
// Additional headers can be passed using [WithHeaders] option.
Respond([]byte, ...RespondOpt) error
// RespondJSON marshals the given response value and responds to the request.
// Additional headers can be passed using [WithHeaders] option.
RespondJSON(any, ...RespondOpt) error
// Error prepares and publishes error response from a handler.
// A response error should be set containing an error code and description.
// Optionally, data can be set as response payload.
Error(code, description string, data []byte, opts ...RespondOpt) error
// Data returns request data.
Data() []byte
// Headers returns request headers.
Headers() Headers
// Subject returns underlying NATS message subject.
Subject() string
// Reply returns underlying NATS message reply subject.
Reply() string
}
Request represents service request available in the service handler. It exposes methods to respond to the request, as well as getting the request data and headers.
type RespondOpt ¶
type RespondOpt func(*nats.Msg)
RespondOpt is a function used to configure [Request.Respond] and [Request.RespondJSON] methods.
func WithHeaders ¶
func WithHeaders(headers Headers) RespondOpt
WithHeaders can be used to configure response with custom headers.
type Service ¶
type Service interface {
// AddEndpoint registers endpoint with given name on a specific subject.
AddEndpoint(string, Handler, ...EndpointOpt) error
// AddGroup returns a Group interface, allowing for more complex endpoint topologies.
// A group can be used to register endpoints with given prefix.
AddGroup(string, ...GroupOpt) Group
// Info returns the service info.
Info() Info
// Stats returns statistics for the service endpoint and all monitoring endpoints.
Stats() Stats
// Reset resets all statistics (for all endpoints) on a service instance.
Reset()
// Stop drains the endpoint subscriptions and marks the service as stopped.
Stop() error
// Stopped informs whether [Stop] was executed on the service.
Stopped() bool
}
Service exposes methods to operate on a service instance.
func AddService ¶
AddService adds a microservice. It will enable internal common services (PING, STATS and INFO). Request handlers have to be registered separately using Service.AddEndpoint. A service name, version and Endpoint configuration are required to add a service. AddService returns a Service interface, allowing service management. Each service is assigned a unique ID.
Example ¶
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
echoHandler := func(req micro.Request) {
req.Respond(req.Data())
}
config := micro.Config{
Name: "EchoService",
Version: "1.0.0",
Description: "Send back what you receive",
// DoneHandler can be set to customize behavior on stopping a service.
DoneHandler: func(srv micro.Service) {
info := srv.Info()
fmt.Printf("stopped service %q with ID %q\n", info.Name, info.ID)
},
// ErrorHandler can be used to customize behavior on service execution error.
ErrorHandler: func(srv micro.Service, err *micro.NATSError) {
info := srv.Info()
fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description)
},
// optional base handler
Endpoint: µ.EndpointConfig{
Subject: "echo",
Handler: micro.HandlerFunc(echoHandler),
},
}
srv, err := micro.AddService(nc, config)
if err != nil {
log.Fatal(err)
}
defer srv.Stop()
type ServiceIdentity ¶
type ServiceIdentity struct {
Name string `json:"name"`
ID string `json:"id"`
Version string `json:"version"`
Metadata map[string]string `json:"metadata"`
}
ServiceIdentity contains fields helping to identity a service instance.
type Stats ¶
type Stats struct {
ServiceIdentity
Type string `json:"type"`
Started time.Time `json:"started"`
Endpoints []*EndpointStats `json:"endpoints"`
}
Stats is the type returned by STATS monitoring endpoint. It contains stats of all registered endpoints.
type StatsHandler ¶
StatsHandler is a function used to configure a custom STATS endpoint. It should return a value which can be serialized to JSON.