unitdb

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2021 License: MIT Imports: 23 Imported by: 2

README

Unitdb go client GoDoc

The Unitdb messaging system is an open source messaging system for microservice, and real-time internet connected devices. The Unitdb messaging API is built for speed and security.

The Unitdb is a real-time messaging system for microservices, and real-tme internet connected devices, it is based on GRPC communication. The Unitdb messaging system satisfy the requirements for low latency and binary messaging, it is perfect messaging system for internet connected devices.

Quick Start

To build unitdb from source code use go get command and copy unitdb.conf to the path unitdb binary is placed.

go get -u github.com/unit-io/unitdb/server

Usage

Detailed API documentation is available using the godoc.org service.

Make use of the client by importing it in your Go client source code. For example,

import "github.com/unit-io/unitdb-go"

Samples are available in the examples directory for reference. To build unitdb server from latest source code use "replace" in go.mod to point to your local module.

go mod edit -replace github.com/unit-io/unitdb=$GOPATH/src/github.com/unit-io/unitdb

Contributing

If you'd like to contribute, please fork the repository and use a feature branch. Pull requests are welcome.

Licensing

This project is licensed under MIT License.

Documentation

Index

Constants

View Source
const (
	TopicInvalid = uint8(iota)
	TopicStatic
	TopicWildcard
	TopicWildcardSymbol      = "*"
	TopicMultiWildcardSymbol = "..."
	TopicKeySeparator        = '/'
	TopicSeparator           = '.' // The separator character.
	TopicMaxLength           = 65535
	TopicMaxDepth            = 100 // Maximum depth for topic using a separator
)

Various constant on Topic.

Variables

This section is empty.

Functions

func Connect

func Connect(conn net.Conn, cm *utp.Connect) (rc uint8, epoch int32, cid int32, err error)

Connect takes a connected net.Conn and performs the initial handshake. Paramaters are: conn - Connected net.Conn cm - Connect Message

func StreamConn

func StreamConn(
	stream grpc.Stream,
) *common.Conn

func TimeNow

func TimeNow() time.Time

TimeNow returns current wall time in UTC rounded to milliseconds.

Types

type Client

type Client interface {
	// Connect will create a connection to the server.
	Connect() error
	// ConnectContext will create a connection to the server.
	// The context will be used in the grpc stream connection.
	ConnectContext(ctx context.Context) error
	// Disconnect will end the connection with the server, but not before waiting
	// the client wait group is done.
	Disconnect() error
	// DisconnectContext will end the connection with the server, but not before waiting
	// the client wait group is done.
	// The context used grpc stream to signal context done.
	DisconnectContext(ctx context.Context) error
	// TopicFilter is used to receive filtered messages on specififc topic.
	TopicFilter(subTopic string) (*TopicFilter, error)
	// Publish will publish a message with the specified DeliveryMode and content
	// to the specified topic.
	Publish(topic string, payload []byte, pubOpts ...PubOptions) Result
	// Relay sends a request to relay messages for one or more topics those are persisted on the server.
	// Provide a MessageHandler to be executed when a message is published on the topic provided,
	// or nil for the default handler.
	Relay(topics []string, relOpts ...RelOptions) Result
	// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
	// a message is published on the topic provided, or nil for the default handler.
	Subscribe(topic string, subOpts ...SubOptions) Result
	// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to be executed when
	// a message is published on the topic provided, or nil for the default handler.
	SubscribeMultiple(subs []string, subOpts ...SubOptions) Result
	// Unsubscribe will end the subscription from each of the topics provided.
	// Messages published to those topics from other clients will no longer be
	// received.
	Unsubscribe(topics ...string) Result
}

func NewClient

func NewClient(target, clientID string, opts ...Options) (Client, error)

type ConnectResult

type ConnectResult struct {
	// contains filtered or unexported fields
}

ConnectResult is an extension of result containing extra fields it provides information about calls to Connect()

func (*ConnectResult) Get

func (r *ConnectResult) Get(ctx context.Context, d time.Duration) (bool, error)

Get returns if server call is complete with error result of call Get blocks until server call is complete or context is done or till duration specified

func (*ConnectResult) ReturnCode

func (r *ConnectResult) ReturnCode() int32

ReturnCode returns the acknowledgement code in the connack sent in response to a Connect()

func (*ConnectResult) SessionPresent

func (r *ConnectResult) SessionPresent() bool

SessionPresent returns a bool representing the value of the session present field in the connack sent in response to a Connect()

type ConnectionHandler

type ConnectionHandler func(Client)

ConnectionHandler is a callback that is called when connection to the server is established.

type ConnectionLostHandler

type ConnectionLostHandler func(Client, error)

ConnectionLostHandler is a callback that is set to be executed upon an uninteded disconnection from server.

type DisconnectResult

type DisconnectResult struct {
	// contains filtered or unexported fields
}

DisconnectResult is an extension of result containing the extra fields required to provide information about calls to Disconnect()

func (*DisconnectResult) Get

func (r *DisconnectResult) Get(ctx context.Context, d time.Duration) (bool, error)

Get returns if server call is complete with error result of call Get blocks until server call is complete or context is done or till duration specified

type MID

type MID int32

MID is 32-bit local message identifier

type Message

type Message interface {
	DeliveryMode() uint8
	MessageID() uint16
	Messages() []*PubMessage
	Ack()
}

Message defines the externals that a message implementation must support these are received messages that are passed, not internal messages

type MessageAndResult

type MessageAndResult struct {
	// contains filtered or unexported fields
}

MessageAndResult is a type that contains both a Message and a Result. This type is passed via channels between client connection interface and goroutines responsible for sending and receiving messages from server

type Notice

type Notice struct {
	// contains filtered or unexported fields
}

type Options

type Options interface {
	// contains filtered or unexported methods
}

Options it contains configurable options for client

func AddServer

func AddServer(target string) Options

AddServer returns an Option which makes client connection and set server url

func WithBatchByteThreshold

func WithBatchByteThreshold(size int) Options

WithBatchByteThreshold sets byte threshold for publish batch.

func WithBatchCountThreshold

func WithBatchCountThreshold(count int) Options

WithBatchCountThreshold sets message count threshold for publish batch.

func WithBatchDuration

func WithBatchDuration(dur time.Duration) Options

WithBatchDuration sets batch duration to group publish requestes into single group.

func WithCleanSession

func WithCleanSession() Options

WithCleanSession returns an Option which makes client connection and set CleanSession

func WithClientID

func WithClientID(clientID string) Options

WithClientID returns an Option which makes client connection and set ClientID

func WithConnectTimeout

func WithConnectTimeout(t time.Duration) Options

WithConnectTimeout limits how long the client will wait when trying to open a connection to server before timing out and erroring the attempt. A duration of 0 never times out. Default 30 seconds.

func WithConnectionHandler

func WithConnectionHandler(handler ConnectionHandler) Options

WithConnectionHandler sets handler function to be called when client is connected.

func WithConnectionLostHandler

func WithConnectionLostHandler(handler ConnectionLostHandler) Options

WithConnectionLostHandler sets handler function to be called when connection to the client is lost.

func WithDefaultOptions

func WithDefaultOptions() Options

WithDefaultOptions will create client connection with some default values.

CleanSession: True
KeepAlive: 30 (seconds)
ConnectTimeout: 30 (seconds)

func WithInsecure

func WithInsecure() Options

WithInsecure returns an Option which makes client connection with insecure flag so that client can provide topic with key prefix. Use insecure flag only for test and debug connection and not for live client.

func WithKeepAlive

func WithKeepAlive(k time.Duration) Options

WithKeepAlive will set the amount of time (in seconds) that the client should wait before sending a PING request to the server. This will allow the client to know that a connection has not been lost with the server.

func WithPingTimeout

func WithPingTimeout(k time.Duration) Options

WithPingTimeout will set the amount of time (in seconds) that the client will wait after sending a PING request to the server, before deciding that the connection has been lost. Default is 10 seconds.

func WithResumeSubs

func WithResumeSubs() Options

WithResumeSubs will enable resuming stored subscribe/unsubscribe messages when connecting but not reconnecting if CleanSession is false.

func WithSessionKey

func WithSessionKey(sessKey uint32) Options

WithSessionKey returns an Option which makes client connection with an existing SessionKey

func WithStoreLogReleaseDuration

func WithStoreLogReleaseDuration(dur time.Duration) Options

WithStoreLogReleaseDuration sets log release duration, it must be greater than WriteTimeout.

func WithStorePath

func WithStorePath(path string) Options

WithStoreDir sets database directory.

func WithStoreSize

func WithStoreSize(size int) Options

WithStoreSize sets buffer size store will use to write messages into log.

func WithTLSConfig

func WithTLSConfig(t *tls.Config) Options

WithTLSConfig will set an SSL/TLS configuration to be used when connecting to server.

func WithUserNamePassword

func WithUserNamePassword(userName string, password []byte) Options

WithUserName returns an Option which makes client connection and pass UserName

func WithWriteTimeout

func WithWriteTimeout(t time.Duration) Options

WithWriteTimeout puts a limit on how long a publish should block until it unblocks with a timeout error. A duration of 0 never times out. Default never times out

type PubMessage

type PubMessage struct {
	Topic   string
	Payload []byte
}

type PubOptions

type PubOptions interface {
	// contains filtered or unexported methods
}

PubOptions it contains configurable options for Publish

func WithPubDelay

func WithPubDelay(delay time.Duration) PubOptions

WithPubDelay allows to specify delay in milliseconds for delivery of messages if DeliveryMode is set to RELIABLE or BATCH.

func WithPubDeliveryMode

func WithPubDeliveryMode(deliveryMode uint8) PubOptions

WithPubDeliveryMode sets DeliveryMode of publish packet. 0 EXPRESS 1 RELIEABLE 2 BATCH

func WithTTL

func WithTTL(ttl string) PubOptions

WithTTL allows to specify time to live for a publish packet.

type PublishResult

type PublishResult struct {
	// contains filtered or unexported fields
}

PublishResult is an extension of result containing the extra fields required to provide information about calls to Publish()

func (*PublishResult) Get

func (r *PublishResult) Get(ctx context.Context, d time.Duration) (bool, error)

Get returns if server call is complete with error result of call Get blocks until server call is complete or context is done or till duration specified

func (*PublishResult) MessageID

func (r *PublishResult) MessageID() uint16

MessageID returns the message ID that was assigned to the Publish Message when it was sent to the server

type PutResult

type PutResult struct {
	// contains filtered or unexported fields
}

PutResult is an extension of result containing the extra fields required to provide information about calls to Put()

func (*PutResult) Get

func (r *PutResult) Get(ctx context.Context, d time.Duration) (bool, error)

Get returns if server call is complete with error result of call Get blocks until server call is complete or context is done or till duration specified

func (*PutResult) MessageID

func (r *PutResult) MessageID() uint16

MessageID returns the message ID that was assigned to the Publish Message when it was sent to the server

type RelOptions

type RelOptions interface {
	// contains filtered or unexported methods
}

Re;Options it contains configurable options for Subscribe

func WithLast

func WithLast(last string) RelOptions

WithLast allows to specify duration to retrive stored messages on a new relay request.

type RelayResult

type RelayResult struct {
	// contains filtered or unexported fields
}

RelayResult is an extension of result containing the extra fields required to provide information about calls to Relay()

func (*RelayResult) Get

func (r *RelayResult) Get(ctx context.Context, d time.Duration) (bool, error)

Get returns if server call is complete with error result of call Get blocks until server call is complete or context is done or till duration specified

func (*RelayResult) Result

func (r *RelayResult) Result() map[string]byte

Result returns a map of topics that were requested to along with the matching return code from the server.

type Result

type Result interface {
	Get(ctx context.Context, d time.Duration) (bool, error)
	// contains filtered or unexported methods
}

type SubOptions

type SubOptions interface {
	// contains filtered or unexported methods
}

SubOptions it contains configurable options for Subscribe

func WithSubDelay

func WithSubDelay(delay time.Duration) SubOptions

WithSubDelay allows to specify delay in milliseconds for delivery of messages if DeliveryMode is set to RELIABLE or BATCH.

func WithSubDeliveryMode

func WithSubDeliveryMode(deliveryMode uint8) SubOptions

WithSubDeliveryMode sets DeliveryMode of a subscription. 0 EXPRESS 1 RELIEABLE 2 BATCH

type SubscribeResult

type SubscribeResult struct {
	// contains filtered or unexported fields
}

SubscribeResult is an extension of result containing the extra fields required to provide information about calls to Subscribe()

func (*SubscribeResult) Get

func (r *SubscribeResult) Get(ctx context.Context, d time.Duration) (bool, error)

Get returns if server call is complete with error result of call Get blocks until server call is complete or context is done or till duration specified

func (*SubscribeResult) Result

func (r *SubscribeResult) Result() map[string]byte

Result returns a map of topics that were subscribed to along with the matching return code from the server. This is either the DeliveryMode value of the subscription or an error code.

type TopicFilter

type TopicFilter struct {
	// contains filtered or unexported fields
}

topic represents a parsed topic.

func (*TopicFilter) Updates

func (t *TopicFilter) Updates() <-chan []*PubMessage

type UnsubscribeResult

type UnsubscribeResult struct {
	// contains filtered or unexported fields
}

UnsubscribeResult is an extension of result containing the extra fields required to provide information about calls to Unsubscribe()

func (*UnsubscribeResult) Get

func (r *UnsubscribeResult) Get(ctx context.Context, d time.Duration) (bool, error)

Get returns if server call is complete with error result of call Get blocks until server call is complete or context is done or till duration specified

Directories

Path Synopsis
examples
sample command
simple command
internal
db
net

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL