cdb

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: BSD-2-Clause Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var CommandExpiredError = ErrCommandExpired //nolint:errname

Deprecated: Use ErrCommandExpired instead.

View Source
var CommandObjectSkippedError = ErrCommandObjectSkipped //nolint:errname

Deprecated: Use ErrCommandObjectSkipped instead.

View Source
var ErrCommandExpired = stderrors.New("command expired")
View Source
var ErrCommandObjectSkipped = stderrors.New("commandObject skipped")

ErrCommandObjectSkipped allow to skip the command execution without raising an error.

View Source
var ErrUnsupportedOperation = stderrors.New("unsupported operation")
View Source
var SchemaIDV1 = SchemaID{
	Group:   "cdb",
	Kind:    "schema",
	Version: "v1",
}
View Source
var UnsupportedOperationError = ErrUnsupportedOperation //nolint:errname

Deprecated: Use ErrUnsupportedOperation instead.

Functions

func BuildTopic

func BuildTopic(schemaID SchemaID, branch base.Branch, suffix string) libkafka.Topic

BuildTopic constructs a Kafka topic name from schema ID, branch, and suffix.

func CreateCommandMessageHandlerBatch

func CreateCommandMessageHandlerBatch(
	db libkv.DB,
	syncProducer libkafka.SyncProducer,
	schemaID SchemaID,
	ignoreUnsupported bool,
	branch base.Branch,
	commandExpireDuration time.Duration,
	commandObjectExecutors ...CommandObjectExecutorTx,
) libkafka.MessageHandlerBatch

func CreateK8sClientset

func CreateK8sClientset(ctx context.Context, kubeconfig string) (versioned.Interface, error)

func NewCommandObjectMessageHandler

func NewCommandObjectMessageHandler(
	schemaID SchemaID,
	commandObjectHandler CommandObjectHandler,
	commandExpireDuration time.Duration,
) libkafka.MessageHandler

func NewCommandObjectMessageHandlerTx

func NewCommandObjectMessageHandlerTx(
	schemaID SchemaID,
	commandObjectHandler CommandObjectHandlerTx,
	commandExpireDuration time.Duration,
) libkafka.MessageHandlerTx

func NewSchemaMessageHandler

func NewSchemaMessageHandler(
	schemaHandler SchemaHandlerTx,
	logSamplerFactory log.SamplerFactory,
) libkafka.MessageHandlerTx

func RunCommandConsumer

func RunCommandConsumer(
	saramaClientProvider libkafka.SaramaClientProvider,
	syncProducer libkafka.SyncProducer,
	schemaID SchemaID,
	kafkaGroup libkafka.Group,
	batchSize libkafka.BatchSize,
	branch base.Branch,
	ignoreUnsupported bool,
	commandExpireDuration time.Duration,
	trigger run.Trigger,
	commandObjectExecutors CommandObjectExecutors,
	options ...func(*libkafka.ConsumerOptions),
) run.Func

func RunCommandConsumerDefault

func RunCommandConsumerDefault(
	saramaClientProvider libkafka.SaramaClientProvider,
	syncProducer libkafka.SyncProducer,
	kafkaGroup libkafka.Group,
	schemaID SchemaID,
	branch base.Branch,
	ignoreUnsupported bool,
	commandObjectExecutors CommandObjectExecutors,
	options ...func(*libkafka.ConsumerOptions),
) run.Func

func RunCommandConsumerTx

func RunCommandConsumerTx(
	saramaClientProvider libkafka.SaramaClientProvider,
	syncProducer libkafka.SyncProducer,
	db libkv.DB,
	schemaID SchemaID,
	batchSize libkafka.BatchSize,
	branch base.Branch,
	ignoreUnsupported bool,
	commandExpireDuration time.Duration,
	trigger run.Trigger,
	commandObjectExecutors CommandObjectExecutorTxs,
	options ...func(*libkafka.ConsumerOptions),
) run.Func

func RunCommandConsumerTxDefault

func RunCommandConsumerTxDefault(
	saramaClientProvider libkafka.SaramaClientProvider,
	syncProducer libkafka.SyncProducer,
	db libkv.DB,
	schemaID SchemaID,
	branch base.Branch,
	ignoreUnsupported bool,
	commandObjectExecutors CommandObjectExecutorTxs,
	options ...func(*libkafka.ConsumerOptions),
) run.Func

func RunCommandConsumerTxWithOffsetManager

func RunCommandConsumerTxWithOffsetManager(
	saramaClientProvider libkafka.SaramaClientProvider,
	syncProducer libkafka.SyncProducer,
	db libkv.DB,
	schemaID SchemaID,
	batchSize libkafka.BatchSize,
	branch base.Branch,
	ignoreUnsupported bool,
	commandExpireDuration time.Duration,
	trigger run.Trigger,
	commandObjectExecutors CommandObjectExecutorTxs,
	offsetManager libkafka.OffsetManager,
	options ...func(*libkafka.ConsumerOptions),
) run.Func

func RunResultConsumer

func RunResultConsumer(
	saramaClientProvider libkafka.SaramaClientProvider,
	db libkv.DB,
	schemaID SchemaID,
	branch base.Branch,
	batchSize libkafka.BatchSize,
	trigger run.Fire,
	logSamplerFactory log.SamplerFactory,
	resultHandler base.ResultHandler,
) run.Func

func RunResultConsumerDefault

func RunResultConsumerDefault(
	saramaClientProvider libkafka.SaramaClientProvider,
	db libkv.DB,
	schemaID SchemaID,
	branch base.Branch,
	resultHandler base.ResultHandler,
) run.Func

func RunResultConsumerLog

func RunResultConsumerLog(
	saramaClientProvider libkafka.SaramaClientProvider,
	db libkv.DB,
	schemaID SchemaID,
	branch base.Branch,
) run.Func

func RunResultConsumerTx

func RunResultConsumerTx(
	saramaClientProvider libkafka.SaramaClientProvider,
	db libkv.DB,
	schemaID SchemaID,
	branch base.Branch,
	batchSize libkafka.BatchSize,
	trigger run.Fire,
	logSamplerFactory log.SamplerFactory,
	resultHandlerTx base.ResultHandlerTx,
) run.Func

func RunResultConsumerTxDefault

func RunResultConsumerTxDefault(
	saramaClientProvider libkafka.SaramaClientProvider,
	db libkv.DB,
	schemaID SchemaID,
	branch base.Branch,
	resultHandlerTx base.ResultHandlerTx,
) run.Func

func RunSchemaConsumer

func RunSchemaConsumer(
	saramaClient libkafka.SaramaClient,
	db libkv.DB,
	branch base.Branch,
	batchSize libkafka.BatchSize,
	trigger run.Fire,
) run.Func

Types

type CommandObject

type CommandObject struct {
	Command  base.Command
	SchemaID SchemaID
}

func (CommandObject) Ptr

func (c CommandObject) Ptr() *CommandObject

func (CommandObject) Validate

func (c CommandObject) Validate(ctx context.Context) error

type CommandObjectExecutor

type CommandObjectExecutor interface {
	CommandOperation() base.CommandOperation
	HandleCommand(
		ctx context.Context,
		commandObject CommandObject,
	) (*base.EventID, base.Event, error)
	// SendResultEnabled enables a sending of a result after handling command
	SendResultEnabled() bool
}

func CommandObjectExecutorFunc

func CommandObjectExecutorFunc(
	commandOperation base.CommandOperation,
	sendResultEnabled bool,
	handleCommand func(ctx context.Context, commandObject CommandObject) (*base.EventID, base.Event, error),
) CommandObjectExecutor

func NewCommandObjectExecutorMetrics

func NewCommandObjectExecutorMetrics(
	commandObjectExecutor CommandObjectExecutor,
	schemaID SchemaID,
) CommandObjectExecutor

func NewCommandObjectExecutorResultSender

func NewCommandObjectExecutorResultSender(
	commandObjectExecutor CommandObjectExecutor,
	resultObjectSender ResultObjectSender,
	logSamplerFactory log.SamplerFactory,
) CommandObjectExecutor

type CommandObjectExecutorTx

type CommandObjectExecutorTx interface {
	CommandOperation() base.CommandOperation
	HandleCommand(
		ctx context.Context,
		tx libkv.Tx,
		commandObject CommandObject,
	) (*base.EventID, base.Event, error)
	// SendResultEnabled enables a sending of a result after handling command
	SendResultEnabled() bool
}

func CommandObjectExecutorTxFunc

func CommandObjectExecutorTxFunc(
	commandOperation base.CommandOperation,
	sendResultEnabled bool,
	handleCommand HandleCommandFunc,
) CommandObjectExecutorTx

func NewCommandObjectExecutorTxMetrics

func NewCommandObjectExecutorTxMetrics(
	commandObjectExecutor CommandObjectExecutorTx,
	schemaID SchemaID,
) CommandObjectExecutorTx

func NewCommandObjectExecutorTxResultSender

func NewCommandObjectExecutorTxResultSender(
	commandObjectExecutor CommandObjectExecutorTx,
	resultObjectSender ResultObjectSender,
	logSamplerFactory log.SamplerFactory,
) CommandObjectExecutorTx

type CommandObjectExecutorTxs

type CommandObjectExecutorTxs []CommandObjectExecutorTx

func WrapCommandObjectExecutorTxs

func WrapCommandObjectExecutorTxs(
	resultObjectSender ResultObjectSender,
	commandObjectExecutors CommandObjectExecutorTxs,
	schemaID SchemaID,
	logSamplerFactory log.SamplerFactory,
) CommandObjectExecutorTxs

func (CommandObjectExecutorTxs) Find

type CommandObjectExecutors

type CommandObjectExecutors []CommandObjectExecutor

func WrapCommandObjectExecutors

func WrapCommandObjectExecutors(
	resultObjectSender ResultObjectSender,
	commandObjectExecutors CommandObjectExecutors,
	schemaID SchemaID,
	logSamplerFactory log.SamplerFactory,
) CommandObjectExecutors

func (CommandObjectExecutors) Find

type CommandObjectFilter

type CommandObjectFilter interface {
	// Filtered return true if commandObject should be filter out
	Filtered(ctx context.Context, commandObject CommandObject) (bool, error)
}

type CommandObjectFilterFunc

type CommandObjectFilterFunc func(ctx context.Context, commandObject CommandObject) (bool, error)

func (CommandObjectFilterFunc) Filtered

func (a CommandObjectFilterFunc) Filtered(
	ctx context.Context,
	commandObject CommandObject,
) (bool, error)

type CommandObjectFilterList

type CommandObjectFilterList []CommandObjectFilter

func (CommandObjectFilterList) Filtered

func (a CommandObjectFilterList) Filtered(
	ctx context.Context,
	commandObject CommandObject,
) (bool, error)

type CommandObjectFilterTx

type CommandObjectFilterTx interface {
	// Filtered return true if commandObject should be filter out
	Filtered(ctx context.Context, tx libkv.Tx, commandObject CommandObject) (bool, error)
}

type CommandObjectFilterTxFunc

type CommandObjectFilterTxFunc func(ctx context.Context, tx libkv.Tx, commandObject CommandObject) (bool, error)

func (CommandObjectFilterTxFunc) Filtered

func (a CommandObjectFilterTxFunc) Filtered(
	ctx context.Context,
	tx libkv.Tx,
	commandObject CommandObject,
) (bool, error)

type CommandObjectFilterTxList

type CommandObjectFilterTxList []CommandObjectFilterTx

func (CommandObjectFilterTxList) Filtered

func (a CommandObjectFilterTxList) Filtered(
	ctx context.Context,
	tx libkv.Tx,
	commandObject CommandObject,
) (bool, error)

type CommandObjectHandler

type CommandObjectHandler interface {
	Handle(ctx context.Context, commandObject CommandObject) error
}

func NewCommandObjectHandler

func NewCommandObjectHandler(
	ignoreUnsupported bool,
	commandObjectExecutors ...CommandObjectExecutor,
) CommandObjectHandler

func NewCommandObjectHandlerFilter

func NewCommandObjectHandlerFilter(
	commandObjectFilter CommandObjectFilter,
	commandObjectHandler CommandObjectHandler,
) CommandObjectHandler

NewCommandObjectHandlerFilter remove filter

type CommandObjectHandlerFunc

type CommandObjectHandlerFunc func(ctx context.Context, commandObject CommandObject) error

func (CommandObjectHandlerFunc) Handle

func (o CommandObjectHandlerFunc) Handle(ctx context.Context, commandObject CommandObject) error

type CommandObjectHandlerList

type CommandObjectHandlerList []CommandObjectHandler

func (CommandObjectHandlerList) Handle

func (o CommandObjectHandlerList) Handle(ctx context.Context, commandObject CommandObject) error

type CommandObjectHandlerTx

type CommandObjectHandlerTx interface {
	Handle(ctx context.Context, tx libkv.Tx, commandObject CommandObject) error
}

func NewCommandObjectHandlerTx

func NewCommandObjectHandlerTx(
	ignoreUnsupported bool,
	commandObjectExecutors ...CommandObjectExecutorTx,
) CommandObjectHandlerTx

func NewCommandObjectHandlerTxFilter

func NewCommandObjectHandlerTxFilter(
	commandObjectFilter CommandObjectFilterTx,
	commandObjectHandler CommandObjectHandlerTx,
) CommandObjectHandlerTx

NewCommandObjectHandlerTxFilter remove filter

type CommandObjectHandlerTxFunc

type CommandObjectHandlerTxFunc func(ctx context.Context, tx libkv.Tx, commandObject CommandObject) error

func (CommandObjectHandlerTxFunc) Handle

func (o CommandObjectHandlerTxFunc) Handle(
	ctx context.Context,
	tx libkv.Tx,
	commandObject CommandObject,
) error

type CommandObjectHandlerTxList

type CommandObjectHandlerTxList []CommandObjectHandlerTx

func (CommandObjectHandlerTxList) Handle

func (o CommandObjectHandlerTxList) Handle(
	ctx context.Context,
	tx libkv.Tx,
	commandObject CommandObject,
) error

type CommandObjectSender

type CommandObjectSender interface {
	SendCommandObject(ctx context.Context, commandObject CommandObject) error
	SendCommandObjects(ctx context.Context, commandObjects CommandObjects) error
}

CommandObjectSender allow send commands

func NewCommandObjectSender

func NewCommandObjectSender(
	syncProducer libkafka.SyncProducer,
	branch base.Branch,
	logSamplerFactory log.SamplerFactory,
) CommandObjectSender

type CommandObjects

type CommandObjects []CommandObject

type EventObject

type EventObject struct {
	Event    base.Event
	ID       base.EventID
	SchemaID SchemaID
}

func (EventObject) Ptr

func (e EventObject) Ptr() *EventObject

func (EventObject) Validate

func (e EventObject) Validate(ctx context.Context) error

type EventObjectSender

type EventObjectSender interface {
	SendUpdate(ctx context.Context, event EventObject) error
	SendDelete(ctx context.Context, event EventObject) error
}

EventObjectSender all easy send of objects

func EventObjectSenderFunc

func EventObjectSenderFunc(
	update func(ctx context.Context, event EventObject) error,
	delete func(ctx context.Context, event EventObject) error,
) EventObjectSender

func NewEventObjectSender

func NewEventObjectSender(
	jsonSender kafka.JsonSender,
	branch base.Branch,
	logSamplerFactory log.SamplerFactory,
) EventObjectSender

type EventObjectStore

type EventObjectStore interface {
	Create(ctx context.Context, eventObject EventObject) error
	Update(ctx context.Context, eventObject EventObject) error
	Patch(ctx context.Context, eventObject EventObject) error
	Delete(ctx context.Context, eventObject EventObject) error
	Get(ctx context.Context, schemaID SchemaID, id base.EventID) (*EventObject, error)
}

func NewEventObjectStore

func NewEventObjectStore(db libkv.DB) EventObjectStore

type EventObjectStoreTx

type EventObjectStoreTx interface {
	Create(ctx context.Context, tx libkv.Tx, eventObject EventObject) error
	Update(ctx context.Context, tx libkv.Tx, eventObject EventObject) error
	Patch(ctx context.Context, tx libkv.Tx, eventObject EventObject) error
	Delete(ctx context.Context, tx libkv.Tx, eventObject EventObject) error
	Get(ctx context.Context, tx libkv.Tx, schemaID SchemaID, id base.EventID) (*EventObject, error)
}

func NewEventObjectStoreTx

func NewEventObjectStoreTx() EventObjectStoreTx

type EventStore

type EventStore interface {
	Create(ctx context.Context, schemaID SchemaID, id base.EventID, data base.Event) error
	Update(ctx context.Context, schemaID SchemaID, id base.EventID, data base.Event) error
	Patch(ctx context.Context, schemaID SchemaID, id base.EventID, data base.Event) error
	Delete(ctx context.Context, schemaID SchemaID, id base.EventID) error
	Get(ctx context.Context, schemaID SchemaID, id base.EventID) (base.Event, error)
}

func NewEventStore

func NewEventStore(db libkv.DB) EventStore

type EventStoreTx

type EventStoreTx interface {
	Create(
		ctx context.Context,
		tx libkv.Tx,
		schemaID SchemaID,
		id base.EventID,
		data base.Event,
	) error
	Update(
		ctx context.Context,
		tx libkv.Tx,
		schemaID SchemaID,
		id base.EventID,
		data base.Event,
	) error
	Patch(
		ctx context.Context,
		tx libkv.Tx,
		schemaID SchemaID,
		id base.EventID,
		data base.Event,
	) error
	Delete(ctx context.Context, tx libkv.Tx, schemaID SchemaID, id base.EventID) error
	Get(ctx context.Context, tx libkv.Tx, schemaID SchemaID, id base.EventID) (base.Event, error)
}

func NewEventStoreTx

func NewEventStoreTx() EventStoreTx

type Group

type Group string

func (Group) String

func (s Group) String() string

type HandleCommandFunc

type HandleCommandFunc func(ctx context.Context, tx libkv.Tx, commandObject CommandObject) (*base.EventID, base.Event, error)

type K8sSchemaConnector

type K8sSchemaConnector interface {
	SetupCustomResourceDefinition(ctx context.Context) error
	Listen(
		ctx context.Context,
		resourceEventHandler cache.ResourceEventHandler,
	) error
}

func NewK8sSchemaConnector

func NewK8sSchemaConnector(
	kubeconfig string,
) K8sSchemaConnector

type K8sSchemaDeployer

type K8sSchemaDeployer interface {
	Deploy(ctx context.Context, cdbSchema v1.Schema) error
	Undeploy(ctx context.Context, namespace k8s.Namespace, name string) error
}

func NewK8sSchemaDeployer

func NewK8sSchemaDeployer(
	cdbClientset versioned.Interface,
) K8sSchemaDeployer

type Kind

type Kind string

func (Kind) String

func (o Kind) String() string

type ResultBroadcaster

type ResultBroadcaster interface {
	Broadcast(ctx context.Context, schemaID SchemaID, result base.Result) error
}

type ResultBroadcasterFunc

type ResultBroadcasterFunc func(ctx context.Context, schemaID SchemaID, result base.Result) error

func (ResultBroadcasterFunc) Broadcast

func (r ResultBroadcasterFunc) Broadcast(
	ctx context.Context,
	schemaID SchemaID,
	result base.Result,
) error

type ResultBroadcasterList

type ResultBroadcasterList []ResultBroadcaster

func (ResultBroadcasterList) Broadcast

func (r ResultBroadcasterList) Broadcast(
	ctx context.Context,
	schemaID SchemaID,
	result base.Result,
) error

type ResultChannelProviderForRequestID

type ResultChannelProviderForRequestID interface {
	ResultProvider
	ResultBroadcaster
}

func NewResultChannelProviderForRequestID

func NewResultChannelProviderForRequestID() ResultChannelProviderForRequestID

type ResultObject

type ResultObject struct {
	Result   base.Result
	SchemaID SchemaID
}

func CreateResultObjectFailure

func CreateResultObjectFailure(
	commandObject CommandObject,
	err error,
) ResultObject

func CreateResultObjectSuccess

func CreateResultObjectSuccess(
	commandObject CommandObject,
	resultEventID *base.EventID,
	resultEventData base.Event,
) ResultObject

func (ResultObject) Validate

func (r ResultObject) Validate(ctx context.Context) error

type ResultObjectSender

type ResultObjectSender interface {
	Send(ctx context.Context, resultObject ResultObject) error
}

ResultObjectSender all easy send of objects

func NewResultObjectSender

func NewResultObjectSender(
	syncProducer kafka.SyncProducer,
	branch base.Branch,
	logSamplerFactory log.SamplerFactory,
) ResultObjectSender

type ResultObjectSenderFunc

type ResultObjectSenderFunc func(ctx context.Context, resultObject ResultObject) error

func (ResultObjectSenderFunc) Send

func (r ResultObjectSenderFunc) Send(ctx context.Context, resultObject ResultObject) error

type ResultProvider

type ResultProvider interface {
	ResultFor(ctx context.Context, command base.Command) (*base.Result, error)
}

type Schema

type Schema struct {

	// ID to unique identify schema
	ID SchemaID `json:"id"`

	// Label shown in frontend
	Label SchemaLabel `json:"label"`

	// Description of schema
	Description SchemaDescription `json:"description"`
}

func (Schema) Validate

func (s Schema) Validate(ctx context.Context) error

type SchemaAdder

type SchemaAdder interface {
	Add(ctx context.Context, schemas ...Schema) error
}

type SchemaAdderTx

type SchemaAdderTx interface {
	Add(ctx context.Context, tx libkv.Tx, schemas ...Schema) error
}

type SchemaDescription

type SchemaDescription string

func (SchemaDescription) String

func (s SchemaDescription) String() string

func (SchemaDescription) Validate

func (s SchemaDescription) Validate(ctx context.Context) error

type SchemaGetter

type SchemaGetter interface {
	Get(ctx context.Context, id SchemaID) (*Schema, error)
}

type SchemaGetterTx

type SchemaGetterTx interface {
	Get(ctx context.Context, tx libkv.Tx, id SchemaID) (*Schema, error)
}

type SchemaHandler

type SchemaHandler interface {
	UpdateSchema(ctx context.Context, schema Schema) error
	DeleteSchema(ctx context.Context, schemaID SchemaID) error
}

func NewSchemaHandler

func NewSchemaHandler(
	db libkv.DB,
	schemaHandlerTx SchemaHandlerTx,
) SchemaHandler

func SchemaHandlerFunc

func SchemaHandlerFunc(
	update func(ctx context.Context, schema Schema) error,
	delete func(ctx context.Context, schemaID SchemaID) error,
) SchemaHandler

type SchemaHandlerList

type SchemaHandlerList []SchemaHandler

func (SchemaHandlerList) DeleteSchema

func (c SchemaHandlerList) DeleteSchema(ctx context.Context, schemaID SchemaID) error

func (SchemaHandlerList) UpdateSchema

func (c SchemaHandlerList) UpdateSchema(ctx context.Context, schema Schema) error

type SchemaHandlerTx

type SchemaHandlerTx interface {
	UpdateSchema(ctx context.Context, tx libkv.Tx, schema Schema) error
	DeleteSchema(ctx context.Context, tx libkv.Tx, schemaID SchemaID) error
}

func SchemaHandlerTxFunc

func SchemaHandlerTxFunc(
	update func(ctx context.Context, tx libkv.Tx, schema Schema) error,
	delete func(ctx context.Context, tx libkv.Tx, schemaID SchemaID) error,
) SchemaHandlerTx

type SchemaHandlerTxList

type SchemaHandlerTxList []SchemaHandlerTx

func (SchemaHandlerTxList) DeleteSchema

func (c SchemaHandlerTxList) DeleteSchema(
	ctx context.Context,
	tx libkv.Tx,
	schemaID SchemaID,
) error

func (SchemaHandlerTxList) UpdateSchema

func (c SchemaHandlerTxList) UpdateSchema(ctx context.Context, tx libkv.Tx, schema Schema) error

type SchemaID

type SchemaID struct {
	Group   Group   `json:"group"`
	Kind    Kind    `json:"kind"`
	Version Version `json:"version"`
}

func ParseSchemaID

func ParseSchemaID(ctx context.Context, id string) (*SchemaID, error)

func (SchemaID) Bytes

func (s SchemaID) Bytes() []byte

func (SchemaID) CommandTopic

func (s SchemaID) CommandTopic(branch base.Branch) libkafka.Topic

func (SchemaID) Equal

func (s SchemaID) Equal(id SchemaID) bool

func (SchemaID) EventID

func (s SchemaID) EventID() base.EventID

func (SchemaID) EventTopic

func (s SchemaID) EventTopic(branch base.Branch) libkafka.Topic

func (SchemaID) HistoryTopic

func (s SchemaID) HistoryTopic(branch base.Branch) libkafka.Topic

func (SchemaID) Ptr

func (s SchemaID) Ptr() *SchemaID

func (SchemaID) ResultTopic

func (s SchemaID) ResultTopic(branch base.Branch) libkafka.Topic

func (SchemaID) String

func (s SchemaID) String() string

func (SchemaID) Validate

func (s SchemaID) Validate(ctx context.Context) error

type SchemaIDs

type SchemaIDs []SchemaID

func ParseSchemaIDs

func ParseSchemaIDs(ctx context.Context, ids []string) (SchemaIDs, error)

func (SchemaIDs) Contains

func (s SchemaIDs) Contains(schemaID SchemaID) bool

type SchemaLabel

type SchemaLabel string

func (SchemaLabel) String

func (s SchemaLabel) String() string

func (SchemaLabel) Validate

func (s SchemaLabel) Validate(ctx context.Context) error

type SchemaRemover

type SchemaRemover interface {
	Remove(ctx context.Context, ids ...SchemaID) error
}

type SchemaRemoverTx

type SchemaRemoverTx interface {
	Remove(ctx context.Context, tx libkv.Tx, ids ...SchemaID) error
}

type SchemaStore

type SchemaStore interface {
	SchemaStreamer
	SchemaGetter
	SchemaAdder
	SchemaRemover
}

func NewSchemaStore

func NewSchemaStore(db libkv.DB) SchemaStore

type SchemaStoreTx

func NewSchemaStoreTx

func NewSchemaStoreTx() SchemaStoreTx

type SchemaStreamer

type SchemaStreamer interface {
	Stream(ctx context.Context, ch chan<- Schema) error
}

type SchemaStreamerTx

type SchemaStreamerTx interface {
	Stream(ctx context.Context, tx libkv.Tx, ch chan<- Schema) error
}

type Schemas

type Schemas []Schema

type Version

type Version string

func (Version) String

func (v Version) String() string

Directories

Path Synopsis
k8s
apis/cdb.benjamin-borbe.de/v1
Package v1 is the v1 version of the API.
Package v1 is the v1 version of the API.
client/clientset/versioned/fake
This package has the automatically generated fake clientset.
This package has the automatically generated fake clientset.
client/clientset/versioned/scheme
This package contains the scheme of the automatically generated clientset.
This package contains the scheme of the automatically generated clientset.
client/clientset/versioned/typed/cdb.benjamin-borbe.de/v1
This package has the automatically generated typed clients.
This package has the automatically generated typed clients.
client/clientset/versioned/typed/cdb.benjamin-borbe.de/v1/fake
Package fake has the automatically generated clients.
Package fake has the automatically generated clients.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL