cqrs

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2019 License: MIT Imports: 9 Imported by: 80

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ObjectName

func ObjectName(v interface{}) string

Types

type CommandBus

type CommandBus struct {
	// contains filtered or unexported fields
}

CommandBus transports commands to command handlers.

func NewCommandBus

func NewCommandBus(
	publisher message.Publisher,
	topic string,
	marshaler CommandEventMarshaler,
) *CommandBus

func (CommandBus) Send

func (c CommandBus) Send(cmd interface{}) error

Send sends command to the command bus.

type CommandEventMarshaler

type CommandEventMarshaler interface {
	// Marshal marshals Command or Event to Watermill's message.
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal unmarshals watermill's message to v Command or Event.
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name returns the name of Command or Event.
	// Name is used to determine, that received command or event is event which we want to handle.
	Name(v interface{}) string

	// NameFromMessage return the name of Command or Event from Watermill's message (generated by Marshal).
	//
	// When we have Commnad or Event marshaled to Watermill's message,
	// we should use NameFromMessage instead of Name to avoid unnecessary unmarshaling.
	NameFromMessage(msg *message.Message) string
}

CommandEventMarshaler marshals Commands and Events to Watermill's messages and vice versa. Payload of the command needs to be marshaled to []bytes.

type CommandHandler

type CommandHandler interface {
	NewCommand() interface{}
	Handle(cmd interface{}) error
}

CommandHandler receives a command defined by NewCommand and handles it with the Handle method. If using DDD, CommandHandler may modify and persist the aggregate.

In contrast to EvenHandler, every Command must have only one CommandHandler.

type CommandProcessor

type CommandProcessor struct {
	// contains filtered or unexported fields
}

CommandProcessor determines which CommandHandler should handle the command received from the command bus.

func NewCommandProcessor

func NewCommandProcessor(
	handlers []CommandHandler,
	commandsTopic string,
	subscriber message.Subscriber,
	marshaler CommandEventMarshaler,
	logger watermill.LoggerAdapter,
) *CommandProcessor

func (CommandProcessor) AddHandlersToRouter

func (p CommandProcessor) AddHandlersToRouter(r *message.Router) error

func (CommandProcessor) Handlers

func (p CommandProcessor) Handlers() []CommandHandler

func (CommandProcessor) RouterHandlerFunc

func (p CommandProcessor) RouterHandlerFunc(handler CommandHandler) (message.HandlerFunc, error)

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus transports events to event handlers.

func NewEventBus

func NewEventBus(
	publisher message.Publisher,
	topic string,
	marshaler CommandEventMarshaler,
) *EventBus

func (EventBus) Publish

func (c EventBus) Publish(event interface{}) error

Send sends command to the event bus.

type EventHandler

type EventHandler interface {
	NewEvent() interface{}
	Handle(event interface{}) error
}

EventHandler receive event defined by NewEvent and handle it with Handle method. If using DDD, CommandHandler may modify and persist the aggregate. It can also invoke process manager, saga or just build a read model.

In contrast to CommandHandler, every Event can have multiple EventHandlers.

type EventProcessor

type EventProcessor struct {
	// contains filtered or unexported fields
}

EventProcessor determines which EventHandler should handle event received from event bus.

func NewEventProcessor

func NewEventProcessor(
	handlers []EventHandler,
	eventsTopic string,
	subscriber message.Subscriber,
	marshaler CommandEventMarshaler,
	logger watermill.LoggerAdapter,
) *EventProcessor

func (EventProcessor) AddHandlersToRouter

func (p EventProcessor) AddHandlersToRouter(r *message.Router) error

func (EventProcessor) Handlers

func (p EventProcessor) Handlers() []EventHandler

func (EventProcessor) RouterHandlerFunc

func (p EventProcessor) RouterHandlerFunc(handler EventHandler) (message.HandlerFunc, error)

type Facade

type Facade struct {
	// contains filtered or unexported fields
}

Facade is a facade for creating the Command and Event buses and processors. It was created to avoid boilerplate, when using CQRS in the standard way. You can also create buses and processors manually, drawing inspiration from how it's done in NewFacade.

func NewFacade

func NewFacade(config FacadeConfig) (*Facade, error)

func (Facade) CommandBus

func (f Facade) CommandBus() *CommandBus

func (Facade) CommandEventMarshaler

func (f Facade) CommandEventMarshaler() CommandEventMarshaler

func (Facade) CommandsTopic

func (f Facade) CommandsTopic() string

func (Facade) EventBus

func (f Facade) EventBus() *EventBus

func (Facade) EventsTopic

func (f Facade) EventsTopic() string

type FacadeConfig

type FacadeConfig struct {
	CommandsTopic   string
	CommandHandlers func(commandBus *CommandBus, eventBus *EventBus) []CommandHandler
	CommandsPubSub  message.PubSub

	EventsTopic   string
	EventHandlers func(commandBus *CommandBus, eventBus *EventBus) []EventHandler
	EventsPubSub  message.PubSub

	Router                *message.Router
	Logger                watermill.LoggerAdapter
	CommandEventMarshaler CommandEventMarshaler
}

func (FacadeConfig) CommandsEnabled

func (c FacadeConfig) CommandsEnabled() bool

func (FacadeConfig) EventsEnabled

func (c FacadeConfig) EventsEnabled() bool

func (FacadeConfig) Validate

func (c FacadeConfig) Validate() error

type JSONMarshaler

type JSONMarshaler struct {
	NewUUID func() string
}

func (JSONMarshaler) Marshal

func (m JSONMarshaler) Marshal(v interface{}) (*message.Message, error)

func (JSONMarshaler) Name

func (m JSONMarshaler) Name(cmdOrEvent interface{}) string

func (JSONMarshaler) NameFromMessage

func (m JSONMarshaler) NameFromMessage(msg *message.Message) string

func (JSONMarshaler) Unmarshal

func (JSONMarshaler) Unmarshal(msg *message.Message, v interface{}) (err error)

type NoProtoMessageError

type NoProtoMessageError struct {
	// contains filtered or unexported fields
}

func (NoProtoMessageError) Error

func (e NoProtoMessageError) Error() string

type NonPointerError

type NonPointerError struct {
	Type reflect.Type
}

func (NonPointerError) Error

func (e NonPointerError) Error() string

type ProtobufMarshaler

type ProtobufMarshaler struct {
	NewUUID func() string
}

func (ProtobufMarshaler) Marshal

func (m ProtobufMarshaler) Marshal(v interface{}) (*message.Message, error)

func (ProtobufMarshaler) Name

func (m ProtobufMarshaler) Name(cmdOrEvent interface{}) string

func (ProtobufMarshaler) NameFromMessage

func (m ProtobufMarshaler) NameFromMessage(msg *message.Message) string

func (ProtobufMarshaler) Unmarshal

func (ProtobufMarshaler) Unmarshal(msg *message.Message, v interface{}) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL