etcd

package
v0.0.0-...-c36b4a8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2025 License: BSD-2-Clause Imports: 11 Imported by: 0

Documentation

Overview

Package etcd provides an etcd implementation of the storage driver interface. It enables using etcd as a distributed key-value storage backend.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	// Txn creates a new transaction.
	Txn(ctx context.Context) etcd.Txn
}

Client defines the minimal interface needed for etcd operations. This allows for easier testing and mock implementations.

type Driver

type Driver struct {
	// contains filtered or unexported fields
}

Driver is an etcd implementation of the storage driver interface. It uses etcd as the underlying key-value storage backend.

func New

func New(client *etcd.Client) *Driver

New creates a new etcd driver instance using an existing etcd client. The client should be properly configured and connected to an etcd cluster.

func (Driver) Execute

func (d Driver) Execute(
	ctx context.Context,
	predicates []predicate.Predicate,
	thenOps []operation.Operation,
	elseOps []operation.Operation,
) (tx.Response, error)

Execute executes a transactional operation with conditional logic. It processes predicates to determine whether to execute thenOps or elseOps.

Example (SimpleDelete)

ExampleDriver_Execute_simpleDelete demonstrates a simple Delete operation with the etcd driver.

package main

import (
	"context"
	"fmt"
	"log"
	"testing"
	"time"

	etcdclient "go.etcd.io/etcd/client/v3"
	etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"

	etcdintegration "go.etcd.io/etcd/tests/v3/integration"

	"github.com/tarantool/go-storage/driver/etcd"
	testingUtils "github.com/tarantool/go-storage/internal/testing"
	"github.com/tarantool/go-storage/operation"
)

const exampleTestDialTimeout = 5 * time.Second

// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
	tb.Helper()

	etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())

	cluster := etcdintegration.NewLazyCluster()

	tb.Cleanup(func() { cluster.Terminate() })

	endpoints := cluster.EndpointsGRPC()

	client, err := etcdclient.New(etcdclient.Config{
		Endpoints:   endpoints,
		DialTimeout: exampleTestDialTimeout,

		AutoSyncInterval:      0,
		DialKeepAliveTime:     0,
		DialKeepAliveTimeout:  0,
		MaxCallSendMsgSize:    0,
		MaxCallRecvMsgSize:    0,
		TLS:                   nil,
		Username:              "",
		Password:              "",
		RejectOldCluster:      false,
		DialOptions:           nil,
		Context:               nil,
		Logger:                nil,
		LogConfig:             nil,
		PermitWithoutStream:   false,
		MaxUnaryRetries:       0,
		BackoffWaitBetween:    0,
		BackoffJitterFraction: 0,
	})
	if err != nil {
		tb.Fatalf("Failed to create etcd client: %v", err)
	}

	tb.Cleanup(func() { _ = client.Close() })

	driver := etcd.New(client)

	return driver, func() {}
}

func main() {
	// Create a testing context for the example.
	t := testingUtils.NewT()
	defer t.Cleanups()

	ctx := context.Background()

	driver, cleanup := createEtcdDriver(t)
	defer cleanup()

	key := []byte("/config/app/version")

	// First store a value.
	_, _ = driver.Execute(ctx, nil, []operation.Operation{
		operation.Put(key, []byte("1.0.0")),
	}, nil)

	// Then delete it.
	_, err := driver.Execute(ctx, nil, []operation.Operation{
		operation.Delete(key),
	}, nil)
	if err != nil {
		log.Printf("Delete operation failed: %v", err)
		return
	}

	fmt.Println("Successfully deleted key:", string(key))

}
Output:

Successfully deleted key: /config/app/version
Example (SimpleGet)

ExampleDriver_Execute_simpleGet demonstrates a simple Get operation with the etcd driver.

package main

import (
	"context"
	"fmt"
	"log"
	"testing"
	"time"

	etcdclient "go.etcd.io/etcd/client/v3"
	etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"

	etcdintegration "go.etcd.io/etcd/tests/v3/integration"

	"github.com/tarantool/go-storage/driver/etcd"
	testingUtils "github.com/tarantool/go-storage/internal/testing"
	"github.com/tarantool/go-storage/operation"
)

const exampleTestDialTimeout = 5 * time.Second

// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
	tb.Helper()

	etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())

	cluster := etcdintegration.NewLazyCluster()

	tb.Cleanup(func() { cluster.Terminate() })

	endpoints := cluster.EndpointsGRPC()

	client, err := etcdclient.New(etcdclient.Config{
		Endpoints:   endpoints,
		DialTimeout: exampleTestDialTimeout,

		AutoSyncInterval:      0,
		DialKeepAliveTime:     0,
		DialKeepAliveTimeout:  0,
		MaxCallSendMsgSize:    0,
		MaxCallRecvMsgSize:    0,
		TLS:                   nil,
		Username:              "",
		Password:              "",
		RejectOldCluster:      false,
		DialOptions:           nil,
		Context:               nil,
		Logger:                nil,
		LogConfig:             nil,
		PermitWithoutStream:   false,
		MaxUnaryRetries:       0,
		BackoffWaitBetween:    0,
		BackoffJitterFraction: 0,
	})
	if err != nil {
		tb.Fatalf("Failed to create etcd client: %v", err)
	}

	tb.Cleanup(func() { _ = client.Close() })

	driver := etcd.New(client)

	return driver, func() {}
}

func main() {
	// Create a testing context for the example.
	t := testingUtils.NewT()
	defer t.Cleanups()

	ctx := context.Background()

	driver, cleanup := createEtcdDriver(t)
	defer cleanup()

	key := []byte("/config/app/version")

	// First store a value.
	_, _ = driver.Execute(ctx, nil, []operation.Operation{
		operation.Put(key, []byte("1.0.0")),
	}, nil)

	// Then retrieve it.
	response, err := driver.Execute(ctx, nil, []operation.Operation{
		operation.Get(key),
	}, nil)
	if err != nil {
		log.Printf("Get operation failed: %v", err)
		return
	}

	if response.Succeeded && len(response.Results) > 0 {
		if len(response.Results[0].Values) > 0 {
			kv := response.Results[0].Values[0]
			fmt.Printf("Retrieved key: %s, value: %s, version: %d\n",
				string(kv.Key), string(kv.Value), kv.ModRevision)
		}
	}

}
Output:

Retrieved key: /config/app/version, value: 1.0.0, version: 2
Example (SimpleMultiPut)

ExampleDriver_Execute_simpleMultiPut demonstrates multiple Put operations in a single transaction with the etcd driver.

package main

import (
	"context"
	"fmt"
	"log"
	"testing"
	"time"

	etcdclient "go.etcd.io/etcd/client/v3"
	etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"

	etcdintegration "go.etcd.io/etcd/tests/v3/integration"

	"github.com/tarantool/go-storage/driver/etcd"
	testingUtils "github.com/tarantool/go-storage/internal/testing"
	"github.com/tarantool/go-storage/operation"
)

const exampleTestDialTimeout = 5 * time.Second

// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
	tb.Helper()

	etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())

	cluster := etcdintegration.NewLazyCluster()

	tb.Cleanup(func() { cluster.Terminate() })

	endpoints := cluster.EndpointsGRPC()

	client, err := etcdclient.New(etcdclient.Config{
		Endpoints:   endpoints,
		DialTimeout: exampleTestDialTimeout,

		AutoSyncInterval:      0,
		DialKeepAliveTime:     0,
		DialKeepAliveTimeout:  0,
		MaxCallSendMsgSize:    0,
		MaxCallRecvMsgSize:    0,
		TLS:                   nil,
		Username:              "",
		Password:              "",
		RejectOldCluster:      false,
		DialOptions:           nil,
		Context:               nil,
		Logger:                nil,
		LogConfig:             nil,
		PermitWithoutStream:   false,
		MaxUnaryRetries:       0,
		BackoffWaitBetween:    0,
		BackoffJitterFraction: 0,
	})
	if err != nil {
		tb.Fatalf("Failed to create etcd client: %v", err)
	}

	tb.Cleanup(func() { _ = client.Close() })

	driver := etcd.New(client)

	return driver, func() {}
}

func main() {
	// Create a testing context for the example.
	t := testingUtils.NewT()
	defer t.Cleanups()

	ctx := context.Background()

	driver, cleanup := createEtcdDriver(t)
	defer cleanup()

	response, err := driver.Execute(ctx, nil, []operation.Operation{
		operation.Put([]byte("/config/app/name"), []byte("MyApp")),
		operation.Put([]byte("/config/app/environment"), []byte("production")),
	}, nil)
	if err != nil {
		log.Printf("Multi-put operation failed: %v", err)
		return
	}

	fmt.Println("Successfully stored", len(response.Results), "configuration items")

}
Output:

Successfully stored 2 configuration items
Example (SimplePut)

ExampleDriver_Execute_simplePut demonstrates a simple Put operation with the etcd driver.

package main

import (
	"context"
	"fmt"
	"log"
	"testing"
	"time"

	etcdclient "go.etcd.io/etcd/client/v3"
	etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"

	etcdintegration "go.etcd.io/etcd/tests/v3/integration"

	"github.com/tarantool/go-storage/driver/etcd"
	testingUtils "github.com/tarantool/go-storage/internal/testing"
	"github.com/tarantool/go-storage/operation"
)

const exampleTestDialTimeout = 5 * time.Second

// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
	tb.Helper()

	etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())

	cluster := etcdintegration.NewLazyCluster()

	tb.Cleanup(func() { cluster.Terminate() })

	endpoints := cluster.EndpointsGRPC()

	client, err := etcdclient.New(etcdclient.Config{
		Endpoints:   endpoints,
		DialTimeout: exampleTestDialTimeout,

		AutoSyncInterval:      0,
		DialKeepAliveTime:     0,
		DialKeepAliveTimeout:  0,
		MaxCallSendMsgSize:    0,
		MaxCallRecvMsgSize:    0,
		TLS:                   nil,
		Username:              "",
		Password:              "",
		RejectOldCluster:      false,
		DialOptions:           nil,
		Context:               nil,
		Logger:                nil,
		LogConfig:             nil,
		PermitWithoutStream:   false,
		MaxUnaryRetries:       0,
		BackoffWaitBetween:    0,
		BackoffJitterFraction: 0,
	})
	if err != nil {
		tb.Fatalf("Failed to create etcd client: %v", err)
	}

	tb.Cleanup(func() { _ = client.Close() })

	driver := etcd.New(client)

	return driver, func() {}
}

func main() {
	// Create a testing context for the example.
	t := testingUtils.NewT()
	defer t.Cleanups()

	ctx := context.Background()

	driver, cleanup := createEtcdDriver(t)
	defer cleanup()

	key := []byte("/config/app/version")
	value := []byte("1.0.0")

	_, err := driver.Execute(ctx, nil, []operation.Operation{
		operation.Put(key, value),
	}, nil)
	if err != nil {
		log.Printf("Put operation failed: %v", err)
		return
	}

	fmt.Println("Key", string(key), "stored with value:", string(value))

}
Output:

Key /config/app/version stored with value: 1.0.0
Example (With_predicates_multiplePredicates)

ExampleDriver_Execute_with_predicates_multiplePredicates demonstrates multiple predicates with Else operations.

package main

import (
	"context"
	"fmt"
	"log"
	"testing"
	"time"

	etcdclient "go.etcd.io/etcd/client/v3"
	etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"

	etcdintegration "go.etcd.io/etcd/tests/v3/integration"

	"github.com/tarantool/go-storage/driver/etcd"
	testingUtils "github.com/tarantool/go-storage/internal/testing"
	"github.com/tarantool/go-storage/operation"
	"github.com/tarantool/go-storage/predicate"
)

const exampleTestDialTimeout = 5 * time.Second

// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
	tb.Helper()

	etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())

	cluster := etcdintegration.NewLazyCluster()

	tb.Cleanup(func() { cluster.Terminate() })

	endpoints := cluster.EndpointsGRPC()

	client, err := etcdclient.New(etcdclient.Config{
		Endpoints:   endpoints,
		DialTimeout: exampleTestDialTimeout,

		AutoSyncInterval:      0,
		DialKeepAliveTime:     0,
		DialKeepAliveTimeout:  0,
		MaxCallSendMsgSize:    0,
		MaxCallRecvMsgSize:    0,
		TLS:                   nil,
		Username:              "",
		Password:              "",
		RejectOldCluster:      false,
		DialOptions:           nil,
		Context:               nil,
		Logger:                nil,
		LogConfig:             nil,
		PermitWithoutStream:   false,
		MaxUnaryRetries:       0,
		BackoffWaitBetween:    0,
		BackoffJitterFraction: 0,
	})
	if err != nil {
		tb.Fatalf("Failed to create etcd client: %v", err)
	}

	tb.Cleanup(func() { _ = client.Close() })

	driver := etcd.New(client)

	return driver, func() {}
}

func main() {
	// Create a testing context for the example.
	t := testingUtils.NewT()
	defer t.Cleanups()

	ctx := context.Background()

	driver, cleanup := createEtcdDriver(t)
	defer cleanup()

	key1 := []byte("/config/database/host")
	key2 := []byte("/config/database/port")

	_, _ = driver.Execute(ctx, nil, []operation.Operation{
		operation.Put(key1, []byte("localhost")),
		operation.Put(key2, []byte("5432")),
	}, nil)

	response, err := driver.Execute(ctx, []predicate.Predicate{
		predicate.ValueEqual(key1, []byte("localhost")),
		predicate.ValueEqual(key2, []byte("5432")),
	}, []operation.Operation{
		operation.Put(key1, []byte("new-host")),
		operation.Put(key2, []byte("6432")),
	}, []operation.Operation{
		operation.Delete(key1),
		operation.Delete(key2),
	})
	if err != nil {
		log.Printf("Multi-predicate transaction failed: %v", err)
		return
	}

	if response.Succeeded {
		fmt.Println("Multi-predicate transaction succeeded - values were updated")
	} else {
		fmt.Println("Multi-predicate transaction failed - cleanup operations executed")
	}

}
Output:

Multi-predicate transaction succeeded - values were updated
Example (With_predicates_valueBased)

ExampleDriver_Execute_with_predicates_valueBased demonstrates value-based conditional updates using predicates.

package main

import (
	"context"
	"fmt"
	"log"
	"testing"
	"time"

	etcdclient "go.etcd.io/etcd/client/v3"
	etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"

	etcdintegration "go.etcd.io/etcd/tests/v3/integration"

	"github.com/tarantool/go-storage/driver/etcd"
	testingUtils "github.com/tarantool/go-storage/internal/testing"
	"github.com/tarantool/go-storage/operation"
	"github.com/tarantool/go-storage/predicate"
)

const exampleTestDialTimeout = 5 * time.Second

// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
	tb.Helper()

	etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())

	cluster := etcdintegration.NewLazyCluster()

	tb.Cleanup(func() { cluster.Terminate() })

	endpoints := cluster.EndpointsGRPC()

	client, err := etcdclient.New(etcdclient.Config{
		Endpoints:   endpoints,
		DialTimeout: exampleTestDialTimeout,

		AutoSyncInterval:      0,
		DialKeepAliveTime:     0,
		DialKeepAliveTimeout:  0,
		MaxCallSendMsgSize:    0,
		MaxCallRecvMsgSize:    0,
		TLS:                   nil,
		Username:              "",
		Password:              "",
		RejectOldCluster:      false,
		DialOptions:           nil,
		Context:               nil,
		Logger:                nil,
		LogConfig:             nil,
		PermitWithoutStream:   false,
		MaxUnaryRetries:       0,
		BackoffWaitBetween:    0,
		BackoffJitterFraction: 0,
	})
	if err != nil {
		tb.Fatalf("Failed to create etcd client: %v", err)
	}

	tb.Cleanup(func() { _ = client.Close() })

	driver := etcd.New(client)

	return driver, func() {}
}

func main() {
	// Create a testing context for the example.
	t := testingUtils.NewT()
	defer t.Cleanups()

	ctx := context.Background()

	driver, cleanup := createEtcdDriver(t)
	defer cleanup()

	key := []byte("/config/app/settings")
	currentValue := []byte("old-settings")
	newValue := []byte("new-settings")

	_, _ = driver.Execute(ctx, nil, []operation.Operation{
		operation.Put(key, currentValue),
	}, nil)

	response, err := driver.Execute(ctx, []predicate.Predicate{
		predicate.ValueEqual(key, []byte("old-settings")),
	}, []operation.Operation{
		operation.Put(key, newValue),
	}, nil)
	if err != nil {
		log.Printf("Conditional update failed: %v", err)
		return
	}

	if response.Succeeded {
		fmt.Println("Conditional update succeeded - value was updated")
	} else {
		fmt.Println("Conditional update failed - value did not match")
	}

}
Output:

Conditional update succeeded - value was updated
Example (With_predicates_versionBased)

ExampleDriver_Execute_with_predicates_versionBased demonstrates version-based conditional updates using predicates.

package main

import (
	"context"
	"fmt"
	"log"
	"testing"
	"time"

	etcdclient "go.etcd.io/etcd/client/v3"
	etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"

	etcdintegration "go.etcd.io/etcd/tests/v3/integration"

	"github.com/tarantool/go-storage/driver/etcd"
	testingUtils "github.com/tarantool/go-storage/internal/testing"
	"github.com/tarantool/go-storage/operation"
	"github.com/tarantool/go-storage/predicate"
)

const exampleTestDialTimeout = 5 * time.Second

// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
	tb.Helper()

	etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())

	cluster := etcdintegration.NewLazyCluster()

	tb.Cleanup(func() { cluster.Terminate() })

	endpoints := cluster.EndpointsGRPC()

	client, err := etcdclient.New(etcdclient.Config{
		Endpoints:   endpoints,
		DialTimeout: exampleTestDialTimeout,

		AutoSyncInterval:      0,
		DialKeepAliveTime:     0,
		DialKeepAliveTimeout:  0,
		MaxCallSendMsgSize:    0,
		MaxCallRecvMsgSize:    0,
		TLS:                   nil,
		Username:              "",
		Password:              "",
		RejectOldCluster:      false,
		DialOptions:           nil,
		Context:               nil,
		Logger:                nil,
		LogConfig:             nil,
		PermitWithoutStream:   false,
		MaxUnaryRetries:       0,
		BackoffWaitBetween:    0,
		BackoffJitterFraction: 0,
	})
	if err != nil {
		tb.Fatalf("Failed to create etcd client: %v", err)
	}

	tb.Cleanup(func() { _ = client.Close() })

	driver := etcd.New(client)

	return driver, func() {}
}

func main() {
	// Create a testing context for the example.
	t := testingUtils.NewT()
	defer t.Cleanups()

	ctx := context.Background()

	driver, cleanup := createEtcdDriver(t)
	defer cleanup()

	key := []byte("/config/app/feature")
	value := []byte("enabled")

	_, err := driver.Execute(ctx, nil, []operation.Operation{
		operation.Put(key, value),
	}, nil)
	if err != nil {
		log.Printf("Initial update failed: %v", err)
		return
	}

	getResponse, err := driver.Execute(ctx, nil, []operation.Operation{
		operation.Get(key),
	}, nil)
	if err != nil {
		log.Printf("Get operation failed: %v", err)
		return
	}

	var currentVersion int64
	if len(getResponse.Results) > 0 && len(getResponse.Results[0].Values) > 0 {
		currentVersion = getResponse.Results[0].Values[0].ModRevision
	}

	response, err := driver.Execute(ctx, []predicate.Predicate{
		predicate.VersionEqual(key, currentVersion),
	}, []operation.Operation{
		operation.Put(key, []byte("disabled")),
	}, nil)
	if err != nil {
		log.Printf("Version-based update failed: %v", err)
		return
	}

	if response.Succeeded {
		fmt.Println("Version-based update succeeded - no concurrent modification")
	} else {
		fmt.Println("Version-based update failed - version conflict detected")
	}

}
Output:

Version-based update succeeded - no concurrent modification

func (Driver) Watch

func (d Driver) Watch(ctx context.Context, key []byte, _ ...watch.Option) (<-chan watch.Event, func(), error)

Watch monitors changes to a specific key and returns a stream of events. It supports optional watch configuration through the opts parameter.

Example (Basic)

ExampleDriver_Watch_basic demonstrates basic watch on a single key.

package main

import (
	"context"
	"fmt"
	"log"
	"testing"
	"time"

	etcdclient "go.etcd.io/etcd/client/v3"
	etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"

	etcdintegration "go.etcd.io/etcd/tests/v3/integration"

	"github.com/tarantool/go-storage/driver/etcd"
	testingUtils "github.com/tarantool/go-storage/internal/testing"
	"github.com/tarantool/go-storage/operation"
)

const exampleTestDialTimeout = 5 * time.Second

// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
	tb.Helper()

	etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())

	cluster := etcdintegration.NewLazyCluster()

	tb.Cleanup(func() { cluster.Terminate() })

	endpoints := cluster.EndpointsGRPC()

	client, err := etcdclient.New(etcdclient.Config{
		Endpoints:   endpoints,
		DialTimeout: exampleTestDialTimeout,

		AutoSyncInterval:      0,
		DialKeepAliveTime:     0,
		DialKeepAliveTimeout:  0,
		MaxCallSendMsgSize:    0,
		MaxCallRecvMsgSize:    0,
		TLS:                   nil,
		Username:              "",
		Password:              "",
		RejectOldCluster:      false,
		DialOptions:           nil,
		Context:               nil,
		Logger:                nil,
		LogConfig:             nil,
		PermitWithoutStream:   false,
		MaxUnaryRetries:       0,
		BackoffWaitBetween:    0,
		BackoffJitterFraction: 0,
	})
	if err != nil {
		tb.Fatalf("Failed to create etcd client: %v", err)
	}

	tb.Cleanup(func() { _ = client.Close() })

	driver := etcd.New(client)

	return driver, func() {}
}

func main() {
	// Create a testing context for the example.
	t := testingUtils.NewT()
	defer t.Cleanups()

	ctx := context.Background()

	driver, cleanup := createEtcdDriver(t)
	defer cleanup()

	key := []byte("/config/app/status")

	watchCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	eventCh, stopWatch, err := driver.Watch(watchCtx, key)
	if err != nil {
		log.Printf("Failed to start watch: %v", err)
		return
	}
	defer stopWatch()

	fmt.Println("Watching for changes on:", string(key))

	go func() {
		time.Sleep(100 * time.Millisecond)

		_, _ = driver.Execute(ctx, nil, []operation.Operation{
			operation.Put(key, []byte("running")),
		}, nil)
	}()

	select {
	case event := <-eventCh:
		fmt.Printf("Received watch event for key: %s\n", string(event.Prefix))
	case <-watchCtx.Done():
		fmt.Println("Watch context expired")
	}

}
Output:

Watching for changes on: /config/app/status
Received watch event for key: /config/app/status
Example (GracefulTermination)

ExampleDriver_Watch_gracefulTermination demonstrates graceful watch termination with manual control.

package main

import (
	"context"
	"fmt"
	"log"
	"testing"
	"time"

	etcdclient "go.etcd.io/etcd/client/v3"
	etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"

	etcdintegration "go.etcd.io/etcd/tests/v3/integration"

	"github.com/tarantool/go-storage/driver/etcd"
	testingUtils "github.com/tarantool/go-storage/internal/testing"
)

const exampleTestDialTimeout = 5 * time.Second

// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
	tb.Helper()

	etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())

	cluster := etcdintegration.NewLazyCluster()

	tb.Cleanup(func() { cluster.Terminate() })

	endpoints := cluster.EndpointsGRPC()

	client, err := etcdclient.New(etcdclient.Config{
		Endpoints:   endpoints,
		DialTimeout: exampleTestDialTimeout,

		AutoSyncInterval:      0,
		DialKeepAliveTime:     0,
		DialKeepAliveTimeout:  0,
		MaxCallSendMsgSize:    0,
		MaxCallRecvMsgSize:    0,
		TLS:                   nil,
		Username:              "",
		Password:              "",
		RejectOldCluster:      false,
		DialOptions:           nil,
		Context:               nil,
		Logger:                nil,
		LogConfig:             nil,
		PermitWithoutStream:   false,
		MaxUnaryRetries:       0,
		BackoffWaitBetween:    0,
		BackoffJitterFraction: 0,
	})
	if err != nil {
		tb.Fatalf("Failed to create etcd client: %v", err)
	}

	tb.Cleanup(func() { _ = client.Close() })

	driver := etcd.New(client)

	return driver, func() {}
}

func main() {
	// Create a testing context for the example.
	t := testingUtils.NewT()
	defer t.Cleanups()

	ctx := context.Background()

	driver, cleanup := createEtcdDriver(t)
	defer cleanup()

	key := []byte("/config/monitoring/metrics")

	watchCtx, cancel := context.WithCancel(ctx)

	eventCh, stopWatch, err := driver.Watch(watchCtx, key)
	if err != nil {
		log.Printf("Failed to start watch: %v", err)
		cancel()

		return
	}

	fmt.Println("Started watch with manual control")

	go func() {
		time.Sleep(100 * time.Millisecond)
		fmt.Println("Stopping watch gracefully...")
		stopWatch()
		cancel()
	}()

	for {
		select {
		case event, ok := <-eventCh:
			if !ok {
				return
			}

			fmt.Printf("Received event: %s\n", string(event.Prefix))
		case <-watchCtx.Done():
			return
		}
	}

}
Output:

Started watch with manual control
Stopping watch gracefully...
Example (MultipleOperations)

ExampleDriver_Watch_multipleOperations demonstrates watching for multiple operations on a key prefix.

package main

import (
	"context"
	"fmt"
	"log"
	"testing"
	"time"

	etcdclient "go.etcd.io/etcd/client/v3"
	etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"

	etcdintegration "go.etcd.io/etcd/tests/v3/integration"

	"github.com/tarantool/go-storage/driver/etcd"
	testingUtils "github.com/tarantool/go-storage/internal/testing"
	"github.com/tarantool/go-storage/operation"
)

const exampleTestDialTimeout = 5 * time.Second

// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
	tb.Helper()

	etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())

	cluster := etcdintegration.NewLazyCluster()

	tb.Cleanup(func() { cluster.Terminate() })

	endpoints := cluster.EndpointsGRPC()

	client, err := etcdclient.New(etcdclient.Config{
		Endpoints:   endpoints,
		DialTimeout: exampleTestDialTimeout,

		AutoSyncInterval:      0,
		DialKeepAliveTime:     0,
		DialKeepAliveTimeout:  0,
		MaxCallSendMsgSize:    0,
		MaxCallRecvMsgSize:    0,
		TLS:                   nil,
		Username:              "",
		Password:              "",
		RejectOldCluster:      false,
		DialOptions:           nil,
		Context:               nil,
		Logger:                nil,
		LogConfig:             nil,
		PermitWithoutStream:   false,
		MaxUnaryRetries:       0,
		BackoffWaitBetween:    0,
		BackoffJitterFraction: 0,
	})
	if err != nil {
		tb.Fatalf("Failed to create etcd client: %v", err)
	}

	tb.Cleanup(func() { _ = client.Close() })

	driver := etcd.New(client)

	return driver, func() {}
}

func main() {
	// Create a testing context for the example.
	t := testingUtils.NewT()
	defer t.Cleanups()

	ctx := context.Background()

	driver, cleanup := createEtcdDriver(t)
	defer cleanup()

	key := []byte("/config/database/")

	watchCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	eventCh, stopWatch, err := driver.Watch(watchCtx, key)
	if err != nil {
		log.Printf("Failed to start watch: %v", err)
		return
	}
	defer stopWatch()

	fmt.Println("Watching for changes on prefix:", string(key))

	go func() {
		_, _ = driver.Execute(ctx, nil, []operation.Operation{
			operation.Put([]byte("/config/database/host"), []byte("db1")),
		}, nil)

		_, _ = driver.Execute(ctx, nil, []operation.Operation{
			operation.Put([]byte("/config/database/port"), []byte("5432")),
		}, nil)

		_, _ = driver.Execute(ctx, nil, []operation.Operation{
			operation.Delete([]byte("/config/database/host")),
		}, nil)
	}()

	eventCount := 0
	for eventCount < 3 {
		select {
		case event := <-eventCh:
			fmt.Printf("Event %d: change detected on %s\n", eventCount+1, string(event.Prefix))

			eventCount++
		case <-watchCtx.Done():
			fmt.Println("Watch context expired")
			return
		}
	}

}
Output:

Watching for changes on prefix: /config/database/
Event 1: change detected on /config/database/
Event 2: change detected on /config/database/
Event 3: change detected on /config/database/

type Watcher

type Watcher interface {
	// Watch watches for changes on a key (using etcd's signature).
	Watch(ctx context.Context, key string, opts ...etcd.OpOption) etcd.WatchChan
	// Close closes the watcher.
	Close() error
}

Watcher defines the interface for watching etcd changes. This extends the etcd.Watcher interface to match our usage pattern.

type WatcherFactory

type WatcherFactory interface {
	// NewWatcher creates a new watcher.
	NewWatcher(client Client) Watcher
}

WatcherFactory creates new watchers from a client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL