Documentation
¶
Index ¶
- Constants
- Variables
- func BuildTopic(schemaID SchemaID, branch base.Branch, suffix string) libkafka.Topic
- func CreateK8sClientset(ctx context.Context, kubeconfig string) (versioned.Interface, error)
- func FetchTimestampFromHeader(ctx context.Context, header libkafka.Header) (*time.Time, error)
- func FetchTimestampFromHeaders(ctx context.Context, headers []*sarama.RecordHeader) (*time.Time, error)
- func FetchTimestampHeader(now time.Time) sarama.RecordHeader
- func NewCommandObjectMessageHandler(schemaID SchemaID, commandObjectHandler CommandObjectHandler, ...) libkafka.MessageHandlerTx
- func RunCommandConsumer(saramaClientProvider libkafka.SaramaClientProvider, ...) run.Func
- func RunCommandConsumerDefault(saramaClientProvider libkafka.SaramaClientProvider, ...) run.Func
- type CommandObject
- type CommandObjectExecutor
- func CommandObjectExecutorFunc(commandOperation base.CommandOperation, sendResultEnabled bool, ...) CommandObjectExecutor
- func NewCommandObjectExecutorMetrics(commandObjectExecutor CommandObjectExecutor, schemaID SchemaID) CommandObjectExecutor
- func NewCommandObjectExecutorResultSender(commandObjectExecutor CommandObjectExecutor, ...) CommandObjectExecutor
- type CommandObjectExecutors
- type CommandObjectHandler
- type CommandObjectHandlerFunc
- type CommandObjectHandlerList
- type CommandObjectSender
- type CommandObjects
- type EventObject
- type Group
- type InputSender
- type K8sSchemaConnector
- type K8sSchemaDeployer
- type Kind
- type ResultObject
- type ResultObjectSender
- type ResultObjectSenderFunc
- type SchemaID
- func (s SchemaID) Bytes() []byte
- func (s SchemaID) CommandTopic(branch base.Branch) libkafka.Topic
- func (s SchemaID) Equal(id SchemaID) bool
- func (s SchemaID) EventTopic(branch base.Branch) libkafka.Topic
- func (s SchemaID) InputTopic(branch base.Branch) libkafka.Topic
- func (s SchemaID) ResultTopic(branch base.Branch) libkafka.Topic
- func (s SchemaID) String() string
- func (s SchemaID) Validate(ctx context.Context) error
- type SchemaIDs
- type Version
Constants ¶
View Source
const FetchTimestampFieldname = "fetchTimestamp"
View Source
const FetchTimestampFormat = time.RFC3339Nano
Variables ¶
View Source
var CommandExpiredError = ErrCommandExpired //nolint:errname
Deprecated: Use ErrCommandExpired instead.
View Source
var CommandObjectSkippedError = ErrCommandObjectSkipped //nolint:errname
Deprecated: Use ErrCommandObjectSkipped instead.
View Source
var ErrCommandExpired = stderrors.New("command expired")
View Source
var ErrCommandObjectSkipped = stderrors.New("commandObject skipped")
View Source
var ErrUnsupportedOperation = stderrors.New("unsupported operation")
View Source
var UnsupportedOperationError = ErrUnsupportedOperation //nolint:errname
Deprecated: Use ErrUnsupportedOperation instead.
Functions ¶
func BuildTopic ¶
BuildTopic constructs a Kafka topic name from schema ID, branch, and suffix.
func CreateK8sClientset ¶
func FetchTimestampHeader ¶
func FetchTimestampHeader(now time.Time) sarama.RecordHeader
func NewCommandObjectMessageHandler ¶
func NewCommandObjectMessageHandler( schemaID SchemaID, commandObjectHandler CommandObjectHandler, commandExpireDuration time.Duration, ) libkafka.MessageHandlerTx
func RunCommandConsumer ¶
func RunCommandConsumer( saramaClientProvider libkafka.SaramaClientProvider, syncProducer libkafka.SyncProducer, db libkv.DB, schemaID SchemaID, batchSize libkafka.BatchSize, branch base.Branch, ignoreUnsupported bool, commandExpireDuration time.Duration, trigger run.Trigger, commandObjectExecutors CommandObjectExecutors, ) run.Func
func RunCommandConsumerDefault ¶
func RunCommandConsumerDefault( saramaClientProvider libkafka.SaramaClientProvider, syncProducer libkafka.SyncProducer, db libkv.DB, schemaID SchemaID, branch base.Branch, ignoreUnsupported bool, commandObjectExecutors CommandObjectExecutors, ) run.Func
Types ¶
type CommandObject ¶
func (CommandObject) Ptr ¶
func (c CommandObject) Ptr() *CommandObject
type CommandObjectExecutor ¶
type CommandObjectExecutor interface {
CommandOperation() base.CommandOperation
HandleCommand(
ctx context.Context,
tx libkv.Tx,
commandObject CommandObject,
) (*base.EventID, base.Event, error)
// SendResultEnabled enables a sending of a result after handling command
SendResultEnabled() bool
}
func CommandObjectExecutorFunc ¶
func CommandObjectExecutorFunc( commandOperation base.CommandOperation, sendResultEnabled bool, handleCommand func(ctx context.Context, tx libkv.Tx, commandObject CommandObject) (*base.EventID, base.Event, error), ) CommandObjectExecutor
func NewCommandObjectExecutorMetrics ¶
func NewCommandObjectExecutorMetrics( commandObjectExecutor CommandObjectExecutor, schemaID SchemaID, ) CommandObjectExecutor
func NewCommandObjectExecutorResultSender ¶
func NewCommandObjectExecutorResultSender( commandObjectExecutor CommandObjectExecutor, resultObjectSender ResultObjectSender, logSamplerFactory log.SamplerFactory, ) CommandObjectExecutor
type CommandObjectExecutors ¶
type CommandObjectExecutors []CommandObjectExecutor
func WrapCommandObjectExecutors ¶
func WrapCommandObjectExecutors( resultObjectSender ResultObjectSender, commandObjectExecutors CommandObjectExecutors, schemaID SchemaID, logSamplerFactory log.SamplerFactory, ) CommandObjectExecutors
func (CommandObjectExecutors) Find ¶
func (c CommandObjectExecutors) Find( commandOperation base.CommandOperation, ) *CommandObjectExecutor
type CommandObjectHandler ¶
type CommandObjectHandler interface {
Handle(ctx context.Context, tx libkv.Tx, commandObject CommandObject) error
}
func NewCommandObjectHandler ¶
func NewCommandObjectHandler( ignoreUnsupported bool, commandObjectExecutors ...CommandObjectExecutor, ) CommandObjectHandler
type CommandObjectHandlerFunc ¶
type CommandObjectHandlerFunc func(ctx context.Context, tx libkv.Tx, commandObject CommandObject) error
func (CommandObjectHandlerFunc) Handle ¶
func (o CommandObjectHandlerFunc) Handle( ctx context.Context, tx libkv.Tx, commandObject CommandObject, ) error
type CommandObjectHandlerList ¶
type CommandObjectHandlerList []CommandObjectHandler
func (CommandObjectHandlerList) Handle ¶
func (o CommandObjectHandlerList) Handle( ctx context.Context, tx libkv.Tx, commandObject CommandObject, ) error
type CommandObjectSender ¶
type CommandObjectSender interface {
SendCommandObject(ctx context.Context, commandObject CommandObject) error
SendCommandObjects(ctx context.Context, commandObjects CommandObjects) error
}
CommandObjectSender allow send commands
func NewCommandObjectSender ¶
func NewCommandObjectSender( syncProducer libkafka.SyncProducer, branch base.Branch, logSamplerFactory log.SamplerFactory, ) CommandObjectSender
type CommandObjects ¶
type CommandObjects []CommandObject
type EventObject ¶
func (EventObject) Ptr ¶
func (e EventObject) Ptr() *EventObject
type InputSender ¶
type InputSender interface {
Send(ctx context.Context, eventObject EventObject) error
}
func NewInputSender ¶
func NewInputSender( syncProducer kafka.SyncProducer, branch base.Branch, logSamplerFactory log.SamplerFactory, ) InputSender
type K8sSchemaConnector ¶
type K8sSchemaConnector interface {
SetupCustomResourceDefinition(ctx context.Context) error
Listen(
ctx context.Context,
resourceEventHandler cache.ResourceEventHandler,
) error
}
func NewK8sSchemaConnector ¶
func NewK8sSchemaConnector( kubeconfig string, ) K8sSchemaConnector
type K8sSchemaDeployer ¶
type K8sSchemaDeployer interface {
Deploy(ctx context.Context, rawSchema v1.Schema) error
Undeploy(ctx context.Context, namespace k8s.Namespace, name string) error
}
func NewK8sSchemaDeployer ¶
func NewK8sSchemaDeployer( rawClientset versioned.Interface, ) K8sSchemaDeployer
type ResultObject ¶
func CreateResultObjectFailure ¶
func CreateResultObjectFailure( commandObject CommandObject, err error, ) ResultObject
func CreateResultObjectSuccess ¶
func CreateResultObjectSuccess( commandObject CommandObject, resultEventID *base.EventID, resultEventData base.Event, ) ResultObject
type ResultObjectSender ¶
type ResultObjectSender interface {
Send(ctx context.Context, resultObject ResultObject) error
}
ResultObjectSender all easy send of objects
func NewResultObjectSender ¶
func NewResultObjectSender( syncProducer kafka.SyncProducer, branch base.Branch, logSamplerFactory log.SamplerFactory, ) ResultObjectSender
type ResultObjectSenderFunc ¶
type ResultObjectSenderFunc func(ctx context.Context, resultObject ResultObject) error
func (ResultObjectSenderFunc) Send ¶
func (r ResultObjectSenderFunc) Send(ctx context.Context, resultObject ResultObject) error
Source Files
¶
- raw_build-topic.go
- raw_command-object-executor-func.go
- raw_command-object-executor-metrics.go
- raw_command-object-executor-result-sender.go
- raw_command-object-executor-wrap.go
- raw_command-object-executor.go
- raw_command-object-handler.go
- raw_command-object-message-handler.go
- raw_command-object-sender.go
- raw_command-object.go
- raw_event-object.go
- raw_fetch-timestamp.go
- raw_input-sender.go
- raw_k8s-clientset.go
- raw_k8s-schema-connector.go
- raw_k8s-schema-deployer.go
- raw_result-object-creator.go
- raw_result-object-sender.go
- raw_result-object.go
- raw_run-command-consumer.go
- raw_schema-id.go
Directories
¶
| Path | Synopsis |
|---|---|
|
k8s
|
|
|
apis/raw.benjamin-borbe.de/v1
Package v1 is the v1 version of the API.
|
Package v1 is the v1 version of the API. |
|
client/clientset/versioned/fake
This package has the automatically generated fake clientset.
|
This package has the automatically generated fake clientset. |
|
client/clientset/versioned/scheme
This package contains the scheme of the automatically generated clientset.
|
This package contains the scheme of the automatically generated clientset. |
|
client/clientset/versioned/typed/raw.benjamin-borbe.de/v1
This package has the automatically generated typed clients.
|
This package has the automatically generated typed clients. |
|
client/clientset/versioned/typed/raw.benjamin-borbe.de/v1/fake
Package fake has the automatically generated clients.
|
Package fake has the automatically generated clients. |
Click to show internal directories.
Click to hide internal directories.