notificationcenter

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2019 License: Apache-2.0 Imports: 5 Imported by: 4

README

Notificationcenter stream library

Build Status Go Report Card GoDoc Coverage Status

License Apache 2.0

The union eventstream wrapper over nifferent stream implementations.

Using examples

Create new stream processor
// Create new stream processor
eventStream, err = nats.NewStream([]string{"event"}, "nats://hostname:4222")
if err != nil {
  log.Fatal(err)
}

// Register stream processor
err = notificationcenter.Register("events", eventStream)
if err != nil {
  log.Fatal(err)
}
Send event to the notification stream
// Send by global functions
notificationcenter.Send("events", message{title: "event 1"})

// Send by logger interface
events := notificationcenter.StreamByName("events")
events.Send(message{title: "event 2"})
Subscribe for the specific notification stream
import (
  nc "github.com/geniusrabbit/notificationcenter"
  "github.com/geniusrabbit/notificationcenter/nats"
)

func main() {
  events := nats.MustNewSubscriber("nats://connection", "group", []string{"events"})
  nc.Register("events", events)

  // Add new handler to process the stream "events"
  nc.Subscribe("events", notificationcenter.FuncHandler(func(msg nc.Message) error {
    fmt.Printf("%v\n", msg.Data())
    return nil
  }))

  // Run seubscribers listeners
  nc.Listen()
}

TODO

  • remove metrics from the stream (DEPRECATED)
  • Add support NATS & NATS stream
  • Add kafka support

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidObject              = errors.New("[notificationcenter] invalid handler")
	ErrInterfaceAlreadySubscribed = errors.New("[notificationcenter] interface already subscribed")
	ErrInvalidParams              = errors.New("[notificationcenter] invalid params")
)

Errors set

Functions

func Close

func Close()

Close notification center

func Listen

func Listen() (err error)

Listen runs subscribers listen interface

func OnClose

func OnClose() <-chan bool

OnClose event will be executed only after closing all interfaces

Usecases in the application makes subsribing for the finishing event very convinient ```go

func myDatabaseObserver() {
  <- notificationcenter.OnClose()
  // ... Do something
}

```

func Register

func Register(name string, elem ...interface{}) error

Register the new streamer or subscriber interface to future using Object can implement both interface and process all events from start to end

func Send

func Send(name string, msg ...interface{}) error

Send message object to the stream by name

func Subscribe

func Subscribe(name string, h Handler) error

Subscribe new handler on some paticular subscriber interface by name

func Unregister

func Unregister(elem ...interface{}) error

Unregister exist streamer or subscriber interface from the global storage

func UnregisterAllByName

func UnregisterAllByName(name string, _streamers, _subscribers bool)

UnregisterAllByName exist streamer or subscriber interface from the global storage

func Unsubscribe

func Unsubscribe(name string, h Handler) error

Unsubscribe some paticular handler interface from subscriber with the *name*

Types

type FuncHandler

type FuncHandler func(msg Message) error

FuncHandler type

func (FuncHandler) Handle

func (f FuncHandler) Handle(msg Message) error

Handle this item

type Handler

type Handler interface {
	Handle(msg Message) error
}

Handler interface

type Message

type Message interface {
	// Unical message ID (depends on transport)
	ID() string

	// Body returns message data as bytes
	Body() []byte

	// Acknowledgment of the message processing
	Ack() error
}

Message describes the access methods to the message original object

type MultithreadHandler

type MultithreadHandler interface {
	Handler
	Concurrently() int
}

MultithreadHandler it's ext interface which contains count of concurrent processes

func NewMultithreadHandler

func NewMultithreadHandler(count int, handler Handler) MultithreadHandler

NewMultithreadHandler processor

type Streamer

type Streamer interface {
	// Send data to statistic
	Send(messages ...interface{}) error
}

Streamer pipeline base declaration

func StreamByName

func StreamByName(name string) Streamer

StreamByName returns streamer interface by codename if exists

type Subscriber

type Subscriber interface {
	// Subscribe new handler
	// @return error or nil
	Subscribe(h Handler) error

	// Unsubscribe this handler by ptr
	// @return error or nil
	Unsubscribe(h Handler) error

	// Start processing queue
	// @return error or nil
	Listen() error
}

Subscriber data type

func SubscriberByName

func SubscriberByName(name string) Subscriber

SubscriberByName interface

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL