raw

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: BSD-2-Clause Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const FetchTimestampFieldname = "fetchTimestamp"
View Source
const FetchTimestampFormat = time.RFC3339Nano

Variables

View Source
var CommandExpiredError = ErrCommandExpired //nolint:errname

Deprecated: Use ErrCommandExpired instead.

View Source
var CommandObjectSkippedError = ErrCommandObjectSkipped //nolint:errname

Deprecated: Use ErrCommandObjectSkipped instead.

View Source
var ErrCommandExpired = stderrors.New("command expired")
View Source
var ErrCommandObjectSkipped = stderrors.New("commandObject skipped")
View Source
var ErrUnsupportedOperation = stderrors.New("unsupported operation")
View Source
var UnsupportedOperationError = ErrUnsupportedOperation //nolint:errname

Deprecated: Use ErrUnsupportedOperation instead.

Functions

func BuildTopic

func BuildTopic(schemaID SchemaID, branch base.Branch, suffix string) libkafka.Topic

BuildTopic constructs a Kafka topic name from schema ID, branch, and suffix.

func CreateK8sClientset

func CreateK8sClientset(ctx context.Context, kubeconfig string) (versioned.Interface, error)

func FetchTimestampFromHeader

func FetchTimestampFromHeader(ctx context.Context, header libkafka.Header) (*time.Time, error)

func FetchTimestampFromHeaders

func FetchTimestampFromHeaders(
	ctx context.Context,
	headers []*sarama.RecordHeader,
) (*time.Time, error)

func FetchTimestampHeader

func FetchTimestampHeader(now time.Time) sarama.RecordHeader

func NewCommandObjectMessageHandler

func NewCommandObjectMessageHandler(
	schemaID SchemaID,
	commandObjectHandler CommandObjectHandler,
	commandExpireDuration time.Duration,
) libkafka.MessageHandlerTx

func RunCommandConsumer

func RunCommandConsumer(
	saramaClientProvider libkafka.SaramaClientProvider,
	syncProducer libkafka.SyncProducer,
	db libkv.DB,
	schemaID SchemaID,
	batchSize libkafka.BatchSize,
	branch base.Branch,
	ignoreUnsupported bool,
	commandExpireDuration time.Duration,
	trigger run.Trigger,
	commandObjectExecutors CommandObjectExecutors,
) run.Func

func RunCommandConsumerDefault

func RunCommandConsumerDefault(
	saramaClientProvider libkafka.SaramaClientProvider,
	syncProducer libkafka.SyncProducer,
	db libkv.DB,
	schemaID SchemaID,
	branch base.Branch,
	ignoreUnsupported bool,
	commandObjectExecutors CommandObjectExecutors,
) run.Func

Types

type CommandObject

type CommandObject struct {
	Command  base.Command
	SchemaID SchemaID
}

func (CommandObject) Ptr

func (c CommandObject) Ptr() *CommandObject

func (CommandObject) Validate

func (c CommandObject) Validate(ctx context.Context) error

type CommandObjectExecutor

type CommandObjectExecutor interface {
	CommandOperation() base.CommandOperation
	HandleCommand(
		ctx context.Context,
		tx libkv.Tx,
		commandObject CommandObject,
	) (*base.EventID, base.Event, error)
	// SendResultEnabled enables a sending of a result after handling command
	SendResultEnabled() bool
}

func CommandObjectExecutorFunc

func CommandObjectExecutorFunc(
	commandOperation base.CommandOperation,
	sendResultEnabled bool,
	handleCommand func(ctx context.Context, tx libkv.Tx, commandObject CommandObject) (*base.EventID, base.Event, error),
) CommandObjectExecutor

func NewCommandObjectExecutorMetrics

func NewCommandObjectExecutorMetrics(
	commandObjectExecutor CommandObjectExecutor,
	schemaID SchemaID,
) CommandObjectExecutor

func NewCommandObjectExecutorResultSender

func NewCommandObjectExecutorResultSender(
	commandObjectExecutor CommandObjectExecutor,
	resultObjectSender ResultObjectSender,
	logSamplerFactory log.SamplerFactory,
) CommandObjectExecutor

type CommandObjectExecutors

type CommandObjectExecutors []CommandObjectExecutor

func WrapCommandObjectExecutors

func WrapCommandObjectExecutors(
	resultObjectSender ResultObjectSender,
	commandObjectExecutors CommandObjectExecutors,
	schemaID SchemaID,
	logSamplerFactory log.SamplerFactory,
) CommandObjectExecutors

func (CommandObjectExecutors) Find

type CommandObjectHandler

type CommandObjectHandler interface {
	Handle(ctx context.Context, tx libkv.Tx, commandObject CommandObject) error
}

func NewCommandObjectHandler

func NewCommandObjectHandler(
	ignoreUnsupported bool,
	commandObjectExecutors ...CommandObjectExecutor,
) CommandObjectHandler

type CommandObjectHandlerFunc

type CommandObjectHandlerFunc func(ctx context.Context, tx libkv.Tx, commandObject CommandObject) error

func (CommandObjectHandlerFunc) Handle

func (o CommandObjectHandlerFunc) Handle(
	ctx context.Context,
	tx libkv.Tx,
	commandObject CommandObject,
) error

type CommandObjectHandlerList

type CommandObjectHandlerList []CommandObjectHandler

func (CommandObjectHandlerList) Handle

func (o CommandObjectHandlerList) Handle(
	ctx context.Context,
	tx libkv.Tx,
	commandObject CommandObject,
) error

type CommandObjectSender

type CommandObjectSender interface {
	SendCommandObject(ctx context.Context, commandObject CommandObject) error
	SendCommandObjects(ctx context.Context, commandObjects CommandObjects) error
}

CommandObjectSender allow send commands

func NewCommandObjectSender

func NewCommandObjectSender(
	syncProducer libkafka.SyncProducer,
	branch base.Branch,
	logSamplerFactory log.SamplerFactory,
) CommandObjectSender

type CommandObjects

type CommandObjects []CommandObject

type EventObject

type EventObject struct {
	Event    base.Event
	ID       base.EventID
	SchemaID SchemaID
}

func (EventObject) Ptr

func (e EventObject) Ptr() *EventObject

func (EventObject) Validate

func (e EventObject) Validate(ctx context.Context) error

type Group

type Group string

func (Group) String

func (g Group) String() string

type InputSender

type InputSender interface {
	Send(ctx context.Context, eventObject EventObject) error
}

func NewInputSender

func NewInputSender(
	syncProducer kafka.SyncProducer,
	branch base.Branch,
	logSamplerFactory log.SamplerFactory,
) InputSender

type K8sSchemaConnector

type K8sSchemaConnector interface {
	SetupCustomResourceDefinition(ctx context.Context) error
	Listen(
		ctx context.Context,
		resourceEventHandler cache.ResourceEventHandler,
	) error
}

func NewK8sSchemaConnector

func NewK8sSchemaConnector(
	kubeconfig string,
) K8sSchemaConnector

type K8sSchemaDeployer

type K8sSchemaDeployer interface {
	Deploy(ctx context.Context, rawSchema v1.Schema) error
	Undeploy(ctx context.Context, namespace k8s.Namespace, name string) error
}

func NewK8sSchemaDeployer

func NewK8sSchemaDeployer(
	rawClientset versioned.Interface,
) K8sSchemaDeployer

type Kind

type Kind string

func (Kind) String

func (k Kind) String() string

type ResultObject

type ResultObject struct {
	Result   base.Result
	SchemaID SchemaID
}

func CreateResultObjectFailure

func CreateResultObjectFailure(
	commandObject CommandObject,
	err error,
) ResultObject

func CreateResultObjectSuccess

func CreateResultObjectSuccess(
	commandObject CommandObject,
	resultEventID *base.EventID,
	resultEventData base.Event,
) ResultObject

func (ResultObject) Validate

func (r ResultObject) Validate(ctx context.Context) error

type ResultObjectSender

type ResultObjectSender interface {
	Send(ctx context.Context, resultObject ResultObject) error
}

ResultObjectSender all easy send of objects

func NewResultObjectSender

func NewResultObjectSender(
	syncProducer kafka.SyncProducer,
	branch base.Branch,
	logSamplerFactory log.SamplerFactory,
) ResultObjectSender

type ResultObjectSenderFunc

type ResultObjectSenderFunc func(ctx context.Context, resultObject ResultObject) error

func (ResultObjectSenderFunc) Send

func (r ResultObjectSenderFunc) Send(ctx context.Context, resultObject ResultObject) error

type SchemaID

type SchemaID struct {
	Group Group `json:"group"`
	Kind  Kind  `json:"kind"`
}

func ParseSchemaID

func ParseSchemaID(ctx context.Context, id string) (*SchemaID, error)

func (SchemaID) Bytes

func (s SchemaID) Bytes() []byte

func (SchemaID) CommandTopic

func (s SchemaID) CommandTopic(branch base.Branch) libkafka.Topic

func (SchemaID) Equal

func (s SchemaID) Equal(id SchemaID) bool

func (SchemaID) EventTopic

func (s SchemaID) EventTopic(branch base.Branch) libkafka.Topic

func (SchemaID) InputTopic

func (s SchemaID) InputTopic(branch base.Branch) libkafka.Topic

func (SchemaID) ResultTopic

func (s SchemaID) ResultTopic(branch base.Branch) libkafka.Topic

func (SchemaID) String

func (s SchemaID) String() string

func (SchemaID) Validate

func (s SchemaID) Validate(ctx context.Context) error

type SchemaIDs

type SchemaIDs []SchemaID

func ParseSchemaIDs

func ParseSchemaIDs(ctx context.Context, ids []string) (SchemaIDs, error)

func (SchemaIDs) Contains

func (s SchemaIDs) Contains(schemaID SchemaID) bool

type Version

type Version string

func (Version) String

func (v Version) String() string

Directories

Path Synopsis
k8s
apis/raw.benjamin-borbe.de/v1
Package v1 is the v1 version of the API.
Package v1 is the v1 version of the API.
client/clientset/versioned/fake
This package has the automatically generated fake clientset.
This package has the automatically generated fake clientset.
client/clientset/versioned/scheme
This package contains the scheme of the automatically generated clientset.
This package contains the scheme of the automatically generated clientset.
client/clientset/versioned/typed/raw.benjamin-borbe.de/v1
This package has the automatically generated typed clients.
This package has the automatically generated typed clients.
client/clientset/versioned/typed/raw.benjamin-borbe.de/v1/fake
Package fake has the automatically generated clients.
Package fake has the automatically generated clients.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL