Documentation
¶
Overview ¶
Package binding is for implementing transport bindings and intermediaries like importers, brokers or channels that forward messages between bindings.
A transport binding implements Message, Sender and Receiver interfaces. An intermediary uses those interfaces to transfer event messages.
A Message is an abstract container for an Event. It provides additional methods for efficient forwarding of structured events and reliable delivery when the underlying transports support it.
A Receiver can return instances of its own Message implementations. A Sender must be able to send any implementation of the Message interface, but it may provide optimized handling for its own Message implementations.
For transports that support a reliable delivery QoS, the Message interface allows acknowledgment between sender and receiver for QoS level 0, 1 or 2. The effective QoS is the lowest provided by sender or receiver.
Example (Implementing) ¶
Example of implementing a transport including a simple message type, and a transport sender and receiver.
package main
import (
"context"
"encoding/json"
"io"
"github.com/cloudevents/sdk-go/pkg/binding"
"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
)
// ExMessage is a json.RawMessage, which is just a byte slice
// containing a JSON encoded event.
type ExMessage struct{ json.RawMessage }
func (m ExMessage) Structured() (string, []byte) {
return cloudevents.ApplicationCloudEventsJSON, []byte(m.RawMessage)
}
func (m ExMessage) Event() (e cloudevents.Event, err error) {
err = json.Unmarshal(m.RawMessage, &e)
return e, err
}
func (m ExMessage) Finish(error) {}
// ExSender sends by writing JSON encoded events to an io.Writer
type ExSender struct{ *json.Encoder }
func NewExSender(w io.Writer) ExSender { return ExSender{json.NewEncoder(w)} }
func (s ExSender) Send(ctx context.Context, m binding.Message) error {
if t, b := m.Structured(); t != "" {
// Fast case: if the Message is already structured JSON we can
// send it directly, no need to decode and re-encode. Encoding a
// json.RawMessage to a json.Encoder() is just a byte-buffer copy.
return s.Encode(json.RawMessage(b))
}
// Some other message encoding. Decode as a generic cloudevents.Event
// and then re-encode as JSON
if e, err := m.Event(); err != nil {
return err
} else if err := s.Encode(e); err != nil {
return err
}
return nil
}
// ExReceiver receives by reading JSON encoded events from an io.Reader
type ExReceiver struct{ *json.Decoder }
func NewExReceiver(r io.Reader) ExReceiver { return ExReceiver{json.NewDecoder(r)} }
func (sr ExReceiver) Receive(context.Context) (binding.Message, error) {
var m ExMessage
err := sr.Decode(&m) // This is just a byte copy since m is a json.RawMessage
return m, err
}
// NewExTransport returns a transport.Transport which is implemented by
// an ExSender and an ExReceiver
func NewExTransport(r io.Reader, w io.Writer) transport.Transport {
return binding.NewTransport(NewExSender(w), NewExReceiver(r))
}
// Example of implementing a transport including a simple message type,
// and a transport sender and receiver.
func main() {}
Example (Using) ¶
This example shows how to use a transport in sender, receiver, and intermediary processes.
The sender and receiver use the client.Client API to send and receive messages. the transport. Only the intermediary example actually uses the transport APIs for efficiency and reliability in forwarding events.
package main
import (
"context"
"fmt"
"io"
"strconv"
"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
)
const count = 3 // Example ends after this many events.
// The sender uses the cloudevents.Client API, not the transport APIs directly.
func runSender(w io.Writer) error {
c, err := client.New(NewExTransport(nil, w))
if err != nil {
return err
}
for i := 0; i < count; i++ {
e := cloudevents.New()
e.SetType("example.com/event")
e.SetSource("example.com/source")
e.SetID(strconv.Itoa(i))
if err := e.SetData(fmt.Sprintf("hello %d", i)); err != nil {
return err
}
if _, _, err := c.Send(context.TODO(), e); err != nil {
return err
}
}
return nil
}
// The receiver uses the cloudevents.Client API, not the transport APIs directly.
func runReceiver(r io.Reader) error {
i := 0
process := func(e cloudevents.Event) error {
fmt.Printf("%s\n", e)
i++
if i == count {
return io.EOF
}
return nil
}
c, err := client.New(NewExTransport(r, nil))
if err != nil {
return err
}
return c.StartReceiver(context.TODO(), process)
}
// The intermediary receives events and forwards them to another
// process using ExReceiver and ExSender directly.
//
// By forwarding a transport.Message instead of a cloudevents.Event,
// it allows the transports to avoid un-necessary decoding of
// structured events, and to exchange delivery status between reliable
// transports. Even transports using different protocols can ensure
// reliable delivery.
func runIntermediary(r io.Reader, w io.WriteCloser) error {
defer w.Close()
for {
receiver := NewExReceiver(r)
sender := NewExSender(w)
for i := 0; i < count; i++ {
if m, err := receiver.Receive(context.TODO()); err != nil {
return err
} else if err := sender.Send(context.TODO(), m); err != nil {
return err
}
}
}
}
// This example shows how to use a transport in sender, receiver,
// and intermediary processes.
//
// The sender and receiver use the client.Client API to send and
// receive messages. the transport. Only the intermediary example
// actually uses the transport APIs for efficiency and reliability in
// forwarding events.
func main() {
r1, w1 := io.Pipe() // The sender-to-intermediary pipe
r2, w2 := io.Pipe() // The intermediary-to-receiver pipe
done := make(chan error)
go func() { done <- runReceiver(r2) }()
go func() { done <- runIntermediary(r1, w2) }()
go func() { done <- runSender(w1) }()
for i := 0; i < 2; i++ {
if err := <-done; err != nil && err != io.EOF {
fmt.Println(err)
}
}
}
Output: Validation: valid Context Attributes, specversion: 0.2 type: example.com/event source: example.com/source id: 0 contenttype: application/json Data, "hello 0" Validation: valid Context Attributes, specversion: 0.2 type: example.com/event source: example.com/source id: 1 contenttype: application/json Data, "hello 1" Validation: valid Context Attributes, specversion: 0.2 type: example.com/event source: example.com/source id: 2 contenttype: application/json Data, "hello 2"
Index ¶
- type EventMessage
- type ExactlyOnceMessage
- type Message
- type Receiver
- type Sender
- type Transport
- func (t *Transport) HasConverter() bool
- func (t *Transport) Send(ctx context.Context, e cloudevents.Event) (context.Context, *cloudevents.Event, error)
- func (t *Transport) SetConverter(transport.Converter)
- func (t *Transport) SetReceiver(r transport.Receiver)
- func (t *Transport) StartReceiver(ctx context.Context) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EventMessage ¶
type EventMessage cloudevents.Event
EventMessage wraps a local cloudevents.Event as a Message.
func (EventMessage) Event ¶
func (m EventMessage) Event() (cloudevents.Event, error)
Event returns the event.
func (EventMessage) Structured ¶
func (EventMessage) Structured() (string, []byte)
Structured returns ("", nil).
type ExactlyOnceMessage ¶
type ExactlyOnceMessage interface {
Message
// Received is called by a Sender when it gets acknowledgment of receipt
// (e.g. AMQP ACCEPT or MQTT PUBREC)
//
// The sender passes a finish() function that the original receiver
// must call when it get's the ack-of-the-ack (e.g. AMQP SETTLE, MQTT
// PUBCOMP)
//
// If sending fails, the sender must call Finish(err) with a non-nil
// error instead of Received. ExactlyOnceMessage implementations
// must also be prepared to handle Finish(nil) if the sender does
// not support QoS 3.
Received(finish func(error))
}
ExactlyOnceMessage is implemented by incoming transport messages that support QoS 2. Only transports that support QoS 2 need to implement or use this interface.
type Message ¶
type Message interface {
// Event decodes and returns the contained Event.
Event() (cloudevents.Event, error)
// Structured optionally returns an encoded structured event if it
// is efficient to do so, or ("", nil) if not.
//
// Enables Senders to optimize the case where structured events are
// passed from transport to transport without being decoded and
// re-encoded.
//
// Transport Message and Sender implementations are not required to
// implement the optimization, they can ignore it.
Structured() (encodingMediaType string, encodedEvent []byte)
// Finish *must* be called when message from a Receiver can be
// forgotten by the receiver.
//
// A QoS 1 sender forwarding messages should not call Finish()
// until it gets an acknowledgment of receipt.
//
// A non-nil error indicates an error in sending or processing.
Finish(error)
}
Message is the interface to a transport-specific message containing an event.
There are 3 reliable qualities of service for messages:
0/at-most-once/unreliable: messages can be dropped silently.
1/at-least-once: messages are not dropped without signaling an error at the sender but may be duplicated.
2/exactly-once: messages are never dropped (without error) or duplicated, as long as both ends maintain some transport-specific state.
The Message interface supports QoS 0 and 1, the ExactlyOnceMessage inteface supports QoS 2
type Transport ¶
Transport implements the transport.Transport interface using a Sender and Receiver.
func NewTransport ¶
func (*Transport) HasConverter ¶
func (*Transport) Send ¶
func (t *Transport) Send(ctx context.Context, e cloudevents.Event) (context.Context, *cloudevents.Event, error)