binding

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2019 License: Apache-2.0 Imports: 7 Imported by: 9

Documentation

Overview

Package binding defines interfaces for protocol bindings.

NOTE: Most applications that emit or consume events should use the ../client package, which provides a simpler API to the underlying binding.

The interfaces in this package provide extra encoding and protocol information to allow efficient forwarding and end-to-end reliable delivery between a Receiver and a Sender belonging to different bindings. This is useful for intermediary applications that route or forward events, but not necessary for most "endpoint" applications that emit or consume events.

Protocol Bindings

A protocol binding implements at least Message, Sender and Receiver, and usually Encoder.

Receiver: receives protocol messages and wraps them to implement the Message interface.

Message: converts to protocol-neutral cloudevents.Event or structured event data. It also provides methods to manage acknowledgment for reliable delivery across bindings.

Sender: converts arbitrary Message implementations to a protocol-specific form and sends them. A protocol Sender should preserve the spec-version and structured/binary mode of sent messages as far as possible. This package provides generic Sender wrappers to pre-process messages into a specific spec-version or structured/binary mode when the user requires that.

Message and ExactlyOnceMessage provide methods to allow acknowledgments to propagate when a reliable messages is forwarded from a Receiver to a Sender. QoS 0 (unreliable), 1 (at-least-once) and 2 (exactly-once) are supported.

Intermediaries

Intermediaries can forward Messages from a Receiver to a Sender without knowledge of the underlying protocols. The Message interface allows structured messages to be forwarded without decoding and re-encoding. It also allows any Message to be fully decoded and examined as needed.

Example (Implementing)

Example of implementing a transport including a simple message type, and a transport sender and receiver.

package main

import (
	"context"
	"encoding/json"
	"io"

	"github.com/cloudevents/sdk-go/pkg/binding"
	"github.com/cloudevents/sdk-go/pkg/cloudevents"
	ce "github.com/cloudevents/sdk-go/pkg/cloudevents"
	"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
)

// ExMessage is a json.RawMessage, a byte slice containing a JSON encoded event.
// It implements binding.StructMessage
//
// Note: a good binding implementation should provide an easy way to convert
// between the Message implementation and the "native" message format.
// In this case it's as simple as:
//
//	native = ExMessage(impl)
//	impl = json.RawMessage(native)
//
// For example in a HTTP binding it should be easy to convert between
// the HTTP binding.Message implementation and net/http.Request and
// Response types.  There are no interfaces for this conversion as it
// requires the use of unknown types.
type ExMessage json.RawMessage

func (m ExMessage) Structured() (string, []byte) {
	return cloudevents.ApplicationCloudEventsJSON, []byte(m)
}

func (m ExMessage) Event() (e cloudevents.Event, err error) {
	err = json.Unmarshal(json.RawMessage(m), &e)
	return e, err
}

func (m ExMessage) Finish(error) error { return nil }

// ExSender sends by writing JSON encoded events to an io.Writer
type ExSender struct{ encoder *json.Encoder }

func NewExSender(w io.Writer) ExSender { return ExSender{json.NewEncoder(w)} }

func (s ExSender) Send(ctx context.Context, m binding.Message) error {
	if f, b := m.Structured(); f == ce.ApplicationCloudEventsJSON {
		// Fast case: Message is already structured JSON.
		return s.encoder.Encode(json.RawMessage(b))
	}
	// Some other message encoding. Decode as generic Event and re-encode.
	if e, err := m.Event(); err != nil {
		return err
	} else if err := s.encoder.Encode(&e); err != nil {
		return err
	}
	return nil
}
func (s ExSender) Close(context.Context) error { return nil }

// ExReceiver receives by reading JSON encoded events from an io.Reader
type ExReceiver struct{ decoder *json.Decoder }

func NewExReceiver(r io.Reader) ExReceiver { return ExReceiver{json.NewDecoder(r)} }

func (r ExReceiver) Receive(context.Context) (binding.Message, error) {
	var rm json.RawMessage
	err := r.decoder.Decode(&rm) // This is just a byte copy.
	return ExMessage(rm), err
}
func (r ExReceiver) Close(context.Context) error { return nil }

// NewExTransport returns a transport.Transport which is implemented by
// an ExSender and an ExReceiver
func NewExTransport(r io.Reader, w io.Writer) transport.Transport {
	return binding.NewTransport(NewExSender(w), NewExReceiver(r))
}

// Example of implementing a transport including a simple message type,
// and a transport sender and receiver.
func main() {}
Example (Using)

This example shows how to use a transport in sender, receiver, and intermediary processes.

The sender and receiver use the client.Client API to send and receive messages. the transport. Only the intermediary example actually uses the transport APIs for efficiency and reliability in forwarding events.

package main

import (
	"context"
	"fmt"
	"io"
	"strconv"

	"github.com/cloudevents/sdk-go/pkg/cloudevents"
	"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
)

const count = 3 // Example ends after this many events.

// The sender uses the cloudevents.Client API, not the transport APIs directly.
func runSender(w io.Writer) error {
	c, err := client.New(NewExTransport(nil, w))
	if err != nil {
		return err
	}
	for i := 0; i < count; i++ {
		e := cloudevents.New()
		e.SetType("example.com/event")
		e.SetSource("example.com/source")
		e.SetID(strconv.Itoa(i))
		if err := e.SetData(fmt.Sprintf("hello %d", i)); err != nil {
			return err
		}
		if _, _, err := c.Send(context.TODO(), e); err != nil {
			return err
		}
	}
	return nil
}

// The receiver uses the cloudevents.Client API, not the transport APIs directly.
func runReceiver(r io.Reader) error {
	i := 0
	process := func(e cloudevents.Event) error {
		fmt.Printf("%s\n", e)
		i++
		if i == count {
			return io.EOF
		}
		return nil
	}
	c, err := client.New(NewExTransport(r, nil))
	if err != nil {
		return err
	}
	return c.StartReceiver(context.TODO(), process)
}

// The intermediary receives events and forwards them to another
// process using ExReceiver and ExSender directly.
//
// By forwarding a transport.Message instead of a cloudevents.Event,
// it allows the transports to avoid un-necessary decoding of
// structured events, and to exchange delivery status between reliable
// transports. Even transports using different protocols can ensure
// reliable delivery.
func runIntermediary(r io.Reader, w io.WriteCloser) error {
	defer w.Close()
	for {
		receiver := NewExReceiver(r)
		sender := NewExSender(w)
		for i := 0; i < count; i++ {
			if m, err := receiver.Receive(context.TODO()); err != nil {
				return err
			} else if err := sender.Send(context.TODO(), m); err != nil {
				return err
			}
		}
	}
}

// This example shows how to use a transport in sender, receiver,
// and intermediary processes.
//
// The sender and receiver use the client.Client API to send and
// receive messages.  the transport.  Only the intermediary example
// actually uses the transport APIs for efficiency and reliability in
// forwarding events.
func main() {
	r1, w1 := io.Pipe() // The sender-to-intermediary pipe
	r2, w2 := io.Pipe() // The intermediary-to-receiver pipe

	done := make(chan error)
	go func() { done <- runReceiver(r2) }()
	go func() { done <- runIntermediary(r1, w2) }()
	go func() { done <- runSender(w1) }()
	for i := 0; i < 2; i++ {
		if err := <-done; err != nil && err != io.EOF {
			fmt.Println(err)
		}
	}

}
Output:

Validation: valid
Context Attributes,
  specversion: 0.2
  type: example.com/event
  source: example.com/source
  id: 0
Data,
  "hello 0"

Validation: valid
Context Attributes,
  specversion: 0.2
  type: example.com/event
  source: example.com/source
  id: 1
Data,
  "hello 1"

Validation: valid
Context Attributes,
  specversion: 0.2
  type: example.com/event
  source: example.com/source
  id: 2
Data,
  "hello 2"

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Structured added in v0.10.0

func Structured(m Message, f format.Format) ([]byte, error)

Structured returns the structured encoding of a message using a format. m.Structured() returns the correct format, return that. Otherwise use format the message's event with f.Format().

Types

type BinaryEncoder added in v0.10.0

type BinaryEncoder struct{}

func (BinaryEncoder) Encode added in v0.10.0

func (BinaryEncoder) Encode(e ce.Event) (Message, error)

type ChanReceiver added in v0.10.0

type ChanReceiver <-chan Message

ChanReceiver implements Receiver by receiving from a channel.

func (ChanReceiver) Close added in v0.10.0

func (r ChanReceiver) Close(ctx context.Context) error

func (ChanReceiver) Receive added in v0.10.0

func (r ChanReceiver) Receive(ctx context.Context) (Message, error)

type ChanSender added in v0.10.0

type ChanSender chan<- Message

ChanSender implements Sender by sending on a channel.

func (ChanSender) Close added in v0.10.0

func (s ChanSender) Close(ctx context.Context) (err error)

func (ChanSender) Send added in v0.10.0

func (s ChanSender) Send(ctx context.Context, m Message) (err error)

type Closer added in v0.10.0

type Closer interface {
	Close(ctx context.Context) error
}

Closer is the common interface for things that can be closed

type Encoder added in v0.10.0

type Encoder interface {
	Encode(ce.Event) (Message, error)
}

Encoder encodes events as messages.

type EventMessage

type EventMessage ce.Event

EventMessage type-converts a cloudevents.Event object to implement Message. This allows local cloudevents.Event objects to be sent directly via Sender.Send()

s.Send(ctx, binding.EventMessage(e))

func (EventMessage) Event

func (m EventMessage) Event() (ce.Event, error)

func (EventMessage) Finish

func (EventMessage) Finish(error) error

func (EventMessage) Structured

func (m EventMessage) Structured() (string, []byte)

type ExactlyOnceMessage

type ExactlyOnceMessage interface {
	Message

	// Received is called by a forwarding QoS2 Sender when it gets
	// acknowledgment of receipt (e.g. AMQP 'accept' or MQTT PUBREC)
	//
	// The receiver must call settle(nil) when it get's the ack-of-ack
	// (e.g. AMQP 'settle' or MQTT PUBCOMP) or settle(err) if the
	// transfer fails.
	//
	// Finally the Sender calls Finish() to indicate the message can be
	// discarded.
	//
	// If sending fails, or if the sender does not support QoS 2, then
	// Finish() may be called without any call to Received()
	Received(settle func(error))
}

ExactlyOnceMessage is implemented by received Messages that support QoS 2. Only transports that support QoS 2 need to implement or use this interface.

type Message

type Message interface {
	// Event decodes and returns the contained Event.
	Event() (ce.Event, error)

	// Structured optionally returns a structured event encoding and its
	// format type, if the message contains such an encoding. Returns
	// ("", nil) if not.
	//
	// This allows Senders to avoid re-encoding messages that are
	// already in suitable structured form.
	//
	Structured() (formatMediaType string, encodedEvent []byte)

	// Finish *must* be called when message from a Receiver can be forgotten by
	// the receiver. Sender.Send() calls Finish() when the message is sent.  A QoS
	// 1 sender should not call Finish() until it gets an acknowledgment of
	// receipt on the underlying transport.  For QoS 2 see ExactlyOnceMessage.
	//
	// Passing a non-nil err indicates sending or processing failed.
	// A non-nil return indicates that the message was not accepted
	// by the receivers peer.
	Finish(error) error
}

Message is the interface to a binding-specific message containing an event.

Reliable Delivery

There are 3 reliable qualities of service for messages:

0/at-most-once/unreliable: messages can be dropped silently.

1/at-least-once: messages are not dropped without signaling an error to the sender, but they may be duplicated in the event of a re-send.

2/exactly-once: messages are never dropped (without error) or duplicated, as long as both sending and receiving ends maintain some binding-specific delivery state. Whether this is persisted depends on the configuration of the binding implementations.

The Message interface supports QoS 0 and 1, the ExactlyOnceMessage interface supports QoS 2

func WithFinish added in v0.10.0

func WithFinish(m Message, finish func(error)) Message

WithFinish returns a wrapper for m that calls finish() and m.Finish() in its Finish(). Allows code to be notified when a message is Finished.

type ReceiveCloser added in v0.10.0

type ReceiveCloser interface {
	Receiver
	Closer
}

ReceiveCloser is a Receiver that can be closed.

type Receiver

type Receiver interface {
	// Receive blocks till a message is received or ctx expires.
	//
	// A non-nil error means the receiver is closed.
	// io.EOF means it closed cleanly, any other value indicates an error.
	Receive(ctx context.Context) (Message, error)
}

Receiver receives messages.

type Requester added in v0.10.0

type Requester interface {
	// Request sends m like Sender.Send() but also arranges to receive a response.
	// The returned Receiver is used to receive the response.
	Request(ctx context.Context, m Message) (Receiver, error)
}

Requester sends a message and receives a response

Optional interface that may be implemented by protocols that support request/response correlation.

type SendCloser added in v0.10.0

type SendCloser interface {
	Sender
	Closer
}

SendCloser is a Sender that can be closed.

type Sender

type Sender interface {
	// Send a message.
	//
	// Send returns when the "outbound" message has been sent. The Sender may
	// still be expecting acknowledgment or holding other state for the message.
	//
	// m.Finish() is called when sending is finished: expected acknowledgments (or
	// errors) have been received, the Sender is no longer holding any state for
	// the message. m.Finish() may be called during or after Send().
	//
	// To support optimized forwading of structured-mode messages, Send()
	// should use the encoding returned by m.Structured() if there is one.
	// Otherwise m.Event() can be encoded as per the binding's rules.
	Send(ctx context.Context, m Message) error
}

Sender sends messages.

func BinarySender added in v0.10.0

func BinarySender(s Sender) Sender

BinarySender returns a Sender that transforms messages to binary mode, then calls s.Send().

func StructSender added in v0.10.0

func StructSender(s Sender, f format.Format) Sender

BinarySender returns a Sender that transforms messages to structured mode with format f, then calls s.Send().

func VersionSender added in v0.10.0

func VersionSender(s Sender, v spec.Version) Sender

VersionSender returns a Sender that transforms messages to spec-version v, then calls s.Send().

By default VersionSender creates binary-mode messages. If combined with StructSender it will create structured messages of version v.

type StructEncoder added in v0.10.0

type StructEncoder struct{ Format format.Format }

StructEncoder encodes events as StructMessage using a Format.

func (StructEncoder) Encode added in v0.10.0

func (enc StructEncoder) Encode(e ce.Event) (Message, error)

type StructMessage added in v0.10.0

type StructMessage struct {
	Format string
	Bytes  []byte
}

StructMessage implements a structured-mode message as a simple struct.

func (StructMessage) Event added in v0.10.0

func (m StructMessage) Event() (e ce.Event, err error)

Event can only decode built-in formats supported by format.Unmarshal.

func (StructMessage) Finish added in v0.10.0

func (StructMessage) Finish(error) error

func (StructMessage) Structured added in v0.10.0

func (m StructMessage) Structured() (string, []byte)

type Transport

type Transport struct {
	Sender   Sender
	Receiver Receiver
	// contains filtered or unexported fields
}

Transport implements transport.Transport using a Sender and Receiver.

func NewTransport

func NewTransport(s Sender, r Receiver) *Transport

func (*Transport) HasConverter

func (t *Transport) HasConverter() bool

func (*Transport) Send

func (t *Transport) Send(ctx context.Context, e ce.Event) (context.Context, *ce.Event, error)

func (*Transport) SetConverter

func (t *Transport) SetConverter(transport.Converter)

func (*Transport) SetReceiver

func (t *Transport) SetReceiver(r transport.Receiver)

func (*Transport) StartReceiver

func (t *Transport) StartReceiver(ctx context.Context) error

Directories

Path Synopsis
Package format formats structured events.
Package format formats structured events.
Package spec provides spec-version metadata.
Package spec provides spec-version metadata.
Package test contains test data and generic tests for testing bindings.
Package test contains test data and generic tests for testing bindings.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL