dyb

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2020 License: Apache-2.0 Imports: 4 Imported by: 0

README

dinhhungle/dyb

dinhhungle/dyb is a synchronous publish/subscribe library for DynamoDB Streams written in Go.
The library can be used to filter incoming DynamoDB Stream records and push the records to the appropriate handlers.

This library is still a work in progress and should not be considered production-ready. Any Feedback/Suggestions/Issues are welcome.

Motivation

By using a single table design in DynamoDB described in talks/guides like

various kinds of entities are stored in the same table. DynamoDB Streams can be used to decouple a lot of business logic especially if strong consistency is not a requirement.

For example:

  • on insert/modification of user data, create a new versioning item in db
  • send a warning email if user email was changed
  • broadcast specific data to different microservices
  • handle edge updates for adjacency lists

This is a lightweight library which organizes handlers and their respective processed records, see examples



Install

With an installed Go Toolchain

go get -u github.com/dinhhungle/dyb

Examples

Here is how you can simply subscribe a handler to all published records

func setupSubscriber(t *dyb.Topic) {
    t.Subscribe(SomeSubscriber)
}

func main() {
    t := dyb.New()
    setupSubscriber(t)
}

A Subscriber must fulfill the interface

type Subscriber interface {
	Handle(ctx context.Context, rec *events.DynamoDBEventRecord) error
}

Or you can subscribe a function either by wrapping the function with SubscriberFunc or using the shorthandle Topic.SubscribeFunc

func SomeSubscriberFunc(ctx context.Context, rec *events.DynamoDBEventRecord) error {
    return nil 
}

func setupSubscriber(t *dyb.Topic) {
    t.Subscribe(SubscriberFunc(SomeSubscriberFunc))
    // or use shorthandle 
    // t.SubscribeFunc(SomeSubsciberFunc)
}

Publishing can be done from a Lambda Handler like this

var t *dyb.Topic 

func LambdaHandler(ctx context.Context, e events.DynamoDBEvent) error {
    for _, rec := range e.Records {
        if err := t.Publish(ctx, &rec); err != nil {
            return err
        }
    }
}

Filtering

The previous example would call the handler for every record. To subscribe a handler to specific events add additional filtering SubOption to Subscribe.

t.Subscribe(SomeHandler,
   // new item must have attribute "model" with value "user"
   dyb.FilterAttr(dyb.ItemTypeNewImage, "model", dyb.AttrtringEqual("user")),
   // only interested in MODIFY events 
   dyb.FilterEventNames(dyb.EventNameModify), 
   // modification must have changed attribute email
   dyb.FilterRecord(dyb.RecordAttrChanged("email")),
}

Middleware

Middlewares have the signature

func(Subscriber) Subscriber

They can be added as SubOption

func Logger(s Subscriber) Subscriber {
    return SubscriberFunc(func(ctx context.Context, rec *events.DynamoDBEventRecord) error {
        sub := dyb.GetSubscription(ctx)

        fmt.Printf("%v was called with pk=%v", sub.Name(), rec.Change.Keys["pk"])
        return s.Handle(ctx, rec)

    }
}


func setupSubscriber(t *dyb.Topic) {
    t.Subscribe(SomeHandler,
        dyb.Name("Some Handler"),
        dyb.Middlewares(Logger, AnotherMiddleware),
        // AnotherMiddleware equivalent to additional suboption
        // dyb.Middlewares(AnotherMiddleware),
    }
}

Documentation

Index

Constants

View Source
const (
	EventNameInsert = "INSERT"
	EventNameModify = "MODIFY"
	EventNameRemove = "REMOVE"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AttributeFilter

type AttributeFilter interface {
	Match(ctx context.Context, attr events.DynamoDBAttributeValue) bool
}

ItemFilter filter on an AttributeValue level. It is applied to a specific item in a record (Keys, NewImage, OldImage) and on a specific attribute name.

func AttrIsNull

func AttrIsNull() AttributeFilter

AttrIsNull returns an AttributeFilter which matches if attribute is null

func AttrNot

func AttrNot(filter AttributeFilter) AttributeFilter

AttrNotNull returns an AttributeFilter which matches only if filter does not match

func AttrNotNull

func AttrNotNull() AttributeFilter

AttrNotNull returns an AttributeFilter which matches if attribute is not null

func AttrStringEqual

func AttrStringEqual(val string) AttributeFilter

AttrStringEqual returns an AttributeFilter which matches if attribute is a string and matches val

func AttrStringPrefix

func AttrStringPrefix(prefix string) AttributeFilter

AttrStringPrefix returns an AttributeFilter which matches if attribute is a string and is prefixed with prefix

type AttributeFilterFunc

type AttributeFilterFunc func(context.Context, events.DynamoDBAttributeValue) bool

AttributeFilterFunc type is an adapter to allow the use of ordinary functions as AttributeFilter. If f is a function with the appropriate signature, AttributeFilterFunc(f) is a Filter that calls f.

func (AttributeFilterFunc) Match

Match calls f(ctx, attr).

type ItemFilter

type ItemFilter interface {
	Match(ctx context.Context, item map[string]events.DynamoDBAttributeValue) bool
}

ItemFilter filter on an item level. It is applied to a specific item in a record (Keys, NewImage, OldImage) Filters should not modify the provided item.

func Attr

func Attr(attrName string, filters ...AttributeFilter) ItemFilter

Attr returns a ItemFilter which applies AttributeFilters to an attribute named attrName in an item. Matches if attribute exists and all AttributeFilters are matching

type ItemFilterFunc

type ItemFilterFunc func(context.Context, map[string]events.DynamoDBAttributeValue) bool

ItemFilterFunc type is an adapter to allow the use of ordinary functions as ItemFilter. If f is a function with the appropriate signature, ItemFilterFunc(f) is a Filter that calls f.

func (ItemFilterFunc) Match

Match calls f(ctx, item).

type ItemType

type ItemType string

ItemType is a selector for ItemFilter and is used to choose the item an ItemFilter is applied to.

const (
	// selects record.Change.Keys
	ItemTypeKeys ItemType = "KEYS"

	// selects record.Change.NewImage
	ItemTypeNewImage ItemType = "NEW_IMAGE"

	// selects record.Change.OldImage
	ItemTypeOldImage ItemType = "OLD_IMAGE"
)

type RecordFilter

type RecordFilter interface {
	Match(ctx context.Context, rec *events.DynamoDBEventRecord) bool
}

RecordFilter filters on record level Filters should not modify the provided record

func AttrChanged

func AttrChanged(attrName string) RecordFilter

AttrChanged returns a RecordFilter which filters a record based on any change to a specific attribute. The Filter matches if the attribute attrName differs between NewImage and OldImage. It also matches if exclusively one of NewImage or OldImage is missing.

func EventNames

func EventNames(names ...string) RecordFilter

EventNames returns a RecordFilter which filters a record based on event name. The Filter matches if the record event name matches anything in names.

func Item

func Item(itemType ItemType, filters ...ItemFilter) RecordFilter

Item returns a RecordFilter which applies ItemFilters on a specific Item in a Record selected by itemType. Matches if all ItemFilters are matching

type RecordFilterFunc

type RecordFilterFunc func(context.Context, *events.DynamoDBEventRecord) bool

RecordFilterFunc type is an adapter to allow the use of ordinary functions as RecordFilter. If f is a function with the appropriate signature, RecordFilterFunc(f) is a Filter that calls f.

func (RecordFilterFunc) Match

Match calls f(ctx, rec).

type SubOption

type SubOption interface {
	// contains filtered or unexported methods
}

SubscribeFunc is a convenience function for Subscribe(SubscriberFunc(f)) .

func FilterAttr

func FilterAttr(itemType ItemType, attrName string, filters ...AttributeFilter) SubOption

FilterAttr returns a SubOption which filter published records based on the AttributeValue for an Attribute named attrName in a specific Item (map[string]events.DynamoDBAttributeValue). Only records with contains the Item with attrName and matches every AttributeFilter are passed through to the Subscriber

func FilterEventNames

func FilterEventNames(names ...string) SubOption

FilterEventNames is a short handle for commonly used FilterRecord(EventNames(names...))

func FilterItem

func FilterItem(itemType ItemType, filters ...ItemFilter) SubOption

FilterRecord returns a SubOption which filter published records based on a specific Item (map[string]events.DynamoDBAttributeValue). ItemFilter will be applied to either Keys, NewImage, OldImage depending on itemType. Only records which contain the Item and matches every ItemFilter are passed through to the Subscriber.

func FilterRecord

func FilterRecord(filters ...RecordFilter) SubOption

FilterRecord returns a SubOption which filter published records. Only records which match every RecordFilter are passed through to the Subscriber

func Middlewares

func Middlewares(middlewares ...func(Subscriber) Subscriber) SubOption

Middlewares returns a SubOption to append middlewares to a subscription. Middlewares can be used to intercept a matched subscription and are called in order they are applied. Middlewares should not modify the originally provided record

func Name

func Name(name string) SubOption

Name returns a SubOption which sets the name the subscription It can be retrieved in Subscribers and middlewares by calling GetSubscription(context.Context).Name()

type Subscriber

type Subscriber interface {
	Handle(ctx context.Context, rec *events.DynamoDBEventRecord) error
}

A Subscriber receives records after registration to a topic with Subscribe

Subscribers should not modify the provided record

func MultiSubscriber

func MultiSubscriber(subscribers ...Subscriber) Subscriber

MultiSubscriber creates a new Subscriber which executes all provided subscribers in order. MultiSubscriber can be used to register multiple subscribers with the same Subscription.

Subscribe(MultiSubscriber(s1, s2), opts...)

Any error returned by a subscriber will failfast and return. Following subscriber are not executed

func MultiSubscriberFunc

func MultiSubscriberFunc(subscriberFuncs ...func(ctx context.Context, rec *events.DynamoDBEventRecord) error) Subscriber

MultiSubscriberFunc is a short handle for MultiSubscriber(SubscriberFunc(f1), SubscriberFunc(f2), ...)

type SubscriberFunc

type SubscriberFunc func(ctx context.Context, rec *events.DynamoDBEventRecord) error

SubscriberFunc type is an adapter to allow the use of ordinary functions as Subscriber. If f is a function with the appropriate signature, SubscriberFunc(f) is a Subscriber that calls f.

func (SubscriberFunc) Handle

Handle calls f(ctx, rec).

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription stores information registered with Subscribe

func GetSubscription

func GetSubscription(ctx context.Context) *Subscription

GetSubscription returns the subscription which matched the record It is available in Subscriber.Handle and in middlewares

func (*Subscription) Name

func (s *Subscription) Name() string

Name returns the name for the subscription, if a name was provided.

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic provides functions to publish and subscribe to DynamoDB Stream Records

func New

func New() *Topic

New returns a new topic instance.

func (*Topic) Publish

func (t *Topic) Publish(ctx context.Context, rec *events.DynamoDBEventRecord) error

Publish publishes a record to all registered subscriber. All Subscribers are called synchronously. Publish will fails fast and return with any error returned by a subscriber

func (*Topic) Subscribe

func (t *Topic) Subscribe(subscriber Subscriber, opts ...SubOption)

Subscribe registers a listener for records. Subscribers receive events in order of registration.

Subscription can be limited to specific types of record by providing filtering SubOptions.

func (*Topic) SubscribeFunc

func (t *Topic) SubscribeFunc(subFunc func(ctx context.Context, evt *events.DynamoDBEventRecord) error, opts ...SubOption)

Subscribe implements Topic.SubscriberFunc

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL