replicaclient

package module
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2023 License: MIT Imports: 10 Imported by: 2

README

go-replicaclient

API Reference

Netrix client to be run on each replica. Uses the Netrix API to send/receive messages. Refer here for Netrix documentation.

Installation

Fetch and install library using go get,

go get github.com/netrixframework/go-replicaclient

Usage

Define a directive handler to handle Directive messages from Netrix. For example,

type Replica struct {
    ...
}

func (n *Replica) Start() error {}

func (n *Replica) Stop() error {}

func (n *Replica) Restart() error {}

To initialize the client, pass the ID, directive handler and optionally a logger that the client can use to log debug messages.

import (
    "github.com/netrixframework/go-replicaclient"
    "github.com/netrixframework/netrix/types"
)

func main() {

    replica := NewReplica()
    ...
    err := replicaclient.Init(&replicaclient.Config{
        ReplicaID: types.ReplicaID(Replica.ID),
        NetrixAddr: "<netrix_addr>",
        ClientServerAddr: "localhost:7074",
    }, replica, logger)
    if err != nil {
        ...
    }
}

The library maintains a single instance of ReplicaClient which can be used to send/receive messages or publish events

client, _ := replicaclient.GetClient()
client.SendMessage(type, to, data, true)
...
message, ok := client.ReceiveMessage()
...

client.PublishEvent("Commit", map[string]string{
    "param1": "val1",
})

Configuration

Init accepts a Config instance as the first argument which is defined as,

type Config struct {
    ReplicaID        types.ReplicaID
    NetrixAddr       string
    ClientServerAddr string
    ClientAdvAddr    string
    Info             map[string]interface{}
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoReplicaID        = errors.New("replica ID config should not be empty")
	ErrNoClientServerAddr = errors.New("client server address should not be empty")
	ErrNoNetrixAddr       = errors.New("netrix addr should not be empty")
)
View Source
var (
	MessageSendEventType    = "MessageSend"
	MessageReceiveEventType = "MessageReceive"
	TimeoutStartEventType   = "TimeoutStart"
	TimeoutEndEventType     = "TimeoutEnd"
)

Functions

func Init

func Init(
	config *Config,
	d DirectiveHandler,
	logger Logger,
) error

Init initializes the global controller

Types

type Config

type Config struct {
	ReplicaID        types.ReplicaID
	NetrixAddr       string
	ClientServerAddr string
	ClientAdvAddr    string
	Info             map[string]interface{}
}

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

func NewCounter

func NewCounter() *Counter

func (*Counter) Next

func (id *Counter) Next() int

func (*Counter) NextID

func (id *Counter) NextID(from, to types.ReplicaID) string

func (*Counter) Reset

func (id *Counter) Reset()

type DirectiveHandler

type DirectiveHandler interface {
	Stop() error
	Start() error
	Restart() error
}

DirectiveHandler is used to perform action on the current replica such as start, stop or restart the replica. This is implementation specific and hence the interface to encapsulate the actions

type Logger

type Logger interface {
	Debug(msg string, keyvals ...interface{})
	Info(msg string, keyvals ...interface{})
	Error(msg string, keyvals ...interface{})
}

Logger interface for logging information

type MessageQueue

type MessageQueue struct {
	// contains filtered or unexported fields
}

MessageQueue datastructure to store the messages in a FIFO queue

func NewMessageQueue

func NewMessageQueue() *MessageQueue

NewMessageQueue returns an empty MessageQueue

func (*MessageQueue) Add

func (q *MessageQueue) Add(m *types.Message)

Add adds a message to the queue

func (*MessageQueue) Block added in v0.1.6

func (q *MessageQueue) Block()

Block ignores additions to the message queue

func (*MessageQueue) Flush

func (q *MessageQueue) Flush()

Flush clears the queue of all messages

func (*MessageQueue) Pop

func (q *MessageQueue) Pop() (*types.Message, bool)

func (*MessageQueue) UnBlock added in v0.1.6

func (q *MessageQueue) UnBlock()

UnBlock stops blocking

type ReplicaClient

type ReplicaClient struct {
	// contains filtered or unexported fields
}

ReplicaClient should be used as a transport to send messages between replicas. It encapsulates the logic of sending the message to the `masternode` for further processing The ReplicaClient also listens for incoming messages from the master and directives to start, restart and stop the current replica.

Additionally, the ReplicaClient also exposes functions to manage timers. This is key to our testing method. Timers are implemented as message sends and receives and again this is encapsulated from the library user

func GetClient

func GetClient() (*ReplicaClient, error)

GetController returns the global controller if initialized

func NewReplicaClient

func NewReplicaClient(
	config *Config,
	directiveHandler DirectiveHandler,
	logger Logger,
) (*ReplicaClient, error)

NewReplicaClient creates a ClientController It requires a DirectiveHandler which is used to perform directive actions such as start, stop and restart

func (*ReplicaClient) IsReady

func (c *ReplicaClient) IsReady() bool

IsReady returns true if the state is set to ready

func (*ReplicaClient) IsRunning

func (c *ReplicaClient) IsRunning() bool

Running returns true if the clientcontroller is running

func (*ReplicaClient) Log

func (c *ReplicaClient) Log(params map[string]interface{}, message string)

func (*ReplicaClient) LogAsync

func (c *ReplicaClient) LogAsync(params map[string]interface{}, message string)

func (*ReplicaClient) NotReady

func (c *ReplicaClient) NotReady()

UnsetReady sets the state of the replica to not ready for testing

func (*ReplicaClient) PublishEvent

func (c *ReplicaClient) PublishEvent(t string, params map[string]string)

func (*ReplicaClient) PublishEventAsync

func (c *ReplicaClient) PublishEventAsync(t string, params map[string]string)

func (*ReplicaClient) Ready

func (c *ReplicaClient) Ready()

SetReady sets the state of the replica to ready for testing

func (*ReplicaClient) ReceiveMessage

func (c *ReplicaClient) ReceiveMessage() (*types.Message, bool)

func (*ReplicaClient) SendMessage

func (c *ReplicaClient) SendMessage(t string, to types.ReplicaID, msg []byte, intercept bool) error

SendMessage is to be used to send a message to another replica and can be marked as to be intercepted or not by the testing framework

func (*ReplicaClient) SendMessageWithID added in v0.1.7

func (c *ReplicaClient) SendMessageWithID(
	id string, t string, to types.ReplicaID,
	msg []byte,
) error

func (*ReplicaClient) Start

func (c *ReplicaClient) Start() error

Start will start the ClientController by spawning the polling goroutines and the server Start should be called before SetReady/UnsetReady

func (*ReplicaClient) StartTimer

func (c *ReplicaClient) StartTimer(i TimeoutInfo)

StartTimer schedules a timer for the given TimerInfo Note: The timers are implemented as message sends and receives that are to be scheduled by the testing strategy. If you do not want to instrument timers as message send/receives then do not use this function.

func (*ReplicaClient) Stop

func (c *ReplicaClient) Stop() error

Stop will halt the clientcontroller and gracefully exit

func (*ReplicaClient) TimeoutChan

func (c *ReplicaClient) TimeoutChan() chan TimeoutInfo

TimeoutChan returns the channel on which timeouts are delivered.

type TimeoutInfo

type TimeoutInfo interface {
	// Key returns a unique key for the given timeout. Only one timeout for a specific key can be running at any given time.
	Key() string
	// Duration of the timeout
	Duration() time.Duration
}

TimeoutInfo encapsulates the timeout information that needs to be scheduled

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL