zbus

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2020 License: Apache-2.0 Imports: 12 Imported by: 59

README

travis codecov GoDoc

Motivation

A light weight bus replacement for local inter-process communication. The main goal is to decouple separate components from each other, by using a light-weight message bus (current implemented redis), to queue and send message to the separate component that can serve it.

Goal

  • Each module has a name, a single module can host one or more objects
  • While it's not required an object can implement one or more interfaces
  • Each object must have a name and a version
  • Interfaces are mainly used to generate client stubs, but it's totally fine to not have one. In that case the client must know precisely the method signature (name and arguments number and types). Same for the return value.
  • A consumer who has connection to the message broker can call methods on the remote objects, knowing only the module name, object name, method name, and argument list. The current implementation of the client supports only synchronous calls. In that matter it's similar to RPC.
  • A consumer of the component can use a stub to abstract the calls to the remote module
  • Support for events where clients can listen to announcements from different components

Installation

Installing the zbus compiler zbusc

go install github.com/threefoldtech/zbus/zbusc

Specs

Please check specs here

Usage

It's very simple, check the examples

The api.go have some go generate lines that runs the zbusc tool

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseServer

type BaseServer struct {
	// contains filtered or unexported fields
}

BaseServer implements the basic server functionality In case you are building your own zbus server

func (*BaseServer) Register

func (s *BaseServer) Register(id ObjectID, object interface{}) error

Register registers an object on server

func (*BaseServer) Start

func (s *BaseServer) Start(ctx context.Context, wg *sync.WaitGroup, workers uint, cb Callback) chan<- *Request

Start starts the workers. Workers will call cb with results of requests. the call will feed requests to workers by feeding requests to channel. panics if workers number is zero.

func (*BaseServer) StartStreams

func (s *BaseServer) StartStreams(ctx context.Context, cb EventCallback)

StartStreams start the stream (events) workers in the background use the ctx to cancel the streams workers

func (*BaseServer) Status

func (s *BaseServer) Status() Status

Status returns a copy of the internal worker status

type Callback

type Callback func(request *Request, response *Response)

Callback defines a callback method signature for responses

type Client

type Client interface {
	// Request [DEPRECATED] makes a request and return the response data
	Request(module string, object ObjectID, method string, args ...interface{}) (*Response, error)

	RequestContext(ctx context.Context, module string, object ObjectID, method string, args ...interface{}) (*Response, error)

	// Stream listens to a stream of events from the server
	Stream(ctx context.Context, module string, object ObjectID, event string) (<-chan Event, error)

	Status(ctx context.Context, module string) (Status, error)
}

Client defines client interface

func NewRedisClient

func NewRedisClient(address string) (Client, error)

NewRedisClient creates a new redis client

type Event

type Event []byte

func (Event) Unmarshal

func (e Event) Unmarshal(o interface{}) error

type EventCallback

type EventCallback func(key string, event interface{})

EventCallback is calld by the base server once an event is available

type Message

type Message struct {
	ID        string
	Arguments [][]byte
}

Message is base message object

func NewMessage

func NewMessage(id string, args ...interface{}) (msg Message, err error)

NewMessage creates a new message

func (*Message) Argument

func (m *Message) Argument(i int, t reflect.Type) (value reflect.Value, err error)

Argument loads an argument into a reflect.Value of type t

func (*Message) NumArguments

func (m *Message) NumArguments() int

NumArguments returns the length of the argument list

func (*Message) Unmarshal

func (m *Message) Unmarshal(i int, v interface{}) error

Unmarshal argument at position i into value

func (*Message) Value

func (m *Message) Value(i int, t reflect.Type) (interface{}, error)

Value gets the concrete value stored at argument index i

type ObjectID

type ObjectID struct {
	Name    string
	Version Version
}

ObjectID defines an object id

func ObjectIDFromString

func ObjectIDFromString(id string) ObjectID

ObjectIDFromString parses an object id from string

func (ObjectID) String

func (o ObjectID) String() string

type RedisClient

type RedisClient struct {
	// contains filtered or unexported fields
}

RedisClient is client implementation for redis broker

func (*RedisClient) Request

func (c *RedisClient) Request(module string, object ObjectID, method string, args ...interface{}) (*Response, error)

Request makes a request to object.Method hosted by module. A module name is the queue name used in the server part.

func (*RedisClient) RequestContext

func (c *RedisClient) RequestContext(ctx context.Context, module string, object ObjectID, method string, args ...interface{}) (*Response, error)

RequestContext makes a request to object.Method hosted by module. A module name is the queue name used in the server part.

func (*RedisClient) Status

func (c *RedisClient) Status(ctx context.Context, module string) (Status, error)

Status return module status

func (*RedisClient) Stream

func (c *RedisClient) Stream(ctx context.Context, module string, object ObjectID, event string) (<-chan Event, error)

Stream listens to a stream of events from the server

type RedisServer

type RedisServer struct {
	BaseServer
	// contains filtered or unexported fields
}

RedisServer implementation for Redis

func (*RedisServer) Run

func (s *RedisServer) Run(ctx context.Context) error

Run starts the ZBus server

type RemoteError

type RemoteError struct {
	Message string
}

RemoteError is a concrete type used to wrap all errors returned by services for example, if a method `f` returns `error` the return.Error() is stored in a RemoteError struct

func (*RemoteError) Error

func (r *RemoteError) Error() string

type Request

type Request struct {
	Message
	Object  ObjectID
	ReplyTo string
	Method  string
}

Request is carrier of byte data. It does not assume any encoding types used for individual objects

var (
	// NoOP request will cause the worker to try polling again from the queue
	// without doing anything. The idea is that we can use this to check
	// if there are any free workers, by pusing this to the channel in a select
	// and see if any of the workers receives it.
	NoOP Request
)

func LoadRequest

func LoadRequest(data []byte) (*Request, error)

LoadRequest from bytes

func NewRequest

func NewRequest(id, replyTo string, object ObjectID, method string, args ...interface{}) (*Request, error)

NewRequest creates a message that carries the given values

func (*Request) Encode

func (m *Request) Encode() ([]byte, error)

Encode converts a message into byte data suitable to send over the wire Encode will always use msgpack.

type Response

type Response struct {
	Message
	// Error hear will carry any protocol error
	Error string
}

Response object

func LoadResponse

func LoadResponse(data []byte) (*Response, error)

LoadResponse loads response from data

func NewResponse

func NewResponse(id, errMsg string, values ...interface{}) (*Response, error)

NewResponse creates a response with id, and errMsg and return values note that errMsg is the protocol level errors (no such method, unknown object, etc...) errors returned by the service method itself should be encapsulated in the values

func (*Response) Encode

func (m *Response) Encode() ([]byte, error)

Encode converts a response into byte data suitable to send over the wire Encode will always use msgpack.

type Return

type Return []interface{}

Return results from a call

type Server

type Server interface {
	Register(id ObjectID, object interface{}) error
	Run(ctx context.Context) error
}

Server is server interface

func NewRedisServer

func NewRedisServer(module, address string, workers uint) (Server, error)

NewRedisServer builds a new ZBus server that uses disque as message broker

type Status

type Status struct {
	Objects []ObjectID     `json:"objects" yaml:"objects"`
	Workers []WorkerStatus `json:"workers" yaml:"workers"`
}

Status is returned by the server Status method

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents a channel of events

func (*Stream) Name

func (s *Stream) Name() string

func (*Stream) Run

func (s *Stream) Run(ctx context.Context) <-chan interface{}

Run stream to completion

type Surrogate

type Surrogate struct {
	// contains filtered or unexported fields
}

Surrogate a wrapper around an object to support dynamic method calls

func NewSurrogate

func NewSurrogate(object interface{}) *Surrogate

NewSurrogate crates a new surrogate object

func (*Surrogate) Call

func (s *Surrogate) Call(name string, args ...interface{}) (Return, error)

Call dynamically call a method

func (*Surrogate) CallRequest

func (s *Surrogate) CallRequest(request *Request) (Return, error)

CallRequest calls a method defined by request

func (*Surrogate) Streams

func (s *Surrogate) Streams() []Stream

Streams return all stream objects associated with this object stream methods only take one method (context) and must return a single value a chan of a static type (struct, or primitive)

type Version

type Version string

Version defines the object version

type WorkerState

type WorkerState string

WorkerState represents curret worker state (free, or busy)

const (
	// WorkerFree free state
	WorkerFree WorkerState = "free"
	// WorkerBusy busy state
	WorkerBusy WorkerState = "busy"
)

type WorkerStatus

type WorkerStatus struct {
	State     WorkerState `json:"state" yaml:"state"`
	StartTime time.Time   `json:"time,omitempty" yaml:"time,omitempty"`
	Action    string      `json:"action,omitempty" yaml:"action,omitempty"`
}

WorkerStatus represents the full worker status including request time and method that it is working on.

Directories

Path Synopsis
examples
client command
server command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL