operationsbus

package
v0.0.40-testproto Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2025 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var File_operation_request_proto protoreflect.FileDescriptor

Functions

func CreateProcessor

func CreateProcessor(
	serviceBusReceiver sb.ReceiverInterface,
	matcher *Matcher,
	operationContainer oc.OperationContainerClient,
	entityController EntityController,
	logger *slog.Logger,
	customHandler shuttle.HandlerFunc,
	processorOptions *shuttle.ProcessorOptions,
	hooks []BaseOperationHooksInterface,
	marshaler Marshaler,
) (*shuttle.Processor, error)

The processor will be utilized to "process" all the operations by receiving the message, guarding against concurrency, running the operation, and updating the right database status.

func DefaultHandlers added in v0.0.31

func DefaultHandlers(
	serviceBusReceiver sb.ReceiverInterface,
	matcher *Matcher,
	operationContainer oc.OperationContainerClient,
	entityController EntityController,
	logger *slog.Logger,
	hooks []BaseOperationHooksInterface,
	marshaler Marshaler,
) shuttle.HandlerFunc

func NewErrorHandler added in v0.0.30

func NewErrorHandler(errHandler ErrorHandlerFunc, receiver sb.ReceiverInterface, next shuttle.HandlerFunc, marshaler Marshaler) shuttle.HandlerFunc

An error handler that continues the normal shuttle.HandlerFunc handler chain.

func NewLogHandler added in v0.0.30

func NewLogHandler(logger *slog.Logger, next shuttle.HandlerFunc, marshaler Marshaler) shuttle.HandlerFunc

NewLogHandler creates a new log handler with the provided logger.

func NewQoSHandler added in v0.0.30

func NewQoSHandler(logger *slog.Logger, next shuttle.HandlerFunc) shuttle.HandlerFunc

NewQoSHandler creates a new QoS handler with the provided logger.

func NewQosErrorHandler added in v0.0.30

func NewQosErrorHandler(errHandler ErrorHandlerFunc) shuttle.HandlerFunc

A QoS handler that is able to log the errors as well.

Types

type ApiOperation added in v0.0.22

type ApiOperation interface {
	InitOperation(context.Context, OperationRequest) (ApiOperation, error)
	GuardConcurrency(context.Context, Entity) *CategorizedError
	Run(context.Context) error
	GetOperationRequest() *OperationRequest
}

ApiOperation is the interface all operations will need to implement.

type BaseOperationHooksInterface added in v0.0.22

type BaseOperationHooksInterface interface {
	BeforeInitOperation(ctx context.Context, req OperationRequest) error
	AfterInitOperation(ctx context.Context, op ApiOperation, req OperationRequest, err error) error

	BeforeGuardConcurrency(ctx context.Context, op ApiOperation, entity Entity) error
	AfterGuardConcurrency(ctx context.Context, op ApiOperation, ce *CategorizedError) error

	BeforeRun(ctx context.Context, op ApiOperation) error
	AfterRun(ctx context.Context, op ApiOperation, err error) error
}

type CategorizedError

type CategorizedError struct {
	Message      string
	InnerMessage string
	ErrorCode    int
	Err          error
}

func NewCategorizedError

func NewCategorizedError(message string, innerMessage string, errorCode int, err error) *CategorizedError

func (*CategorizedError) Error

func (ce *CategorizedError) Error() string

type Entity

type Entity interface {
	GetLatestOperationID() string
}

type EntityController added in v0.0.32

type EntityController interface {
	GetEntity(context.Context, OperationRequest) (Entity, error)
}

type EntityFactoryFunc added in v0.0.37

type EntityFactoryFunc func(string) Entity

type ErrorHandler added in v0.0.30

type ErrorHandler interface {
	Handle(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) error
}

ErrorHandler interface that returns an error. Required for any error handling and not depending on panics.

type ErrorHandlerFunc added in v0.0.30

type ErrorHandlerFunc func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) error

func NewErrorReturnHandler added in v0.0.30

func NewErrorReturnHandler(errHandler ErrorHandlerFunc, receiver sb.ReceiverInterface, next shuttle.HandlerFunc, marshaler Marshaler) ErrorHandlerFunc

An error handler that provides the error to the parent handler for logging.

func NewOperationContainerHandler added in v0.0.32

func NewOperationContainerHandler(errHandler ErrorHandlerFunc, operationContainer oc.OperationContainerClient, marshaler Marshaler) ErrorHandlerFunc

Handler for when the user uses the OperationContainer

func OperationHandler added in v0.0.31

func OperationHandler(matcher *Matcher, hooks []BaseOperationHooksInterface, entityController EntityController, marshaler Marshaler) ErrorHandlerFunc

func (ErrorHandlerFunc) Handle added in v0.0.30

type HookedApiOperation added in v0.0.22

type HookedApiOperation struct {
	Operation      ApiOperation
	OperationHooks []BaseOperationHooksInterface
}

func (*HookedApiOperation) AfterGuardConcurrency added in v0.0.22

func (h *HookedApiOperation) AfterGuardConcurrency(ctx context.Context, op ApiOperation, ce *CategorizedError) error

func (*HookedApiOperation) AfterInitOperation added in v0.0.25

func (h *HookedApiOperation) AfterInitOperation(ctx context.Context, op ApiOperation, req OperationRequest, err error) error

func (*HookedApiOperation) AfterRun added in v0.0.22

func (h *HookedApiOperation) AfterRun(ctx context.Context, op ApiOperation, err error) error

func (*HookedApiOperation) BeforeGuardConcurrency added in v0.0.22

func (h *HookedApiOperation) BeforeGuardConcurrency(ctx context.Context, op ApiOperation, entity Entity) error

func (*HookedApiOperation) BeforeInitOperation added in v0.0.25

func (h *HookedApiOperation) BeforeInitOperation(ctx context.Context, req OperationRequest) error

func (*HookedApiOperation) BeforeRun added in v0.0.22

func (h *HookedApiOperation) BeforeRun(ctx context.Context, op ApiOperation) error

func (*HookedApiOperation) GuardConcurrency added in v0.0.22

func (h *HookedApiOperation) GuardConcurrency(ctx context.Context, entity Entity) *CategorizedError

func (*HookedApiOperation) InitOperation added in v0.0.25

func (h *HookedApiOperation) InitOperation(ctx context.Context, opReq OperationRequest) (ApiOperation, error)

func (*HookedApiOperation) Run added in v0.0.22

type Marshaler

type Marshaler interface {
	Marshal(v interface{}) ([]byte, error)
	Unmarshal(data []byte, v interface{}) error
}

type Matcher

type Matcher struct {
	Types          map[string]reflect.Type
	EntityCreators map[string]EntityFactoryFunc
}

The matcher is utilized in order to keep track of the name and type of each operation. This is required because we only send the OperationRequest through the service bus, but we utilize the name shown in that struct in order to create an instance of the right operation type (e.g. LongRunning) and Run with the correct logic.

func NewMatcher

func NewMatcher() *Matcher

func (*Matcher) CreateEntityInstance added in v0.0.36

func (m *Matcher) CreateEntityInstance(key string, lastOperationId string) (Entity, error)

func (*Matcher) CreateHookedInstace added in v0.0.22

func (m *Matcher) CreateHookedInstace(key string, hooks []BaseOperationHooksInterface) (*HookedApiOperation, error)

func (*Matcher) CreateOperationInstance added in v0.0.36

func (m *Matcher) CreateOperationInstance(key string) (ApiOperation, error)

This will create an empty instance of the type, with which you can then call op.Init() and initialize any info you need.

func (*Matcher) Get

func (m *Matcher) Get(key string) (reflect.Type, bool)

Get retrieves a value from the map by its key

func (*Matcher) Register

func (m *Matcher) Register(key string, value ApiOperation)

Set adds a key-value pair to the map Ex: matcher.Register("LongRunning", &LongRunning{})

func (*Matcher) RegisterEntity added in v0.0.36

func (m *Matcher) RegisterEntity(key string, value EntityFactoryFunc)

Set adds a key-value pair to the map Ex: matcher.Register("LongRunning", &LongRunning{})

type NonRetryError added in v0.0.30

type NonRetryError struct {
	Message string
}

func (*NonRetryError) Error added in v0.0.30

func (e *NonRetryError) Error() string

type OperationRequest

type OperationRequest struct {
	OperationName       string               `protobuf:"bytes,1,opt,name=operation_name,json=operationName,proto3" json:"operation_name,omitempty"`                   // Name of the operation being processed. Used to match the ApiOperation with the right implementation.
	ApiVersion          string               `protobuf:"bytes,2,opt,name=api_version,json=apiVersion,proto3" json:"api_version,omitempty"`                            // Specifies the version of the API the operation is associated with, ensuring compatibility.
	RetryCount          int32                `protobuf:"varint,3,opt,name=retry_count,json=retryCount,proto3" json:"retry_count,omitempty"`                           // Tracks the number of retries of the operation to prevent infinite looping or special logic around retries.
	OperationId         string               `protobuf:"bytes,4,opt,name=operation_id,json=operationId,proto3" json:"operation_id,omitempty"`                         // A unique identifier for the operation.
	EntityId            string               `protobuf:"bytes,5,opt,name=entity_id,json=entityId,proto3" json:"entity_id,omitempty"`                                  // A unique identifier for the entity (resource) the operation is acting on, used with EntityType to ensure we have selected the right entity.
	EntityType          string               `protobuf:"bytes,6,opt,name=entity_type,json=entityType,proto3" json:"entity_type,omitempty"`                            // Specified the type of entity the operation is acting on, used with EntityId to ensure we have selected the right Entity.
	ExpirationTimestamp *timestamp.Timestamp `protobuf:"bytes,7,opt,name=expiration_timestamp,json=expirationTimestamp,proto3" json:"expiration_timestamp,omitempty"` // Defines when the operation should expire and prevent execution, should it have passed this date.
	Body                []byte               `protobuf:"bytes,8,opt,name=body,proto3" json:"body,omitempty"`                                                          // Contains request payload or data needed for the operation in HTTP operations.
	HttpMethod          string               `protobuf:"bytes,9,opt,name=http_method,json=httpMethod,proto3" json:"http_method,omitempty"`                            // Indicated the GGPT method if the operation requires HTTP-based communication.
	Extension           *any1.Any            `protobuf:"bytes,10,opt,name=extension,proto3" json:"extension,omitempty"`                                               // An optional and flexible field to add any data the user may require.
	// contains filtered or unexported fields
}

func (*OperationRequest) Descriptor deprecated

func (*OperationRequest) Descriptor() ([]byte, []int)

Deprecated: Use OperationRequest.ProtoReflect.Descriptor instead.

func (*OperationRequest) GetApiVersion

func (x *OperationRequest) GetApiVersion() string

func (*OperationRequest) GetBody

func (x *OperationRequest) GetBody() []byte

func (*OperationRequest) GetEntityId

func (x *OperationRequest) GetEntityId() string

func (*OperationRequest) GetEntityType

func (x *OperationRequest) GetEntityType() string

func (*OperationRequest) GetExpirationTimestamp

func (x *OperationRequest) GetExpirationTimestamp() *timestamp.Timestamp

func (*OperationRequest) GetExtension

func (x *OperationRequest) GetExtension() *any1.Any

func (*OperationRequest) GetHttpMethod

func (x *OperationRequest) GetHttpMethod() string

func (*OperationRequest) GetOperationId

func (x *OperationRequest) GetOperationId() string

func (*OperationRequest) GetOperationName

func (x *OperationRequest) GetOperationName() string

func (*OperationRequest) GetRetryCount

func (x *OperationRequest) GetRetryCount() int32

func (*OperationRequest) ProtoMessage

func (*OperationRequest) ProtoMessage()

func (*OperationRequest) ProtoReflect

func (x *OperationRequest) ProtoReflect() protoreflect.Message

func (*OperationRequest) Reset

func (x *OperationRequest) Reset()

func (*OperationRequest) String

func (x *OperationRequest) String() string

type ProtoMarshaler

type ProtoMarshaler struct{}

func (ProtoMarshaler) Marshal

func (p ProtoMarshaler) Marshal(v interface{}) ([]byte, error)

func (ProtoMarshaler) Unmarshal

func (p ProtoMarshaler) Unmarshal(data []byte, v interface{}) error

type RetryError added in v0.0.30

type RetryError struct {
	Message string
}

Default errors for the error handler.

func (*RetryError) Error added in v0.0.30

func (e *RetryError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL