dkv

package module
v0.1.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2025 License: MIT Imports: 19 Imported by: 0

README

dkv

Библиотека для развертывания и взаимодействия с узлами распределенного KV-хранилища.

Основные классы

Classes

  • LocalKVRepo - интерфейс локального хранилища. К нему будут проходить обращения, когда текущий узел будет для ключа держателем или репликой.

    • Библиотека предоставляет стандартную реализацию этого интерфейса - персистентное хранилище на основе файловой системы. Под капотом используется badger.
  • RemoteKVRepo - интерфейс, обобщающий обращения к другим узлам системы. Будет использоваться, когда текущий узел не будет являться держателем ключа.

    • Библиотека предоставляет стандартную реализацию этого интерфейса - обертку над HTTP клиентом для обращения к соответствующим контроллерам на других узлах.
  • Controller - интерфейс, обобщающий сущность, ответственную запрослушивание обращений извне к узлу.

    • Реализация по умолчанию - HTTP контроллер.
  • Merger - Интерфейс, обобщающий механизм слияния старой и новой версии ключ-значения.

    • Реализация по умолчанию - новая версия затирает старую.
  • Discovery - интерфейс клиента сервиса типа Discovery. Подразумеваетналичие такого сервиса и его постоянную доступность. Реализации по умолчанию быть не может, но как вариант решения предлагается использовать клиента horockey/service_discovery.

  • Hashring - сущность, описывающая хэш-кольцо - основную сущность алгоритма консистентного хэширования (почитать можно ТУТ и ТУТ). Для непоредствено хэширования используется HashFunc - функция оговоренной сигнатуры. По умолчанию это функция-оберкта над MD5 хэшированием.

  • Processor - центральная сущность библиотеки, содержащая в себе бизнес-логику ее работы. Принимает обращения от контроллера и клиента, обращается к нижестоящим адаптерам, обеспечивает запись реплик, транзакционность операций, при необходимости осуществляет их откаты.

  • Client - Сущность - обертка над процессором. Обеспечивает старт процессора и контроллера, предоставляет возможность разработчику взаимодейтвовать с системой.

Пример использования

package main

import (
 "context"
 "errors"
 "fmt"
 "math/rand/v2"
 "net/http"
 "os"
 "os/signal"
 "strconv"
 "sync"
 "syscall"
 "time"

 "github.com/google/uuid"
 "github.com/horockey/dkv"
 "github.com/horockey/dkv_monkey_service/internal/model"
 serdisc "github.com/horockey/service_discovery/api"
 "github.com/rs/zerolog"
)

const TotalStorageCap = 1_000_000

func main() {
 serv := http.Server{
  Addr: "0.0.0.0:80",
 }

 logger := zerolog.New(zerolog.ConsoleWriter{
  Out:        os.Stdout,
  TimeFormat: time.RFC3339,
 }).With().Timestamp().Logger()

 disc, err := serdisc.NewClient(
  "dkv_monkey_service",
  "http://discovery:6500",
  "ak",
  &serv,
  logger.
   With().
   Str("scope", "serdisc_client").
   Logger(),
 )
 if err != nil {
  logger.
   Fatal().
   Err(fmt.Errorf("creating serdisc client: %w", err)).
   Send()
 }

 hostname, _ := os.Hostname()
 cl, err := dkv.NewClient(
  "dkv_ak",
  hostname,
  &model.DiscoveryImpl{Cl: disc},
  dkv.WithServicePort[model.Value](7000),
  dkv.WithLogger[model.Value](
   logger.
    With().
    Str("scope", "dkv_client").
    Logger(),
  ),
 )
 if err != nil {
  logger.
   Fatal().
   Err(fmt.Errorf("creating dkv client: %w", err)).
   Send()
 }

 ctx, cancel := signal.NotifyContext(
  context.Background(),
  syscall.SIGTERM,
  syscall.SIGABRT,
  syscall.SIGINT,
  syscall.SIGQUIT,
  syscall.SIGKILL,
 )
 defer cancel()

 var wg sync.WaitGroup

 wg.Add(1)
 go func() {
  defer wg.Done()
  if err := serv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
   logger.
    Error().
    Err(fmt.Errorf("running http_server")).
    Send()
  }
 }()

 wg.Add(1)
 go func() {
  defer wg.Done()

  if err := cl.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
   logger.
    Error().
    Err(fmt.Errorf("running dkv client: %w", err)).
    Send()
   cancel()
  }
 }()

 wg.Add(1)
 go func() {
  defer wg.Done()

  time.Sleep(time.Second * 5)

  for {
   select {
   case <-ctx.Done():
    return
   default:
    idx := rand.IntN(TotalStorageCap)
    key := "monkey_" + strconv.Itoa(idx)
    value := model.Value{
     Foo: uuid.NewString(),
     Bar: uuid.NewString(),
    }
    action := rand.IntN(2)
    switch action {
    case 0:
     // write
     if err := cl.AddOrUpdate(ctx, key, value); err != nil {
      logger.Error().Err(fmt.Errorf("writing to client: %w", err)).Send()
     }
    case 1:
     // read
     if _, err := cl.Get(ctx, key); err != nil && !errors.Is(err, dkv.ErrKeyNotFoundError{Key: key}) {
      logger.Error().Err(fmt.Errorf("reading from client: %w", err)).Send()
     }
    }

    time.Sleep(time.Millisecond * 100)
   }
  }
 }()

 logger.Info().Msg("Service started")

 <-ctx.Done()
 _ = serv.Close()
 wg.Wait()

 logger.Info().Msg("Service stopped")
}

Диаграмма последовательности

Seq

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithBadgerDir

func WithBadgerDir[V any](dir string) options.Option[createClientParams[V]]

Sets custom badger root dir. Default is ./badger

func WithHashFunc

func WithHashFunc[V any](hf hashringx.HashFunc) options.Option[createClientParams[V]]

Sets custom hashring func. Default is md5.

func WithLocalRepo

func WithLocalRepo[V any](
	repo local_kv_pairs.Repository[V],
) options.Option[createClientParams[V]]

Sets user-defined implementation of local repository. Default is badger persistent repo.

WARNING! Apply this opt only if you know what you are doing.

func WithLocalRepoTombstonesTTL

func WithLocalRepoTombstonesTTL[V any](ttl time.Duration) options.Option[createClientParams[V]]

Sets custom tombstone TTL. Default is 1 day.

func WithLogger

func WithLogger[V any](l zerolog.Logger) options.Option[createClientParams[V]]

Sets custom logger. Default is stdout logger.

func WithMerger

func WithMerger[V any](m Merger[V]) options.Option[createClientParams[V]]

Sets custom merger. By default newest data overrites old one.

func WithNodeWeight

func WithNodeWeight[V any](w uint) options.Option[createClientParams[V]]

Sets custom node weight to report to discovery. Default is 1.

func WithRemoteRepoAndController

func WithRemoteRepoAndController[V any](
	repo remote_kv_pairs.Gateway[V],
	ctrl Controller[V],
) options.Option[createClientParams[V]]

Sets user-defined implementations of remote repo and coresponding controller. Default are HTTP.

WARNING! Apply this opt only if you know what you are doing.

func WithReplicasCount

func WithReplicasCount[V any](r uint8) options.Option[createClientParams[V]]

Sets custom write replicas count. Default is 2.

func WithRevertTimeout

func WithRevertTimeout[V any](to time.Duration) options.Option[createClientParams[V]]

Sets custom timeout for transaction revert. Default is 10s.

func WithServicePort

func WithServicePort[V any](p int) options.Option[createClientParams[V]]

Sets custom service port. Default is 7000.

Types

type Client

type Client[V any] struct {
	*processor.Processor[V]
	// contains filtered or unexported fields
}

func NewClient

func NewClient[V any](
	apiKey string,
	hostname string,
	discovery Discovery,
	opts ...options.Option[createClientParams[V]],
) (*Client[V], error)

func (*Client[V]) AddOrUpdate added in v0.1.12

func (cl *Client[V]) AddOrUpdate(ctx context.Context, key string, value V) error

func (*Client[V]) Metrics

func (cl *Client[V]) Metrics() []prometheus.Collector

func (*Client[V]) Remove added in v0.1.12

func (cl *Client[V]) Remove(ctx context.Context, key string) error

func (*Client[V]) Start

func (cl *Client[V]) Start(ctx context.Context) error

type Controller

type Controller[V any] interface {
	model.MetricsProvider
	Start(ctx context.Context, proc *Processor[V]) error
}

type Discovery

type Discovery = model.Discovery

type ErrKeyNotFoundError

type ErrKeyNotFoundError = model.KeyNotFoundError

type KvTooOldError

type KvTooOldError = model.KvTooOldError

type Merger

type Merger[V any] = model.Merger[V]

type Node

type Node = model.Node

type Processor

type Processor[V any] = processor.Processor[V]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL