Documentation
¶
Overview ¶
Package binding defines interfaces for protocol bindings.
NOTE: Most applications that emit or consume events should use the ../client package, which provides a simpler API to the underlying binding.
The interfaces in this package provide extra encoding and protocol information to allow efficient forwarding and end-to-end reliable delivery between a Receiver and a Sender belonging to different bindings. This is useful for intermediary applications that route or forward events, but not necessary for most "endpoint" applications that emit or consume events.
Protocol Bindings ¶
A protocol binding implements at least Message, Sender and Receiver, and usually Encoder.
Receiver: receives protocol messages and wraps them to implement the Message interface.
Message: converts to protocol-neutral cloudevents.Event or structured event data. It also provides methods to manage acknowledgment for reliable delivery across bindings.
Sender: converts arbitrary Message implementations to a protocol-specific form and sends them. A protocol Sender should preserve the spec-version and structured/binary mode of sent messages as far as possible. This package provides generic Sender wrappers to pre-process messages into a specific spec-version or structured/binary mode when the user requires that.
Message and ExactlyOnceMessage provide methods to allow acknowledgments to propagate when a reliable messages is forwarded from a Receiver to a Sender. QoS 0 (unreliable), 1 (at-least-once) and 2 (exactly-once) are supported.
Intermediaries ¶
Intermediaries can forward Messages from a Receiver to a Sender without knowledge of the underlying protocols. The Message interface allows structured messages to be forwarded without decoding and re-encoding. It also allows any Message to be fully decoded and examined as needed.
Example (Implementing) ¶
Example of implementing a transport including a simple message type, and a transport sender and receiver.
package main
import (
"context"
"encoding/json"
"io"
"github.com/cloudevents/sdk-go/pkg/binding"
"github.com/cloudevents/sdk-go/pkg/cloudevents"
ce "github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
)
// ExMessage is a json.RawMessage, a byte slice containing a JSON encoded event.
// It implements binding.StructMessage
//
// Note: a good binding implementation should provide an easy way to convert
// between the Message implementation and the "native" message format.
// In this case it's as simple as:
//
// native = ExMessage(impl)
// impl = json.RawMessage(native)
//
// For example in a HTTP binding it should be easy to convert between
// the HTTP binding.Message implementation and net/http.Request and
// Response types. There are no interfaces for this conversion as it
// requires the use of unknown types.
type ExMessage json.RawMessage
func (m ExMessage) Structured() (string, []byte) {
return cloudevents.ApplicationCloudEventsJSON, []byte(m)
}
func (m ExMessage) Event() (e cloudevents.Event, err error) {
err = json.Unmarshal(json.RawMessage(m), &e)
return e, err
}
func (m ExMessage) Finish(error) error { return nil }
// ExSender sends by writing JSON encoded events to an io.Writer
type ExSender struct{ encoder *json.Encoder }
func NewExSender(w io.Writer) ExSender { return ExSender{json.NewEncoder(w)} }
func (s ExSender) Send(ctx context.Context, m binding.Message) error {
if f, b := m.Structured(); f == ce.ApplicationCloudEventsJSON {
// Fast case: Message is already structured JSON.
return s.encoder.Encode(json.RawMessage(b))
}
// Some other message encoding. Decode as generic Event and re-encode.
if e, err := m.Event(); err != nil {
return err
} else if err := s.encoder.Encode(&e); err != nil {
return err
}
return nil
}
func (s ExSender) Close(context.Context) error { return nil }
// ExReceiver receives by reading JSON encoded events from an io.Reader
type ExReceiver struct{ decoder *json.Decoder }
func NewExReceiver(r io.Reader) ExReceiver { return ExReceiver{json.NewDecoder(r)} }
func (r ExReceiver) Receive(context.Context) (binding.Message, error) {
var rm json.RawMessage
err := r.decoder.Decode(&rm) // This is just a byte copy.
return ExMessage(rm), err
}
func (r ExReceiver) Close(context.Context) error { return nil }
// NewExTransport returns a transport.Transport which is implemented by
// an ExSender and an ExReceiver
func NewExTransport(r io.Reader, w io.Writer) transport.Transport {
return binding.NewTransport(NewExSender(w), NewExReceiver(r))
}
// Example of implementing a transport including a simple message type,
// and a transport sender and receiver.
func main() {}
Example (Using) ¶
This example shows how to use a transport in sender, receiver, and intermediary processes.
The sender and receiver use the client.Client API to send and receive messages. the transport. Only the intermediary example actually uses the transport APIs for efficiency and reliability in forwarding events.
package main
import (
"context"
"fmt"
"io"
"strconv"
"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
)
const count = 3 // Example ends after this many events.
// The sender uses the cloudevents.Client API, not the transport APIs directly.
func runSender(w io.Writer) error {
c, err := client.New(NewExTransport(nil, w))
if err != nil {
return err
}
for i := 0; i < count; i++ {
e := cloudevents.New()
e.SetType("example.com/event")
e.SetSource("example.com/source")
e.SetID(strconv.Itoa(i))
if err := e.SetData(fmt.Sprintf("hello %d", i)); err != nil {
return err
}
if _, _, err := c.Send(context.TODO(), e); err != nil {
return err
}
}
return nil
}
// The receiver uses the cloudevents.Client API, not the transport APIs directly.
func runReceiver(r io.Reader) error {
i := 0
process := func(e cloudevents.Event) error {
fmt.Printf("%s\n", e)
i++
if i == count {
return io.EOF
}
return nil
}
c, err := client.New(NewExTransport(r, nil))
if err != nil {
return err
}
return c.StartReceiver(context.TODO(), process)
}
// The intermediary receives events and forwards them to another
// process using ExReceiver and ExSender directly.
//
// By forwarding a transport.Message instead of a cloudevents.Event,
// it allows the transports to avoid un-necessary decoding of
// structured events, and to exchange delivery status between reliable
// transports. Even transports using different protocols can ensure
// reliable delivery.
func runIntermediary(r io.Reader, w io.WriteCloser) error {
defer w.Close()
for {
receiver := NewExReceiver(r)
sender := NewExSender(w)
for i := 0; i < count; i++ {
if m, err := receiver.Receive(context.TODO()); err != nil {
return err
} else if err := sender.Send(context.TODO(), m); err != nil {
return err
}
}
}
}
// This example shows how to use a transport in sender, receiver,
// and intermediary processes.
//
// The sender and receiver use the client.Client API to send and
// receive messages. the transport. Only the intermediary example
// actually uses the transport APIs for efficiency and reliability in
// forwarding events.
func main() {
r1, w1 := io.Pipe() // The sender-to-intermediary pipe
r2, w2 := io.Pipe() // The intermediary-to-receiver pipe
done := make(chan error)
go func() { done <- runReceiver(r2) }()
go func() { done <- runIntermediary(r1, w2) }()
go func() { done <- runSender(w1) }()
for i := 0; i < 2; i++ {
if err := <-done; err != nil && err != io.EOF {
fmt.Println(err)
}
}
}
Output: Validation: valid Context Attributes, specversion: 0.2 type: example.com/event source: example.com/source id: 0 Data, "hello 0" Validation: valid Context Attributes, specversion: 0.2 type: example.com/event source: example.com/source id: 1 Data, "hello 1" Validation: valid Context Attributes, specversion: 0.2 type: example.com/event source: example.com/source id: 2 Data, "hello 2"
Index ¶
- func Structured(m Message, f format.Format) ([]byte, error)
- type BinaryEncoder
- type ChanReceiver
- type ChanSender
- type Closer
- type Encoder
- type EventMessage
- type ExactlyOnceMessage
- type Message
- type ReceiveCloser
- type Receiver
- type Requester
- type SendCloser
- type Sender
- type StructEncoder
- type StructMessage
- type Transport
- func (t *Transport) HasConverter() bool
- func (t *Transport) Send(ctx context.Context, e ce.Event) (context.Context, *ce.Event, error)
- func (t *Transport) SetConverter(transport.Converter)
- func (t *Transport) SetReceiver(r transport.Receiver)
- func (t *Transport) StartReceiver(ctx context.Context) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BinaryEncoder ¶ added in v0.10.0
type BinaryEncoder struct{}
type ChanReceiver ¶ added in v0.10.0
type ChanReceiver <-chan Message
ChanReceiver implements Receiver by receiving from a channel.
type ChanSender ¶ added in v0.10.0
type ChanSender chan<- Message
ChanSender implements Sender by sending on a channel.
type EventMessage ¶
EventMessage type-converts a cloudevents.Event object to implement Message. This allows local cloudevents.Event objects to be sent directly via Sender.Send()
s.Send(ctx, binding.EventMessage(e))
func (EventMessage) Finish ¶
func (EventMessage) Finish(error) error
func (EventMessage) Structured ¶
func (m EventMessage) Structured() (string, []byte)
type ExactlyOnceMessage ¶
type ExactlyOnceMessage interface {
Message
// Received is called by a forwarding QoS2 Sender when it gets
// acknowledgment of receipt (e.g. AMQP 'accept' or MQTT PUBREC)
//
// The receiver must call settle(nil) when it get's the ack-of-ack
// (e.g. AMQP 'settle' or MQTT PUBCOMP) or settle(err) if the
// transfer fails.
//
// Finally the Sender calls Finish() to indicate the message can be
// discarded.
//
// If sending fails, or if the sender does not support QoS 2, then
// Finish() may be called without any call to Received()
Received(settle func(error))
}
ExactlyOnceMessage is implemented by received Messages that support QoS 2. Only transports that support QoS 2 need to implement or use this interface.
type Message ¶
type Message interface {
// Event decodes and returns the contained Event.
Event() (ce.Event, error)
// Structured optionally returns a structured event encoding and its
// format type, if the message contains such an encoding. Returns
// ("", nil) if not.
//
// This allows Senders to avoid re-encoding messages that are
// already in suitable structured form.
//
Structured() (formatMediaType string, encodedEvent []byte)
// Finish *must* be called when message from a Receiver can be forgotten by
// the receiver. Sender.Send() calls Finish() when the message is sent. A QoS
// 1 sender should not call Finish() until it gets an acknowledgment of
// receipt on the underlying transport. For QoS 2 see ExactlyOnceMessage.
//
// Passing a non-nil err indicates sending or processing failed.
// A non-nil return indicates that the message was not accepted
// by the receivers peer.
Finish(error) error
}
Message is the interface to a binding-specific message containing an event.
Reliable Delivery ¶
There are 3 reliable qualities of service for messages:
0/at-most-once/unreliable: messages can be dropped silently.
1/at-least-once: messages are not dropped without signaling an error to the sender, but they may be duplicated in the event of a re-send.
2/exactly-once: messages are never dropped (without error) or duplicated, as long as both sending and receiving ends maintain some binding-specific delivery state. Whether this is persisted depends on the configuration of the binding implementations.
The Message interface supports QoS 0 and 1, the ExactlyOnceMessage interface supports QoS 2
func WithFinish ¶ added in v0.10.0
WithFinish returns a wrapper for m that calls finish() and m.Finish() in its Finish(). Allows code to be notified when a message is Finished.
type ReceiveCloser ¶ added in v0.10.0
ReceiveCloser is a Receiver that can be closed.
type Receiver ¶
type Receiver interface {
// Receive blocks till a message is received or ctx expires.
//
// A non-nil error means the receiver is closed.
// io.EOF means it closed cleanly, any other value indicates an error.
Receive(ctx context.Context) (Message, error)
}
Receiver receives messages.
type Requester ¶ added in v0.10.0
type Requester interface {
// Request sends m like Sender.Send() but also arranges to receive a response.
// The returned Receiver is used to receive the response.
Request(ctx context.Context, m Message) (Receiver, error)
}
Requester sends a message and receives a response
Optional interface that may be implemented by protocols that support request/response correlation.
type SendCloser ¶ added in v0.10.0
SendCloser is a Sender that can be closed.
type Sender ¶
type Sender interface {
// Send a message.
//
// Send returns when the "outbound" message has been sent. The Sender may
// still be expecting acknowledgment or holding other state for the message.
//
// m.Finish() is called when sending is finished: expected acknowledgments (or
// errors) have been received, the Sender is no longer holding any state for
// the message. m.Finish() may be called during or after Send().
//
// To support optimized forwading of structured-mode messages, Send()
// should use the encoding returned by m.Structured() if there is one.
// Otherwise m.Event() can be encoded as per the binding's rules.
Send(ctx context.Context, m Message) error
}
Sender sends messages.
func BinarySender ¶ added in v0.10.0
BinarySender returns a Sender that transforms messages to binary mode, then calls s.Send().
func StructSender ¶ added in v0.10.0
BinarySender returns a Sender that transforms messages to structured mode with format f, then calls s.Send().
func VersionSender ¶ added in v0.10.0
VersionSender returns a Sender that transforms messages to spec-version v, then calls s.Send().
By default VersionSender creates binary-mode messages. If combined with StructSender it will create structured messages of version v.
type StructEncoder ¶ added in v0.10.0
StructEncoder encodes events as StructMessage using a Format.
type StructMessage ¶ added in v0.10.0
StructMessage implements a structured-mode message as a simple struct.
func (StructMessage) Event ¶ added in v0.10.0
func (m StructMessage) Event() (e ce.Event, err error)
Event can only decode built-in formats supported by format.Unmarshal.
func (StructMessage) Finish ¶ added in v0.10.0
func (StructMessage) Finish(error) error
func (StructMessage) Structured ¶ added in v0.10.0
func (m StructMessage) Structured() (string, []byte)
type Transport ¶
Transport implements transport.Transport using a Sender and Receiver.
func NewTransport ¶
func (*Transport) HasConverter ¶
func (*Transport) SetConverter ¶
func (*Transport) SetReceiver ¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package format formats structured events.
|
Package format formats structured events. |
|
Package spec provides spec-version metadata.
|
Package spec provides spec-version metadata. |
|
Package test contains test data and generic tests for testing bindings.
|
Package test contains test data and generic tests for testing bindings. |