binding

package
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2019 License: Apache-2.0 Imports: 3 Imported by: 9

Documentation

Overview

Package binding is for implementing transport bindings and intermediaries like importers, brokers or channels that forward messages between bindings.

A transport binding implements Message, Sender and Receiver interfaces. An intermediary uses those interfaces to transfer event messages.

A Message is an abstract container for an Event. It provides additional methods for efficient forwarding of structured events and reliable delivery when the underlying transports support it.

A Receiver can return instances of its own Message implementations. A Sender must be able to send any implementation of the Message interface, but it may provide optimized handling for its own Message implementations.

For transports that support a reliable delivery QoS, the Message interface allows acknowledgment between sender and receiver for QoS level 0, 1 or 2. The effective QoS is the lowest provided by sender or receiver.

Example (Implementing)

Example of implementing a transport including a simple message type, and a transport sender and receiver.

package main

import (
	"context"
	"encoding/json"
	"io"

	"github.com/cloudevents/sdk-go/pkg/binding"
	"github.com/cloudevents/sdk-go/pkg/cloudevents"
	"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
)

// ExMessage is a json.RawMessage, which is just a byte slice
// containing a JSON encoded event.
type ExMessage struct{ json.RawMessage }

func (m ExMessage) Structured() (string, []byte) {
	return cloudevents.ApplicationCloudEventsJSON, []byte(m.RawMessage)
}

func (m ExMessage) Event() (e cloudevents.Event, err error) {
	err = json.Unmarshal(m.RawMessage, &e)
	return e, err
}

func (m ExMessage) Finish(error) {}

// ExSender sends by writing JSON encoded events to an io.Writer
type ExSender struct{ *json.Encoder }

func NewExSender(w io.Writer) ExSender { return ExSender{json.NewEncoder(w)} }

func (s ExSender) Send(ctx context.Context, m binding.Message) error {
	if t, b := m.Structured(); t != "" {
		// Fast case: if the Message is already structured JSON we can
		// send it directly, no need to decode and re-encode. Encoding a
		// json.RawMessage to a json.Encoder() is just a byte-buffer copy.
		return s.Encode(json.RawMessage(b))
	}
	// Some other message encoding. Decode as a generic cloudevents.Event
	// and then re-encode as JSON
	if e, err := m.Event(); err != nil {
		return err
	} else if err := s.Encode(e); err != nil {
		return err
	}
	return nil
}

// ExReceiver receives by reading JSON encoded events from an io.Reader
type ExReceiver struct{ *json.Decoder }

func NewExReceiver(r io.Reader) ExReceiver { return ExReceiver{json.NewDecoder(r)} }

func (sr ExReceiver) Receive(context.Context) (binding.Message, error) {
	var m ExMessage
	err := sr.Decode(&m) // This is just a byte copy since m is a json.RawMessage
	return m, err
}

// NewExTransport returns a transport.Transport which is implemented by
// an ExSender and an ExReceiver
func NewExTransport(r io.Reader, w io.Writer) transport.Transport {
	return binding.NewTransport(NewExSender(w), NewExReceiver(r))
}

// Example of implementing a transport including a simple message type,
// and a transport sender and receiver.
func main() {}
Example (Using)

This example shows how to use a transport in sender, receiver, and intermediary processes.

The sender and receiver use the client.Client API to send and receive messages. the transport. Only the intermediary example actually uses the transport APIs for efficiency and reliability in forwarding events.

package main

import (
	"context"
	"fmt"
	"io"
	"strconv"

	"github.com/cloudevents/sdk-go/pkg/cloudevents"
	"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
)

const count = 3 // Example ends after this many events.

// The sender uses the cloudevents.Client API, not the transport APIs directly.
func runSender(w io.Writer) error {
	c, err := client.New(NewExTransport(nil, w))
	if err != nil {
		return err
	}
	for i := 0; i < count; i++ {
		e := cloudevents.New()
		e.SetType("example.com/event")
		e.SetSource("example.com/source")
		e.SetID(strconv.Itoa(i))
		if err := e.SetData(fmt.Sprintf("hello %d", i)); err != nil {
			return err
		}
		if _, _, err := c.Send(context.TODO(), e); err != nil {
			return err
		}
	}
	return nil
}

// The receiver uses the cloudevents.Client API, not the transport APIs directly.
func runReceiver(r io.Reader) error {
	i := 0
	process := func(e cloudevents.Event) error {
		fmt.Printf("%s\n", e)
		i++
		if i == count {
			return io.EOF
		}
		return nil
	}
	c, err := client.New(NewExTransport(r, nil))
	if err != nil {
		return err
	}
	return c.StartReceiver(context.TODO(), process)
}

// The intermediary receives events and forwards them to another
// process using ExReceiver and ExSender directly.
//
// By forwarding a transport.Message instead of a cloudevents.Event,
// it allows the transports to avoid un-necessary decoding of
// structured events, and to exchange delivery status between reliable
// transports. Even transports using different protocols can ensure
// reliable delivery.
func runIntermediary(r io.Reader, w io.WriteCloser) error {
	defer w.Close()
	for {
		receiver := NewExReceiver(r)
		sender := NewExSender(w)
		for i := 0; i < count; i++ {
			if m, err := receiver.Receive(context.TODO()); err != nil {
				return err
			} else if err := sender.Send(context.TODO(), m); err != nil {
				return err
			}
		}
	}
}

// This example shows how to use a transport in sender, receiver,
// and intermediary processes.
//
// The sender and receiver use the client.Client API to send and
// receive messages.  the transport.  Only the intermediary example
// actually uses the transport APIs for efficiency and reliability in
// forwarding events.
func main() {
	r1, w1 := io.Pipe() // The sender-to-intermediary pipe
	r2, w2 := io.Pipe() // The intermediary-to-receiver pipe

	done := make(chan error)
	go func() { done <- runReceiver(r2) }()
	go func() { done <- runIntermediary(r1, w2) }()
	go func() { done <- runSender(w1) }()
	for i := 0; i < 2; i++ {
		if err := <-done; err != nil && err != io.EOF {
			fmt.Println(err)
		}
	}

}
Output:

Validation: valid
Context Attributes,
  specversion: 0.2
  type: example.com/event
  source: example.com/source
  id: 0
  contenttype: application/json
Data,
  "hello 0"

Validation: valid
Context Attributes,
  specversion: 0.2
  type: example.com/event
  source: example.com/source
  id: 1
  contenttype: application/json
Data,
  "hello 1"

Validation: valid
Context Attributes,
  specversion: 0.2
  type: example.com/event
  source: example.com/source
  id: 2
  contenttype: application/json
Data,
  "hello 2"

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EventMessage

type EventMessage cloudevents.Event

EventMessage wraps a local cloudevents.Event as a Message.

func (EventMessage) Event

func (m EventMessage) Event() (cloudevents.Event, error)

Event returns the event.

func (EventMessage) Finish

func (EventMessage) Finish(error)

Finish does nothing.

func (EventMessage) Structured

func (EventMessage) Structured() (string, []byte)

Structured returns ("", nil).

type ExactlyOnceMessage

type ExactlyOnceMessage interface {
	Message

	// Received is called by a Sender when it gets acknowledgment of receipt
	// (e.g. AMQP ACCEPT or MQTT PUBREC)
	//
	// The sender passes a finish() function that the original receiver
	// must call when it get's the ack-of-the-ack (e.g. AMQP SETTLE, MQTT
	// PUBCOMP)
	//
	// If sending fails, the sender must call Finish(err) with a non-nil
	// error instead of Received. ExactlyOnceMessage implementations
	// must also be prepared to handle Finish(nil) if the sender does
	// not support QoS 3.
	Received(finish func(error))
}

ExactlyOnceMessage is implemented by incoming transport messages that support QoS 2. Only transports that support QoS 2 need to implement or use this interface.

type Message

type Message interface {
	// Event decodes and returns the contained Event.
	Event() (cloudevents.Event, error)

	// Structured optionally returns an encoded structured event if it
	// is efficient to do so, or ("", nil) if not.
	//
	// Enables Senders to optimize the case where structured events are
	// passed from transport to transport without being decoded and
	// re-encoded.
	//
	// Transport Message and Sender implementations are not required to
	// implement the optimization, they can ignore it.
	Structured() (encodingMediaType string, encodedEvent []byte)

	// Finish *must* be called when message from a Receiver can be
	// forgotten by the receiver.
	//
	// A QoS 1 sender forwarding messages should not call Finish()
	// until it gets an acknowledgment of receipt.
	//
	// A non-nil error indicates an error in sending or processing.
	Finish(error)
}

Message is the interface to a transport-specific message containing an event.

There are 3 reliable qualities of service for messages:

0/at-most-once/unreliable: messages can be dropped silently.

1/at-least-once: messages are not dropped without signaling an error at the sender but may be duplicated.

2/exactly-once: messages are never dropped (without error) or duplicated, as long as both ends maintain some transport-specific state.

The Message interface supports QoS 0 and 1, the ExactlyOnceMessage inteface supports QoS 2

type Receiver

type Receiver interface {
	Receive(ctx context.Context) (Message, error)
}

Receiver receives messages.

type Sender

type Sender interface {
	Send(ctx context.Context, m Message) error
}

Sender sends messages.

type Transport

type Transport struct {
	Sender   Sender
	Receiver Receiver
	// contains filtered or unexported fields
}

Transport implements the transport.Transport interface using a Sender and Receiver.

func NewTransport

func NewTransport(s Sender, r Receiver) *Transport

func (*Transport) HasConverter

func (t *Transport) HasConverter() bool

func (*Transport) Send

func (*Transport) SetConverter

func (t *Transport) SetConverter(transport.Converter)

func (*Transport) SetReceiver

func (t *Transport) SetReceiver(r transport.Receiver)

func (*Transport) StartReceiver

func (t *Transport) StartReceiver(ctx context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL