datasvc

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Overview

Package datasvc contains interfaces and helpers for interacting with the ServiceRadar data service.

Package datasvc is a generated GoMock package.

Package datasvc implements the gRPC server for the data service (KV + object store).

Index

Constants

This section is empty.

Variables

View Source
var ErrCASMismatch = errors.New("kv: compare-and-swap mismatch")

ErrCASMismatch indicates a compare-and-swap failure due to a stale revision.

View Source
var ErrKeyExists = errors.New("kv: key already exists")

ErrKeyExists indicates that a create/put-if-absent operation found an existing value.

View Source
var ErrObjectNotFound = errors.New("kv: object not found")

ErrObjectNotFound indicates that a requested object does not exist.

View Source
var (
	// ErrStatusNotReceivedByCore is returned when Core does not acknowledge status receipt.
	ErrStatusNotReceivedByCore = errors.New("status not received by Core")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	ListenAddr    string                 `json:"listen_addr"`
	NATSURL       string                 `json:"nats_url"`
	NATSCredsFile string                 `json:"nats_creds_file,omitempty"`
	Security      *models.SecurityConfig `json:"security"`
	NATSSecurity  *models.SecurityConfig `json:"nats_security"`
	RBAC          struct {
		Roles []RBACRule `json:"roles"`
	} `json:"rbac"`
	Bucket           string            `json:"bucket,omitempty"`            // KV bucket name
	Domain           string            `json:"domain,omitempty"`            // Optional JetStream domain
	ObjectBucket     string            `json:"object_bucket,omitempty"`     // JetStream object store bucket name
	BucketMaxBytes   int64             `json:"bucket_max_bytes,omitempty"`  // Hard cap for bucket size (bytes)
	BucketTTL        models.Duration   `json:"bucket_ttl,omitempty"`        // TTL for entries (0 = no expiry)
	BucketHistory    uint32            `json:"bucket_history,omitempty"`    // History depth per key
	CoreRegistration *CoreRegistration `json:"core_registration,omitempty"` // Core service registration settings

	// NATSOperator configures the NATS account management service for namespace isolation.
	// When configured, datasvc will expose the NATSAccountService gRPC endpoint.
	NATSOperator *accounts.OperatorConfig `json:"nats_operator,omitempty"`
}

Config holds the configuration for the KV service.

func (*Config) Validate

func (c *Config) Validate() error

Validate ensures the configuration is valid.

type CoreRegistration

type CoreRegistration struct {
	Enabled           bool            `json:"enabled"`                      // Enable registration with Core
	CoreEndpoint      string          `json:"core_endpoint"`                // Core gRPC endpoint
	InstanceID        string          `json:"instance_id"`                  // Unique instance ID
	HeartbeatInterval models.Duration `json:"heartbeat_interval,omitempty"` // Heartbeat interval (default: 30s)
}

CoreRegistration contains settings for registering this datasvc with Core.

type Entry

type Entry struct {
	Value    []byte
	Revision uint64
	Found    bool
}

Entry captures the value, revision, and presence metadata for a key lookup.

type KVStore

type KVStore interface {
	// Get retrieves the value associated with the given key.
	// Returns the value as a byte slice, a boolean indicating if the key was found, and an error if the operation fails.
	Get(ctx context.Context, key string) ([]byte, bool, error)

	// GetEntry retrieves the full metadata for the key, including revision numbers.
	GetEntry(ctx context.Context, key string) (Entry, error)

	// Put stores a value under the given key with an optional TTL (time-to-live).
	// If ttl is zero, the value persists indefinitely (or until explicitly deleted, depending on the backend).
	Put(ctx context.Context, key string, value []byte, ttl time.Duration) error

	// PutIfAbsent stores a value only if the key does not already exist.
	// Returns an error if the key exists. TTL semantics mirror Put.
	PutIfAbsent(ctx context.Context, key string, value []byte, ttl time.Duration) error

	// PutMany stores multiple key/value pairs in a single operation.
	// The ttl parameter applies to all entries.
	PutMany(ctx context.Context, entries []KeyValueEntry, ttl time.Duration) error

	// Update performs a compare-and-swap write using the provided revision.
	Update(ctx context.Context, key string, value []byte, revision uint64, ttl time.Duration) (uint64, error)

	// Delete removes the key and its associated value from the store.
	Delete(ctx context.Context, key string) error

	// Watch monitors the specified key for changes and sends updates through a channel.
	// The channel receives the new value (or nil if deleted) whenever the key is modified.
	// The returned channel is closed when the context is canceled or the KV store is closed.
	Watch(ctx context.Context, key string) (<-chan []byte, error)

	// ListKeys returns all keys matching the given prefix filter.
	// If prefix is empty, all keys are returned.
	ListKeys(ctx context.Context, prefix string) ([]string, error)

	// PutObject streams an object payload into the JetStream object store.
	PutObject(ctx context.Context, key string, reader io.Reader, meta ObjectMetadata) (*ObjectInfo, error)

	// GetObject retrieves an object payload from the JetStream object store.
	GetObject(ctx context.Context, key string) (io.ReadCloser, *ObjectInfo, error)

	// DeleteObject removes an object from the JetStream object store.
	DeleteObject(ctx context.Context, key string) error

	// GetObjectInfo returns object metadata without downloading payload data.
	GetObjectInfo(ctx context.Context, key string) (*ObjectInfo, bool, error)

	// Close shuts down the KV store, releasing any resources (e.g., connections).
	Close() error
}

KVStore defines the interface for a key-value store used in ServiceRadar configuration management.

type KeyValueEntry

type KeyValueEntry struct {
	Key   string
	Value []byte
}

KeyValueEntry represents a key-value update with metadata (used internally by NATSStore).

type MockKVStore

type MockKVStore struct {
	// contains filtered or unexported fields
}

MockKVStore is a mock of KVStore interface.

func NewMockKVStore

func NewMockKVStore(ctrl *gomock.Controller) *MockKVStore

NewMockKVStore creates a new mock instance.

func (*MockKVStore) Close

func (m *MockKVStore) Close() error

Close mocks base method.

func (*MockKVStore) Delete

func (m *MockKVStore) Delete(arg0 context.Context, arg1 string) error

Delete mocks base method.

func (*MockKVStore) DeleteObject

func (m *MockKVStore) DeleteObject(arg0 context.Context, arg1 string) error

DeleteObject mocks base method.

func (*MockKVStore) EXPECT

func (m *MockKVStore) EXPECT() *MockKVStoreMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockKVStore) Get

func (m *MockKVStore) Get(arg0 context.Context, arg1 string) ([]byte, bool, error)

Get mocks base method.

func (*MockKVStore) GetEntry

func (m *MockKVStore) GetEntry(arg0 context.Context, arg1 string) (Entry, error)

GetEntry mocks base method.

func (*MockKVStore) GetObject

func (m *MockKVStore) GetObject(arg0 context.Context, arg1 string) (io.ReadCloser, *ObjectInfo, error)

GetObject mocks base method.

func (*MockKVStore) GetObjectInfo

func (m *MockKVStore) GetObjectInfo(arg0 context.Context, arg1 string) (*ObjectInfo, bool, error)

GetObjectInfo mocks base method.

func (*MockKVStore) ListKeys added in v1.0.69

func (m *MockKVStore) ListKeys(arg0 context.Context, arg1 string) ([]string, error)

ListKeys mocks base method.

func (*MockKVStore) Put

func (m *MockKVStore) Put(arg0 context.Context, arg1 string, arg2 []byte, arg3 time.Duration) error

Put mocks base method.

func (*MockKVStore) PutIfAbsent

func (m *MockKVStore) PutIfAbsent(arg0 context.Context, arg1 string, arg2 []byte, arg3 time.Duration) error

PutIfAbsent mocks base method.

func (*MockKVStore) PutMany

func (m *MockKVStore) PutMany(arg0 context.Context, arg1 []KeyValueEntry, arg2 time.Duration) error

PutMany mocks base method.

func (*MockKVStore) PutObject

func (m *MockKVStore) PutObject(arg0 context.Context, arg1 string, arg2 io.Reader, arg3 ObjectMetadata) (*ObjectInfo, error)

PutObject mocks base method.

func (*MockKVStore) Update

func (m *MockKVStore) Update(arg0 context.Context, arg1 string, arg2 []byte, arg3 uint64, arg4 time.Duration) (uint64, error)

Update mocks base method.

func (*MockKVStore) Watch

func (m *MockKVStore) Watch(arg0 context.Context, arg1 string) (<-chan []byte, error)

Watch mocks base method.

type MockKVStoreMockRecorder

type MockKVStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockKVStoreMockRecorder is the mock recorder for MockKVStore.

func (*MockKVStoreMockRecorder) Close

func (mr *MockKVStoreMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close.

func (*MockKVStoreMockRecorder) Delete

func (mr *MockKVStoreMockRecorder) Delete(arg0, arg1 any) *gomock.Call

Delete indicates an expected call of Delete.

func (*MockKVStoreMockRecorder) DeleteObject

func (mr *MockKVStoreMockRecorder) DeleteObject(arg0, arg1 any) *gomock.Call

DeleteObject indicates an expected call of DeleteObject.

func (*MockKVStoreMockRecorder) Get

func (mr *MockKVStoreMockRecorder) Get(arg0, arg1 any) *gomock.Call

Get indicates an expected call of Get.

func (*MockKVStoreMockRecorder) GetEntry

func (mr *MockKVStoreMockRecorder) GetEntry(arg0, arg1 any) *gomock.Call

GetEntry indicates an expected call of GetEntry.

func (*MockKVStoreMockRecorder) GetObject

func (mr *MockKVStoreMockRecorder) GetObject(arg0, arg1 any) *gomock.Call

GetObject indicates an expected call of GetObject.

func (*MockKVStoreMockRecorder) GetObjectInfo

func (mr *MockKVStoreMockRecorder) GetObjectInfo(arg0, arg1 any) *gomock.Call

GetObjectInfo indicates an expected call of GetObjectInfo.

func (*MockKVStoreMockRecorder) ListKeys added in v1.0.69

func (mr *MockKVStoreMockRecorder) ListKeys(arg0, arg1 any) *gomock.Call

ListKeys indicates an expected call of ListKeys.

func (*MockKVStoreMockRecorder) Put

func (mr *MockKVStoreMockRecorder) Put(arg0, arg1, arg2, arg3 any) *gomock.Call

Put indicates an expected call of Put.

func (*MockKVStoreMockRecorder) PutIfAbsent

func (mr *MockKVStoreMockRecorder) PutIfAbsent(arg0, arg1, arg2, arg3 any) *gomock.Call

PutIfAbsent indicates an expected call of PutIfAbsent.

func (*MockKVStoreMockRecorder) PutMany

func (mr *MockKVStoreMockRecorder) PutMany(arg0, arg1, arg2 any) *gomock.Call

PutMany indicates an expected call of PutMany.

func (*MockKVStoreMockRecorder) PutObject

func (mr *MockKVStoreMockRecorder) PutObject(arg0, arg1, arg2, arg3 any) *gomock.Call

PutObject indicates an expected call of PutObject.

func (*MockKVStoreMockRecorder) Update

func (mr *MockKVStoreMockRecorder) Update(arg0, arg1, arg2, arg3, arg4 any) *gomock.Call

Update indicates an expected call of Update.

func (*MockKVStoreMockRecorder) Watch

func (mr *MockKVStoreMockRecorder) Watch(arg0, arg1 any) *gomock.Call

Watch indicates an expected call of Watch.

type NATSAccountServer added in v1.0.79

type NATSAccountServer struct {
	proto.UnimplementedNATSAccountServiceServer
	// contains filtered or unexported fields
}

NATSAccountServer implements the NATSAccountService gRPC interface. This is a stateless service that performs NATS JWT/NKeys cryptographic operations. Account state (seeds, JWTs) is stored by the caller (Elixir/CNPG with AshCloak).

func NewNATSAccountServer added in v1.0.79

func NewNATSAccountServer(operator *accounts.Operator) *NATSAccountServer

NewNATSAccountServer creates a new NATSAccountServer with the given operator. The server is stateless - it only holds the operator key for signing operations. If operator is nil, the server will start in uninitialized state and require bootstrap.

func (*NATSAccountServer) BootstrapOperator added in v1.0.79

BootstrapOperator initializes the NATS operator for the platform. This can either generate a new operator key pair or import an existing seed. Should be called once during initial platform setup.

func (*NATSAccountServer) CreateAccount added in v1.0.79

CreateAccount generates new account NKeys and a signed account JWT. The returned account_seed should be stored encrypted by the caller (Elixir/AshCloak).

func (*NATSAccountServer) GenerateUserCredentials added in v1.0.79

GenerateUserCredentials creates NATS user credentials for an account. Requires the account_seed (from Elixir storage) to sign the user JWT.

func (*NATSAccountServer) GetOperatorInfo added in v1.0.79

GetOperatorInfo returns the current operator status and public key. Used to verify the operator is initialized before account operations.

func (*NATSAccountServer) PushAccountJWT added in v1.0.79

PushAccountJWT pushes an account JWT to the NATS resolver via $SYS. This makes the account immediately available without NATS restart.

func (*NATSAccountServer) SetAllowedClientIdentities added in v1.0.79

func (s *NATSAccountServer) SetAllowedClientIdentities(identities []string)

SetAllowedClientIdentities configures which mTLS identities may call this service.

func (*NATSAccountServer) SetNATSStore added in v1.0.79

func (s *NATSAccountServer) SetNATSStore(store *NATSStore)

SetNATSStore sets the NATS store for JWT push operations. Must be called before PushAccountJWT can work.

func (*NATSAccountServer) SetResolverClient added in v1.0.79

func (s *NATSAccountServer) SetResolverClient(natsURL string, security *models.SecurityConfig, credsFile string)

SetResolverClient configures how account JWTs are pushed to the NATS resolver. credsFile should point to a system-account .creds file authorized for $SYS updates.

func (*NATSAccountServer) SetResolverPaths added in v1.0.79

func (s *NATSAccountServer) SetResolverPaths(operatorConfigPath, resolverBasePath string)

SetResolverPaths configures the paths for file-based JWT resolver. operatorConfigPath: where to write operator.conf for NATS to include resolverBasePath: base directory for account JWT files

func (*NATSAccountServer) SignAccountJWT added in v1.0.79

SignAccountJWT regenerates an account JWT with updated claims. Use this when revocations or limits change. Requires account_seed from Elixir storage.

func (*NATSAccountServer) WriteAccountJWT added in v1.0.79

func (s *NATSAccountServer) WriteAccountJWT(accountPublicKey, accountJWT string) error

WriteAccountJWT writes an account JWT to the file-based resolver directory. This allows the NATS server to pick up new accounts without restart.

func (*NATSAccountServer) WriteOperatorConfig added in v1.0.79

func (s *NATSAccountServer) WriteOperatorConfig() error

WriteOperatorConfig writes the operator configuration file for NATS. This includes the operator JWT and system account configuration. Must be called after BootstrapOperator to enable JWT-based account resolution.

type NATSStore

type NATSStore struct {
	// contains filtered or unexported fields
}

func NewNATSStore

func NewNATSStore(ctx context.Context, cfg *Config) (*NATSStore, error)

func (*NATSStore) Close

func (n *NATSStore) Close() error

func (*NATSStore) Create

func (n *NATSStore) Create(ctx context.Context, key string, value []byte, _ time.Duration) error

Create stores a key-value pair only if it doesn't already exist.

func (*NATSStore) Delete

func (n *NATSStore) Delete(ctx context.Context, key string) error

func (*NATSStore) DeleteObject

func (n *NATSStore) DeleteObject(ctx context.Context, key string) error

func (*NATSStore) Get

func (n *NATSStore) Get(ctx context.Context, key string) ([]byte, bool, error)

func (*NATSStore) GetEntry

func (n *NATSStore) GetEntry(ctx context.Context, key string) (Entry, error)

GetEntry retrieves the value and revision metadata for a given key.

func (*NATSStore) GetObject

func (n *NATSStore) GetObject(ctx context.Context, key string) (io.ReadCloser, *ObjectInfo, error)

func (*NATSStore) GetObjectInfo

func (n *NATSStore) GetObjectInfo(ctx context.Context, key string) (*ObjectInfo, bool, error)

func (*NATSStore) ListKeys added in v1.0.69

func (n *NATSStore) ListKeys(ctx context.Context, prefix string) ([]string, error)

ListKeys returns all keys matching the given prefix filter. If prefix is empty, all keys are returned.

func (*NATSStore) Put

func (n *NATSStore) Put(ctx context.Context, key string, value []byte, _ time.Duration) error

Put stores a key-value pair in the NATS key-value store. It accepts a context, key, value, and TTL. The TTL is not used in this implementation, as it is handled at the bucket level.

func (*NATSStore) PutIfAbsent

func (n *NATSStore) PutIfAbsent(ctx context.Context, key string, value []byte, ttl time.Duration) error

PutIfAbsent stores a key-value pair only if it doesn't already exist.

func (*NATSStore) PutMany

func (n *NATSStore) PutMany(ctx context.Context, entries []KeyValueEntry, _ time.Duration) error

PutMany stores multiple key/value pairs. TTL is ignored in this implementation.

func (*NATSStore) PutObject

func (n *NATSStore) PutObject(ctx context.Context, key string, reader io.Reader, meta ObjectMetadata) (*ObjectInfo, error)

func (*NATSStore) Update

func (n *NATSStore) Update(ctx context.Context, key string, value []byte, revision uint64, _ time.Duration) (uint64, error)

Update performs a compare-and-swap using JetStream revisions.

func (*NATSStore) Watch

func (n *NATSStore) Watch(ctx context.Context, key string) (<-chan []byte, error)

type ObjectInfo

type ObjectInfo struct {
	Key            string
	Domain         string
	SHA256         string
	Size           int64
	CreatedAtUnix  int64
	ModifiedAtUnix int64
	Chunks         uint64
	Metadata       ObjectMetadata
}

ObjectInfo reflects server-observed metadata for stored objects.

type ObjectMetadata

type ObjectMetadata struct {
	Domain      string
	ContentType string
	Compression string
	SHA256      string
	TotalSize   int64
	Attributes  map[string]string
}

ObjectMetadata captures descriptive attributes for JetStream objects.

type RBACRule

type RBACRule struct {
	Identity string `json:"identity"`
	Role     Role   `json:"role"`
}

RBACRule maps a client identity to a role.

type Role

type Role string

Role defines a role in the RBAC system.

const (
	RoleReader Role = "reader"
	RoleWriter Role = "writer"
)

type Server

type Server struct {
	proto.UnimplementedKVServiceServer
	proto.UnimplementedDataServiceServer
	// contains filtered or unexported fields
}

Server implements the KVService gRPC interface and lifecycle.Service.

func NewServer

func NewServer(ctx context.Context, cfg *Config) (*Server, error)

func (*Server) BatchGet

BatchGet implements the BatchGet RPC.

func (*Server) Delete

Delete implements the Delete RPC.

func (*Server) DeleteObject

DeleteObject removes an object from the store.

func (*Server) DownloadObject

DownloadObject streams an object to the caller.

func (*Server) Get

func (s *Server) Get(ctx context.Context, req *proto.GetRequest) (*proto.GetResponse, error)

Get implements the Get RPC.

func (*Server) GetObjectInfo

GetObjectInfo retrieves metadata describing an object.

func (*Server) Info

Info implements the Info RPC (domain and bucket for introspection).

func (*Server) ListKeys added in v1.0.69

ListKeys implements the ListKeys RPC to return keys matching a prefix filter.

func (*Server) Put

func (s *Server) Put(ctx context.Context, req *proto.PutRequest) (*proto.PutResponse, error)

Put implements the Put RPC.

func (*Server) PutIfAbsent

func (s *Server) PutIfAbsent(ctx context.Context, req *proto.PutRequest) (*proto.PutResponse, error)

PutIfAbsent implements the PutIfAbsent RPC.

func (*Server) PutMany

PutMany implements the PutMany RPC.

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

func (*Server) StartCoreRegistration

func (s *Server) StartCoreRegistration(ctx context.Context, cfg *CoreRegistration, listenAddr string, security *models.SecurityConfig)

StartCoreRegistration starts the background goroutine that registers this datasvc instance with Core and sends periodic heartbeats.

func (*Server) Stop

func (s *Server) Stop(_ context.Context) error

func (*Server) Store

func (s *Server) Store() KVStore

func (*Server) Update

Update implements the Update (CAS) RPC.

func (*Server) UploadObject

func (s *Server) UploadObject(stream proto.DataService_UploadObjectServer) error

UploadObject implements the client-streaming object upload RPC.

func (*Server) Watch

func (s *Server) Watch(req *proto.WatchRequest, stream proto.KVService_WatchServer) error

Watch implements the Watch RPC.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL