Documentation
¶
Overview ¶
Package bqwriter provides a compact Streamer API in order to write data concurrently to BigQuery using the insertAll or Storage API.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type InsertAllClientConfig ¶
type InsertAllClientConfig struct {
// FailOnInvalidRows causes rows containing invalid data to fail
// if there is an attempt to insert an invalid row.
//
// Defaults to false, making it ignore any invalid rows, silently ignoring these errors.
FailOnInvalidRows bool
// FailForUnknownValues causes records containing such values
// to be treated as invalid records.
//
// Defaults to false, making it ignore any invalid values, silently ignoring these errors,
// and publishing the rows with the unknown values removed from them.
FailForUnknownValues bool
// BatchSize defines the amount of rows (data) by a worker, prior to a worker
// actually writing it to BQ. Should a worker have rows left in its local cache when closing,
// it will flush/write these rows prior to closing.
//
// Defaults to constant.DefaultBatchSize if n == 0,
// use a negative value or an explicit value of 1
// in case you want to write each row directly.
BatchSize int
// MaxRetryDeadlineOffset is the max amount of time the back off algorithm is allowed to take
// for its initial as well as all retry attempts. No retry should be attempted when already over this limit.
// This Offset is to be seen as a maximum, which can be stepped over but not by too much.
//
// Defaults to constant.DefaultMaxRetryDeadlineOffset if MaxRetryDeadlineOffset == 0.
MaxRetryDeadlineOffset time.Duration
}
InsertAllClientConfig is used to configure an InsertAll client API driven Streamer Client. All properties have sane defaults as defined and used by this Go package.
type StorageClientConfig ¶
type StorageClientConfig struct {
// BigQuerySchema can be used in order to use a data encoder for the StorageClient
// based on a dynamically defined BigQuery schema in order to be able to encode any struct,
// JsonMarshaler, Json-encoded byte slice, Stringer (text proto) or string (also text proto)
// as a valid protobuf message based on the given BigQuery Schema.
//
// This config is required only if ProtobufDescriptor is not defined
// and it will be ignored in case ProtobufDescriptor is defined. The latter is recommended
// as a BigQuery Schema based encoder has a possible performance penalty.
BigQuerySchema *bigquery.Schema
// ProtobufDescriptor can be used in order to use a data encoder for the StorageClient
// based on a pre-compiled protobuf schema in order to be able to encode any proto Message
// adhering to this descriptor.
//
// This config is required only if BigQuerySchema is not defined.
// It is however recommended to use the The ProtobufDescriptor
// as a BigQuerySchema based encoder has a possible performance penalty.
ProtobufDescriptor *descriptorpb.DescriptorProto
// MaxRetries is the max amount of times that the retry logic will retry a retryable
// BQ write error, prior to giving up. Note that non-retryable errors will immediately stop
// and that there is also an upper limit of MaxTotalElpasedRetryTime to execute in worst case these max retries.
//
// Defaults to constant.DefaultMaxRetries if MaxRetries == 0,
// or use MaxRetries < 0 if you want to explicitly disable Retrying.
MaxRetries int
// InitialRetryDelay is the initial time the back off algorithm will wait and which will
// be used as the base value to be multiplied for any possible sequential retries.
//
// Defaults to constant.DefaultInitialRetryDelay if InitialRetryDelay == 0.
InitialRetryDelay time.Duration
// MaxRetryDeadlineOffset is the max amount of time the back off algorithm is allowed to take
// for its initial as well as all retry attempts. No retry should be attempted when already over this limit.
// This Offset is to be seen as a maximum, which can be stepped over but not by too much.
//
// Defaults to constant.DefaultMaxRetryDeadlineOffset if MaxRetryDeadlineOffset == 0.
MaxRetryDeadlineOffset time.Duration
// RetryDelayMultiplier is the retry delay multipler used by the retry
// back off algorithm in order to increase the delay in between each sequential write-retry of the
// same back off sequence.
//
// Defaults to constant.DefaultRetryDelayMultiplier if RetryDelayMultiplier < 1, as 2 is also the lowest possible multiplier accepted.
RetryDelayMultiplier float64
}
StorageClientConfig is used to configure a storage client API driven Streamer Client. All properties have sane defaults as defined and used by this Go package.
A non-nil StorageClientConfig instance has to be passed in to the StorageClient property of a StreamerConfig in order to indicate the Streamer should be build using the Storage API Client under the hood.
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer is a simple BQ stream-writer, allowing you write data to a BQ table concurrently.
func NewStreamer ¶
func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg *StreamerConfig) (*Streamer, error)
NewStreamer creates a new Streamer Client. StreamerConfig is optional, all other parameters are required.
An error is returned in case the Streamer Client couldn't be created for some unexpected reason, most likely something going wrong within the layer of actually interacting with GCloud.
func (*Streamer) Close ¶
func (s *Streamer) Close()
Close closes the streamer and all its worker goroutines.
func (*Streamer) Write ¶
Write a row of data to a BQ table within the streamer's project. The row will be written as soon as all previous rows has been written and a worker goroutine becomes available to write it.
Jobs that failed to write but which are retryable can be retried on the same goroutine in an exponential back-off approach, should the streamer be configured to do so.
type StreamerConfig ¶
type StreamerConfig struct {
// WorkerCount defines the amount of workers to be used,
// each on their own goroutine and with an opt-out channel buffer per routine.
// Use a negative value in order to just want a single worker (same as defining it as 1 explicitly).
//
// Defaults to constant.DefaultWorkerCount if not defined explicitly.
WorkerCount int
// WorkerQueueSize defines the size of the job queue per worker used
// in order to allow the Streamer's users to write rows even if all workers are currently
// too busy to accept new incoming rows.
//
// Use a negative value in order to provide no buffer for the workers at all,
// not rcommended but a possibility for you to choose non the less.
//
// Defaults to constant.MaxTotalElapsedRetryTime if not defined explicitly
WorkerQueueSize int
// MaxBatchDelay defines the max amount of time a worker batches rows,
// prior to writing the batched rows, even when not yet full.
//
// Defaults to constant.DefaultMaxBatchDelay if d == 0.
MaxBatchDelay time.Duration
// Logger allows you to attach a logger to be used by the streamer,
// instead of the default built-in STDERR logging implementation,
// with the latter being used as the default in case this logger isn't defined explicitly.
Logger log.Logger
// InsertAllClient allows you to overwrite any or all of the defaults used to configure an
// InsertAll client API driven Streamer Client. Note that this optional configuration is ignored
// all together in case StorageClient is defined as a non-nil value.
InsertAllClient *InsertAllClientConfig
// StorageClient allows you to create a Storage API driven Streamer Client.
// You can do so using `new(StorageClientConfig)` in order to create a StorageClient
// with all possible configurations configured using their defaults as defined by this Go package.
StorageClient *StorageClientConfig
}
StreamerConfig is used to build a Streamer (client). All configurations found in this structure are optional and have sane defaults defined for them. All required parameters are to be passed in as separate arguments to the NewStreamer constructor method.