hedge

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2025 License: Apache-2.0 Imports: 30 Imported by: 0

README

main Go Reference

hedge-cb

An AWS-native, cluster membership Go library. It is built on spindle-cb, a distributed locking library built on aws/clock-bound and PostgreSQL. It is a port (subset only) of hedge. Ported features include:

  • Tracking of member nodes - good for clusters with sizes that can change dynamically overtime, such as AWS Autoscaling Groups, and Kubernetes Deployments;
  • Leader election - the cluster elects and maintains a single leader node at all times;
  • List of members - get a list of all member nodes at any time;
  • [Streaming] Send - any member node can send messages to the leader at any time;
  • [Streaming] Broadcast - any member node can broadcast messages to all nodes at any time.

Requirements

  • A PostgreSQL database - a requirement of spindle-cb.
  • The ClockBound daemon - a requirement of spindle-cb.
  • All nodes within a cluster should be able to contact each other via TCP (host:port).
  • Each hedge-cb's instance id should be set using the node's host:port. The host part can be inferred internally as well.

Running the sample

A sample cloud-init startup script is provided for spinning up an Auto Scaling Group with the ClockBound daemon already setup and running. You need to update the ExecStart section first with a working PostgreSQL connection value. Note that this is NOT recommended though. You should use something like IAM Role + Secrets Manager, for instance.

# Create a launch template. ImageId here is Amazon Linux, default VPC.
# You can remove the "KeyName" line if SSH access is not needed.
# (Added newlines for readability. Might not run when copied as is.)
$ aws ec2 create-launch-template \
  --launch-template-name hedge-lt \
  --version-description version1 \
  --launch-template-data '
  {
    "UserData":"'"$(cat startup-aws-asg.sh | base64 -w 0)"'",
    "ImageId":"ami-0fe289b44779ce58a",
    "InstanceType":"t3.medium",
    "KeyName":"keyName"
  }'

# Create the single-zone ASG; update {target-zone} with actual value:
$ aws autoscaling create-auto-scaling-group \
  --auto-scaling-group-name hedge-asg \
  --launch-template LaunchTemplateName=spindle-lt,Version='1' \
  --min-size 3 \
  --max-size 3 \
  --tags Key=Name,Value=hedge-asg \
  --availability-zones {target-zone}

# or a multi-zone ASG; update {subnet(?)} with actual value(s):
$ aws autoscaling create-auto-scaling-group \
  --auto-scaling-group-name hedge-asg \
  --launch-template LaunchTemplateName=spindle-lt,Version='1' \
  --min-size 3 \
  --max-size 3 \
  --tags Key=Name,Value=hedge-asg \
  --vpc-zone-identifier "{subnet1,subnet2,subnet3}"

# You can now SSH to the instance(s). Note that it might take some time
# before ClockBound is running due to the need to build it in Rust. You
# can wait for the `clockbound` process, or tail the startup script output,
# like so:
$ tail -f /var/log/cloud-init-output.log

# Tail the service logs:
$ journalctl -f -u hedge

# Test the Send API (send message to current leader):
$ curl -v localhost:9090/send -d "hello-leader"

# Test the Broadcast API (send message to all nodes):
$ curl -v localhost:9090/broadcast -d "hello-all"

License

This library is licensed under the Apache 2.0 License.

Documentation

Index

Constants

View Source
const (
	CmdLeader     = "LDR" // for leader confirmation, reply="ACK"
	CmdWrite      = "PUT" // write key/value, fmt="PUT <base64(payload)> [noappend]"
	CmdSend       = "SND" // member to leader, fmt="SND <base64(payload)>"
	CmdPing       = "HEY" // heartbeat to indicate availability, fmt="HEY [id]"
	CmdMembers    = "MEM" // members info from leader to all, fmt="MEM base64(JSON(members))"
	CmdBroadcast  = "ALL" // broadcast to all, fmt="ALL base64(payload)"
	CmdAck        = "ACK" // generic reply, fmt="ACK"|"ACK base64(err)"|"ACK base64(JSON(members))"
	CmdSemaphore  = "SEM" // create semaphore, fmt="SEM {name} {limit} {caller}, reply="ACK"
	CmdSemAcquire = "SEA" // acquire semaphore, fmt="SEA {name} {caller}", reply="ACK[ base64([0:|1:]err)]" (0=final,1=retry)
	CmdSemRelease = "SER" // release semaphore, fmt="SER {name} {caller}"

	FlagNoAppend = "noappend"
)

Variables

View Source
var (
	ErrNotRunning   = fmt.Errorf("hedge: not running")
	ErrNoLeader     = fmt.Errorf("hedge: no leader available")
	ErrNoHandler    = fmt.Errorf("hedge: no message handler")
	ErrNotSupported = fmt.Errorf("hedge: not supported")
	ErrInvalidConn  = fmt.Errorf("hedge: invalid connection")
)

Functions

func SendToLeader

func SendToLeader(ctx context.Context, op *Op, m []byte, args ...*SendToLeaderArgs) ([]byte, error)

SendToLeader is a wrapper to hedge.Send() with builtin retry mechanisms.

Types

type BroadcastArgs

type BroadcastArgs struct {
	SkipSelf bool // if true, skip broadcasting to self
	Out      chan BroadcastOutput
}

type BroadcastOutput

type BroadcastOutput struct {
	Id    string `json:"id,omitempty"`
	Reply []byte `json:"reply,omitempty"`
	Error error  `json:"error,omitempty"`
}

type FnMsgHandler

type FnMsgHandler func(data any, msg []byte) ([]byte, error)

type KeyValue

type KeyValue struct {
	Key       string    `json:"key"`
	Value     string    `json:"value"`
	Timestamp time.Time `json:"timestamp"` // read-only, populated when Get()
}

KeyValue is for Put()/Get() callers.

type LogItem

type LogItem struct {
	Id        string
	Key       string
	Value     string
	Leader    string
	Timestamp time.Time
}

LogItem represents an item in our log.

type Op

type Op struct {
	*spindle.Lock // handles our distributed lock
	// contains filtered or unexported fields
}

Op is our main instance for hedge operations.

func New

func New(db *sql.DB, hostPort, lockTable, lockName string, opts ...Option) *Op

New creates an instance of Op. hostPort can be in "ip:port" format, or ":port" format, in which case the IP part will be resolved internally, or empty, in which case port 8080 will be used. The internal spindle object's lock table name will be lockTable, and lockName is the lock name. logTable will serve as our append-only, distributed key/value storage table. If logTable is empty, Put, Get, and Semaphore features will be disabled.

func (*Op) Broadcast

func (op *Op) Broadcast(ctx context.Context, msg []byte, args ...BroadcastArgs) []BroadcastOutput

Broadcast sends msg to all nodes (send to all). Any node can broadcast messages, including the leader itself. Note that this is best-effort basis only; by the time you call this API, the handler might not have all the active members in record yet, as is the usual situation with k8s deployments, where pods come and go, and our internal heartbeat protocol hasn't been completed yet. This call will also block until it receives all the reply from all nodes' broadcast handlers.

If args[].Out is set, the output will be streamed to that channel instead. Useful if you prefer a streamed output (as reply comes) instead of waiting for all replies before returning. If set, the return value (output slice) will be set to empty []. Also, close() will be called on the Out channel to indicate streaming end.

func (*Op) HostPort

func (op *Op) HostPort() string

HostPort returns the host:port (or name) of this instance.

func (*Op) IsRunning

func (op *Op) IsRunning() bool

IsRunning returns true if Op is already running.

func (*Op) Members

func (op *Op) Members() []string

Members returns a list of members in the cluster/group.

func (*Op) Name

func (op *Op) Name() string

Name is the same as HostPort.

func (*Op) NewSoS

func (op *Op) NewSoS(name string, opts ...*SoSOptions) *SoS

NewSoS returns an object for writing data to spill-over storage across the cluster. The order of writing is local memory, local disk, other pod's memory, other pod's disk, and so on.

func (*Op) Run

func (op *Op) Run(ctx context.Context, done ...chan error) error

Run starts the main handler. It blocks until ctx is cancelled, optionally sending an error message to done when finished.

func (*Op) Send

func (op *Op) Send(ctx context.Context, msg []byte) ([]byte, error)

Send sends msg to the current leader. Any node can send messages, including the leader itself (send to self). It also blocks until it receives the reply from the leader's message handler.

func (*Op) StreamBroadcast

func (op *Op) StreamBroadcast(ctx context.Context, args ...StreamBroadcastArgs) (*StreamBroadcastOutput, error)

StreamBroadcast returns input and output channels for doing streaming broadcasts. Any node can broadcast messages, including the leader itself. Note that this is best-effort basis only; by the time you call this API, the handler might not have all the active members in record yet, as is the usual situation with k8s deployments, where pods come and go, and our internal heartbeat protocol hasn't been completed yet. This call will also block until it receives all the reply from all nodes' broadcast handlers.

To use the channels, send your request message(s) to the input channel, close it (i.e. close(input)), then read the replies from the output channels. This function will close all output channels when done.

StreamBroadcast is sequential in the sense that you need to send all your input messages first before getting any response from all the nodes.

func (*Op) StreamToLeader

func (op *Op) StreamToLeader(ctx context.Context) (*StreamToLeaderOutput, error)

StreamToLeader returns an input and output channels for streaming to leader. To use the channels, send your request message(s) to the input channel, close it (i.e. close(input)), then read the replies from the output channel. This function will close the output channel when done.

StreamToLeader is sequential in the sense that you need to send all your input messages first before getting any response from the leader.

func (*Op) String

func (op *Op) String() string

String implements the Stringer interface.

type Option

type Option interface {
	Apply(*Op)
}

func WithBroadcastHandler

func WithBroadcastHandler(d any, h FnMsgHandler) Option

WithBroadcastHandler sets the node's callback function for broadcast messages from anyone in the group using the Broadcast(...) API. Any arbitrary data represented by d will be passed to the callback h every time it is called. If d is nil, the default callback data will be the *Op object itself. The handler's returning []byte will serve as reply.

A nil broadcast handler disables the internal heartbeat function.

func WithBroadcastStreamChannels

func WithBroadcastStreamChannels(in chan *StreamMessage, out chan *StreamMessage) Option

WithBroadcastStreamChannels sets the streaming input and output channels for broadcasting messages to all nodes. All incoming stream messages will be sent to the `in` channel. A nil message indicates the end of the streaming data. After sending all messages to `in`, the handler will then listen to the `out` channel for reply messages. A nil message indicates the end of the reply stream.

func WithDuration

func WithDuration(v int64) Option

WithDuration sets Op's internal spindle object's lease duration in milliseconds. Defaults to 30000ms (30s) when not set. Minimum value is 2000ms (2s).

func WithGroupSyncInterval

func WithGroupSyncInterval(v time.Duration) Option

WithGroupSyncInterval sets the internal interval timeout to sync membership within the group in seconds. If not set, defaults to 30s. Minimum value is 2s.

func WithGrpcHostPort

func WithGrpcHostPort(v string) Option

WithGrpcHostPort sets Op's internal grpc host/port address. Defaults to the internal TCP host:port+1.

func WithLeaderCallback

func WithLeaderCallback(d any, f spindle.FnLeaderCallback) Option

WithLeaderCallback sets the node's callback function that will be called when a leader node selected (or deselected). The msg arg for f will be set to either 0 or 1.

func WithLeaderHandler

func WithLeaderHandler(d any, h FnMsgHandler) Option

WithLeaderHandler sets the node's callback function when it is the current leader and when members send messages to it using the Send(...) API. Any arbitrary data represented by d will be passed to the callback h every time it is called. If d is nil, the default callback data will be the *Op object itself. The handler's returning []byte will serve as reply.

Typical flow would be:

  1. Any node (including the leader) calls the Send(...) API.
  2. The current leader handles the call by reading the input.
  3. Leader will then call FnLeaderHandler, passing the arbitrary data along with the message.
  4. FnLeaderHandler will process the data as leader, then returns the reply to the calling member.

func WithLeaderStreamChannels

func WithLeaderStreamChannels(in chan *StreamMessage, out chan *StreamMessage) Option

WithLeaderStreamChannels sets the streaming input and output channels for sending streaming messages to the leader. All incoming stream messages to the leader will be sent to the `in` channel. A nil message indicates the end of the streaming data. After sending all messages to `in`, the handler will then listen to the `out` channel for reply messages. A nil message indicates the end of the reply stream.

func WithLogger

func WithLogger(v *log.Logger) Option

WithLogger sets Op's logger object. Can be silenced by setting v to:

log.New(io.Discard, "", 0)

type Reader

type Reader struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Reader) Close

func (r *Reader) Close()

Close closes the reader object.

func (*Reader) Err

func (r *Reader) Err() error

Err returns the last recorded error, if any, during the read operation.

func (*Reader) Read

func (r *Reader) Read(out chan []byte)

Read reads the underlying data and streams them to the `out` channel.

type SendToLeaderArgs

type SendToLeaderArgs struct {
	// Number of retry attempts to contact the leader.
	// Defaults to 10. If set to a negative number, it
	// will retry forever.
	Retries int
}

type SoS

type SoS struct {
	sync.Mutex

	Name string // the name of this instance
	// contains filtered or unexported fields
}

SoS (Spillover-Store) represents an object for spill-over (or stitched) storage. Useful for load-process-discard types of data processing. The order of storage priority is local memory, local disk, other pod's memory, other pod's disk, and so on.

Limitation: At the moment, it's not allowed to reuse a name for SOS once it's used and closed within hedge's lifetime.

func (*SoS) Close

func (sos *SoS) Close()

Close closes the SoS object.

func (*SoS) Reader

func (sos *SoS) Reader(opts ...*readerOptions) (*Reader, error)

Reader returns a reader object for reading data from SoS. The caller needs to call reader.Close() after use. Options is only used internally, not exposed to callers.

func (*SoS) Writer

func (sos *SoS) Writer(opts ...*writerOptions) (*Writer, error)

Writer returns a writer object for writing data to SoS. The caller needs to call writer.Close() after use. Options is only used internally, not exposed to callers.

type SoSOptions

type SoSOptions struct {
	// MemLimit sets the memory limit in bytes to be used per node.
	MemLimit uint64

	// DiskLimit sets the disk limit in bytes to be used per node.
	DiskLimit uint64

	// Expiration sets the TTL (time-to-live) of the backing storage.
	// If not set, the default is 30s.
	Expiration int64
}

type StreamBroadcastArgs

type StreamBroadcastArgs struct {
	SkipSelf bool // if true, skip broadcasting to self
}

type StreamBroadcastOutput

type StreamBroadcastOutput struct {
	In   chan *StreamMessage
	Outs map[string]chan *StreamMessage
}

type StreamMessage

type StreamMessage struct {
	Payload *pb.Payload `json:"payload"`
	Error   error       `json:"error"`
}

type StreamToLeaderOutput

type StreamToLeaderOutput struct {
	In  chan *StreamMessage `json:"in"`
	Out chan *StreamMessage `json:"out"`
}

type Writer

type Writer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Writer) Close

func (w *Writer) Close()

Close closes the writer object.

func (*Writer) Err

func (w *Writer) Err() error

Err returns the last recorded error during the write operation.

func (*Writer) Write

func (w *Writer) Write(data []byte)

Write writes data to the underlying storage.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL