appsync

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package appsync is a libraty to connect to AWS's Appsync Event API. See https://docs.aws.amazon.com/appsync/.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrChannelNotSubscribed = errors.New("channel is not subscribed")
	ErrContextEnded         = errors.New("context ended")
	ErrIDDoesNotExists      = errors.New("id does not exist")
	ErrIDExists             = errors.New("uuid exsists")
	ErrMarshalMsg           = errors.New("failed to marshal message")
	ErrRecieveMsg           = errors.New("failed to receive message")
	ErrServerMsg            = errors.New("server returned error")
	ErrSubscriptionCalled   = errors.New("subscription on channel already called")
	ErrTimeout              = errors.New("server timed out")
	ErrTypeAssertion        = errors.New("faild type assertion")
	ErrUnsubscriptionCalled = errors.New("unsubscription on channel already called")
	ErrUnsupportedMsgFormat = errors.New("unsupported message format")
)

Possible errors returned from creation or usage of the WebSocketClient.

Functions

This section is empty.

Types

type Authorization added in v0.2.0

type Authorization struct {
	Authorization     string `json:"authorization,omitempty"`
	Host              string `json:"host,omitempty"`
	XAmzDate          string `json:"x-amz-date,omitempty"`
	XAmzSecurityToken string `json:"x-amz-security-token,omitempty"`
	XAPIKey           string `json:"x-api-key,omitempty"`
}

Authorization contains the client authentication details. See https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-websocket-protocol.html#authorization-formatting-by-mode for more information on authorization formatting.

type Config

type Config struct {
	Authorization     *Authorization
	Headers           map[string]string
	HTTPEndpoint      string
	HTTPProtocol      string
	RealTimeEndpoint  string
	WebSocketProtocol string
}

Config is the configuration file for creating the WebSocketClient.

func NewAPIKeyConfig

func NewAPIKeyConfig(httpEndpoint, realTimeEndpoint, apiKey string) *Config

NewAPIKeyConfig creates a config for api key authentication. See https://docs.aws.amazon.com/appsync/latest/devguide/security-authz.html for authentication types.

func NewLambdaConfig

func NewLambdaConfig(httpEndpoint, realTimeEndpoint, authorizationToken string) *Config

NewLambdaConfig creates a config for lambda authentication. See https://docs.aws.amazon.com/appsync/latest/devguide/security-authz.html for authentication types.

func (Config) Host added in v0.2.0

func (c Config) Host() (string, error)

Host returns the url of the host.

func (Config) Subprotocols added in v0.2.0

func (c Config) Subprotocols() ([]string, error)

Subprotocols Returns the websocket subprotocols.

func (Config) URL added in v0.2.0

func (c Config) URL() (string, error)

URL returns the url of the webscoket.

type Conn

type Conn interface {
	// Close closes the connection.
	Close() error
	// Read reads data from the connection.
	// Return EOF when it reaches the end of a message.
	Read(ctx context.Context, b []byte) (n int, err error)
	// Write writes data to the connection.
	Write(ctx context.Context, b []byte) (n int, err error)
}

Conn is the websocket connection to the server.

type MessageError

type MessageError struct {
	ErrorType string `json:"errorType"`
	Message   string `json:"message"`
}

MessageError are errors received from the Appsync Event server.

type ReceiveMessage

type ReceiveMessage struct {
	ConnectionTimeoutMs int                     `json:"connectionTimeoutMs,omitempty"`
	Errors              []MessageError          `json:"errors,omitempty"`
	Event               string                  `json:"event,omitempty"`
	Failed              []ReceiveMessageEventID `json:"failed,omitempty"`
	Successful          []ReceiveMessageEventID `json:"successful,omitempty"`
	ID                  string                  `json:"id,omitempty"`
	Type                ReceiveMsgType          `json:"type"`
}

ReceiveMessage are messages that are received from the Appsync Event server.

type ReceiveMessageEventID

type ReceiveMessageEventID struct {
	Identifier string `json:"identifier"`
	Index      int    `json:"index"`
}

ReceiveMessageEventID are the event identifiers that are used for reporting success or failed of individual events.

type ReceiveMsgType

type ReceiveMsgType string

ReceiveMsgType is the message types that can be received from the Appsync Event server.

const (
	ConnectionAckType              ReceiveMsgType = "connection_ack"
	ConnectionErrType              ReceiveMsgType = "connection_error"
	ErrorType                      ReceiveMsgType = "error"
	KeepAliveType                  ReceiveMsgType = "ka"
	PublishErrType                 ReceiveMsgType = "publish_error"
	PublishSuccessType             ReceiveMsgType = "publish_success"
	SubscribeErrType               ReceiveMsgType = "subscribe_error"
	SubscribeSuccessType           ReceiveMsgType = "subscribe_success"
	SubscriptionBroadcastErrorType ReceiveMsgType = "broadcast_error"
	SubscriptionDataType           ReceiveMsgType = "data"
	UnsubscribeErrType             ReceiveMsgType = "unsubscribe_error"
	UnsubscribeSuccessType         ReceiveMsgType = "unsubscribe_success"
)

type SendMessage

type SendMessage struct {
	Authorization *Authorization `json:"authorization,omitempty"`
	Channel       string         `json:"channel,omitempty"`
	Events        []string       `json:"events,omitempty"`
	ID            string         `json:"id,omitempty"`
	Type          SendMsgType    `json:"type"`
}

SendMessage are messages that are sent to the Appsync Event server.

type SendMsgType

type SendMsgType string

SendMsgType is the message types that are sent to the Appsync Event server.

const (
	ConnectionInitType SendMsgType = "connection_init"
	PublishType        SendMsgType = "publish"
	SubscribeType      SendMsgType = "subscribe"
	UnsubscribeType    SendMsgType = "unsubscribe"
)

type SubscriptionMessage

type SubscriptionMessage struct {
	Errors []MessageError `json:"errors,omitempty"`
	Event  string         `json:"event,omitempty"`
	Type   ReceiveMsgType `json:"type"`
}

SubscriptionMessage are the subscription event messages received from the Appsync Event server.

type WebSocketClient

type WebSocketClient struct {
	// Authorization is authorization details sent to the server.
	Authorization *Authorization
	// Conn is the websocket connection to the server.
	Conn Conn
	// Err is the first error found that prvent the client from continuing.
	// These errors range from connection errors to data processing errors.
	Err error
	// contains filtered or unexported fields
}

WebSocketClient is the client for managing a Appsync Event websocket connection.

func DialWebSocketConfig

func DialWebSocketConfig(ctx context.Context, config *Config) (*WebSocketClient, error)

DialWebSocketConfig creates a Appsync websocket client. For more information on the Appsync websocket API, see https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-websocket-protocol.html.

func NewWebSocketClient added in v0.2.0

func NewWebSocketClient(ctx context.Context, conn Conn, auth *Authorization) (*WebSocketClient, error)

NewWebSocketClient creates a websocket client.

func (*WebSocketClient) Close

func (w *WebSocketClient) Close() error

Close closes the connection to the server and all open subscription channels.

func (*WebSocketClient) Publish

func (w *WebSocketClient) Publish(ctx context.Context, channel string, events []string) (sucessIs []int, err error)

Publish publishes an event to Appsync. If you want to send JSON event, marshal the object into a string.

Example
// Local websocket server to mimic Appsync
go runPublishAPI("8001")

// Appsync endpoints
httpEndpoint := "localhost:8001"     // <ID>.appsync-api.<region>.amazonaws.com
realtimeEndpoint := "localhost:8001" // <ID>.appsync-realtime-api.<region>.amazonaws.com

// Authentication method
apiKey := "ab-cdefghijklmnopqrstuvwxyz"
config := appsync.NewAPIKeyConfig(httpEndpoint, realtimeEndpoint, apiKey)

// Use non encypted connection for local calls only
config.HTTPProtocol = "http"
config.WebSocketProtocol = "ws"

// Client
ctx := context.Background()
client, err := appsync.DialWebSocketConfig(ctx, config)
if err != nil {
	panic(err)
}

// JSON event(s) to publish
eventABytes, err := json.Marshal(struct {
	A string `json:"a"`
}{
	A: "123",
})
if err != nil {
	panic(err)
}
eventBBytes, err := json.Marshal(struct {
	B string `json:"b"`
}{
	B: "123",
})
if err != nil {
	panic(err)
}
events := []string{string(eventABytes), string(eventBBytes)}

// Publish
channel := "/default/example"
successIndicies, err := client.Publish(ctx, channel, events)
if err != nil {
	panic(err)
}

// Results
fmt.Println(successIndicies)

// Close
err = client.Close()
if err != nil {
	panic(err)
}
Output:

[0 1]

func (*WebSocketClient) Subscribe

func (w *WebSocketClient) Subscribe(ctx context.Context, channel string, channelC chan *SubscriptionMessage) error

Subscribe subscribes to an event channel. The chan returned, channelC, will return events for the channel subscription. channelC can be buffered or unbuffered. It is closed when the connection to the server is closed or channel is unsubscribed.

Example
// Local websocket server to mimic Appsync
go runSubscribeAPI("8001")

// Appsync endpoints
httpEndpoint := "localhost:8001"     // <ID>.appsync-api.<region>.amazonaws.com
realtimeEndpoint := "localhost:8001" // <ID>.appsync-realtime-api.<region>.amazonaws.com

// Authentication method
apiKey := "ab-cdefghijklmnopqrstuvwxyz"
config := appsync.NewAPIKeyConfig(httpEndpoint, realtimeEndpoint, apiKey)

// Use non encypted connection for local calls only
config.HTTPProtocol = "http"
config.WebSocketProtocol = "ws"

// Client
ctx := context.Background()
client, err := appsync.DialWebSocketConfig(ctx, config)
if err != nil {
	panic(err)
}

// Subscribe
channel := "/default/example"
msgC := make(chan *appsync.SubscriptionMessage)
err = client.Subscribe(ctx, channel, msgC)
if err != nil {
	panic(err)
}

// Result
msg, ok := <-msgC
if !ok {
	panic(client.Err)
}
fmt.Println(msg.Event)

// Close
err = client.Close()
if err != nil {
	panic(err)
}
Output:

eventa

func (*WebSocketClient) Unsubscribe

func (w *WebSocketClient) Unsubscribe(ctx context.Context, channel string) error

Unsubscribe unsubscribes to an event channel. The chan used to receive events is not closed after unsubscribing.

Example
// Local websocket server to mimic Appsync
go runUnsubscribeAPI("8001")

// Appsync endpoints
httpEndpoint := "localhost:8001"     // <ID>.appsync-api.<region>.amazonaws.com
realtimeEndpoint := "localhost:8001" // <ID>.appsync-realtime-api.<region>.amazonaws.com

// Authentication method
apiKey := "ab-cdefghijklmnopqrstuvwxyz"
config := appsync.NewAPIKeyConfig(httpEndpoint, realtimeEndpoint, apiKey)

// Use non encypted connection for local calls only
config.HTTPProtocol = "http"
config.WebSocketProtocol = "ws"

// Client
ctx := context.Background()
client, err := appsync.DialWebSocketConfig(ctx, config)
if err != nil {
	panic(err)
}

// Subscribe
channel := "/default/example"
msgC := make(chan *appsync.SubscriptionMessage)
err = client.Subscribe(ctx, channel, msgC)
if err != nil {
	panic(err)
}

// Unsubscribe
err = client.Unsubscribe(ctx, channel)
if err != nil {
	panic(err)
}

fmt.Println("channel is unsubscribed")

// Close
err = client.Close()
if err != nil {
	panic(err)
}
Output:

channel is unsubscribed

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL