Documentation
¶
Index ¶
- Constants
- Variables
- func NewSaramaLoggerFromBark(l bark.Logger, module string) sarama.StdLogger
- func OpenKafkaStream(c <-chan *s.ConsumerMessage, kafkaMessageConverter KafkaMessageConverter, ...) stream.BStoreOpenReadStreamOutCall
- type AckID
- type Committer
- type CommitterLevel
- type KafkaCommitter
- func (c *KafkaCommitter) GetCommitLevel() (l CommitterLevel)
- func (c *KafkaCommitter) GetReadLevel() (l CommitterLevel)
- func (c *KafkaCommitter) SetCommitLevel(l CommitterLevel)
- func (c *KafkaCommitter) SetFinalLevel(l CommitterLevel)
- func (c *KafkaCommitter) SetReadLevel(l CommitterLevel)
- func (c *KafkaCommitter) UnlockAndFlush(l sync.Locker) error
- type KafkaGroupMetadata
- type KafkaMessageConverter
- type KafkaMessageConverterConfig
- type KafkaMessageConverterFactory
- type KafkaOffsetMetadata
- type Notifier
- type NotifyMsg
- type NotifyType
- type OutOptions
- type OutputCgConfig
- type OutputHost
- func (h *OutputHost) AckMessages(ctx thrift.Context, ackRequest *cherami.AckMessagesRequest) error
- func (h *OutputHost) ConsumerGroupsUpdated(ctx thrift.Context, request *admin.ConsumerGroupsUpdatedRequest) (err error)
- func (h *OutputHost) ListLoadedConsumerGroups(ctx thrift.Context) (result *admin.ListConsumerGroupsResult_, err error)
- func (h *OutputHost) LoadUconfig()
- func (h *OutputHost) OpenConsumerStream(ctx thrift.Context, call stream.BOutOpenConsumerStreamInCall) error
- func (h *OutputHost) OpenConsumerStreamHandler(w http.ResponseWriter, r *http.Request)
- func (h *OutputHost) OpenStreamingConsumerStream(ctx thrift.Context, call stream.BOutOpenStreamingConsumerStreamInCall) error
- func (h *OutputHost) ReadCgState(ctx thrift.Context, req *admin.ReadConsumerGroupStateRequest) (result *admin.ReadConsumerGroupStateResult_, err error)
- func (h *OutputHost) ReceiveMessageBatch(ctx thrift.Context, request *cherami.ReceiveMessageBatchRequest) (*cherami.ReceiveMessageBatchResult_, error)
- func (h *OutputHost) RegisterWSHandler() *http.ServeMux
- func (h *OutputHost) Report(reporter common.LoadReporter)
- func (h *OutputHost) SetConsumedMessages(ctx thrift.Context, request *cherami.SetConsumedMessagesRequest) error
- func (h *OutputHost) SetFrontendClient(frontendClient ccherami.TChanBFrontend)
- func (h *OutputHost) Shutdown()
- func (h *OutputHost) Start(thriftService []thrift.TChanServer)
- func (h *OutputHost) Stop()
- func (h *OutputHost) UnloadConsumerGroups(ctx thrift.Context, request *admin.UnloadConsumerGroupsRequest) (err error)
- func (h *OutputHost) UtilGetPickedStore(cgName string, path string) (connStore string)
- type Throttler
Constants ¶
const ( // ThrottleDown is the message to stop throttling the connections ThrottleDown = iota // ThrottleUp is the messages to start throttling connections ThrottleUp // NumNotifyTypes is the overall notification types NumNotifyTypes )
const SmartRetryDisableString = `smartRetryDisable`
SmartRetryDisableString can be added to a destination or CG owner email to request smart retry to be disabled Note that Google allows something like this: gbailey+smartRetryDisable@uber.com The above is still a valid email and will be delivered to gbailey@uber.com
Variables ¶
var ErrCgAlreadyUnloaded = &cherami.InternalServiceError{Message: "Consumer Group already unloaded"}
ErrCgAlreadyUnloaded is returned when the cg is already unloaded
var ErrCgUnloaded = &cherami.InternalServiceError{Message: "ConsumerGroup already unloaded"}
ErrCgUnloaded is returned when the cgCache is already unloaded
var ErrConfigCast = &cherami.InternalServiceError{Message: "Unable to cast to OutputCgConfig"}
ErrConfigCast is returned when we are unable to cast to the CgConfig type
var ErrHostShutdown = &cherami.InternalServiceError{Message: "OutputHost already shutdown"}
ErrHostShutdown is returned when the host is already shutdown
var ErrLimit = &cherami.InternalServiceError{Message: "Outputhost cons connection limit exceeded"}
ErrLimit is returned when the overall limit for the CG is violated
Functions ¶
func NewSaramaLoggerFromBark ¶ added in v1.26.0
NewSaramaLoggerFromBark provides a logger suitable for Sarama from a given bark logger
func OpenKafkaStream ¶ added in v1.26.0
func OpenKafkaStream(c <-chan *s.ConsumerMessage, kafkaMessageConverter KafkaMessageConverter, logger bark.Logger) stream.BStoreOpenReadStreamOutCall
OpenKafkaStream opens a store call simulated from the given sarama message stream
Types ¶
type AckID ¶
type AckID string
AckID is an acknowledgement ID; not the same as common.AckID, which decomposes this string Capitalized because otherwise the type name conflicts horribly with local variables
type Committer ¶ added in v1.26.0
type Committer interface {
// SetCommitLevel indicates that work up to and including the message specified by the sequence number and address
// has been acknowledged. Not guaranteed to be persisted until a successful call to Flush()
SetCommitLevel(l CommitterLevel)
// SetReadLevel indicates that a particular message has been read. Metadata not guaranteed to be communicated/persisted
// until a successful call to Flush()
SetReadLevel(l CommitterLevel)
// SetFinalLevel indicates that a particular level is the last that can possibly be read. Metadata not guaranteed
// to be communicated/persisted until a successful call to Flush()
SetFinalLevel(l CommitterLevel)
// UnlockAndFlush copies accumulated commit/read state, unlocks the provided lock, and then commits
// the levels to durable storage, e.g. Kafka offset storage or Cherami-Cassandra AckLevel storage
UnlockAndFlush(l sync.Locker) error
// GetCommitLevel receives the last value given to Commit()
GetCommitLevel() (l CommitterLevel)
// GetReadLevel receives the last value given to Read()
GetReadLevel() (l CommitterLevel)
}
Committer is an interface that wraps the internals of how offsets/acklevels are committed for a given queueing system (e.g. Kafka or Cherami)
type CommitterLevel ¶ added in v1.26.0
type CommitterLevel struct {
// contains filtered or unexported fields
}
CommitterLevel binds a logical and store address together for committing
type KafkaCommitter ¶ added in v1.26.0
type KafkaCommitter struct {
*sc.OffsetStash
KafkaOffsetMetadata
// contains filtered or unexported fields
}
KafkaCommitter is commits ackLevels to Cassandra through the TChanMetadataClient interface
func NewKafkaCommitter ¶ added in v1.26.0
func NewKafkaCommitter( outputHostUUID string, cgUUID string, logger bark.Logger, client **sc.Consumer) *KafkaCommitter
NewKafkaCommitter instantiates a kafkaCommitter
func (*KafkaCommitter) GetCommitLevel ¶ added in v1.26.0
func (c *KafkaCommitter) GetCommitLevel() (l CommitterLevel)
GetCommitLevel returns the next commit level that will be flushed
func (*KafkaCommitter) GetReadLevel ¶ added in v1.26.0
func (c *KafkaCommitter) GetReadLevel() (l CommitterLevel)
GetReadLevel returns the next readlevel that will be flushed
func (*KafkaCommitter) SetCommitLevel ¶ added in v1.26.0
func (c *KafkaCommitter) SetCommitLevel(l CommitterLevel)
SetCommitLevel just updates the next Cherami ack level that will be flushed
func (*KafkaCommitter) SetFinalLevel ¶ added in v1.26.0
func (c *KafkaCommitter) SetFinalLevel(l CommitterLevel)
SetFinalLevel just updates the last possible read level
func (*KafkaCommitter) SetReadLevel ¶ added in v1.26.0
func (c *KafkaCommitter) SetReadLevel(l CommitterLevel)
SetReadLevel just updates the next Cherami read level that will be flushed
func (*KafkaCommitter) UnlockAndFlush ¶ added in v1.26.0
func (c *KafkaCommitter) UnlockAndFlush(l sync.Locker) error
UnlockAndFlush pushes our commit and read levels to Cherami metadata, using SetAckOffset
type KafkaGroupMetadata ¶ added in v1.26.0
type KafkaGroupMetadata struct {
// Version is the version of this structure
Version uint
// CGUUID is the internal Cherami consumer group UUID that committed this offset
CGUUID string
// OutputHostUUID is the UUID of the Cherami Outputhost that committed this offset
OutputHostUUID string
}
KafkaGroupMetadata is a structure used for JSON encoding/decoding of the metadata stored for Kafka groups joined by Cherami
type KafkaMessageConverter ¶ added in v1.26.0
type KafkaMessageConverter func(kMsg *s.ConsumerMessage) (cMsg *store.ReadMessageContent)
KafkaMessageConverter defines interface to convert a Kafka message to Cherami message
type KafkaMessageConverterConfig ¶ added in v1.26.0
type KafkaMessageConverterConfig struct {
// Destination and ConsumerGroup are not needed currently, but may in the future
// Destination *cherami.DestinationDescription
// ConsumerGroup *cherami.ConsumerGroupDescription
KafkaTopics []string // optional: already contained in destination-description
KafkaCluster string // optional: already contained in destination-description
}
KafkaMessageConverterConfig is used to config customized converter
type KafkaMessageConverterFactory ¶ added in v1.26.0
type KafkaMessageConverterFactory interface {
GetConverter(cfg *KafkaMessageConverterConfig, log bark.Logger) KafkaMessageConverter
}
KafkaMessageConverterFactory provides converter from Kafka message to Cherami message In the future, it may provide implementations for BStoreOpenReadStreamOutCall
type KafkaOffsetMetadata ¶ added in v1.26.0
type KafkaOffsetMetadata struct {
// Version is the version of this structure
Version uint
// CGUUID is the internal Cherami consumer group UUID that committed this offset
CGUUID string
// OutputHostUUID is the UUID of the Cherami Outputhost that committed this offset
OutputHostUUID string
// OutputHostStartTime is the time that the output host started
OutputHostStartTime string
// CommitterStartTime is the time that this committer was started
CommitterStartTime string
}
KafkaOffsetMetadata is a structure used for JSON encoding/decoding of the metadata stored for Kafka offsets committed by Cherami
type Notifier ¶
type Notifier interface {
// Register is the rotuine to register this connection on the notifier
Register(id int, notifyCh chan NotifyMsg)
// Unregister is the routine to unregister this connection from the notifier
Unregister(id int)
// Notify is the routine to send a notification on the appropriate channel
Notify(id int, notifyMsg NotifyMsg)
}
Notifier is the interface to notify connections about nacks and potentially slowing them down, if needed. Any one who wants to get notified should Register with the notifier by specifying a unique ID and a channel to receive the notification.
type NotifyMsg ¶
type NotifyMsg struct {
// contains filtered or unexported fields
}
NotifyMsg is the message sent on the notify channel
type OutOptions ¶
type OutOptions struct {
//CacheIdleTimeout
CacheIdleTimeout time.Duration
//KStreamFactory
KStreamFactory KafkaMessageConverterFactory
}
OutOptions is the options used during instantiating a new host
type OutputCgConfig ¶ added in v0.2.0
type OutputCgConfig struct {
// MessageCacheSize is used to configure the per CG cache size.
// This is a string slice, where each entry is a tuple with the
// destination/CG_name=value.
// For example, we can ideally have two CGs for the same destination
// with different size config as follows:
// "/test/destination//test/cg_1=50,/test/destination//test/cg_2=100"
MessageCacheSize []string `name:"messagecachesize" default:"/=10000"`
// RedeliveryIntervalInMs is used to configure the per CG message redelivery interval.
// Just like MessageCacheSize above
RedeliveryIntervalInMs []string `name:"redeliveryIntervalInMs" default:"/=100"`
}
OutputCgConfig is the per cg config used by the cassandra config manager
type OutputHost ¶
OutputHost is the main server class for OutputHosts
func NewOutputHost ¶
func NewOutputHost( serviceName string, sVice common.SCommon, metadataClient metadata.TChanMetadataService, frontendClient ccherami.TChanBFrontend, opts *OutOptions, kafkaCfg configure.CommonKafkaConfig, ) (*OutputHost, []thrift.TChanServer)
NewOutputHost is the constructor for BOut
func (*OutputHost) AckMessages ¶
func (h *OutputHost) AckMessages(ctx thrift.Context, ackRequest *cherami.AckMessagesRequest) error
AckMessages is the implementation of the thrift handler for the BOut service
func (*OutputHost) ConsumerGroupsUpdated ¶
func (h *OutputHost) ConsumerGroupsUpdated(ctx thrift.Context, request *admin.ConsumerGroupsUpdatedRequest) (err error)
ConsumerGroupsUpdated is the API exposed to ExtentController to communicate any changes to current extent view
func (*OutputHost) ListLoadedConsumerGroups ¶ added in v1.26.0
func (h *OutputHost) ListLoadedConsumerGroups(ctx thrift.Context) (result *admin.ListConsumerGroupsResult_, err error)
ListLoadedConsumerGroups is the API used to unload consumer groups to clear the cache
func (*OutputHost) LoadUconfig ¶
func (h *OutputHost) LoadUconfig()
LoadUconfig load the dynamic config values for key
func (*OutputHost) OpenConsumerStream ¶
func (h *OutputHost) OpenConsumerStream(ctx thrift.Context, call stream.BOutOpenConsumerStreamInCall) error
OpenConsumerStream is the implementation of the thrift handler for the Out service TODO: find remote "host" from the context as a tag (pass to newConsConnection)
func (*OutputHost) OpenConsumerStreamHandler ¶
func (h *OutputHost) OpenConsumerStreamHandler(w http.ResponseWriter, r *http.Request)
OpenConsumerStreamHandler is websocket handler for opening consumer stream
func (*OutputHost) OpenStreamingConsumerStream ¶
func (h *OutputHost) OpenStreamingConsumerStream(ctx thrift.Context, call stream.BOutOpenStreamingConsumerStreamInCall) error
OpenStreamingConsumerStream is unimplemented
func (*OutputHost) ReadCgState ¶ added in v1.26.0
func (h *OutputHost) ReadCgState(ctx thrift.Context, req *admin.ReadConsumerGroupStateRequest) (result *admin.ReadConsumerGroupStateResult_, err error)
ReadCgState is the API used to get the cg state
func (*OutputHost) ReceiveMessageBatch ¶
func (h *OutputHost) ReceiveMessageBatch(ctx thrift.Context, request *cherami.ReceiveMessageBatchRequest) (*cherami.ReceiveMessageBatchResult_, error)
ReceiveMessageBatch is a thrift handler. It consumes messages from this output host within thrift context deadline. This is long-poll able.
func (*OutputHost) RegisterWSHandler ¶
func (h *OutputHost) RegisterWSHandler() *http.ServeMux
RegisterWSHandler is the implementation of WSService interface
func (*OutputHost) Report ¶
func (h *OutputHost) Report(reporter common.LoadReporter)
Report is the implementation for reporting host specific load to controller
func (*OutputHost) SetConsumedMessages ¶
func (h *OutputHost) SetConsumedMessages(ctx thrift.Context, request *cherami.SetConsumedMessagesRequest) error
SetConsumedMessages is unimplemented
func (*OutputHost) SetFrontendClient ¶
func (h *OutputHost) SetFrontendClient(frontendClient ccherami.TChanBFrontend)
SetFrontendClient is used to set the frontend client after we start the output
func (*OutputHost) Shutdown ¶
func (h *OutputHost) Shutdown()
Shutdown shutsdown all the OutputHost cleanly
func (*OutputHost) Start ¶
func (h *OutputHost) Start(thriftService []thrift.TChanServer)
Start starts the outputhost service
func (*OutputHost) UnloadConsumerGroups ¶
func (h *OutputHost) UnloadConsumerGroups(ctx thrift.Context, request *admin.UnloadConsumerGroupsRequest) (err error)
UnloadConsumerGroups is the API used to unload consumer groups to clear the cache
func (*OutputHost) UtilGetPickedStore ¶
func (h *OutputHost) UtilGetPickedStore(cgName string, path string) (connStore string)
UtilGetPickedStore is used by the integration test to figure out which store host we are currently connected to XXX: This should not be used anywhere else other than the test.
type Throttler ¶
type Throttler interface {
// SetBackoffCoefficient is the routine to set the backoff coefficient for the throttler
SetBackoffCoefficient(backoffCoefficient float64)
// SetMaxThrottleDuration is used to set the max duration to return for the throttler
SetMaxThrottleDuration(max time.Duration)
// SetMinThrottleDuration is used to set the min duration to return for the throttler
SetMinThrottleDuration(min time.Duration)
// GetCurrentSleepDuration returns the current duration for this throttler
GetCurrentSleepDuration() time.Duration
// GetNextSleepDuration returns the sleep duration based on the backoff coefficient
GetNextSleepDuration() time.Duration
// ResetSleepDuration resets the current sleep duration for this throttler
ResetSleepDuration() time.Duration
// IsCurrentSleepDurationAtMaximum tells whether the current sleep duration is maxed out
IsCurrentSleepDurationAtMaximum() bool
}
Throttler is the interface is to get the duration to sleep based on a simple backoff formulae sleepDuration = time.Duration(currentSleepDuration * backoffCoefficient) Note: this is lock free and is *not* thread safe. This is intentional because the user is expected to be a single goroutine. If we need this to be thread safe we need to add some synchronization.