tcs

package
v0.0.0-...-c36b4a8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2025 License: BSD-2-Clause Imports: 12 Imported by: 0

Documentation

Overview

Package tcs provides a Tarantool config storage driver implementation. It enables using Tarantool as a distributed key-value storage backend.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrUnknownOperator is returned when the operator is unknown.
	ErrUnknownOperator = errors.New("unknown operator")
	// ErrUnknownTarget is returned when the target is unknown.
	ErrUnknownTarget = errors.New("unknown target")
)
View Source
var (

	// ErrUnexpectedResponse is returned when the response from tarantool has unexpected format.
	ErrUnexpectedResponse = errors.New("unexpected response from tarantool")
)
View Source
var (
	// ErrUnknownOperation is returned when the operation is unknown.
	ErrUnknownOperation = errors.New("unknown operation")
)

Error definitions for err113 compliance.

Functions

func NewOperationEncodingError

func NewOperationEncodingError(text string, err error) error

NewOperationEncodingError returns a new operation encoding error.

func NewPredicateEncodingError

func NewPredicateEncodingError(text string, err error) error

NewPredicateEncodingError returns a new predicate encoding error.

func NewTxnOpResponseDecodingError

func NewTxnOpResponseDecodingError(err error) error

NewTxnOpResponseDecodingError returns a new txnOpResponse decoding error.

Types

type DecodingError

type DecodingError struct {
	ObjectType string
	Text       string
	Err        error
}

DecodingError represents an error that occurs during decoding operations.

func (DecodingError) Error

func (e DecodingError) Error() string

Error returns the error message.

func (DecodingError) Unwrap

func (e DecodingError) Unwrap() error

type DoerWatcher

type DoerWatcher interface {
	Do(req tarantool.Request) (fut *tarantool.Future)
	NewWatcher(key string, callback tarantool.WatchCallback) (tarantool.Watcher, error)
}

DoerWatcher is an interface that combines tarantool.Doer and NewWatcher method. tarantool.Connection and pool.ConnectionAdapter implement this interface.

type Driver

type Driver struct {
	// contains filtered or unexported fields
}

Driver is a Tarantool implementation of the storage driver interface. It uses TCS as the underlying key-value storage backend.

func New

func New(doer DoerWatcher) *Driver

New creates a new Tarantool driver instance. It establishes connections to Tarantool instances using the provided addresses.

func (Driver) Execute

func (d Driver) Execute(
	ctx context.Context,
	predicates []goPredicate.Predicate,
	thenOps []goOperation.Operation,
	elseOps []goOperation.Operation,
) (tx.Response, error)

Execute executes a transactional operation with conditional logic. It processes predicates to determine whether to execute thenOps or elseOps.

Example (Simple)

ExampleExecuteBasicOperations demonstrates basic Execute operations with the TCS driver. This example shows Put, Get, and Delete operations without predicates.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/tarantool/go-storage/driver/tcs"
	gsTesting "github.com/tarantool/go-storage/internal/testing"
	"github.com/tarantool/go-storage/operation"
)

func newResponse(success bool, data [][]any) *gsTesting.MockResponse {
	return gsTesting.NewMockResponse(gsTesting.NewT(), []any{
		map[string]any{
			"data": map[string]any{
				"responses":  data,
				"is_success": success,
			},
			"revision": 1000,
		},
	})
}

func createTCSDriverExecuteSimple() *tcs.Driver {
	mock := gsTesting.NewMockDoer(gsTesting.NewT(),

		newResponse(true, [][]any{{}}),
		newResponse(true, [][]any{
			{
				map[string]any{
					"path":         []byte("/config/app/version"),
					"value":        []byte("1.0.0"),
					"mod_revision": 1000,
				},
			},
		}),
		newResponse(true, [][]any{{}}),
		newResponse(true, [][]any{{}, {}}),
	)

	return tcs.New(gsTesting.NewMockDoerWithWatcher(mock, nil))
}

func main() {
	ctx := context.Background()

	driver := createTCSDriverExecuteSimple()

	// Example 1: Simple Put operation.
	{
		key := []byte("/config/app/version")
		value := []byte("1.0.0")

		_, err := driver.Execute(ctx, nil, []operation.Operation{
			operation.Put(key, value),
		}, nil)
		if err != nil {
			log.Printf("Put operation failed: %v", err)
			return
		}

		fmt.Println("Key", string(key), "stored with value:", string(value))
	}

	// Example 2: Simple Get operation.
	{
		key := []byte("/config/app/version")

		response, err := driver.Execute(ctx, nil, []operation.Operation{
			operation.Get(key),
		}, nil)
		if err != nil {
			log.Printf("Get operation failed: %v", err)
			return
		}

		if response.Succeeded && len(response.Results) > 0 {
			if len(response.Results[0].Values) > 0 {
				kv := response.Results[0].Values[0]
				fmt.Printf("Retrieved key: %s, value: %s, version: %d\n",
					string(kv.Key), string(kv.Value), kv.ModRevision)
			}
		}
	}

	// Example 3: Simple Delete operation.
	{
		key := []byte("/config/app/version")

		_, err := driver.Execute(ctx, nil, []operation.Operation{
			operation.Delete(key),
		}, nil)
		if err != nil {
			log.Printf("Delete operation failed: %v", err)
			return
		}

		fmt.Println("Successfully deleted key:", string(key))
	}

	// Example 4: Multiple operations in single transaction.
	{
		response, err := driver.Execute(ctx, nil, []operation.Operation{
			operation.Put([]byte("/config/app/name"), []byte("MyApp")),
			operation.Put([]byte("/config/app/environment"), []byte("production")),
		}, nil)
		if err != nil {
			log.Printf("Multi-put operation failed: %v", err)
			return
		}

		fmt.Println("Successfully stored", len(response.Results), "configuration items")
	}

}
Output:

Key /config/app/version stored with value: 1.0.0
Retrieved key: /config/app/version, value: 1.0.0, version: 1000
Successfully deleted key: /config/app/version
Successfully stored 2 configuration items
Example (With_predicates)

ExampleDriver_Execute_WithPredicates demonstrates conditional Execute operations using predicates. This example shows how to use value and version predicates for conditional execution.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/tarantool/go-storage/driver/tcs"
	gsTesting "github.com/tarantool/go-storage/internal/testing"
	"github.com/tarantool/go-storage/operation"
	"github.com/tarantool/go-storage/predicate"
)

func newResponse(success bool, data [][]any) *gsTesting.MockResponse {
	return gsTesting.NewMockResponse(gsTesting.NewT(), []any{
		map[string]any{
			"data": map[string]any{
				"responses":  data,
				"is_success": success,
			},
			"revision": 1000,
		},
	})
}

func createTCSDriverExecuteWithPredicates() *tcs.Driver {
	mock := gsTesting.NewMockDoer(gsTesting.NewT(),

		newResponse(true, [][]any{{}}),
		newResponse(true, [][]any{{}}),
		newResponse(true, [][]any{
			{
				map[string]any{
					"path":         []byte("/config/app/feature"),
					"value":        []byte("enabled"),
					"mod_revision": 1000,
				},
			},
		}),
		newResponse(true, [][]any{{}}),
		newResponse(true, [][]any{{}, {}}),
		newResponse(true, [][]any{{}, {}}),
		newResponse(false, [][]any{{}, {}}),
	)

	return tcs.New(gsTesting.NewMockDoerWithWatcher(mock, nil))
}

func main() {
	ctx := context.Background()

	driver := createTCSDriverExecuteWithPredicates()

	// Example 1: Value-based conditional update.
	{
		key := []byte("/config/app/settings")
		currentValue := []byte("old-settings")
		newValue := []byte("new-settings")

		_, _ = driver.Execute(ctx, nil, []operation.Operation{
			operation.Put(key, currentValue),
		}, nil)

		response, err := driver.Execute(ctx, []predicate.Predicate{
			predicate.ValueEqual(key, "old-settings"),
		}, []operation.Operation{
			operation.Put(key, newValue),
		}, nil)
		if err != nil {
			log.Printf("Conditional update failed: %v", err)
			return
		}

		if response.Succeeded {
			fmt.Println("Conditional update succeeded - value was updated")
		} else {
			fmt.Println("Conditional update failed - value did not match")
		}
	}

	// Example 2: Version-based conditional update.
	{
		key := []byte("/config/app/feature")
		value := []byte("enabled")

		_, err := driver.Execute(ctx, nil, []operation.Operation{
			operation.Put(key, value),
		}, nil)
		if err != nil {
			log.Printf("Initial update failed: %v", err)
			return
		}

		getResponse, err := driver.Execute(ctx, nil, []operation.Operation{
			operation.Get(key),
		}, nil)
		if err != nil {
			log.Printf("Get operation failed: %v", err)
			return
		}

		var currentVersion int64
		if len(getResponse.Results) > 0 && len(getResponse.Results[0].Values) > 0 {
			currentVersion = getResponse.Results[0].Values[0].ModRevision
		}

		response, err := driver.Execute(ctx, []predicate.Predicate{
			predicate.VersionEqual(key, currentVersion),
		}, []operation.Operation{
			operation.Put(key, []byte("disabled")),
		}, nil)
		if err != nil {
			log.Printf("Version-based update failed: %v", err)
			return
		}

		if response.Succeeded {
			fmt.Println("Version-based update succeeded - no concurrent modification")
		} else {
			fmt.Println("Version-based update failed - version conflict detected")
		}
	}

	// Example 3: Multiple predicates with Else operations.
	{
		key1 := []byte("/config/database/host")
		key2 := []byte("/config/database/port")

		_, _ = driver.Execute(ctx, nil, []operation.Operation{
			operation.Put(key1, []byte("localhost")),
			operation.Put(key2, []byte("5432")),
		}, nil)

		response, err := driver.Execute(ctx, []predicate.Predicate{
			predicate.ValueEqual(key1, "localhost"),
			predicate.ValueEqual(key2, "5432"),
		}, []operation.Operation{
			operation.Put(key1, []byte("new-host")),
			operation.Put(key2, []byte("6432")),
		}, []operation.Operation{
			operation.Delete(key1),
			operation.Delete(key2),
		})
		if err != nil {
			log.Printf("Multi-predicate transaction failed: %v", err)
			return
		}

		if response.Succeeded {
			fmt.Println("Multi-predicate transaction succeeded - values were updated")
		} else {
			fmt.Println("Multi-predicate transaction failed - cleanup operations executed")
		}
	}

}
Output:

Conditional update succeeded - value was updated
Version-based update succeeded - no concurrent modification
Multi-predicate transaction failed - cleanup operations executed

func (Driver) Watch

func (d Driver) Watch(ctx context.Context, key []byte, _ ...watch.Option) (<-chan watch.Event, func(), error)

Watch monitors changes to a specific key and returns a stream of events. It supports optional watch configuration through the opts parameter. To watch for config storage key "config.storage:" prefix should be used.

Example

ExampleWatchOperations demonstrates how to use Watch for real-time change notifications. This example shows watching individual keys and handling watch events.

ctx := context.Background()

driver := createTCSDriverWatch()

// Example 1: Basic watch on a single key.
{
	key := []byte("/config/app/status")

	watchCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	eventCh, stopWatch, err := driver.Watch(watchCtx, key)
	if err != nil {
		log.Printf("Failed to start watch: %v", err)
		return
	}
	defer stopWatch()

	fmt.Println("Watching for changes on:", string(key))

	go func() {
		time.Sleep(100 * time.Millisecond)

		_, _ = driver.Execute(ctx, nil, []operation.Operation{
			operation.Put(key, []byte("running")),
		}, nil)
	}()

	select {
	case event := <-eventCh:
		fmt.Printf("Received watch event for key: %s\n", string(event.Prefix))
	case <-watchCtx.Done():
		fmt.Println("Watch context expired")
	}
}

// Example 2: Watch with multiple operations.
{
	key := []byte("/config/database/")

	watchCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	eventCh, stopWatch, err := driver.Watch(watchCtx, key)
	if err != nil {
		log.Printf("Failed to start watch: %v", err)
		return
	}
	defer stopWatch()

	fmt.Println("Watching for changes on prefix:", string(key))

	go func() {
		time.Sleep(100 * time.Millisecond)

		_, _ = driver.Execute(ctx, nil, []operation.Operation{
			operation.Put([]byte("/config/database/host"), []byte("db1")),
		}, nil)

		time.Sleep(200 * time.Millisecond)

		_, _ = driver.Execute(ctx, nil, []operation.Operation{
			operation.Put([]byte("/config/database/port"), []byte("5432")),
		}, nil)

		time.Sleep(300 * time.Millisecond)

		_, _ = driver.Execute(ctx, nil, []operation.Operation{
			operation.Delete([]byte("/config/database/host")),
		}, nil)
	}()

	eventCount := 0
	for eventCount < 3 {
		select {
		case event := <-eventCh:
			fmt.Printf("Event %d: change detected on %s\n", eventCount+1, string(event.Prefix))

			eventCount++
		case <-watchCtx.Done():
			fmt.Println("Watch context expired")
			return
		}
	}
}

// Example 3: Graceful watch termination.
{
	key := []byte("/config/monitoring/metrics")

	watchCtx, cancel := context.WithCancel(ctx)

	eventCh, stopWatch, err := driver.Watch(watchCtx, key)
	if err != nil {
		log.Printf("Failed to start watch: %v", err)
		cancel()

		return
	}

	fmt.Println("Started watch with manual control")

	go func() {
		time.Sleep(100 * time.Millisecond)
		fmt.Println("Stopping watch gracefully...")
		stopWatch()
		cancel()
	}()

	for {
		select {
		case event, ok := <-eventCh:
			if !ok {
				return
			}

			fmt.Printf("Received event: %s\n", string(event.Prefix))
		case <-watchCtx.Done():
			return
		}
	}
}
Output:

Watching for changes on: /config/app/status
Received watch event for key: /config/app/status
Watching for changes on prefix: /config/database/
Event 1: change detected on /config/database/
Event 2: change detected on /config/database/
Event 3: change detected on /config/database/
Started watch with manual control
Stopping watch gracefully...

type EncodingError

type EncodingError struct {
	ObjectType string
	Text       string
	Err        error
}

EncodingError represents an error that occurs during encoding operations.

func (EncodingError) Error

func (e EncodingError) Error() string

Error returns the error message.

func (EncodingError) Unwrap

func (e EncodingError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL