Documentation
¶
Overview ¶
Package etcd implements the distributed key value store and fs integration. This also takes care of managing and clustering of the embedded etcd server. The automatic clustering is considered experimental. If you require a more robust, battle-test etcd cluster, then manage your own, and point each mgmt agent at it with --seeds.
Algorithm ¶
The elastic etcd algorithm works in the following way:
* When you start up mgmt, you can pass it a list of seeds.
* If no seeds are given, then assume you are the first server and startup.
* If a seed is given, connect as a client, and volunteer to be a server.
* All volunteering clients should listen for a message for nomination.
* If a client has been nominated, it should startup a server.
* A server should shutdown if its nomination is removed.
* The elected leader should decide who to nominate/unnominate as needed.
Notes ¶
If you attempt to add a new member to the cluster with a duplicate hostname, then the behaviour is undefined, and you could bork your cluster. This is not recommended or supported. Please ensure that your hostnames are unique.
A single ^C requests an orderly shutdown, however a third ^C will ask etcd to shutdown forcefully. It is not recommended that you use this option, it exists as a way to make exit easier if something deadlocked the cluster. If this was due to user error (eg: duplicate hostnames) then it was your fault, but if the member did not shutdown from a single ^C under normal circumstances, then please file a bug.
There are currently some races in this implementation. In practice, this should not cause any adverse effects unless you simultaneously add or remove members at a high rate. Fixing these races will probably require some internal changes to etcd. Help is welcome if you're interested in working on this.
Smoke testing ¶
Here is a simple way to test etcd clustering basics...
./mgmt run --tmp-prefix --no-pgp --hostname h1 empty ./mgmt run --tmp-prefix --no-pgp --hostname h2 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2381 --server-urls=http://127.0.0.1:2382 empty ./mgmt run --tmp-prefix --no-pgp --hostname h3 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2383 --server-urls=http://127.0.0.1:2384 empty ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/chooser/dynamicsize/idealclustersize 3 ./mgmt run --tmp-prefix --no-pgp --hostname h4 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2385 --server-urls=http://127.0.0.1:2386 empty ./mgmt run --tmp-prefix --no-pgp --hostname h5 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2387 --server-urls=http://127.0.0.1:2388 empty ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/chooser/dynamicsize/idealclustersize 5 ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list
Bugs ¶
A member might occasionally think that an endpoint still exists after it has already shutdown. This isn't a major issue, since if that endpoint doesn't respond, then it will automatically choose the next available one. To see this issue, turn on debugging and start: H1, H2, H3, then stop H2, and you might see that H3 still knows about H2.
Shutting down a cluster by setting the idealclustersize to zero is currently buggy and not supported. Try this at your own risk.
If a member is nominated, and it doesn't respond to the nominate event and startup, and we lost quorum to add it, then we could be in a blocked state. This can be improved upon if we can call memberRemove after a timeout.
Adding new cluster members very quickly, might trigger a: `runtime error: error validating peerURLs ... member count is unequal` error. See: https://github.com/etcd-io/etcd/issues/10626 for more information.
If you use the dynamic size feature to start and stop the server process, once it has already started and then stopped, it can't be re-started because of a bug in etcd that doesn't free the port. Instead you'll get a: `bind: address already in use` error. See: https://github.com/etcd-io/etcd/issues/6042 for more information.
Index ¶
- Constants
- type EmbdEtcd
- func (obj *EmbdEtcd) Cleanup() error
- func (obj *EmbdEtcd) Err() error
- func (obj *EmbdEtcd) Exited() <-chan struct{}
- func (obj *EmbdEtcd) Init() error
- func (obj *EmbdEtcd) Ready() <-chan struct{}
- func (obj *EmbdEtcd) Run(ctx context.Context) error
- func (obj *EmbdEtcd) ServerExited() (<-chan struct{}, func())
- func (obj *EmbdEtcd) ServerReady() (<-chan struct{}, func())
- func (obj *EmbdEtcd) Validate() error
- type World
- func (obj *World) AddDeploy(ctx context.Context, id uint64, hash, pHash string, data *string) error
- func (obj *World) Cleanup() error
- func (obj *World) Connect(ctx context.Context, init *engine.WorldInit) error
- func (obj *World) Fs(uri string) (engine.Fs, error)
- func (obj *World) GetDeploy(ctx context.Context, id uint64) (string, error)
- func (obj *World) GetDeploys(ctx context.Context) (map[uint64]string, error)
- func (obj *World) GetMaxDeployID(ctx context.Context) (uint64, error)
- func (obj *World) ResCollect(ctx context.Context, filters []*engine.ResFilter) ([]*engine.ResOutput, error)
- func (obj *World) ResDelete(ctx context.Context, resourceDeletes []*engine.ResDelete) (bool, error)
- func (obj *World) ResExport(ctx context.Context, resourceExports []*engine.ResExport) (bool, error)
- func (obj *World) ResWatch(ctx context.Context, kind string) (chan error, error)
- func (obj *World) Scheduled(ctx context.Context, namespace string) (chan *scheduler.ScheduledResult, error)
- func (obj *World) Scheduler(namespace string, opts ...scheduler.Option) (*scheduler.Result, error)
- func (obj *World) StrDel(ctx context.Context, namespace string) error
- func (obj *World) StrGet(ctx context.Context, namespace string) (string, error)
- func (obj *World) StrIsNotExist(err error) bool
- func (obj *World) StrMapDel(ctx context.Context, namespace string) error
- func (obj *World) StrMapGet(ctx context.Context, namespace string) (map[string]string, error)
- func (obj *World) StrMapSet(ctx context.Context, namespace, value string) error
- func (obj *World) StrMapWatch(ctx context.Context, namespace string) (chan error, error)
- func (obj *World) StrSet(ctx context.Context, namespace, value string) error
- func (obj *World) StrWatch(ctx context.Context, namespace string) (chan error, error)
- func (obj *World) URI() string
- func (obj *World) WatchDeploy(ctx context.Context) (chan error, error)
- func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error)
Constants ¶
const ( // ConvergedPath is the unprefixed path under which the converger // may store data. This is public so that other consumers can know to // avoid this key prefix. ConvergedPath = "/converged/" // SchedulerPath is the unprefixed path under which the scheduler // may store data. This is public so that other consumers can know to // avoid this key prefix. SchedulerPath = "/scheduler/" // DefaultClientURL is the default value that is used for client URLs. // It is pulled from the upstream etcd package. DefaultClientURL = embed.DefaultListenClientURLs // 127.0.0.1:2379 // DefaultServerURL is the default value that is used for server URLs. // It is pulled from the upstream etcd package. DefaultServerURL = embed.DefaultListenPeerURLs // 127.0.0.1:2380 // DefaultMaxTxnOps is the maximum number of operations to run in a // single etcd transaction. If you exceed this limit, it is possible // that you have either an extremely large code base, or that you have // some code which is possibly not as efficient as it could be. Let us // know so that we can analyze the situation, and increase this if // necessary. DefaultMaxTxnOps = 512 // ClientDialTimeout is the DialTimeout option in the client config. ClientDialTimeout = 5 * time.Second // ClientDialKeepAliveTime is the DialKeepAliveTime config value for the // etcd client. It is recommended that you use this so that dead // endpoints don't block any cluster operations. ClientDialKeepAliveTime = 2 * time.Second // from etcdctl // ClientDialKeepAliveTimeout is the DialKeepAliveTimeout config value // for the etcd client. It is recommended that you use this so that dead // endpoints don't block any cluster operations. ClientDialKeepAliveTimeout = 6 * time.Second // from etcdctl // MemberChangeInterval is the polling interval to use when watching for // member changes during add or remove. MemberChangeInterval = 500 * time.Millisecond // SessionTTL is the number of seconds to wait before a dead or // unresponsive host has their volunteer keys removed from the cluster. // This should be an integer multiple of seconds, since one second is // the TTL precision used in etcd. SessionTTL = 10 * time.Second // seconds // ConvergerHostnameNamespace is a unique key used in the converger. ConvergerHostnameNamespace = "etcd-hostname" )
const ( // MaxServerStartTimeout is the amount of time to wait for the server // to start before considering it a failure. If you hit this timeout, // let us know so that we can analyze the situation, and increase this // if necessary. MaxServerStartTimeout = 60 * time.Second // MaxServerCloseTimeout is the maximum amount of time we'll wait for // the server to close down. If it exceeds this, it's probably a bug. MaxServerCloseTimeout = 15 * time.Second // MaxServerRetries is the maximum number of times we can try to restart // the server if it fails on startup. This can help workaround some // timing bugs in etcd. MaxServerRetries = 5 // ServerRetryWait is the amount of time to wait between retries. ServerRetryWait = 500 * time.Millisecond )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EmbdEtcd ¶
type EmbdEtcd struct {
// Hostname is the unique identifier for this host.
Hostname string
// Seeds is the list of servers that this client could connect to.
Seeds etcdtypes.URLs
// ClientURLs are the locations to listen for clients if i am a server.
ClientURLs etcdtypes.URLs
// ServerURLs are the locations to listen for servers (peers) if i am a
// server (peer).
ServerURLs etcdtypes.URLs
// AClientURLs are the client urls to advertise.
AClientURLs etcdtypes.URLs
// AServerURLscare the server (peer) urls to advertise.
AServerURLs etcdtypes.URLs
// NoNetwork causes this to use unix:// sockets instead of TCP for
// connections.
NoNetwork bool
// Converger is a converged coordinator object that can be used to
// track the converged state.
Converger *converger.Coordinator
// NS is a string namespace that we prefix to every key operation.
NS string
// Prefix is the directory where any etcd related state is stored. It
// must be an absolute directory path.
Prefix string
Debug bool
Logf func(format string, v ...interface{})
// contains filtered or unexported fields
}
EmbdEtcd provides the embedded server and client etcd functionality. The "elastic" functionality has been removed for the time being because it was too complicated and not stable. Much better to use mcl to build that system.
func (*EmbdEtcd) Err ¶
Err will contain the last error when Next shuts down. It waits for all the running processes to exit before it returns.
func (*EmbdEtcd) Exited ¶
func (obj *EmbdEtcd) Exited() <-chan struct{}
Exited returns a channel that closes when we've destroyed. This process happens after Run exits. If Run is never called, this will never happen.
func (*EmbdEtcd) Init ¶
Init initializes the struct after it has been populated as desired. You must not use the struct if this returns an error.
func (*EmbdEtcd) Ready ¶
func (obj *EmbdEtcd) Ready() <-chan struct{}
Ready returns a channel that closes when we're up and running. This process happens when calling Run. If Run is never called, this will never happen. Our main startup must be running, and our client must be connected to get here.
func (*EmbdEtcd) Run ¶
Run is the main entry point to kick off the embedded etcd server. It blocks until we've exited for shutdown. The shutdown can be triggered by cancelling the context.
func (*EmbdEtcd) ServerExited ¶
func (obj *EmbdEtcd) ServerExited() (<-chan struct{}, func())
ServerExited returns a channel that closes when the server is destroyed. This process happens after runServer exits. If runServer is never called, this will never happen. It also returns a cancel/ack function which must be called once the signal is received or we are done watching it. This is because this is a cyclical signal which happens, and then gets reset as the server starts up, shuts down, and repeats the cycle. The cancel/ack function ensures that we only watch a signal when it's ready to be read, and only reset it when we are done watching it.
func (*EmbdEtcd) ServerReady ¶
func (obj *EmbdEtcd) ServerReady() (<-chan struct{}, func())
ServerReady returns a channel that closes when we're up and running. This process happens when calling runServer. If runServer is never called, this will never happen. It also returns a cancel/ack function which must be called once the signal is received or we are done watching it. This is because this is a cyclical signal which happens, and then gets reset as the server starts up, shuts down, and repeats the cycle. The cancel/ack function ensures that we only watch a signal when it's ready to be read, and only reset it when we are done watching it.
type World ¶
type World struct {
// Client is the etcd client to use. This should not be specified, one
// will be created automatically. This exists for legacy reasons and for
// the SSH etcd world implementation. Maybe it can be removed in the
// future.
Client interfaces.Client
// Seeds are the list of etcd endpoints to connect to.
Seeds []string
// NS is the etcd namespace to use.
NS string
MetadataPrefix string // expected metadata prefix
StoragePrefix string // storage prefix for etcdfs storage
StandaloneFs engine.Fs // store an fs here for local usage
GetURI func() string
// contains filtered or unexported fields
}
World is an etcd backed implementation of the World interface.
func (*World) Fs ¶
Fs returns a distributed file system from a unique URI. For single host execution that doesn't span more than a single host, this file system might actually be a local or memory backed file system, so actually only distributed within the boredom that is a single host cluster.
func (*World) GetDeploys ¶
GetDeploys gets all the available deploys.
func (*World) GetMaxDeployID ¶
GetMaxDeployID returns the maximum deploy id.
func (*World) ResCollect ¶
func (obj *World) ResCollect(ctx context.Context, filters []*engine.ResFilter) ([]*engine.ResOutput, error)
ResCollect gets the collection of exported resources which match the filters. It does this atomically so that a call always returns a complete collection.
func (*World) ResDelete ¶
ResDelete deletes a number of resources in the world storage system. If this doesn't delete, it returns (true, nil). If it makes a delete, then it returns (false, nil). On any error we return (false, err).
func (*World) ResExport ¶
ResExport stores a number of resources in the world storage system. The individual records should not be updated if they are identical to what is already present. (This is to prevent unnecessary events.) If this makes no changes, it returns (true, nil). If it makes a change, then it returns (false, nil). On any error we return (false, err). It stores the exports under our hostname namespace. Subsequent calls do NOT replace the previously set collection.
func (*World) ResWatch ¶
ResWatch returns a channel which spits out events on possible exported resource changes.
func (*World) Scheduled ¶
func (obj *World) Scheduled(ctx context.Context, namespace string) (chan *scheduler.ScheduledResult, error)
Scheduled gets the scheduled results without participating.
func (*World) Scheduler ¶
Scheduler returns a scheduling result of hosts in a particular namespace. XXX: Add a context.Context here
func (*World) StrIsNotExist ¶
StrIsNotExist returns whether the error from StrGet is a key missing error.
func (*World) StrMapSet ¶
StrMapSet sets the namespace value to a particular string under the identity of its own hostname.
func (*World) StrMapWatch ¶
StrMapWatch returns a channel which spits out events on possible string changes.
func (*World) StrSet ¶
StrSet sets the namespace value to a particular string. XXX: This can overwrite another hosts value that was set with StrMapSet. Add possible cryptographic signing or special namespacing to prevent such things.
func (*World) StrWatch ¶
StrWatch returns a channel which spits out events on possible string changes.
func (*World) URI ¶
URI returns the current FS URI. TODO: Can we improve this API or deprecate it entirely?
func (*World) WatchDeploy ¶
WatchDeploy returns a channel which spits out events on new deploy activity.
func (*World) WatchMembers ¶
func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error)
WatchMembers returns a channel of changing members in the cluster.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package fs implements a very simple and limited file system on top of etcd.
|
Package fs implements a very simple and limited file system on top of etcd. |
|
Package scheduler implements a distributed consensus scheduler with etcd.
|
Package scheduler implements a distributed consensus scheduler with etcd. |
|
Package ssh transports etcd traffic over SSH to provide a special World API.
|
Package ssh transports etcd traffic over SSH to provide a special World API. |