members

package module
v0.0.0-...-2cfe9bb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2025 License: MIT Imports: 13 Imported by: 4

README

members

Language Go Report Card codecov Build Status Release GoDoc

this project is base on hashicorp memberlist library.

Usage

go get github.com/tsingsun/members
package main

import (
	"context"
	"flag"
	"github.com/tsingsun/members"
	"github.com/vmihailenco/msgpack/v5"
	"strings"
)

var (
	peers   = flag.String("peers", "", "comma seperated list of peers")
	address = flag.String("address", ":4001", "http host:port")
)

func init() {
	flag.Parse()
}

func main() {
	var ms []string
	if len(*peers) > 0 {
		ms = strings.Split(*peers, ",")
	}
	group, err := members.NewPeer()
	if err != nil {
		panic(err)
	}
	group.Options.ExistsPeers = ms
	if err = group.Join(context.Background()); err != nil {
		panic(err)
	}
	// OrderHandler implement members.Shard interface
	orderhdl := &OrderHandler{
		ShardId: "order",
	}
	sd, err := group.AddShard(orderhdl)
	if err != nil {
		panic(err)
	}
	orderhdl.Spreader = sd
}

type OrderHandler struct {
	ShardId  string
	Spreader members.Spreader
	Orders   []string
}

// Name returns the name of the shard, which is used to identify the shard.
func (OrderHandler) Name() string {
	return "order"
}

// MarshalBinary marshals the shard data into a binary to sync other nodes.
func (o *OrderHandler) MarshalBinary() ([]byte, error) {
	return msgpack.Marshal(o.Orders)
}

// Merge data from remote node MarshalBinary result. The Shard should be able to dedupe the data.
func (o *OrderHandler) Merge(b []byte) error {
	var ors []string
	if err := msgpack.Unmarshal(b, &ors); err != nil {
		return err
	}
	for _, ord := range ors {
		if !strings.Contains(ord, "order") {
			continue
        }
		o.Orders = append(o.Orders, ord)
	}
	return nil
}


func (o *OrderHandler) Receive(ord string) error {
	o.Orders = append(o.Orders, ord)
	bs, err := msgpack.Marshal([]string{ord})
	if err != nil {
		return err
	}
	return o.Spreader.Broadcast(bs)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OversizedMessage

func OversizedMessage(b []byte, size int) bool

OversizedMessage indicates whether the byte payload should be sent via TCP.

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel is a channel for communication between shard holding nodes.

func NewChannel

func NewChannel(shardName string, peer *Peer) (*Channel, error)

NewChannel creates a new channel for the given shard.

func (*Channel) Broadcast

func (c *Channel) Broadcast(msg []byte) error

Broadcast sends a message to all nodes in the channel.

func (*Channel) Stop

func (c *Channel) Stop(_ context.Context) error

Stop stops the channel.

type NoopSpreader

type NoopSpreader struct{}

func (*NoopSpreader) Broadcast

func (n *NoopSpreader) Broadcast([]byte) error

type Option

type Option func(*Options)

func WithConfiguration

func WithConfiguration(cnf *conf.Configuration) Option

type Options

type Options struct {
	// ExistsPeers is the list of known peers to join
	ExistsPeers []string      `yaml:"existsPeers" json:"existsPeers"`
	JoinTTL     time.Duration `yaml:"joinTTL" json:"joinTTL"`
	JoinRetry   int           `yaml:"joinRetry" json:"joinRetry"`
	// MembersConfig is the memberlist.Config for the peer.
	// note that: bind address should be ip address, not hostname.
	MembersConfig *memberlist.Config `yaml:"membersConfig" json:"membersConfig"`

	Event    memberlist.EventDelegate `yaml:"-" json:"-"`
	Delegate memberlist.Delegate      `yaml:"-" json:"-"`
	// contains filtered or unexported fields
}

type Payload

type Payload struct {
	Key  string
	Data []byte
}

type Peer

type Peer struct {
	Options
	// contains filtered or unexported fields
}

Peer is a memberlist Node wrapper

func NewPeer

func NewPeer(opts ...Option) (*Peer, error)

func (*Peer) AddShard

func (p *Peer) AddShard(sd Shard) (Spreader, error)

func (*Peer) Address

func (p *Peer) Address() string

func (*Peer) Join

func (p *Peer) Join(ctx context.Context) error

Join other nodes, whatever exists peers not set.

func (*Peer) MemberCount

func (p *Peer) MemberCount() int

func (*Peer) OthersNodes

func (p *Peer) OthersNodes() []*memberlist.Node

func (*Peer) ReliableMsgHandle

func (p *Peer) ReliableMsgHandle(ctx context.Context) error

func (*Peer) SendReliable

func (p *Peer) SendReliable(b []byte)

SendReliable uses sending large message (see OversizedMessage) to other nodes. It calls memberlist.SendReliable.

func (*Peer) Start

func (p *Peer) Start(ctx context.Context) error

func (*Peer) Stop

func (p *Peer) Stop(ctx context.Context) error

type Shard

type Shard interface {
	// Name returns the name of the shard, which is used to identify the shard.
	Name() string
	// MarshalBinary marshals the shard data into a binary to sync other nodes.
	MarshalBinary() ([]byte, error)
	// Merge data from remote node MarshalBinary result. The Shard should be able to dedupe the data.
	Merge(b []byte) error
}

Shard is some specified distributed data handler.

type Spreader

type Spreader interface {
	// Broadcast sends a message to all nodes in the cluster.
	Broadcast([]byte) error
}

Spreader is an interface for transporting messages to other nodes in the cluster.

Directories

Path Synopsis
example module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL