Documentation
¶
Overview ¶
Package etcd provides an etcd implementation of the storage driver interface. It enables using etcd as a distributed key-value storage backend.
Index ¶
Examples ¶
- Driver.Execute (SimpleDelete)
- Driver.Execute (SimpleGet)
- Driver.Execute (SimpleMultiPut)
- Driver.Execute (SimplePut)
- Driver.Execute (With_predicates_multiplePredicates)
- Driver.Execute (With_predicates_valueBased)
- Driver.Execute (With_predicates_versionBased)
- Driver.Watch (Basic)
- Driver.Watch (GracefulTermination)
- Driver.Watch (MultipleOperations)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
Client defines the minimal interface needed for etcd operations. This allows for easier testing and mock implementations.
type Driver ¶
type Driver struct {
// contains filtered or unexported fields
}
Driver is an etcd implementation of the storage driver interface. It uses etcd as the underlying key-value storage backend.
func New ¶
New creates a new etcd driver instance using an existing etcd client. The client should be properly configured and connected to an etcd cluster.
func (Driver) Execute ¶
func (d Driver) Execute( ctx context.Context, predicates []predicate.Predicate, thenOps []operation.Operation, elseOps []operation.Operation, ) (tx.Response, error)
Execute executes a transactional operation with conditional logic. It processes predicates to determine whether to execute thenOps or elseOps.
Example (SimpleDelete) ¶
ExampleDriver_Execute_simpleDelete demonstrates a simple Delete operation with the etcd driver.
package main
import (
"context"
"fmt"
"log"
"testing"
"time"
etcdclient "go.etcd.io/etcd/client/v3"
etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"
etcdintegration "go.etcd.io/etcd/tests/v3/integration"
"github.com/tarantool/go-storage/driver/etcd"
testingUtils "github.com/tarantool/go-storage/internal/testing"
"github.com/tarantool/go-storage/operation"
)
const exampleTestDialTimeout = 5 * time.Second
// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
tb.Helper()
etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())
cluster := etcdintegration.NewLazyCluster()
tb.Cleanup(func() { cluster.Terminate() })
endpoints := cluster.EndpointsGRPC()
client, err := etcdclient.New(etcdclient.Config{
Endpoints: endpoints,
DialTimeout: exampleTestDialTimeout,
AutoSyncInterval: 0,
DialKeepAliveTime: 0,
DialKeepAliveTimeout: 0,
MaxCallSendMsgSize: 0,
MaxCallRecvMsgSize: 0,
TLS: nil,
Username: "",
Password: "",
RejectOldCluster: false,
DialOptions: nil,
Context: nil,
Logger: nil,
LogConfig: nil,
PermitWithoutStream: false,
MaxUnaryRetries: 0,
BackoffWaitBetween: 0,
BackoffJitterFraction: 0,
})
if err != nil {
tb.Fatalf("Failed to create etcd client: %v", err)
}
tb.Cleanup(func() { _ = client.Close() })
driver := etcd.New(client)
return driver, func() {}
}
func main() {
// Create a testing context for the example.
t := testingUtils.NewT()
defer t.Cleanups()
ctx := context.Background()
driver, cleanup := createEtcdDriver(t)
defer cleanup()
key := []byte("/config/app/version")
// First store a value.
_, _ = driver.Execute(ctx, nil, []operation.Operation{
operation.Put(key, []byte("1.0.0")),
}, nil)
// Then delete it.
_, err := driver.Execute(ctx, nil, []operation.Operation{
operation.Delete(key),
}, nil)
if err != nil {
log.Printf("Delete operation failed: %v", err)
return
}
fmt.Println("Successfully deleted key:", string(key))
}
Output: Successfully deleted key: /config/app/version
Example (SimpleGet) ¶
ExampleDriver_Execute_simpleGet demonstrates a simple Get operation with the etcd driver.
package main
import (
"context"
"fmt"
"log"
"testing"
"time"
etcdclient "go.etcd.io/etcd/client/v3"
etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"
etcdintegration "go.etcd.io/etcd/tests/v3/integration"
"github.com/tarantool/go-storage/driver/etcd"
testingUtils "github.com/tarantool/go-storage/internal/testing"
"github.com/tarantool/go-storage/operation"
)
const exampleTestDialTimeout = 5 * time.Second
// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
tb.Helper()
etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())
cluster := etcdintegration.NewLazyCluster()
tb.Cleanup(func() { cluster.Terminate() })
endpoints := cluster.EndpointsGRPC()
client, err := etcdclient.New(etcdclient.Config{
Endpoints: endpoints,
DialTimeout: exampleTestDialTimeout,
AutoSyncInterval: 0,
DialKeepAliveTime: 0,
DialKeepAliveTimeout: 0,
MaxCallSendMsgSize: 0,
MaxCallRecvMsgSize: 0,
TLS: nil,
Username: "",
Password: "",
RejectOldCluster: false,
DialOptions: nil,
Context: nil,
Logger: nil,
LogConfig: nil,
PermitWithoutStream: false,
MaxUnaryRetries: 0,
BackoffWaitBetween: 0,
BackoffJitterFraction: 0,
})
if err != nil {
tb.Fatalf("Failed to create etcd client: %v", err)
}
tb.Cleanup(func() { _ = client.Close() })
driver := etcd.New(client)
return driver, func() {}
}
func main() {
// Create a testing context for the example.
t := testingUtils.NewT()
defer t.Cleanups()
ctx := context.Background()
driver, cleanup := createEtcdDriver(t)
defer cleanup()
key := []byte("/config/app/version")
// First store a value.
_, _ = driver.Execute(ctx, nil, []operation.Operation{
operation.Put(key, []byte("1.0.0")),
}, nil)
// Then retrieve it.
response, err := driver.Execute(ctx, nil, []operation.Operation{
operation.Get(key),
}, nil)
if err != nil {
log.Printf("Get operation failed: %v", err)
return
}
if response.Succeeded && len(response.Results) > 0 {
if len(response.Results[0].Values) > 0 {
kv := response.Results[0].Values[0]
fmt.Printf("Retrieved key: %s, value: %s, version: %d\n",
string(kv.Key), string(kv.Value), kv.ModRevision)
}
}
}
Output: Retrieved key: /config/app/version, value: 1.0.0, version: 2
Example (SimpleMultiPut) ¶
ExampleDriver_Execute_simpleMultiPut demonstrates multiple Put operations in a single transaction with the etcd driver.
package main
import (
"context"
"fmt"
"log"
"testing"
"time"
etcdclient "go.etcd.io/etcd/client/v3"
etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"
etcdintegration "go.etcd.io/etcd/tests/v3/integration"
"github.com/tarantool/go-storage/driver/etcd"
testingUtils "github.com/tarantool/go-storage/internal/testing"
"github.com/tarantool/go-storage/operation"
)
const exampleTestDialTimeout = 5 * time.Second
// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
tb.Helper()
etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())
cluster := etcdintegration.NewLazyCluster()
tb.Cleanup(func() { cluster.Terminate() })
endpoints := cluster.EndpointsGRPC()
client, err := etcdclient.New(etcdclient.Config{
Endpoints: endpoints,
DialTimeout: exampleTestDialTimeout,
AutoSyncInterval: 0,
DialKeepAliveTime: 0,
DialKeepAliveTimeout: 0,
MaxCallSendMsgSize: 0,
MaxCallRecvMsgSize: 0,
TLS: nil,
Username: "",
Password: "",
RejectOldCluster: false,
DialOptions: nil,
Context: nil,
Logger: nil,
LogConfig: nil,
PermitWithoutStream: false,
MaxUnaryRetries: 0,
BackoffWaitBetween: 0,
BackoffJitterFraction: 0,
})
if err != nil {
tb.Fatalf("Failed to create etcd client: %v", err)
}
tb.Cleanup(func() { _ = client.Close() })
driver := etcd.New(client)
return driver, func() {}
}
func main() {
// Create a testing context for the example.
t := testingUtils.NewT()
defer t.Cleanups()
ctx := context.Background()
driver, cleanup := createEtcdDriver(t)
defer cleanup()
response, err := driver.Execute(ctx, nil, []operation.Operation{
operation.Put([]byte("/config/app/name"), []byte("MyApp")),
operation.Put([]byte("/config/app/environment"), []byte("production")),
}, nil)
if err != nil {
log.Printf("Multi-put operation failed: %v", err)
return
}
fmt.Println("Successfully stored", len(response.Results), "configuration items")
}
Output: Successfully stored 2 configuration items
Example (SimplePut) ¶
ExampleDriver_Execute_simplePut demonstrates a simple Put operation with the etcd driver.
package main
import (
"context"
"fmt"
"log"
"testing"
"time"
etcdclient "go.etcd.io/etcd/client/v3"
etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"
etcdintegration "go.etcd.io/etcd/tests/v3/integration"
"github.com/tarantool/go-storage/driver/etcd"
testingUtils "github.com/tarantool/go-storage/internal/testing"
"github.com/tarantool/go-storage/operation"
)
const exampleTestDialTimeout = 5 * time.Second
// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
tb.Helper()
etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())
cluster := etcdintegration.NewLazyCluster()
tb.Cleanup(func() { cluster.Terminate() })
endpoints := cluster.EndpointsGRPC()
client, err := etcdclient.New(etcdclient.Config{
Endpoints: endpoints,
DialTimeout: exampleTestDialTimeout,
AutoSyncInterval: 0,
DialKeepAliveTime: 0,
DialKeepAliveTimeout: 0,
MaxCallSendMsgSize: 0,
MaxCallRecvMsgSize: 0,
TLS: nil,
Username: "",
Password: "",
RejectOldCluster: false,
DialOptions: nil,
Context: nil,
Logger: nil,
LogConfig: nil,
PermitWithoutStream: false,
MaxUnaryRetries: 0,
BackoffWaitBetween: 0,
BackoffJitterFraction: 0,
})
if err != nil {
tb.Fatalf("Failed to create etcd client: %v", err)
}
tb.Cleanup(func() { _ = client.Close() })
driver := etcd.New(client)
return driver, func() {}
}
func main() {
// Create a testing context for the example.
t := testingUtils.NewT()
defer t.Cleanups()
ctx := context.Background()
driver, cleanup := createEtcdDriver(t)
defer cleanup()
key := []byte("/config/app/version")
value := []byte("1.0.0")
_, err := driver.Execute(ctx, nil, []operation.Operation{
operation.Put(key, value),
}, nil)
if err != nil {
log.Printf("Put operation failed: %v", err)
return
}
fmt.Println("Key", string(key), "stored with value:", string(value))
}
Output: Key /config/app/version stored with value: 1.0.0
Example (With_predicates_multiplePredicates) ¶
ExampleDriver_Execute_with_predicates_multiplePredicates demonstrates multiple predicates with Else operations.
package main
import (
"context"
"fmt"
"log"
"testing"
"time"
etcdclient "go.etcd.io/etcd/client/v3"
etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"
etcdintegration "go.etcd.io/etcd/tests/v3/integration"
"github.com/tarantool/go-storage/driver/etcd"
testingUtils "github.com/tarantool/go-storage/internal/testing"
"github.com/tarantool/go-storage/operation"
"github.com/tarantool/go-storage/predicate"
)
const exampleTestDialTimeout = 5 * time.Second
// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
tb.Helper()
etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())
cluster := etcdintegration.NewLazyCluster()
tb.Cleanup(func() { cluster.Terminate() })
endpoints := cluster.EndpointsGRPC()
client, err := etcdclient.New(etcdclient.Config{
Endpoints: endpoints,
DialTimeout: exampleTestDialTimeout,
AutoSyncInterval: 0,
DialKeepAliveTime: 0,
DialKeepAliveTimeout: 0,
MaxCallSendMsgSize: 0,
MaxCallRecvMsgSize: 0,
TLS: nil,
Username: "",
Password: "",
RejectOldCluster: false,
DialOptions: nil,
Context: nil,
Logger: nil,
LogConfig: nil,
PermitWithoutStream: false,
MaxUnaryRetries: 0,
BackoffWaitBetween: 0,
BackoffJitterFraction: 0,
})
if err != nil {
tb.Fatalf("Failed to create etcd client: %v", err)
}
tb.Cleanup(func() { _ = client.Close() })
driver := etcd.New(client)
return driver, func() {}
}
func main() {
// Create a testing context for the example.
t := testingUtils.NewT()
defer t.Cleanups()
ctx := context.Background()
driver, cleanup := createEtcdDriver(t)
defer cleanup()
key1 := []byte("/config/database/host")
key2 := []byte("/config/database/port")
_, _ = driver.Execute(ctx, nil, []operation.Operation{
operation.Put(key1, []byte("localhost")),
operation.Put(key2, []byte("5432")),
}, nil)
response, err := driver.Execute(ctx, []predicate.Predicate{
predicate.ValueEqual(key1, []byte("localhost")),
predicate.ValueEqual(key2, []byte("5432")),
}, []operation.Operation{
operation.Put(key1, []byte("new-host")),
operation.Put(key2, []byte("6432")),
}, []operation.Operation{
operation.Delete(key1),
operation.Delete(key2),
})
if err != nil {
log.Printf("Multi-predicate transaction failed: %v", err)
return
}
if response.Succeeded {
fmt.Println("Multi-predicate transaction succeeded - values were updated")
} else {
fmt.Println("Multi-predicate transaction failed - cleanup operations executed")
}
}
Output: Multi-predicate transaction succeeded - values were updated
Example (With_predicates_valueBased) ¶
ExampleDriver_Execute_with_predicates_valueBased demonstrates value-based conditional updates using predicates.
package main
import (
"context"
"fmt"
"log"
"testing"
"time"
etcdclient "go.etcd.io/etcd/client/v3"
etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"
etcdintegration "go.etcd.io/etcd/tests/v3/integration"
"github.com/tarantool/go-storage/driver/etcd"
testingUtils "github.com/tarantool/go-storage/internal/testing"
"github.com/tarantool/go-storage/operation"
"github.com/tarantool/go-storage/predicate"
)
const exampleTestDialTimeout = 5 * time.Second
// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
tb.Helper()
etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())
cluster := etcdintegration.NewLazyCluster()
tb.Cleanup(func() { cluster.Terminate() })
endpoints := cluster.EndpointsGRPC()
client, err := etcdclient.New(etcdclient.Config{
Endpoints: endpoints,
DialTimeout: exampleTestDialTimeout,
AutoSyncInterval: 0,
DialKeepAliveTime: 0,
DialKeepAliveTimeout: 0,
MaxCallSendMsgSize: 0,
MaxCallRecvMsgSize: 0,
TLS: nil,
Username: "",
Password: "",
RejectOldCluster: false,
DialOptions: nil,
Context: nil,
Logger: nil,
LogConfig: nil,
PermitWithoutStream: false,
MaxUnaryRetries: 0,
BackoffWaitBetween: 0,
BackoffJitterFraction: 0,
})
if err != nil {
tb.Fatalf("Failed to create etcd client: %v", err)
}
tb.Cleanup(func() { _ = client.Close() })
driver := etcd.New(client)
return driver, func() {}
}
func main() {
// Create a testing context for the example.
t := testingUtils.NewT()
defer t.Cleanups()
ctx := context.Background()
driver, cleanup := createEtcdDriver(t)
defer cleanup()
key := []byte("/config/app/settings")
currentValue := []byte("old-settings")
newValue := []byte("new-settings")
_, _ = driver.Execute(ctx, nil, []operation.Operation{
operation.Put(key, currentValue),
}, nil)
response, err := driver.Execute(ctx, []predicate.Predicate{
predicate.ValueEqual(key, []byte("old-settings")),
}, []operation.Operation{
operation.Put(key, newValue),
}, nil)
if err != nil {
log.Printf("Conditional update failed: %v", err)
return
}
if response.Succeeded {
fmt.Println("Conditional update succeeded - value was updated")
} else {
fmt.Println("Conditional update failed - value did not match")
}
}
Output: Conditional update succeeded - value was updated
Example (With_predicates_versionBased) ¶
ExampleDriver_Execute_with_predicates_versionBased demonstrates version-based conditional updates using predicates.
package main
import (
"context"
"fmt"
"log"
"testing"
"time"
etcdclient "go.etcd.io/etcd/client/v3"
etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"
etcdintegration "go.etcd.io/etcd/tests/v3/integration"
"github.com/tarantool/go-storage/driver/etcd"
testingUtils "github.com/tarantool/go-storage/internal/testing"
"github.com/tarantool/go-storage/operation"
"github.com/tarantool/go-storage/predicate"
)
const exampleTestDialTimeout = 5 * time.Second
// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
tb.Helper()
etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())
cluster := etcdintegration.NewLazyCluster()
tb.Cleanup(func() { cluster.Terminate() })
endpoints := cluster.EndpointsGRPC()
client, err := etcdclient.New(etcdclient.Config{
Endpoints: endpoints,
DialTimeout: exampleTestDialTimeout,
AutoSyncInterval: 0,
DialKeepAliveTime: 0,
DialKeepAliveTimeout: 0,
MaxCallSendMsgSize: 0,
MaxCallRecvMsgSize: 0,
TLS: nil,
Username: "",
Password: "",
RejectOldCluster: false,
DialOptions: nil,
Context: nil,
Logger: nil,
LogConfig: nil,
PermitWithoutStream: false,
MaxUnaryRetries: 0,
BackoffWaitBetween: 0,
BackoffJitterFraction: 0,
})
if err != nil {
tb.Fatalf("Failed to create etcd client: %v", err)
}
tb.Cleanup(func() { _ = client.Close() })
driver := etcd.New(client)
return driver, func() {}
}
func main() {
// Create a testing context for the example.
t := testingUtils.NewT()
defer t.Cleanups()
ctx := context.Background()
driver, cleanup := createEtcdDriver(t)
defer cleanup()
key := []byte("/config/app/feature")
value := []byte("enabled")
_, err := driver.Execute(ctx, nil, []operation.Operation{
operation.Put(key, value),
}, nil)
if err != nil {
log.Printf("Initial update failed: %v", err)
return
}
getResponse, err := driver.Execute(ctx, nil, []operation.Operation{
operation.Get(key),
}, nil)
if err != nil {
log.Printf("Get operation failed: %v", err)
return
}
var currentVersion int64
if len(getResponse.Results) > 0 && len(getResponse.Results[0].Values) > 0 {
currentVersion = getResponse.Results[0].Values[0].ModRevision
}
response, err := driver.Execute(ctx, []predicate.Predicate{
predicate.VersionEqual(key, currentVersion),
}, []operation.Operation{
operation.Put(key, []byte("disabled")),
}, nil)
if err != nil {
log.Printf("Version-based update failed: %v", err)
return
}
if response.Succeeded {
fmt.Println("Version-based update succeeded - no concurrent modification")
} else {
fmt.Println("Version-based update failed - version conflict detected")
}
}
Output: Version-based update succeeded - no concurrent modification
func (Driver) Watch ¶
func (d Driver) Watch(ctx context.Context, key []byte, _ ...watch.Option) (<-chan watch.Event, func(), error)
Watch monitors changes to a specific key and returns a stream of events. It supports optional watch configuration through the opts parameter.
Example (Basic) ¶
ExampleDriver_Watch_basic demonstrates basic watch on a single key.
package main
import (
"context"
"fmt"
"log"
"testing"
"time"
etcdclient "go.etcd.io/etcd/client/v3"
etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"
etcdintegration "go.etcd.io/etcd/tests/v3/integration"
"github.com/tarantool/go-storage/driver/etcd"
testingUtils "github.com/tarantool/go-storage/internal/testing"
"github.com/tarantool/go-storage/operation"
)
const exampleTestDialTimeout = 5 * time.Second
// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
tb.Helper()
etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())
cluster := etcdintegration.NewLazyCluster()
tb.Cleanup(func() { cluster.Terminate() })
endpoints := cluster.EndpointsGRPC()
client, err := etcdclient.New(etcdclient.Config{
Endpoints: endpoints,
DialTimeout: exampleTestDialTimeout,
AutoSyncInterval: 0,
DialKeepAliveTime: 0,
DialKeepAliveTimeout: 0,
MaxCallSendMsgSize: 0,
MaxCallRecvMsgSize: 0,
TLS: nil,
Username: "",
Password: "",
RejectOldCluster: false,
DialOptions: nil,
Context: nil,
Logger: nil,
LogConfig: nil,
PermitWithoutStream: false,
MaxUnaryRetries: 0,
BackoffWaitBetween: 0,
BackoffJitterFraction: 0,
})
if err != nil {
tb.Fatalf("Failed to create etcd client: %v", err)
}
tb.Cleanup(func() { _ = client.Close() })
driver := etcd.New(client)
return driver, func() {}
}
func main() {
// Create a testing context for the example.
t := testingUtils.NewT()
defer t.Cleanups()
ctx := context.Background()
driver, cleanup := createEtcdDriver(t)
defer cleanup()
key := []byte("/config/app/status")
watchCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
eventCh, stopWatch, err := driver.Watch(watchCtx, key)
if err != nil {
log.Printf("Failed to start watch: %v", err)
return
}
defer stopWatch()
fmt.Println("Watching for changes on:", string(key))
go func() {
time.Sleep(100 * time.Millisecond)
_, _ = driver.Execute(ctx, nil, []operation.Operation{
operation.Put(key, []byte("running")),
}, nil)
}()
select {
case event := <-eventCh:
fmt.Printf("Received watch event for key: %s\n", string(event.Prefix))
case <-watchCtx.Done():
fmt.Println("Watch context expired")
}
}
Output: Watching for changes on: /config/app/status Received watch event for key: /config/app/status
Example (GracefulTermination) ¶
ExampleDriver_Watch_gracefulTermination demonstrates graceful watch termination with manual control.
package main
import (
"context"
"fmt"
"log"
"testing"
"time"
etcdclient "go.etcd.io/etcd/client/v3"
etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"
etcdintegration "go.etcd.io/etcd/tests/v3/integration"
"github.com/tarantool/go-storage/driver/etcd"
testingUtils "github.com/tarantool/go-storage/internal/testing"
)
const exampleTestDialTimeout = 5 * time.Second
// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
tb.Helper()
etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())
cluster := etcdintegration.NewLazyCluster()
tb.Cleanup(func() { cluster.Terminate() })
endpoints := cluster.EndpointsGRPC()
client, err := etcdclient.New(etcdclient.Config{
Endpoints: endpoints,
DialTimeout: exampleTestDialTimeout,
AutoSyncInterval: 0,
DialKeepAliveTime: 0,
DialKeepAliveTimeout: 0,
MaxCallSendMsgSize: 0,
MaxCallRecvMsgSize: 0,
TLS: nil,
Username: "",
Password: "",
RejectOldCluster: false,
DialOptions: nil,
Context: nil,
Logger: nil,
LogConfig: nil,
PermitWithoutStream: false,
MaxUnaryRetries: 0,
BackoffWaitBetween: 0,
BackoffJitterFraction: 0,
})
if err != nil {
tb.Fatalf("Failed to create etcd client: %v", err)
}
tb.Cleanup(func() { _ = client.Close() })
driver := etcd.New(client)
return driver, func() {}
}
func main() {
// Create a testing context for the example.
t := testingUtils.NewT()
defer t.Cleanups()
ctx := context.Background()
driver, cleanup := createEtcdDriver(t)
defer cleanup()
key := []byte("/config/monitoring/metrics")
watchCtx, cancel := context.WithCancel(ctx)
eventCh, stopWatch, err := driver.Watch(watchCtx, key)
if err != nil {
log.Printf("Failed to start watch: %v", err)
cancel()
return
}
fmt.Println("Started watch with manual control")
go func() {
time.Sleep(100 * time.Millisecond)
fmt.Println("Stopping watch gracefully...")
stopWatch()
cancel()
}()
for {
select {
case event, ok := <-eventCh:
if !ok {
return
}
fmt.Printf("Received event: %s\n", string(event.Prefix))
case <-watchCtx.Done():
return
}
}
}
Output: Started watch with manual control Stopping watch gracefully...
Example (MultipleOperations) ¶
ExampleDriver_Watch_multipleOperations demonstrates watching for multiple operations on a key prefix.
package main
import (
"context"
"fmt"
"log"
"testing"
"time"
etcdclient "go.etcd.io/etcd/client/v3"
etcdfintegration "go.etcd.io/etcd/tests/v3/framework/integration"
etcdintegration "go.etcd.io/etcd/tests/v3/integration"
"github.com/tarantool/go-storage/driver/etcd"
testingUtils "github.com/tarantool/go-storage/internal/testing"
"github.com/tarantool/go-storage/operation"
)
const exampleTestDialTimeout = 5 * time.Second
// createEtcdDriver creates an etcd driver for examples using the integration framework.
// Returns driver and cleanup function.
func createEtcdDriver(tb testing.TB) (*etcd.Driver, func()) {
tb.Helper()
etcdfintegration.BeforeTest(tb, etcdfintegration.WithoutGoLeakDetection())
cluster := etcdintegration.NewLazyCluster()
tb.Cleanup(func() { cluster.Terminate() })
endpoints := cluster.EndpointsGRPC()
client, err := etcdclient.New(etcdclient.Config{
Endpoints: endpoints,
DialTimeout: exampleTestDialTimeout,
AutoSyncInterval: 0,
DialKeepAliveTime: 0,
DialKeepAliveTimeout: 0,
MaxCallSendMsgSize: 0,
MaxCallRecvMsgSize: 0,
TLS: nil,
Username: "",
Password: "",
RejectOldCluster: false,
DialOptions: nil,
Context: nil,
Logger: nil,
LogConfig: nil,
PermitWithoutStream: false,
MaxUnaryRetries: 0,
BackoffWaitBetween: 0,
BackoffJitterFraction: 0,
})
if err != nil {
tb.Fatalf("Failed to create etcd client: %v", err)
}
tb.Cleanup(func() { _ = client.Close() })
driver := etcd.New(client)
return driver, func() {}
}
func main() {
// Create a testing context for the example.
t := testingUtils.NewT()
defer t.Cleanups()
ctx := context.Background()
driver, cleanup := createEtcdDriver(t)
defer cleanup()
key := []byte("/config/database/")
watchCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
eventCh, stopWatch, err := driver.Watch(watchCtx, key)
if err != nil {
log.Printf("Failed to start watch: %v", err)
return
}
defer stopWatch()
fmt.Println("Watching for changes on prefix:", string(key))
go func() {
_, _ = driver.Execute(ctx, nil, []operation.Operation{
operation.Put([]byte("/config/database/host"), []byte("db1")),
}, nil)
_, _ = driver.Execute(ctx, nil, []operation.Operation{
operation.Put([]byte("/config/database/port"), []byte("5432")),
}, nil)
_, _ = driver.Execute(ctx, nil, []operation.Operation{
operation.Delete([]byte("/config/database/host")),
}, nil)
}()
eventCount := 0
for eventCount < 3 {
select {
case event := <-eventCh:
fmt.Printf("Event %d: change detected on %s\n", eventCount+1, string(event.Prefix))
eventCount++
case <-watchCtx.Done():
fmt.Println("Watch context expired")
return
}
}
}
Output: Watching for changes on prefix: /config/database/ Event 1: change detected on /config/database/ Event 2: change detected on /config/database/ Event 3: change detected on /config/database/
type Watcher ¶
type Watcher interface {
// Watch watches for changes on a key (using etcd's signature).
Watch(ctx context.Context, key string, opts ...etcd.OpOption) etcd.WatchChan
// Close closes the watcher.
Close() error
}
Watcher defines the interface for watching etcd changes. This extends the etcd.Watcher interface to match our usage pattern.
type WatcherFactory ¶
type WatcherFactory interface {
// NewWatcher creates a new watcher.
NewWatcher(client Client) Watcher
}
WatcherFactory creates new watchers from a client.