tools

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2019 License: Apache-2.0 Imports: 36 Imported by: 0

README

SpotHero Tools Library for Go

GoDoc Build Status codecov Go Report Card

The SpotHero Tools Library is used internally at SpotHero across our Go programs. This library is a collection of common utilities and functions that don't yet stand on their own as individual libraries.

Additionally, an example server, template Makefile, and yeoman project generator are provided as a convenience for users.

We welcome community usage and collaboration.

Running the Example Server
  1. Install Golang
    1. brew install golang
    2. Set your GOPATH in your .zshrc/.bashrc/etc
    3. Add GOPATH/bin to your PATH
      1. export PATH=$GOPATH/bin:$PATH
  2. Clone this repository
  3. make
  4. ./example_server
  5. Open your browser to http://localhost:8080
Overview

This library contains common modules for use in all GoLang projects across SpotHero. To use this library simply add this as a dependency in your dep Gopkg.toml using the latest release.

Currently, this library supports the following features:

  • AWS Utilities
  • CLI Utilities
  • Kafka
    • Support for consuming and producing metrics
    • Support for goroutine-based callback functions where types are automatically deduced and unpacked
    • Schema Registry
  • Avro Decoding
  • HTTP Server with instrumentation
  • Prometheus Metrics
  • Kubernetes API Listeners
  • High-Performance Logging
  • Sentry Integration
  • OpenTracing/Jaeger Tracing Support

In addition, all the above packages may automatically be integrated with Cobra/Viper CLIs for 12-factor application compatibility via the CLI module.

Getting Setup

Usage of this library simply requires you to specify this package in your dep Gopkg.toml.

For example:

...
[[constraint]]
  branch = "master"
  name = "github.com/spothero/tools"

Then, in your application you can simply do the following:

package coolpkg

import (
  "github.com/spothero/tools"
  ...
)
...
Usage

A simple example is provided under examples/example_server.go which shows usage of this library to create a simple 12-factor Go Web application which has tracing, logging, metrics, sentry, and local caching enabled.

For production applications, we recommend separating the Cobra/Viper command portion into its own cmd/ directory, and your application logic into a pkg/ directory as is standard with most Go applications.

Additionally, the Makefile for this project is an excellent example which you can (and should) borrow for your own projects.

License

Apache 2.0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Logger = zap.NewNop()

Logger is a zap logger. If performance is a concern, use this logger.

View Source
var SugaredLogger = Logger.Sugar()

SugaredLogger abstracts away types and lets the zap library figure them out so that the caller doesn't have to import zap into their package but is slightly slower and creates more garbage.

Functions

func BaseHTTPMonitoringHandler

func BaseHTTPMonitoringHandler(next http.Handler, serverName string) http.HandlerFunc

BaseHTTPMonitoringHandler is meant to be used as middleware for every request. It will:

  • Starts an opentracing span, place it in http.Request context, and closes the span when the request completes
  • Capture any unhandled errors and send them to Sentry
  • Capture metrics to Prometheus for the duration of the HTTP request

func CobraBindEnvironmentVariables

func CobraBindEnvironmentVariables(prefix string) func(cmd *cobra.Command, _ []string)

CobraBindEnvironmentVariables can be used at the root command level of a cobra CLI hierarchy to allow all command-line variables to be set by environment variables as well. Note that skewered-variable-names will automatically be translated to skewered_variable_names for compatibility with environment variables.

In addition, you can pass in an application name prefix such that all environment variables will need to start with PREFIX_ to be picked up as valid environment variables. For example, if you specified the prefix as "availability", then the program would only detect environment variables like "AVAILABILITY_KAFKA_BROKER" and not "KAFKA_BROKER". There is no need to capitalize the prefix name.

Note: CLI arguments (eg --address=localhost) will always take precedence over environment variables

func CreateStdLogger

func CreateStdLogger(zapLogger *zap.Logger, logLevel string) (*log.Logger, error)

CreateStdLogger returns a standard-library compatible logger

func LoadLocation

func LoadLocation(name string) (*time.Location, error)

LoadLocation is a drop-in replacement for time.LoadLocation that caches all loaded locations so that subsequent loads do not require additional filesystem lookups.

func TraceOutbound

func TraceOutbound(r *http.Request, span opentracing.Span)

TraceOutbound injects outbound HTTP requests with OpenTracing headers

Types

type HTTPMetricsRecorder

type HTTPMetricsRecorder interface {
	RecordHttpMetrics(w http.ResponseWriter, r *http.Request) *prometheus.Timer
}

HTTPMetricsRecorder defines an interface for recording prometheus metrics on HTTP requests

type HTTPServerConfig

type HTTPServerConfig struct {
	Address string
	Port    int
	Name    string
}

HTTPServerConfig contains the basic configuration necessary for running an HTTP Server

func (*HTTPServerConfig) RegisterFlags

func (c *HTTPServerConfig) RegisterFlags(flags *pflag.FlagSet, defaultPort int, defaultName string)

RegisterFlags registers HTTP flags with pflags

func (*HTTPServerConfig) RunHTTPServer

func (c *HTTPServerConfig) RunHTTPServer(
	preStart func(ctx context.Context, mux *http.ServeMux, server *http.Server),
	postShutdown func(ctx context.Context),
	registerMuxes func(*http.ServeMux),
)

RunHTTPServer starts and runs a web server, waiting for a cancellation signal to exit

func (*HTTPServerConfig) RunWebServer

func (c *HTTPServerConfig) RunWebServer(
	ctx context.Context,
	wg *sync.WaitGroup,
	preStart func(ctx context.Context, mux *http.ServeMux, server *http.Server),
	postShutdown func(ctx context.Context),
	registerMuxes func(*http.ServeMux),
)

RunWebServer starts and runs a new web server

type KafkaClient

type KafkaClient struct {
	KafkaConfig
	// contains filtered or unexported fields
}

KafkaClient wraps a sarama client and Kafka configuration and can be used to create producers and consumers

func (KafkaClient) Close

func (kc KafkaClient) Close()

Close the underlying Kafka client

func (KafkaClient) NewKafkaConsumer

func (kc KafkaClient) NewKafkaConsumer() (KafkaConsumer, error)

NewKafkaConsumer sets up a Kafka consumer

func (KafkaClient) NewKafkaProducer

func (kc KafkaClient) NewKafkaProducer() (KafkaProducer, error)

NewKafkaProducer creates a sarama producer from a client

type KafkaConfig

type KafkaConfig struct {
	Broker                   string
	ClientID                 string
	TLSCaCrtPath             string
	TLSCrtPath               string
	TLSKeyPath               string
	Handlers                 map[string]KafkaMessageHandler
	JSONEnabled              bool
	Verbose                  bool
	KafkaVersion             string
	ProducerCompressionCodec string
	ProducerCompressionLevel int
	SchemaRegistry           *SchemaRegistryConfig
	// contains filtered or unexported fields
}

KafkaConfig contains connection settings and configuration for communicating with a Kafka cluster

func (KafkaConfig) NewKafkaClient

func (kc KafkaConfig) NewKafkaClient(ctx context.Context) (KafkaClient, error)

NewKafkaClient creates a Kafka client with metrics exporting and optional TLS that can be used to create consumers or producers

func (*KafkaConfig) RegisterFlags

func (kc *KafkaConfig) RegisterFlags(flags *pflag.FlagSet)

RegisterFlags registers Kafka flags with pflags

type KafkaConsumer

type KafkaConsumer struct {
	KafkaClient
	// contains filtered or unexported fields
}

KafkaConsumer contains a sarama client, consumer, and implementation of the KafkaMessageUnmarshaler interface

func (KafkaConsumer) Close

func (kc KafkaConsumer) Close()

Close Sarama consumer and client

func (KafkaConsumer) ConsumeTopic

func (kc KafkaConsumer) ConsumeTopic(
	ctx context.Context,
	handler KafkaMessageHandler,
	topic string,
	offsets PartitionOffsets,
	readResult chan PartitionOffsets,
	catchupWg *sync.WaitGroup,
	exitAfterCaughtUp bool,
) error

ConsumeTopic consumes a particular Kafka topic from startOffset to endOffset or from startOffset to forever

This function will create consumers for all partitions in a topic and read from the given offset on each partition to the latest offset when the consumer was started, then notify the caller via catchupWg. If exitAfterCaughtUp is true, the consumer will exit after it reads message at the latest offset when it started up. When all partition consumers are closed, it will send the last offset read on each partition through the readResult channel. If exitAfterCaughtUp is true, the consumer will exit after reading to the latest offset.

func (KafkaConsumer) ConsumeTopicFromBeginning

func (kc KafkaConsumer) ConsumeTopicFromBeginning(
	ctx context.Context,
	handler KafkaMessageHandler,
	topic string,
	readResult chan PartitionOffsets,
	catchupWg *sync.WaitGroup,
	exitAfterCaughtUp bool,
) error

ConsumeTopicFromBeginning starts Kafka consumers on all partitions in a given topic from the message with the oldest offset.

func (KafkaConsumer) ConsumeTopicFromLatest

func (kc KafkaConsumer) ConsumeTopicFromLatest(
	ctx context.Context,
	handler KafkaMessageHandler,
	topic string,
	readResult chan PartitionOffsets,
) error

ConsumeTopicFromLatest starts Kafka consumers on all partitions in a given topic from the message with the latest offset.

type KafkaConsumerIface

type KafkaConsumerIface interface {
	ConsumeTopic(ctx context.Context, handler KafkaMessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
	ConsumeTopicFromBeginning(ctx context.Context, handler KafkaMessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
	ConsumeTopicFromLatest(ctx context.Context, handler KafkaMessageHandler, topic string, readResult chan PartitionOffsets) error
	Close()
}

KafkaConsumerIface is an interface for consuming messages from a Kafka topic

type KafkaMessageHandler

type KafkaMessageHandler interface {
	HandleMessage(ctx context.Context, msg *sarama.ConsumerMessage, unmarshaler KafkaMessageUnmarshaler) error
}

KafkaMessageHandler defines an interface for handling new messages received by the Kafka consumer

type KafkaMessageUnmarshaler

type KafkaMessageUnmarshaler interface {
	UnmarshalMessage(ctx context.Context, msg *sarama.ConsumerMessage, target interface{}) error
}

KafkaMessageUnmarshaler defines an interface for unmarshaling messages received from Kafka to Go types

type KafkaProducer

type KafkaProducer struct {
	KafkaClient
	// contains filtered or unexported fields
}

KafkaProducer contains a sarama client and async producer

func (KafkaProducer) RunProducer

func (kp KafkaProducer) RunProducer(messages <-chan *sarama.ProducerMessage, done chan bool)

RunProducer wraps the sarama AsyncProducer and adds metrics, logging, and a shutdown procedure to the producer. To stop the producer, close the messages channel; when the producer is shutdown a signal will be emitted on the done channel. If the messages channel is unbuffered, each message sent to the producer is guaranteed to at least have been attempted to be produced to Kafka.

type LoggingConfig

type LoggingConfig struct {
	SentryLoggingEnabled bool
	UseDevelopmentLogger bool
	OutputPaths          []string
	ErrorOutputPaths     []string
	Level                string
	SamplingInitial      int
	SamplingThereafter   int
	AppVersion           string
	GitSha               string
}

LoggingConfig defines the necessary configuration for instantiating a Logger

func (*LoggingConfig) InitializeLogger

func (lc *LoggingConfig) InitializeLogger() error

InitializeLogger sets up the logger. This function should be called as soon as possible. Any use of the logger provided by this package will be a nop until this function is called

func (*LoggingConfig) RegisterFlags

func (lc *LoggingConfig) RegisterFlags(flags *pflag.FlagSet)

RegisterFlags register Logging flags with pflags

type MockKafkaConsumer

type MockKafkaConsumer struct {
	mock.Mock
	sync.Mutex
	// contains filtered or unexported fields
}

MockKafkaConsumer implements KafkaConsumerIface for testing purposes

func (*MockKafkaConsumer) Close

func (m *MockKafkaConsumer) Close()

Close mocks the Kafka consumer Close method

func (*MockKafkaConsumer) ConsumeTopic

func (m *MockKafkaConsumer) ConsumeTopic(ctx context.Context, handler KafkaMessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error

ConsumeTopic mocks the Kafka consumer ConsumeTopic method

func (*MockKafkaConsumer) ConsumeTopicFromBeginning

func (m *MockKafkaConsumer) ConsumeTopicFromBeginning(ctx context.Context, handler KafkaMessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error

ConsumeTopicFromBeginning mocks the Kafka consumer ConsumeTopicFromBeginning method

func (*MockKafkaConsumer) ConsumeTopicFromLatest

func (m *MockKafkaConsumer) ConsumeTopicFromLatest(ctx context.Context, handler KafkaMessageHandler, topic string, readResult chan PartitionOffsets) error

ConsumeTopicFromLatest mocks the Kafka consumer ConsumeTopicFromLatest method

func (*MockKafkaConsumer) EmitReadResult

func (m *MockKafkaConsumer) EmitReadResult(offsets PartitionOffsets)

EmitReadResult allows tests to send values through the readResult channel passed into the mock consumer.

type PartitionOffsets

type PartitionOffsets map[int32]int64

PartitionOffsets is a mapping of partition ID to an offset to which a consumer read on that partition

type SchemaRegistryConfig

type SchemaRegistryConfig struct {
	SchemaRegistryURL string
	// contains filtered or unexported fields
}

SchemaRegistryConfig defines the necessary configuration for interacting with Schema Registry

func (*SchemaRegistryConfig) RegisterFlags

func (src *SchemaRegistryConfig) RegisterFlags(flags *pflag.FlagSet)

RegisterFlags registers Kafka flags with pflags

func (*SchemaRegistryConfig) UnmarshalMessage

func (src *SchemaRegistryConfig) UnmarshalMessage(
	ctx context.Context,
	msg *sarama.ConsumerMessage,
	target interface{},
) error

UnmarshalMessage Implements the KafkaMessageUnmarshaler interface. Decodes an Avro message into a Go struct type, specifically an Avro message from Kafka. Avro schemas are fetched from Kafka schema registry. To use this function, tag each field of the target struct with a `kafka` tag whose value indicates which key on the Avro message to set as the value.

type SentryConfig

type SentryConfig struct {
	DSN         string
	AppPackage  string
	Environment string
	AppVersion  string
}

SentryConfig defines the necessary configuration for instantiating a Sentry Reporter

func (*SentryConfig) InitializeRaven

func (sc *SentryConfig) InitializeRaven()

InitializeRaven Initializes the Raven client. This function should be called as soon as possible after the application configuration is loaded so that raven is setup.

func (*SentryConfig) RegisterFlags

func (sc *SentryConfig) RegisterFlags(flags *pflag.FlagSet)

RegisterFlags registers Sentry flags with pflags

type SentryCore

type SentryCore struct {
	zapcore.LevelEnabler
	// contains filtered or unexported fields
}

SentryCore Implements a zapcore.Core that sends logged errors to Sentry

func (*SentryCore) Check

Check must be called before calling Write. This determines whether or not logs are sent to Sentry

func (*SentryCore) Sync

func (c *SentryCore) Sync() error

Sync flushes any buffered logs

func (*SentryCore) With

func (c *SentryCore) With(fields []zapcore.Field) zapcore.Core

With adds structured context to the Sentry Core

func (*SentryCore) Write

func (c *SentryCore) Write(ent zapcore.Entry, fields []zapcore.Field) error

Write logs the entry and fields supplied at the log site and writes them to their destination

type TracingConfig

type TracingConfig struct {
	Enabled               bool
	SamplerType           string
	SamplerParam          float64
	ReporterLogSpans      bool
	ReporterMaxQueueSize  int
	ReporterFlushInterval time.Duration
	AgentHost             string
	AgentPort             int
	ServiceName           string
}

TracingConfig defines the necessary configuration for instantiating a Tracer

func (*TracingConfig) ConfigureTracer

func (tc *TracingConfig) ConfigureTracer() io.Closer

ConfigureTracer instantiates and configures the OpenTracer and returns the tracer closer

func (*TracingConfig) RegisterFlags

func (tc *TracingConfig) RegisterFlags(flags *pflag.FlagSet, defaultTracerName string)

RegisterFlags registers Tracer flags with pflags

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL