Documentation
¶
Index ¶
- Constants
- Variables
- func Compare(o1, o2 Offset) (int, error)
- func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser
- func NewLengthDelimitedFrameWriter(w io.Writer) io.Writer
- func WithUser(ctx context.Context, user User) context.Context
- type Attributes
- type AttributesCodec
- type AttributesDecoder
- type AttributesEncoder
- type DataFrame
- type DataFrameCodec
- type DataFrameDecoder
- type DataFrameEncoder
- type DataFrameReader
- type Index
- type Interface
- type Offset
- type Range
- type StreamStatus
- type Tags
- func (t Tags) Contains(t1 Tags) bool
- func (t Tags) Diff(t1 Tags) (add Tags, del Tags, update Tags)
- func (t Tags) Empty() bool
- func (t Tags) Equals(t1 Tags) bool
- func (t Tags) Get(key string) string
- func (t Tags) Has(key string) bool
- func (t Tags) Set(key string, value string)
- func (t Tags) ToJSON() string
- func (t Tags) Validate() error
- type User
- type UserAware
- type UserWithToken
- type Watcher
- type WorkerStatus
Constants ¶
View Source
const ( ContentTypeProtobuf = "application/vnd.google.protobuf" ContentTypeFlatbuffer = "application/x-flatbuffers" ContentTypeJSON = "application/json" )
View Source
const ( Backend = "meta.backend" MaxPayloadBytes = "meta.maxPayloadBytes" UserIdentifyHeader = "meta.header.userIdentifyHeader" GroupIdentifyHeader = "meta.header.groupIdentifyHeader" StreamLength = "stream.length" StreamApproximateLength = "stream.approxMaxLength" )
const attributes keys the queue service implement must provide.
Variables ¶
View Source
var MaxIndex = FromUint64(uint64(math.MaxUint64))
Functions ¶
func NewLengthDelimitedFrameReader ¶
func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser
NewLengthDelimitedFrameReader returns an io.Reader that will decode length-prefixed frames off of a stream.
The protocol is:
stream: message ... message: prefix body prefix: 4 byte uint32 in BigEndian order, denotes length of body body: bytes (0..prefix)
If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead will be returned along with the number of bytes read.
Types ¶
type Attributes ¶
type AttributesCodec ¶
type AttributesCodec interface {
MediaType() string
AttributesEncoder
AttributesDecoder
}
AttributesCodec helps to encode or decode Attributes from or to bytes.
func AttributesCodecFor ¶
func AttributesCodecFor(contentType string) AttributesCodec
type AttributesDecoder ¶
type AttributesDecoder interface {
Decode([]byte, *Attributes) error
}
type AttributesEncoder ¶
type AttributesEncoder interface {
Encode(Attributes Attributes, w io.Writer) error
}
type DataFrameCodec ¶
type DataFrameCodec interface {
MediaType() string
DataFrameEncoder
DataFrameDecoder
}
DataFrameCodec helps to encode or decode a DataFrame from or to bytes.
func DataFrameCodecFor ¶
func DataFrameCodecFor(contentType string) DataFrameCodec
type DataFrameDecoder ¶
type DataFrameEncoder ¶
type DataFrameReader ¶
type DataFrameReader interface {
// FrameChan return a DataFrame channel.
FrameChan() <-chan DataFrame
}
type Interface ¶
type Interface interface {
// End normally emits 'EOS' symbol to end up the queue asynchronously,
// but if force set to true, stream ends up directly.
// Undelivered data will be truncated.
End(ctx context.Context, force bool) error
// Truncate truncates data before the specific index.
Truncate(ctx context.Context, index uint64) error
// Put appends new data into stream.
Put(ctx context.Context, data []byte, tags Tags) (index uint64, err error)
// Get returns data frames from the index of stream in queue.
// Param length specifies the expected message count.
// And if timeout is set, this call will block until length got satisfied or
// timeout timer fires.
Get(ctx context.Context, index uint64, length int, timeout time.Duration, tags Tags) (dfs []DataFrame, err error)
// Watch subscribe to queue service, when new data frame is appended through Put method,
// watcher will emit it through its result channel.
// Param index specifies the beginning message index of the watch.
// Param window specifies the largest size the Watcher could transfer at one time.
Watch(ctx context.Context, index uint64, indexOnly bool, noAck bool, window uint64) (Watcher, error)
// Commit commits indices to make the corresponding messages marked as consumed.
Commit(ctx context.Context, del bool, indexes ...uint64) error
// Del deletes indices to make the corresponding messages deleted from stream.
Del(ctx context.Context, indexes ...uint64) error
// Attributes reflects self dynamic attributes by K/V pairs.
Attributes() Attributes
}
Interface of QueueService. Core abstraction for streaming framework.
type Range ¶
func ParseRange ¶
type StreamStatus ¶
type StreamStatus string
const ( StreamOk StreamStatus = "OK" StreamCancel StreamStatus = "Cancel" StreamEnd StreamStatus = "End" )
type User ¶
type User interface {
// Uid represents the user id.
Uid() string
// Gid represents the group id of user.
Gid() string
// Token represents the access token of the queue service.
Token() string
}
User authenticated information.
type UserWithToken ¶
type UserWithToken interface {
// Token to access the backend service.
Token() string
}
type Watcher ¶
type Watcher interface {
// Watcher is a kind of DataFrameReader.
DataFrameReader
// Close stops Watcher and closes the FrameChan.
Close()
}
Watcher is the entity following the stream.
type WorkerStatus ¶
type WorkerStatus string
const ( WorkerRunning WorkerStatus = "Running" WorkerStopped WorkerStatus = "Stopped" WorkerError WorkerStatus = "Error" WorkerUnknown WorkerStatus = "Unknown" )
Click to show internal directories.
Click to hide internal directories.