Documentation
¶
Overview ¶
Package appsync is a libraty to connect to AWS's Appsync Event API. See https://docs.aws.amazon.com/appsync/.
Index ¶
- Variables
- type Config
- type Conn
- type MessageError
- type ReceiveMessage
- type ReceiveMessageEventID
- type ReceiveMsgType
- type SendMessage
- type SendMessageAuthorization
- type SendMsgType
- type SubscriptionMessage
- type WebSocketClient
- func (w *WebSocketClient) Close() error
- func (w *WebSocketClient) Publish(ctx context.Context, channel string, events []string) ([]int, error)
- func (w *WebSocketClient) Subscribe(ctx context.Context, channel string, channelC chan *SubscriptionMessage) error
- func (w *WebSocketClient) Unsubscribe(ctx context.Context, channel string) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrChannelNotSubscribed = errors.New("channel is not subscribed")
var ErrMarshalMsg = errors.New("failed to marshal message")
var ErrRecieveMsg = errors.New("failed to receive message")
var ErrServerMsg = errors.New("server returned error")
var ErrTimeout = errors.New("server timed out")
var ErrUnknownMessageID = errors.New("unknown message id")
var ErrUnsupportedMsgFormat = errors.New("unsupported message format")
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
Authorization *SendMessageAuthorization
Headers map[string]string
HTTPEndpoint string
HTTPProtocol string
RealTimeEndpoint string
WebSocketProtocol string
}
func NewAPIKeyConfig ¶
NewAPIKeyConfig creates a config for api key authentication. See https://docs.aws.amazon.com/appsync/latest/devguide/security-authz.html.
func NewLambdaConfig ¶
NewLambdaConfig creates a config for lambda authentication. See https://docs.aws.amazon.com/appsync/latest/devguide/security-authz.html.
type Conn ¶
type Conn interface {
Close(code websocket.StatusCode, reason string) error
Reader(ctx context.Context) (websocket.MessageType, io.Reader, error)
Writer(ctx context.Context, typ websocket.MessageType) (io.WriteCloser, error)
}
type MessageError ¶
MessageError are errors received from the server.
type ReceiveMessage ¶
type ReceiveMessage struct {
ConnectionTimeoutMs int `json:"connectionTimeoutMs,omitempty"`
Errors []MessageError `json:"errors,omitempty"`
Event string `json:"event,omitempty"`
Failed []ReceiveMessageEventID `json:"failed,omitempty"`
Successful []ReceiveMessageEventID `json:"successful,omitempty"`
ID string `json:"id,omitempty"`
Type ReceiveMsgType `json:"type"`
}
ReceiveMessage are messages that are received from the server.
type ReceiveMessageEventID ¶
type ReceiveMessageEventID struct {
Identifier string `json:"identifier"`
Index int `json:"index"`
}
ReceiveMessageEventID are the event identifiers that are used for reporting success or failed of individual events.
type ReceiveMsgType ¶
type ReceiveMsgType string
ReceiveMsgType is the message types that can be received from the server.
const ( ConnectionAckType ReceiveMsgType = "connection_ack" ConnectionErrType ReceiveMsgType = "connection_error" ErrorType ReceiveMsgType = "error" KeepAliveType ReceiveMsgType = "ka" PublishErrType ReceiveMsgType = "publish_error" PublishSuccessType ReceiveMsgType = "publish_success" SubscribeErrType ReceiveMsgType = "subscribe_error" SubscribeSuccessType ReceiveMsgType = "subscribe_success" SubscriptionBroadcastErrorType ReceiveMsgType = "broadcast_error" SubscriptionDataType ReceiveMsgType = "data" UnsubscribeErrType ReceiveMsgType = "unsubscribe_error" UnsubscribeSuccessType ReceiveMsgType = "unsubscribe_success" )
type SendMessage ¶
type SendMessage struct {
Authorization *SendMessageAuthorization `json:"authorization,omitempty"`
Channel string `json:"channel,omitempty"`
Events []string `json:"events,omitempty"`
ID string `json:"id,omitempty"`
Type SendMsgType `json:"type"`
}
SendMessage are messages that are sent to the server.
func (*SendMessage) Equal ¶
func (m *SendMessage) Equal(otherM *SendMessage) bool
type SendMessageAuthorization ¶
type SendMessageAuthorization struct {
Authorization string `json:"authorization,omitempty"`
Host string `json:"host,omitempty"`
XAmzDate string `json:"x-amz-date,omitempty"`
XAmzSecurityToken string `json:"x-amz-security-token,omitempty"`
XAPIKey string `json:"x-api-key,omitempty"`
}
SendMessageAuthorization contain the client authentication details. See https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-websocket-protocol.html#authorization-formatting-by-mode.
func (*SendMessageAuthorization) Equal ¶
func (m *SendMessageAuthorization) Equal(otherM *SendMessageAuthorization) bool
type SendMsgType ¶
type SendMsgType string
SendMsgType is the message types that can be sent to the server.
const ( ConnectionInitType SendMsgType = "connection_init" PublishType SendMsgType = "publish" SubscribeType SendMsgType = "subscribe" UnsubscribeType SendMsgType = "unsubscribe" )
type SubscriptionMessage ¶
type SubscriptionMessage struct {
Errors []MessageError `json:"errors,omitempty"`
Event string `json:"event,omitempty"`
Type ReceiveMsgType `json:"type"`
}
SubscriptionMessage are the subscription event messages received from the server.
func (*SubscriptionMessage) Equal ¶
func (m *SubscriptionMessage) Equal(otherM *SubscriptionMessage) bool
type WebSocketClient ¶
type WebSocketClient struct {
Err error
// contains filtered or unexported fields
}
WebSocketClient is the client for managing a Appsync event websocket connection. See https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-websocket-protocol.html.
func DialWebSocketConfig ¶
func DialWebSocketConfig(ctx context.Context, config *Config) (*WebSocketClient, error)
DialWebSocketConfig creates a Appsync websocket client. For more information on the Appsync websocket API, see https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-websocket-protocol.html.
func (*WebSocketClient) Close ¶
func (w *WebSocketClient) Close() error
Close closes the connection to the server and all open subscription channels.
func (*WebSocketClient) Publish ¶
func (w *WebSocketClient) Publish(ctx context.Context, channel string, events []string) ([]int, error)
Publish publishes an event to Appsync.
Example ¶
// Local websocket server to mimic Appsync
go runPublishAPI("8001")
// Appsync endpoints
httpEndpoint := "localhost:8001" // <ID>.appsync-api.<region>.amazonaws.com
realtimeEndpoint := "localhost:8001" // <ID>.appsync-realtime-api.<region>.amazonaws.com
// Authentication method
apiKey := "ab-cdefghijklmnopqrstuvwxyz"
config := appsync.NewAPIKeyConfig(httpEndpoint, realtimeEndpoint, apiKey)
// Use non encypted connection for local calls only
config.HTTPProtocol = "http"
config.WebSocketProtocol = "ws"
// Client
ctx := context.Background()
client, err := appsync.DialWebSocketConfig(ctx, config)
if err != nil {
panic(err)
}
// JSON event(s) to publish
eventABytes, err := json.Marshal(struct {
A string `json:"a"`
}{
A: "123",
})
if err != nil {
panic(err)
}
eventBBytes, err := json.Marshal(struct {
B string `json:"b"`
}{
B: "123",
})
if err != nil {
panic(err)
}
events := []string{string(eventABytes), string(eventBBytes)}
// Publish
channel := "/default/example"
successIndicies, err := client.Publish(ctx, channel, events)
if err != nil {
panic(err)
}
// Results
fmt.Println(successIndicies)
// Close
err = client.Close()
if err != nil {
panic(err)
}
Output: [0 1]
func (*WebSocketClient) Subscribe ¶
func (w *WebSocketClient) Subscribe(ctx context.Context, channel string, channelC chan *SubscriptionMessage) error
Subscribe subscribes to an event channel. The chan returned will return events for the channel subscribed to. The chan is closed when the connection to the server is closed. The order of events are not preserved.
Example ¶
// Local websocket server to mimic Appsync
go runSubscribeAPI("8001")
// Appsync endpoints
httpEndpoint := "localhost:8001" // <ID>.appsync-api.<region>.amazonaws.com
realtimeEndpoint := "localhost:8001" // <ID>.appsync-realtime-api.<region>.amazonaws.com
// Authentication method
apiKey := "ab-cdefghijklmnopqrstuvwxyz"
config := appsync.NewAPIKeyConfig(httpEndpoint, realtimeEndpoint, apiKey)
// Use non encypted connection for local calls only
config.HTTPProtocol = "http"
config.WebSocketProtocol = "ws"
// Client
ctx := context.Background()
client, err := appsync.DialWebSocketConfig(ctx, config)
if err != nil {
panic(err)
}
// Subscribe
channel := "/default/example"
msgC := make(chan *appsync.SubscriptionMessage)
err = client.Subscribe(ctx, channel, msgC)
if err != nil {
panic(err)
}
// Result
msg, ok := <-msgC
if !ok {
panic(client.Err)
}
fmt.Println(msg.Event)
// Close
err = client.Close()
if err != nil {
panic(err)
}
Output: eventa
func (*WebSocketClient) Unsubscribe ¶
func (w *WebSocketClient) Unsubscribe(ctx context.Context, channel string) error
Unsubscribe unsubscribes to an event channel. The chan used to receive events is not closed after unsubscribing.
Example ¶
// Local websocket server to mimic Appsync
go runUnsubscribeAPI("8001")
// Appsync endpoints
httpEndpoint := "localhost:8001" // <ID>.appsync-api.<region>.amazonaws.com
realtimeEndpoint := "localhost:8001" // <ID>.appsync-realtime-api.<region>.amazonaws.com
// Authentication method
apiKey := "ab-cdefghijklmnopqrstuvwxyz"
config := appsync.NewAPIKeyConfig(httpEndpoint, realtimeEndpoint, apiKey)
// Use non encypted connection for local calls only
config.HTTPProtocol = "http"
config.WebSocketProtocol = "ws"
// Client
ctx := context.Background()
client, err := appsync.DialWebSocketConfig(ctx, config)
if err != nil {
panic(err)
}
// Subscribe
channel := "/default/example"
msgC := make(chan *appsync.SubscriptionMessage)
err = client.Subscribe(ctx, channel, msgC)
if err != nil {
panic(err)
}
// Unsubscribe
err = client.Unsubscribe(ctx, channel)
if err != nil {
panic(err)
}
fmt.Println("channel is unsubscribed")
// Close
err = client.Close()
if err != nil {
panic(err)
}
Output: channel is unsubscribed