Documentation
¶
Overview ¶
Package eventsource implements a client and server to allow streaming data one-way over a HTTP connection using the Server-Sent Events API http://dev.w3.org/html5/eventsource/
The client and server respect the Last-Event-ID header. If the Repository interface is implemented on the server, events can be replayed in case of a network disconnection.
Index ¶
- Constants
- Variables
- type Decoder
- type DecoderOption
- type Encoder
- type Event
- type Logger
- type Repository
- type Server
- type SliceRepository
- type Stream
- func Subscribe(url, lastEventID string) (*Stream, error)
- func SubscribeWith(lastEventID string, client *http.Client, request *http.Request) (*Stream, error)
- func SubscribeWithRequest(lastEventID string, request *http.Request) (*Stream, error)
- func SubscribeWithRequestAndOptions(request *http.Request, options ...StreamOption) (*Stream, error)
- func SubscribeWithURL(url string, options ...StreamOption) (*Stream, error)
- type StreamErrorHandler
- type StreamErrorHandlerResult
- type StreamOption
- func StreamOptionCanRetryFirstConnection(initialRetryTimeout time.Duration) StreamOption
- func StreamOptionErrorHandler(handler StreamErrorHandler) StreamOption
- func StreamOptionHTTPClient(client *http.Client) StreamOption
- func StreamOptionInitialRetry(retry time.Duration) StreamOption
- func StreamOptionLastEventID(lastEventID string) StreamOption
- func StreamOptionLogger(logger Logger) StreamOption
- func StreamOptionReadTimeout(timeout time.Duration) StreamOption
- func StreamOptionRetryResetInterval(retryResetInterval time.Duration) StreamOption
- func StreamOptionUseBackoff(maxDelay time.Duration) StreamOption
- func StreamOptionUseJitter(jitterRatio float64) StreamOption
- type SubscriptionError
Examples ¶
Constants ¶
const ( // DefaultInitialRetry is the default value for StreamOptionalInitialRetry. DefaultInitialRetry = time.Second * 3 // DefaultRetryResetInterval is the default value for StreamOptionRetryResetInterval. DefaultRetryResetInterval = time.Second * 60 )
Variables ¶
var ( // ErrReadTimeout is the error that will be emitted if a stream was closed due to not // receiving any data within the configured read timeout interval. ErrReadTimeout = errors.New("Read timeout on stream") )
Functions ¶
This section is empty.
Types ¶
type Decoder ¶
type Decoder struct {
// contains filtered or unexported fields
}
A Decoder is capable of reading Events from a stream.
func NewDecoder ¶
NewDecoder returns a new Decoder instance that reads events with the given io.Reader.
func NewDecoderWithOptions ¶
func NewDecoderWithOptions(r io.Reader, options ...DecoderOption) *Decoder
NewDecoderWithOptions returns a new Decoder instance that reads events with the given io.Reader, with optional configuration parameters.
func (*Decoder) Decode ¶
Decode reads the next Event from a stream (and will block until one comes in). Graceful disconnects (between events) are indicated by an io.EOF error. Any error occurring mid-event is considered non-graceful and will show up as some other error (most likely io.ErrUnexpectedEOF).
type DecoderOption ¶
type DecoderOption interface {
// contains filtered or unexported methods
}
DecoderOption is a common interface for optional configuration parameters that can be used in creating a Decoder.
func DecoderOptionReadTimeout ¶
func DecoderOptionReadTimeout(timeout time.Duration) DecoderOption
DecoderOptionReadTimeout returns an option that sets the read timeout interval for a Decoder when the Decoder is created. If the Decoder does not receive new data within this length of time, it will return an error. By default, there is no read timeout.
type Encoder ¶
type Encoder struct {
// contains filtered or unexported fields
}
An Encoder is capable of writing Events to a stream. Optionally Events can be gzip compressed in this process.
func NewEncoder ¶
NewEncoder returns an Encoder for a given io.Writer. When compressed is set to true, a gzip writer will be created.
type Event ¶
type Event interface {
// Id is an identifier that can be used to allow a client to replay
// missed Events by returning the Last-Event-Id header.
// Return empty string if not required.
Id() string
// The name of the event. Return empty string if not required.
Event() string
// The payload of the event.
Data() string
}
Event is the interface for any event received by the client or sent by the server.
Example ¶
package main
import (
"fmt"
"net"
"net/http"
"time"
"github.com/launchdarkly/eventsource"
)
type TimeEvent time.Time
func (t TimeEvent) Id() string { return fmt.Sprint(time.Time(t).UnixNano()) }
func (t TimeEvent) Event() string { return "Tick" }
func (t TimeEvent) Data() string { return time.Time(t).String() }
const (
TICK_COUNT = 5
)
func TimePublisher(srv *eventsource.Server) {
start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC)
ticker := time.NewTicker(time.Second)
for i := 0; i < TICK_COUNT; i++ {
<-ticker.C
srv.Publish([]string{"time"}, TimeEvent(start))
start = start.Add(time.Second)
}
}
func main() {
srv := eventsource.NewServer()
srv.Gzip = true
defer srv.Close()
l, err := net.Listen("tcp", ":8080")
if err != nil {
return
}
defer l.Close()
http.HandleFunc("/time", srv.Handler("time"))
go http.Serve(l, nil)
go TimePublisher(srv)
stream, err := eventsource.Subscribe("http://127.0.0.1:8080/time", "")
if err != nil {
return
}
for i := 0; i < TICK_COUNT; i++ {
ev := <-stream.Events
fmt.Println(ev.Id(), ev.Event(), ev.Data())
}
}
Output: 1356998400000000000 Tick 2013-01-01 00:00:00 +0000 UTC 1356998401000000000 Tick 2013-01-01 00:00:01 +0000 UTC 1356998402000000000 Tick 2013-01-01 00:00:02 +0000 UTC 1356998403000000000 Tick 2013-01-01 00:00:03 +0000 UTC 1356998404000000000 Tick 2013-01-01 00:00:04 +0000 UTC
type Logger ¶
type Logger interface {
Println(...interface{})
Printf(string, ...interface{})
}
Logger is the interface for a custom logging implementation that can handle log output for a Stream.
type Repository ¶
type Repository interface {
// Gets the Events which should follow on from the specified channel and event id. This method may be called
// from different goroutines, so it must be safe for concurrent access.
Replay(channel, id string) chan Event
}
Repository is an interface to be used with Server.Register() allowing clients to replay previous events through the server, if history is required.
Example ¶
package main
import (
"encoding/json"
"fmt"
"github.com/launchdarkly/eventsource"
"net"
"net/http"
)
type NewsArticle struct {
id string
Title, Content string
}
func (a *NewsArticle) Id() string { return a.id }
func (a *NewsArticle) Event() string { return "News Article" }
func (a *NewsArticle) Data() string { b, _ := json.Marshal(a); return string(b) }
var articles = []NewsArticle{
{"2", "Governments struggle to control global price of gas", "Hot air...."},
{"1", "Tomorrow is another day", "And so is the day after."},
{"3", "News for news' sake", "Nothing has happened."},
}
func buildRepo(srv *eventsource.Server) {
repo := eventsource.NewSliceRepository()
srv.Register("articles", repo)
for i := range articles {
repo.Add("articles", &articles[i])
srv.Publish([]string{"articles"}, &articles[i])
}
}
func main() {
srv := eventsource.NewServer()
defer srv.Close()
http.HandleFunc("/articles", srv.Handler("articles"))
l, err := net.Listen("tcp", ":8080")
if err != nil {
return
}
defer l.Close()
go http.Serve(l, nil)
stream, err := eventsource.Subscribe("http://127.0.0.1:8080/articles", "")
if err != nil {
return
}
go buildRepo(srv)
// This will receive events in the order that they come
for i := 0; i < 3; i++ {
ev := <-stream.Events
fmt.Println(ev.Id(), ev.Event(), ev.Data())
}
stream, err = eventsource.Subscribe("http://127.0.0.1:8080/articles", "1")
if err != nil {
fmt.Println(err)
return
}
// This will replay the events in order of id
for i := 0; i < 3; i++ {
ev := <-stream.Events
fmt.Println(ev.Id(), ev.Event(), ev.Data())
}
}
Output: 2 News Article {"Title":"Governments struggle to control global price of gas","Content":"Hot air...."} 1 News Article {"Title":"Tomorrow is another day","Content":"And so is the day after."} 3 News Article {"Title":"News for news' sake","Content":"Nothing has happened."} 1 News Article {"Title":"Tomorrow is another day","Content":"And so is the day after."} 2 News Article {"Title":"Governments struggle to control global price of gas","Content":"Hot air...."} 3 News Article {"Title":"News for news' sake","Content":"Nothing has happened."}
type Server ¶
type Server struct {
AllowCORS bool // Enable all handlers to be accessible from any origin
ReplayAll bool // Replay repository even if there's no Last-Event-Id specified
BufferSize int // How many messages do we let the client get behind before disconnecting
Gzip bool // Enable compression if client can accept it
MaxConnTime time.Duration // If non-zero, HTTP connections will be automatically closed after this time
Logger Logger // Logger is a logger that, when set, will be used for logging debug messages
// contains filtered or unexported fields
}
Server manages any number of event-publishing channels and allows subscribers to consume them. To use it within an HTTP server, create a handler for each channel with Handler().
func (*Server) Close ¶
func (srv *Server) Close()
Close permanently shuts down the Server. It will no longer allow new subscriptions.
func (*Server) Handler ¶
func (srv *Server) Handler(channel string) http.HandlerFunc
Handler creates a new HTTP handler for serving a specified channel.
func (*Server) PublishComment ¶
PublishComment publishes a comment to one or more channels.
func (*Server) Register ¶
func (srv *Server) Register(channel string, repo Repository)
Register registers the repository to be used for the specified channel.
type SliceRepository ¶
type SliceRepository struct {
// contains filtered or unexported fields
}
SliceRepository is an example repository that uses a slice as storage for past events.
func NewSliceRepository ¶
func NewSliceRepository() *SliceRepository
NewSliceRepository creates a SliceRepository.
func (*SliceRepository) Add ¶
func (repo *SliceRepository) Add(channel string, event Event)
Add adds an event to the repository history.
func (SliceRepository) Replay ¶
func (repo SliceRepository) Replay(channel, id string) (out chan Event)
Replay implements the event replay logic for the Repository interface.
type Stream ¶
type Stream struct {
// Events emits the events received by the stream
Events chan Event
// Errors emits any errors encountered while reading events from the stream.
//
// Errors during initialization of the stream are not pushed to this channel, since until the
// Subscribe method has returned the caller would not be able to consume the channel. If you have
// configured the Stream to be able to retry on initialization errors, but you still want to know
// about those errors or control how they are handled, use StreamOptionErrorHandler.
//
// If an error handler has been specified with StreamOptionErrorHandler, the Errors channel is
// not used and will be nil.
Errors chan error
// Logger is a logger that, when set, will be used for logging informational messages.
//
// This field is exported for backward compatibility, but should not be set directly because
// it may be used by multiple goroutines. Use SetLogger instead.
Logger Logger
// contains filtered or unexported fields
}
Stream handles a connection for receiving Server Sent Events. It will try and reconnect if the connection is lost, respecting both received retry delays and event id's.
func Subscribe ¶
Subscribe to the Events emitted from the specified url. If lastEventId is non-empty it will be sent to the server in case it can replay missed events. Deprecated: use SubscribeWithURL instead.
func SubscribeWith ¶
SubscribeWith takes a HTTP client and request providing customization over both headers and control over the HTTP client settings (timeouts, tls, etc) If request.Body is set, then request.GetBody should also be set so that we can reissue the request Deprecated: use SubscribeWithRequestAndOptions instead.
func SubscribeWithRequest ¶
SubscribeWithRequest will take an http.Request to set up the stream, allowing custom headers to be specified, authentication to be configured, etc. Deprecated: use SubscribeWithRequestAndOptions instead.
func SubscribeWithRequestAndOptions ¶
func SubscribeWithRequestAndOptions(request *http.Request, options ...StreamOption) (*Stream, error)
SubscribeWithRequestAndOptions takes an initial http.Request to set up the stream - allowing custom headers, authentication, etc. to be configured - and also takes any number of StreamOption values to set other properties of the stream, such as timeouts or a specific HTTP client to use.
func SubscribeWithURL ¶
func SubscribeWithURL(url string, options ...StreamOption) (*Stream, error)
SubscribeWithURL subscribes to the Events emitted from the specified URL. The stream can be configured by providing any number of StreamOption values.
func (*Stream) Close ¶
func (stream *Stream) Close()
Close closes the stream permanently. It is safe for concurrent access and can be called multiple times.
func (*Stream) Restart ¶
func (stream *Stream) Restart()
Restart forces the stream to drop the currently active connection and attempt to connect again, in the same way it would if the connection had failed. There will be a delay before reconnection, as defined by the Stream configuration (StreamOptionInitialRetry, StreamOptionUseBackoff, etc.).
This method is safe for concurrent access. Its behavior is asynchronous: Restart returns immediately and the connection is restarted as soon as possible from another goroutine after that. It is possible for additional events from the original connection to be delivered during that interval.ssible.
If the stream has already been closed with Close, Restart has no effect.
type StreamErrorHandler ¶
type StreamErrorHandler func(error) StreamErrorHandlerResult
StreamErrorHandler is a function type used with StreamOptionErrorHandler.
This function will be called whenever Stream encounters either a network error or an HTTP error response status. The returned value determines whether Stream should retry as usual, or immediately stop.
The error may be any I/O error returned by Go's networking types, or it may be the eventsource type SubscriptionError representing an HTTP error response status.
For errors during initialization of the Stream, this function will be called on the same goroutine that called the Subscribe method; for errors on an existing connection, it will be called on a worker goroutine. It should return promptly and not block the goroutine.
In this example, the error handler always logs the error with log.Printf, and it forces the stream to close permanently if there was an HTTP 401 error:
func handleError(err error) eventsource.StreamErrorHandlerResult {
log.Printf("stream error: %s", err)
if se, ok := err.(eventsource.SubscriptionError); ok && se.Code == 401 {
return eventsource.StreamErrorHandlerResult{CloseNow: true}
}
return eventsource.StreamErrorHandlerResult{}
}
type StreamErrorHandlerResult ¶
type StreamErrorHandlerResult struct {
// CloseNow can be set to true to tell the Stream to immediately stop and not retry, as if Close had
// been called.
//
// If CloseNow is false, the Stream will proceed as usual after an error: if there is an existing
// connection it will retry the connection, and if the Stream is still being initialized then the
// retry behavior is configurable (see StreamOptionCanRetryFirstConnection).
CloseNow bool
}
StreamErrorHandlerResult contains values returned by StreamErrorHandler.
type StreamOption ¶
type StreamOption interface {
// contains filtered or unexported methods
}
StreamOption is a common interface for optional configuration parameters that can be used in creating a stream.
func StreamOptionCanRetryFirstConnection ¶
func StreamOptionCanRetryFirstConnection(initialRetryTimeout time.Duration) StreamOption
StreamOptionCanRetryFirstConnection returns an option that determines whether to apply retry behavior to the first connection attempt for the stream.
If the timeout is nonzero, an initial connection failure when subscribing will not cause an error result, but will trigger the same retry logic as if an existing connection had failed. The stream constructor will not return until a connection has been made, or until the specified timeout expires, if the timeout is positive; if the timeout is negative, it will continue retrying indefinitely.
The default value is zero: an initial connection failure will not be retried.
func StreamOptionErrorHandler ¶
func StreamOptionErrorHandler(handler StreamErrorHandler) StreamOption
StreamOptionErrorHandler returns an option that causes a Stream to call the specified function for stream errors.
If non-nil, this function will be called whenever Stream encounters either a network error or an HTTP error response status. The returned value determines whether Stream should retry as usual, or immediately stop as if Close had been called.
When used, this mechanism replaces the Errors channel; that channel will be pre-closed and Stream will not push any errors to it, so the caller does not need to consume the channel.
Note that using a handler is the only way to have control over how Stream handles errors during the initial connection attempt, since there would be no way for the caller to consume the Errors channel before the Subscribe method has returned.
func StreamOptionHTTPClient ¶
func StreamOptionHTTPClient(client *http.Client) StreamOption
StreamOptionHTTPClient returns an option that overrides the default HTTP client used by a stream when the stream is created.
func StreamOptionInitialRetry ¶
func StreamOptionInitialRetry(retry time.Duration) StreamOption
StreamOptionInitialRetry returns an option that sets the initial retry delay for a stream when the stream is created.
This delay will be used the first time the stream has to be restarted; the interval will increase exponentially on subsequent reconnections. Each time, there will also be a pseudo-random jitter so that the actual value may be up to 50% less. So, for instance, if you set the initial delay to 1 second, the first reconnection will use a delay between 0.5s and 1s inclusive, and subsequent reconnections will be 1s-2s, 2s-4s, etc.
The default value is DefaultInitialRetry. In a future version, this value may change, so if you need a specific value it is best to set it explicitly.
func StreamOptionLastEventID ¶
func StreamOptionLastEventID(lastEventID string) StreamOption
StreamOptionLastEventID returns an option that sets the initial last event ID for a stream when the stream is created. If specified, this value will be sent to the server in case it can replay missed events.
func StreamOptionLogger ¶
func StreamOptionLogger(logger Logger) StreamOption
StreamOptionLogger returns an option that sets the logger for a stream when the stream is created (to change it later, you can use SetLogger). By default, there is no logger.
func StreamOptionReadTimeout ¶
func StreamOptionReadTimeout(timeout time.Duration) StreamOption
StreamOptionReadTimeout returns an option that sets the read timeout interval for a stream when the stream is created. If the stream does not receive new data within this length of time, it will restart the connection.
By default, there is no read timeout.
func StreamOptionRetryResetInterval ¶
func StreamOptionRetryResetInterval(retryResetInterval time.Duration) StreamOption
StreamOptionRetryResetInterval returns an option that sets the minimum amount of time that a connection must stay open before the Stream resets its backoff delay. This is only relevant if backoff is enabled (see StreamOptionUseBackoff).
If a connection fails before the threshold has elapsed, the delay before reconnecting will be greater than the last delay; if it fails after the threshold, the delay will start over at the the initial minimum value. This prevents long delays from occurring on connections that are only rarely restarted.
The default value is DefaultRetryResetInterval.
func StreamOptionUseBackoff ¶
func StreamOptionUseBackoff(maxDelay time.Duration) StreamOption
StreamOptionUseBackoff returns an option that determines whether to use an exponential backoff for reconnection delays.
If the maxDelay parameter is greater than zero, backoff is enabled. The retry delay interval will be doubled (not counting jitter - see StreamOptionUseJitter) for consecutive stream reconnections, but will never be greater than maxDelay.
For consistency with earlier versions, this is currently zero (disabled) by default. In a future version this may change, so if you do not want backoff behavior you should explicitly set it to zero. It is recommended to use both backoff and jitter, to avoid "thundering herd" behavior in the case of a server outage.
func StreamOptionUseJitter ¶
func StreamOptionUseJitter(jitterRatio float64) StreamOption
StreamOptionUseJitter returns an option that determines whether to use a randomized jitter for reconnection delays.
If jitterRatio is greater than zero, it represents a proportion up to 1.0 (100%) that will be deducted from the retry delay interval would otherwise be used: for instance, 0.5 means that the delay will be randomly decreased by up to 50%. A value greater than 1.0 is treated as equal to 1.0.
For consistency with earlier versions, this is currently disabled (zero) by default. In a future version this may change, so if you do not want jitter you should explicitly set it to zero. It is recommended to use both backoff and jitter, to avoid "thundering herd" behavior in the case of a server outage.
type SubscriptionError ¶
SubscriptionError is an error object returned from a stream when there is an HTTP error.
func (SubscriptionError) Error ¶
func (e SubscriptionError) Error() string