appsync

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package appsync is a libraty to connect to AWS's Appsync Event API. See https://docs.aws.amazon.com/appsync/.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrChannelNotSubscribed = errors.New("channel is not subscribed")
View Source
var ErrMarshalMsg = errors.New("failed to marshal message")
View Source
var ErrRecieveMsg = errors.New("failed to receive message")
View Source
var ErrServerMsg = errors.New("server returned error")
View Source
var ErrTimeout = errors.New("server timed out")
View Source
var ErrUnknownMessageID = errors.New("unknown message id")
View Source
var ErrUnsupportedMsgFormat = errors.New("unsupported message format")

Functions

This section is empty.

Types

type Config

type Config struct {
	Authorization     *SendMessageAuthorization
	Headers           map[string]string
	HTTPEndpoint      string
	HTTPProtocol      string
	RealTimeEndpoint  string
	WebSocketProtocol string
}

func NewAPIKeyConfig

func NewAPIKeyConfig(httpEndpoint, realTimeEndpoint, apiKey string) *Config

NewAPIKeyConfig creates a config for api key authentication. See https://docs.aws.amazon.com/appsync/latest/devguide/security-authz.html.

func NewLambdaConfig

func NewLambdaConfig(httpEndpoint, realTimeEndpoint, authorizationToken string) *Config

NewLambdaConfig creates a config for lambda authentication. See https://docs.aws.amazon.com/appsync/latest/devguide/security-authz.html.

type Conn

type Conn interface {
	Close(code websocket.StatusCode, reason string) error
	Reader(ctx context.Context) (websocket.MessageType, io.Reader, error)
	Writer(ctx context.Context, typ websocket.MessageType) (io.WriteCloser, error)
}

type MessageError

type MessageError struct {
	ErrorType string `json:"errorType"`
	Message   string `json:"message"`
}

MessageError are errors received from the server.

type ReceiveMessage

type ReceiveMessage struct {
	ConnectionTimeoutMs int                     `json:"connectionTimeoutMs,omitempty"`
	Errors              []MessageError          `json:"errors,omitempty"`
	Event               string                  `json:"event,omitempty"`
	Failed              []ReceiveMessageEventID `json:"failed,omitempty"`
	Successful          []ReceiveMessageEventID `json:"successful,omitempty"`
	ID                  string                  `json:"id,omitempty"`
	Type                ReceiveMsgType          `json:"type"`
}

ReceiveMessage are messages that are received from the server.

type ReceiveMessageEventID

type ReceiveMessageEventID struct {
	Identifier string `json:"identifier"`
	Index      int    `json:"index"`
}

ReceiveMessageEventID are the event identifiers that are used for reporting success or failed of individual events.

type ReceiveMsgType

type ReceiveMsgType string

ReceiveMsgType is the message types that can be received from the server.

const (
	ConnectionAckType              ReceiveMsgType = "connection_ack"
	ConnectionErrType              ReceiveMsgType = "connection_error"
	ErrorType                      ReceiveMsgType = "error"
	KeepAliveType                  ReceiveMsgType = "ka"
	PublishErrType                 ReceiveMsgType = "publish_error"
	PublishSuccessType             ReceiveMsgType = "publish_success"
	SubscribeErrType               ReceiveMsgType = "subscribe_error"
	SubscribeSuccessType           ReceiveMsgType = "subscribe_success"
	SubscriptionBroadcastErrorType ReceiveMsgType = "broadcast_error"
	SubscriptionDataType           ReceiveMsgType = "data"
	UnsubscribeErrType             ReceiveMsgType = "unsubscribe_error"
	UnsubscribeSuccessType         ReceiveMsgType = "unsubscribe_success"
)

type SendMessage

type SendMessage struct {
	Authorization *SendMessageAuthorization `json:"authorization,omitempty"`
	Channel       string                    `json:"channel,omitempty"`
	Events        []string                  `json:"events,omitempty"`
	ID            string                    `json:"id,omitempty"`
	Type          SendMsgType               `json:"type"`
}

SendMessage are messages that are sent to the server.

func (*SendMessage) Equal

func (m *SendMessage) Equal(otherM *SendMessage) bool

type SendMessageAuthorization

type SendMessageAuthorization struct {
	Authorization     string `json:"authorization,omitempty"`
	Host              string `json:"host,omitempty"`
	XAmzDate          string `json:"x-amz-date,omitempty"`
	XAmzSecurityToken string `json:"x-amz-security-token,omitempty"`
	XAPIKey           string `json:"x-api-key,omitempty"`
}

SendMessageAuthorization contain the client authentication details. See https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-websocket-protocol.html#authorization-formatting-by-mode.

func (*SendMessageAuthorization) Equal

type SendMsgType

type SendMsgType string

SendMsgType is the message types that can be sent to the server.

const (
	ConnectionInitType SendMsgType = "connection_init"
	PublishType        SendMsgType = "publish"
	SubscribeType      SendMsgType = "subscribe"
	UnsubscribeType    SendMsgType = "unsubscribe"
)

type SubscriptionMessage

type SubscriptionMessage struct {
	Errors []MessageError `json:"errors,omitempty"`
	Event  string         `json:"event,omitempty"`
	Type   ReceiveMsgType `json:"type"`
}

SubscriptionMessage are the subscription event messages received from the server.

func (*SubscriptionMessage) Equal

func (m *SubscriptionMessage) Equal(otherM *SubscriptionMessage) bool

type WebSocketClient

type WebSocketClient struct {
	Err error
	// contains filtered or unexported fields
}

WebSocketClient is the client for managing a Appsync event websocket connection. See https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-websocket-protocol.html.

func DialWebSocketConfig

func DialWebSocketConfig(ctx context.Context, config *Config) (*WebSocketClient, error)

DialWebSocketConfig creates a Appsync websocket client. For more information on the Appsync websocket API, see https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-websocket-protocol.html.

func (*WebSocketClient) Close

func (w *WebSocketClient) Close() error

Close closes the connection to the server and all open subscription channels.

func (*WebSocketClient) Publish

func (w *WebSocketClient) Publish(ctx context.Context, channel string, events []string) ([]int, error)

Publish publishes an event to Appsync.

Example
// Local websocket server to mimic Appsync
go runPublishAPI("8001")

// Appsync endpoints
httpEndpoint := "localhost:8001"     // <ID>.appsync-api.<region>.amazonaws.com
realtimeEndpoint := "localhost:8001" // <ID>.appsync-realtime-api.<region>.amazonaws.com

// Authentication method
apiKey := "ab-cdefghijklmnopqrstuvwxyz"
config := appsync.NewAPIKeyConfig(httpEndpoint, realtimeEndpoint, apiKey)

// Use non encypted connection for local calls only
config.HTTPProtocol = "http"
config.WebSocketProtocol = "ws"

// Client
ctx := context.Background()
client, err := appsync.DialWebSocketConfig(ctx, config)
if err != nil {
	panic(err)
}

// JSON event(s) to publish
eventABytes, err := json.Marshal(struct {
	A string `json:"a"`
}{
	A: "123",
})
if err != nil {
	panic(err)
}
eventBBytes, err := json.Marshal(struct {
	B string `json:"b"`
}{
	B: "123",
})
if err != nil {
	panic(err)
}
events := []string{string(eventABytes), string(eventBBytes)}

// Publish
channel := "/default/example"
successIndicies, err := client.Publish(ctx, channel, events)
if err != nil {
	panic(err)
}

// Results
fmt.Println(successIndicies)

// Close
err = client.Close()
if err != nil {
	panic(err)
}
Output:

[0 1]

func (*WebSocketClient) Subscribe

func (w *WebSocketClient) Subscribe(ctx context.Context, channel string, channelC chan *SubscriptionMessage) error

Subscribe subscribes to an event channel. The chan returned will return events for the channel subscribed to. The chan is closed when the connection to the server is closed. The order of events are not preserved.

Example
// Local websocket server to mimic Appsync
go runSubscribeAPI("8001")

// Appsync endpoints
httpEndpoint := "localhost:8001"     // <ID>.appsync-api.<region>.amazonaws.com
realtimeEndpoint := "localhost:8001" // <ID>.appsync-realtime-api.<region>.amazonaws.com

// Authentication method
apiKey := "ab-cdefghijklmnopqrstuvwxyz"
config := appsync.NewAPIKeyConfig(httpEndpoint, realtimeEndpoint, apiKey)

// Use non encypted connection for local calls only
config.HTTPProtocol = "http"
config.WebSocketProtocol = "ws"

// Client
ctx := context.Background()
client, err := appsync.DialWebSocketConfig(ctx, config)
if err != nil {
	panic(err)
}

// Subscribe
channel := "/default/example"
msgC := make(chan *appsync.SubscriptionMessage)
err = client.Subscribe(ctx, channel, msgC)
if err != nil {
	panic(err)
}

// Result
msg, ok := <-msgC
if !ok {
	panic(client.Err)
}
fmt.Println(msg.Event)

// Close
err = client.Close()
if err != nil {
	panic(err)
}
Output:

eventa

func (*WebSocketClient) Unsubscribe

func (w *WebSocketClient) Unsubscribe(ctx context.Context, channel string) error

Unsubscribe unsubscribes to an event channel. The chan used to receive events is not closed after unsubscribing.

Example
// Local websocket server to mimic Appsync
go runUnsubscribeAPI("8001")

// Appsync endpoints
httpEndpoint := "localhost:8001"     // <ID>.appsync-api.<region>.amazonaws.com
realtimeEndpoint := "localhost:8001" // <ID>.appsync-realtime-api.<region>.amazonaws.com

// Authentication method
apiKey := "ab-cdefghijklmnopqrstuvwxyz"
config := appsync.NewAPIKeyConfig(httpEndpoint, realtimeEndpoint, apiKey)

// Use non encypted connection for local calls only
config.HTTPProtocol = "http"
config.WebSocketProtocol = "ws"

// Client
ctx := context.Background()
client, err := appsync.DialWebSocketConfig(ctx, config)
if err != nil {
	panic(err)
}

// Subscribe
channel := "/default/example"
msgC := make(chan *appsync.SubscriptionMessage)
err = client.Subscribe(ctx, channel, msgC)
if err != nil {
	panic(err)
}

// Unsubscribe
err = client.Unsubscribe(ctx, channel)
if err != nil {
	panic(err)
}

fmt.Println("channel is unsubscribed")

// Close
err = client.Close()
if err != nil {
	panic(err)
}
Output:

channel is unsubscribed

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL