etcd

package
v0.0.0-...-935816b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: GPL-3.0 Imports: 27 Imported by: 15

Documentation

Overview

Package etcd implements the distributed key value store and fs integration. This also takes care of managing and clustering of the embedded etcd server. The automatic clustering is considered experimental. If you require a more robust, battle-test etcd cluster, then manage your own, and point each mgmt agent at it with --seeds.

Algorithm

The elastic etcd algorithm works in the following way:

* When you start up mgmt, you can pass it a list of seeds.

* If no seeds are given, then assume you are the first server and startup.

* If a seed is given, connect as a client, and volunteer to be a server.

* All volunteering clients should listen for a message for nomination.

* If a client has been nominated, it should startup a server.

* A server should shutdown if its nomination is removed.

* The elected leader should decide who to nominate/unnominate as needed.

Notes

If you attempt to add a new member to the cluster with a duplicate hostname, then the behaviour is undefined, and you could bork your cluster. This is not recommended or supported. Please ensure that your hostnames are unique.

A single ^C requests an orderly shutdown, however a third ^C will ask etcd to shutdown forcefully. It is not recommended that you use this option, it exists as a way to make exit easier if something deadlocked the cluster. If this was due to user error (eg: duplicate hostnames) then it was your fault, but if the member did not shutdown from a single ^C under normal circumstances, then please file a bug.

There are currently some races in this implementation. In practice, this should not cause any adverse effects unless you simultaneously add or remove members at a high rate. Fixing these races will probably require some internal changes to etcd. Help is welcome if you're interested in working on this.

Smoke testing

Here is a simple way to test etcd clustering basics...

./mgmt run --tmp-prefix --no-pgp --hostname h1 empty
./mgmt run --tmp-prefix --no-pgp --hostname h2 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2381 --server-urls=http://127.0.0.1:2382 empty
./mgmt run --tmp-prefix --no-pgp --hostname h3 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2383 --server-urls=http://127.0.0.1:2384 empty
ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/chooser/dynamicsize/idealclustersize 3
./mgmt run --tmp-prefix --no-pgp --hostname h4 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2385 --server-urls=http://127.0.0.1:2386 empty
./mgmt run --tmp-prefix --no-pgp --hostname h5 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2387 --server-urls=http://127.0.0.1:2388 empty
ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list
ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/chooser/dynamicsize/idealclustersize 5
ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list

Bugs

A member might occasionally think that an endpoint still exists after it has already shutdown. This isn't a major issue, since if that endpoint doesn't respond, then it will automatically choose the next available one. To see this issue, turn on debugging and start: H1, H2, H3, then stop H2, and you might see that H3 still knows about H2.

Shutting down a cluster by setting the idealclustersize to zero is currently buggy and not supported. Try this at your own risk.

If a member is nominated, and it doesn't respond to the nominate event and startup, and we lost quorum to add it, then we could be in a blocked state. This can be improved upon if we can call memberRemove after a timeout.

Adding new cluster members very quickly, might trigger a: `runtime error: error validating peerURLs ... member count is unequal` error. See: https://github.com/etcd-io/etcd/issues/10626 for more information.

If you use the dynamic size feature to start and stop the server process, once it has already started and then stopped, it can't be re-started because of a bug in etcd that doesn't free the port. Instead you'll get a: `bind: address already in use` error. See: https://github.com/etcd-io/etcd/issues/6042 for more information.

Index

Constants

View Source
const (

	// ConvergedPath is the unprefixed path under which the converger
	// may store data. This is public so that other consumers can know to
	// avoid this key prefix.
	ConvergedPath = "/converged/"

	// SchedulerPath is the unprefixed path under which the scheduler
	// may store data. This is public so that other consumers can know to
	// avoid this key prefix.
	SchedulerPath = "/scheduler/"

	// DefaultClientURL is the default value that is used for client URLs.
	// It is pulled from the upstream etcd package.
	DefaultClientURL = embed.DefaultListenClientURLs // 127.0.0.1:2379

	// DefaultServerURL is the default value that is used for server URLs.
	// It is pulled from the upstream etcd package.
	DefaultServerURL = embed.DefaultListenPeerURLs // 127.0.0.1:2380

	// DefaultMaxTxnOps is the maximum number of operations to run in a
	// single etcd transaction. If you exceed this limit, it is possible
	// that you have either an extremely large code base, or that you have
	// some code which is possibly not as efficient as it could be. Let us
	// know so that we can analyze the situation, and increase this if
	// necessary.
	DefaultMaxTxnOps = 512

	// ClientDialTimeout is the DialTimeout option in the client config.
	ClientDialTimeout = 5 * time.Second

	// ClientDialKeepAliveTime is the DialKeepAliveTime config value for the
	// etcd client. It is recommended that you use this so that dead
	// endpoints don't block any cluster operations.
	ClientDialKeepAliveTime = 2 * time.Second // from etcdctl
	// ClientDialKeepAliveTimeout is the DialKeepAliveTimeout config value
	// for the etcd client. It is recommended that you use this so that dead
	// endpoints don't block any cluster operations.
	ClientDialKeepAliveTimeout = 6 * time.Second // from etcdctl

	// MemberChangeInterval is the polling interval to use when watching for
	// member changes during add or remove.
	MemberChangeInterval = 500 * time.Millisecond

	// SessionTTL is the number of seconds to wait before a dead or
	// unresponsive host has their volunteer keys removed from the cluster.
	// This should be an integer multiple of seconds, since one second is
	// the TTL precision used in etcd.
	SessionTTL = 10 * time.Second // seconds

	// ConvergerHostnameNamespace is a unique key used in the converger.
	ConvergerHostnameNamespace = "etcd-hostname"
)
View Source
const (
	// MaxServerStartTimeout is the amount of time to wait for the server
	// to start before considering it a failure. If you hit this timeout,
	// let us know so that we can analyze the situation, and increase this
	// if necessary.
	MaxServerStartTimeout = 60 * time.Second

	// MaxServerCloseTimeout is the maximum amount of time we'll wait for
	// the server to close down. If it exceeds this, it's probably a bug.
	MaxServerCloseTimeout = 15 * time.Second

	// MaxServerRetries is the maximum number of times we can try to restart
	// the server if it fails on startup. This can help workaround some
	// timing bugs in etcd.
	MaxServerRetries = 5

	// ServerRetryWait is the amount of time to wait between retries.
	ServerRetryWait = 500 * time.Millisecond
)

Variables

This section is empty.

Functions

This section is empty.

Types

type EmbdEtcd

type EmbdEtcd struct {
	// Hostname is the unique identifier for this host.
	Hostname string

	// Seeds is the list of servers that this client could connect to.
	Seeds etcdtypes.URLs

	// ClientURLs are the locations to listen for clients if i am a server.
	ClientURLs etcdtypes.URLs

	// ServerURLs are the locations to listen for servers (peers) if i am a
	// server (peer).
	ServerURLs etcdtypes.URLs

	// AClientURLs are the client urls to advertise.
	AClientURLs etcdtypes.URLs

	// AServerURLscare the server (peer) urls to advertise.
	AServerURLs etcdtypes.URLs

	// NoNetwork causes this to use unix:// sockets instead of TCP for
	// connections.
	NoNetwork bool

	// Converger is a converged coordinator object that can be used to
	// track the converged state.
	Converger *converger.Coordinator

	// NS is a string namespace that we prefix to every key operation.
	NS string

	// Prefix is the directory where any etcd related state is stored. It
	// must be an absolute directory path.
	Prefix string

	Debug bool
	Logf  func(format string, v ...interface{})
	// contains filtered or unexported fields
}

EmbdEtcd provides the embedded server and client etcd functionality. The "elastic" functionality has been removed for the time being because it was too complicated and not stable. Much better to use mcl to build that system.

func (*EmbdEtcd) Cleanup

func (obj *EmbdEtcd) Cleanup() error

Cleanup cleans up after you are done using the struct.

func (*EmbdEtcd) Err

func (obj *EmbdEtcd) Err() error

Err will contain the last error when Next shuts down. It waits for all the running processes to exit before it returns.

func (*EmbdEtcd) Exited

func (obj *EmbdEtcd) Exited() <-chan struct{}

Exited returns a channel that closes when we've destroyed. This process happens after Run exits. If Run is never called, this will never happen.

func (*EmbdEtcd) Init

func (obj *EmbdEtcd) Init() error

Init initializes the struct after it has been populated as desired. You must not use the struct if this returns an error.

func (*EmbdEtcd) Ready

func (obj *EmbdEtcd) Ready() <-chan struct{}

Ready returns a channel that closes when we're up and running. This process happens when calling Run. If Run is never called, this will never happen. Our main startup must be running, and our client must be connected to get here.

func (*EmbdEtcd) Run

func (obj *EmbdEtcd) Run(ctx context.Context) error

Run is the main entry point to kick off the embedded etcd server. It blocks until we've exited for shutdown. The shutdown can be triggered by cancelling the context.

func (*EmbdEtcd) ServerExited

func (obj *EmbdEtcd) ServerExited() (<-chan struct{}, func())

ServerExited returns a channel that closes when the server is destroyed. This process happens after runServer exits. If runServer is never called, this will never happen. It also returns a cancel/ack function which must be called once the signal is received or we are done watching it. This is because this is a cyclical signal which happens, and then gets reset as the server starts up, shuts down, and repeats the cycle. The cancel/ack function ensures that we only watch a signal when it's ready to be read, and only reset it when we are done watching it.

func (*EmbdEtcd) ServerReady

func (obj *EmbdEtcd) ServerReady() (<-chan struct{}, func())

ServerReady returns a channel that closes when we're up and running. This process happens when calling runServer. If runServer is never called, this will never happen. It also returns a cancel/ack function which must be called once the signal is received or we are done watching it. This is because this is a cyclical signal which happens, and then gets reset as the server starts up, shuts down, and repeats the cycle. The cancel/ack function ensures that we only watch a signal when it's ready to be read, and only reset it when we are done watching it.

func (*EmbdEtcd) Validate

func (obj *EmbdEtcd) Validate() error

Validate the initial struct. This is called from Init, but can be used if you would like to check your configuration is correct.

type World

type World struct {

	// Client is the etcd client to use. This should not be specified, one
	// will be created automatically. This exists for legacy reasons and for
	// the SSH etcd world implementation. Maybe it can be removed in the
	// future.
	Client interfaces.Client

	// Seeds are the list of etcd endpoints to connect to.
	Seeds []string

	// NS is the etcd namespace to use.
	NS string

	MetadataPrefix string    // expected metadata prefix
	StoragePrefix  string    // storage prefix for etcdfs storage
	StandaloneFs   engine.Fs // store an fs here for local usage
	GetURI         func() string
	// contains filtered or unexported fields
}

World is an etcd backed implementation of the World interface.

func (*World) AddDeploy

func (obj *World) AddDeploy(ctx context.Context, id uint64, hash, pHash string, data *string) error

AddDeploy adds a new deploy.

func (*World) Cleanup

func (obj *World) Cleanup() error

Cleanup runs last.

func (*World) Connect

func (obj *World) Connect(ctx context.Context, init *engine.WorldInit) error

Connect runs first.

func (*World) Fs

func (obj *World) Fs(uri string) (engine.Fs, error)

Fs returns a distributed file system from a unique URI. For single host execution that doesn't span more than a single host, this file system might actually be a local or memory backed file system, so actually only distributed within the boredom that is a single host cluster.

func (*World) GetDeploy

func (obj *World) GetDeploy(ctx context.Context, id uint64) (string, error)

GetDeploy returns the deploy with the specified id if it exists.

func (*World) GetDeploys

func (obj *World) GetDeploys(ctx context.Context) (map[uint64]string, error)

GetDeploys gets all the available deploys.

func (*World) GetMaxDeployID

func (obj *World) GetMaxDeployID(ctx context.Context) (uint64, error)

GetMaxDeployID returns the maximum deploy id.

func (*World) ResCollect

func (obj *World) ResCollect(ctx context.Context, filters []*engine.ResFilter) ([]*engine.ResOutput, error)

ResCollect gets the collection of exported resources which match the filters. It does this atomically so that a call always returns a complete collection.

func (*World) ResDelete

func (obj *World) ResDelete(ctx context.Context, resourceDeletes []*engine.ResDelete) (bool, error)

ResDelete deletes a number of resources in the world storage system. If this doesn't delete, it returns (true, nil). If it makes a delete, then it returns (false, nil). On any error we return (false, err).

func (*World) ResExport

func (obj *World) ResExport(ctx context.Context, resourceExports []*engine.ResExport) (bool, error)

ResExport stores a number of resources in the world storage system. The individual records should not be updated if they are identical to what is already present. (This is to prevent unnecessary events.) If this makes no changes, it returns (true, nil). If it makes a change, then it returns (false, nil). On any error we return (false, err). It stores the exports under our hostname namespace. Subsequent calls do NOT replace the previously set collection.

func (*World) ResWatch

func (obj *World) ResWatch(ctx context.Context, kind string) (chan error, error)

ResWatch returns a channel which spits out events on possible exported resource changes.

func (*World) Scheduled

func (obj *World) Scheduled(ctx context.Context, namespace string) (chan *scheduler.ScheduledResult, error)

Scheduled gets the scheduled results without participating.

func (*World) Scheduler

func (obj *World) Scheduler(namespace string, opts ...scheduler.Option) (*scheduler.Result, error)

Scheduler returns a scheduling result of hosts in a particular namespace. XXX: Add a context.Context here

func (*World) StrDel

func (obj *World) StrDel(ctx context.Context, namespace string) error

StrDel deletes the value in a particular namespace.

func (*World) StrGet

func (obj *World) StrGet(ctx context.Context, namespace string) (string, error)

StrGet returns the value for the the given namespace.

func (*World) StrIsNotExist

func (obj *World) StrIsNotExist(err error) bool

StrIsNotExist returns whether the error from StrGet is a key missing error.

func (*World) StrMapDel

func (obj *World) StrMapDel(ctx context.Context, namespace string) error

StrMapDel deletes the value in a particular namespace.

func (*World) StrMapGet

func (obj *World) StrMapGet(ctx context.Context, namespace string) (map[string]string, error)

StrMapGet returns a map of hostnames to values in the given namespace.

func (*World) StrMapSet

func (obj *World) StrMapSet(ctx context.Context, namespace, value string) error

StrMapSet sets the namespace value to a particular string under the identity of its own hostname.

func (*World) StrMapWatch

func (obj *World) StrMapWatch(ctx context.Context, namespace string) (chan error, error)

StrMapWatch returns a channel which spits out events on possible string changes.

func (*World) StrSet

func (obj *World) StrSet(ctx context.Context, namespace, value string) error

StrSet sets the namespace value to a particular string. XXX: This can overwrite another hosts value that was set with StrMapSet. Add possible cryptographic signing or special namespacing to prevent such things.

func (*World) StrWatch

func (obj *World) StrWatch(ctx context.Context, namespace string) (chan error, error)

StrWatch returns a channel which spits out events on possible string changes.

func (*World) URI

func (obj *World) URI() string

URI returns the current FS URI. TODO: Can we improve this API or deprecate it entirely?

func (*World) WatchDeploy

func (obj *World) WatchDeploy(ctx context.Context) (chan error, error)

WatchDeploy returns a channel which spits out events on new deploy activity.

func (*World) WatchMembers

func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error)

WatchMembers returns a channel of changing members in the cluster.

Directories

Path Synopsis
str
Package fs implements a very simple and limited file system on top of etcd.
Package fs implements a very simple and limited file system on top of etcd.
Package scheduler implements a distributed consensus scheduler with etcd.
Package scheduler implements a distributed consensus scheduler with etcd.
Package ssh transports etcd traffic over SSH to provide a special World API.
Package ssh transports etcd traffic over SSH to provide a special World API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL