thriftbp

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2020 License: BSD-3-Clause Imports: 15 Imported by: 0

Documentation

Overview

Package thriftbp provides Baseplate specific thrift related helpers.

Clients

On the client side, this package provides a middleware framework for thrift.TClient to allow you to automatically run code before and after making a Thrift call. It also includes middleware implementations to wrap each call in a Thrift client span as well as a function that most services can use as the "golden path" for setting up a Thrift client pool.

Servers

On the server side, this package provides middleware implementations for EdgeRequestContext handling and tracing propagation according to Baseplate spec.

Example (ClientPool)

This example demonstrates a typical use case of thriftbp pool in microservice code with custom middleware.

package main

import (
	"context"
	"time"

	"github.com/apache/thrift/lib/go/thrift"
	"github.com/reddit/baseplate.go/log"
	"github.com/reddit/baseplate.go/thriftbp"
)

// In real code these should be coming from either config file or flags instead.
const (
	remoteAddr    = "host:port"
	socketTimeout = time.Millisecond * 10

	initialConnections = 50
	maxConnections     = 100

	clientTTL = time.Minute * 5

	poolGaugeInterval = time.Second * 10
)

// BEGIN THRIFT GENERATED CODE SECTION
//
// In real code this section should be from thrift generated code instead,
// but for this example we just define some placeholders here.

type MyEndpointRequest struct{}

type MyEndpointResponse struct{}

type MyService interface {
	MyEndpoint(ctx context.Context, req *MyEndpointRequest) (*MyEndpointResponse, error)
}

func NewMyServiceClient(_ thrift.TClient) MyService {
	// In real code this certainly won't return nil.
	return nil
}

// END THRIFT GENERATED CODE SECTION

func LoggingMiddleware(next thrift.TClient) thrift.TClient {
	return thrift.WrappedTClient{
		Wrapped: func(ctx context.Context, method string, args, result thrift.TStruct) error {
			log.Infof("pre: %s", method)
			log.Infof("args: %#v", args)
			defer func() {
				log.Infof("after: %s", method)
			}()

			return next.Call(ctx, method, args, result)
		},
	}
}

// This example demonstrates a typical use case of thriftbp pool in
// microservice code with custom middleware.
func main() {
	pool, err := thriftbp.NewBaseplateClientPool(
		thriftbp.ClientPoolConfig{
			ServiceSlug:        "my-service",
			Addr:               remoteAddr,
			InitialConnections: initialConnections,
			MaxConnections:     maxConnections,
			MaxConnectionAge:   clientTTL,
			SocketTimeout:      socketTimeout,
			ReportPoolStats:    true,
			PoolGaugeInterval:  poolGaugeInterval,
		},
		LoggingMiddleware,
	)
	if err != nil {
		panic(err)
	}
	defer pool.Close()

	client := NewMyServiceClient(pool)
	if _, err = client.MyEndpoint(context.Background(), &MyEndpointRequest{}); err != nil {
		panic(err)
	}
}

Index

Examples

Constants

View Source
const (
	// The Trace ID, a 64-bit integer encoded in decimal.
	HeaderTracingTrace = "Trace"
	// The Span ID, a 64-bit integer encoded in decimal.
	HeaderTracingSpan = "Span"
	// The Parent Span ID, a 64-bit integer encoded in decimal.
	HeaderTracingParent = "Parent"
	// The Sampled flag, an ASCII "1" (HeaderTracingSampledTrue) if true,
	// otherwise false.
	// If not present, defaults to false.
	HeaderTracingSampled = "Sampled"
	// Trace flags, a 64-bit integer encoded in decimal.
	// If not present, defaults to null.
	HeaderTracingFlags = "Flags"
)

Tracing related headers, as defined in https://pages.github.snooguts.net/reddit/baseplate.spec/component-apis/thrift#tracing

View Source
const DefaultMaxConnectionAge = time.Minute * 5

DefaultMaxConnectionAge is the default max age for a Thrift client connection.

View Source
const DefaultPoolGaugeInterval = time.Second * 10

DefaultPoolGaugeInterval is the fallback value to be used when ClientPoolConfig.PoolGaugeInterval <= 0.

View Source
const (
	// Number of milliseconds, 64-bit integer encoded in decimal.
	HeaderDeadlineBudget = "Deadline-Budget"
)

Deadline propagation related headers.

View Source
const (
	HeaderEdgeRequest = "Edge-Request"
)

Edge request context propagation related headers, as defined in https://pages.github.snooguts.net/reddit/baseplate.spec/component-apis/thrift#edge-request-context-propagation

View Source
const HeaderTracingSampledTrue = "1"

HeaderTracingSampledTrue is the header value to indicate that this trace should be sampled.

Variables

HeadersToForward are the headers that should always be forwarded to upstream thrift servers, to be used in thrift.TSimpleServer.SetForwardHeaders.

Functions

func AttachEdgeRequestContext

func AttachEdgeRequestContext(ctx context.Context, ec *edgecontext.EdgeRequestContext) context.Context

AttachEdgeRequestContext returns a context that has the header of the given EdgeRequestContext set to forward using the "Edge-Request" header on any Thrift calls made with that context object.

func BaseplateDefaultClientMiddlewares

func BaseplateDefaultClientMiddlewares() []thrift.ClientMiddleware

BaseplateDefaultClientMiddlewares returns the default client middlewares that should be used by a baseplate service.

Currently they are (in order):

1. MonitorClient

2. ForwardEdgeRequestContext

3. SetDeadlineBudget

func BaseplateDefaultProcessorMiddlewares

func BaseplateDefaultProcessorMiddlewares(ecImpl *edgecontext.Impl) []thrift.ProcessorMiddleware

BaseplateDefaultProcessorMiddlewares returns the default processor

middlewares that should be used by a baseplate Thrift service.

Currently they are (in order):

1. ExtractDeadlineBudget

2. InjectServerSpan

3. InjectEdgeContext

func CreateThriftContextFromSpan

func CreateThriftContextFromSpan(ctx context.Context, span *tracing.Span) context.Context

CreateThriftContextFromSpan injects span info into a context object that can be used in thrift client code. If you are using a client pool created using thriftbp.NewBaseplateClientPool, all of your thrift calls will already be call this automatically, so there is no need to use it directly.

Caller should first create a client child-span for the thrift call as usual, then use that span and the parent context object with this call, then use the returned context object in the thrift call. Something like:

span, clientCtx := opentracing.StartSpanFromContext(
  ctx,
  "myCall",
  tracing.SpanTypeOption{Type: tracing.SpanTypeClient},
)
clientCtx = thriftbp.CreateThriftContextFromSpan(clientCtx, tracing.AsSpan(span))
result, err := client.MyCall(clientCtx, arg1, arg2)
span.FinishWithOptions(tracing.FinishOptions{
  Ctx: clientCtx,
  Err: err,
}.Convert())

func ExtractDeadlineBudget

func ExtractDeadlineBudget(name string, next thrift.TProcessorFunction) thrift.TProcessorFunction

ExtractDeadlineBudget is the server middleware implementing Phase 1 of Baseplate deadline propagation.

It only sets the timeout if the passed in deadline is at least 1ms.

func ForwardEdgeRequestContext

func ForwardEdgeRequestContext(next thrift.TClient) thrift.TClient

ForwardEdgeRequestContext forwards the EdgeRequestContext set on the context object to the Thrift service being called if one is set.

If you are using a thrift ClientPool created by NewBaseplateClientPool, this will be included automatically and should not be passed in as a ClientMiddleware to NewBaseplateClientPool.

func InitializeEdgeContext

func InitializeEdgeContext(ctx context.Context, impl *edgecontext.Impl) context.Context

InitializeEdgeContext sets an edge request context created from the Thrift headers set on the context onto the context and configures Thrift to forward the edge requent context header on any Thrift calls made by the server.

func InjectEdgeContext

func InjectEdgeContext(impl *edgecontext.Impl) thrift.ProcessorMiddleware

InjectEdgeContext returns a ProcessorMiddleware that injects an edge request context created from the Thrift headers set on the context into the `next` thrift.TProcessorFunction.

Note, this depends on the edge context headers already being set on the context object. These should be automatically injected by your thrift.TSimpleServer.

func InjectServerSpan

func InjectServerSpan(name string, next thrift.TProcessorFunction) thrift.TProcessorFunction

InjectServerSpan implements thrift.ProcessorMiddleware and injects a server span into the `next` context.

Starts the server span before calling the `next` TProcessorFunction and stops the span after it finishes. If the function returns an error, that will be passed to span.Stop.

Note, the span will be created according to tracing related headers already being set on the context object. These should be automatically injected by your thrift.TSimpleServer.

func Merge

func Merge(processors ...thrift.TProcessor) thrift.TProcessor

Merge merges together multiple processors into the first one.

It's useful when the server needs to support more than one separated thrift file.

It's kind of like thrift's TMultiplexedProcessor. The key difference is that TMultiplexedProcessor requires the client to also use TMultiplexedProtocol, while here the client doesn't need any special handling.

func MonitorClient

func MonitorClient(next thrift.TClient) thrift.TClient

MonitorClient is a ClientMiddleware that wraps the inner thrift.TClient.Call in a thrift client span.

If you are using a thrift ClientPool created by NewBaseplateClientPool, this will be included automatically and should not be passed in as a ClientMiddleware to NewBaseplateClientPool.

Example

This example illustrates what thriftbp.MonitorClient does specifically and the details of how thriftbp.WrapClient works, a typical service will not write code like this and will instead be creating a ClientPool using thriftbp.NewBaseplateClientPool.

package main

import (
	"context"

	"github.com/apache/thrift/lib/go/thrift"
	opentracing "github.com/opentracing/opentracing-go"
	"github.com/reddit/baseplate.go/internal/gen-go/reddit/baseplate"
	"github.com/reddit/baseplate.go/log"
	"github.com/reddit/baseplate.go/thriftbp"
	"github.com/reddit/baseplate.go/tracing"
)

func main() {
	// variables should be properly initialized in production code
	var (
		transport thrift.TTransport
		factory   thrift.TProtocolFactory
	)
	// Create an actual service client
	client := baseplate.NewBaseplateServiceClient(
		// Use MonitoredClient to wrap a standard thrift client
		thrift.WrapClient(
			thrift.NewTStandardClient(
				factory.GetProtocol(transport),
				factory.GetProtocol(transport),
			),
			thriftbp.MonitorClient,
		),
	)
	// Create a context with a server span
	_, ctx := opentracing.StartSpanFromContext(
		context.Background(),
		"test",
		tracing.SpanTypeOption{Type: tracing.SpanTypeServer},
	)
	// Calls should be automatically wrapped using client spans
	healthy, err := client.IsHealthy(ctx)
	log.Debug("%v, %s", healthy, err)
}

func NewBaseplateServer

func NewBaseplateServer(
	bp baseplate.Baseplate,
	processor thrift.TProcessor,
	middlewares ...thrift.ProcessorMiddleware,
) (baseplate.Server, error)

NewBaseplateServer returns a new Thrift implementation of a Baseplate server with the given TProcessor.

The TProcessor underlying the server will be wrapped in the default Baseplate Middleware and any additional middleware passed in.

Example

This example demonstrates what a typical main function should look like for a Baseplate thrift service.

package main

import (
	"context"

	baseplate "github.com/reddit/baseplate.go"
	bpgen "github.com/reddit/baseplate.go/internal/gen-go/reddit/baseplate"
	"github.com/reddit/baseplate.go/log"
	"github.com/reddit/baseplate.go/thriftbp"
)

func main() {
	ctx, bp, err := baseplate.New(context.Background(), "example.yaml", nil)
	if err != nil {
		panic(err)
	}
	defer bp.Close()

	// In real prod code, you should define your thrift endpoints and create this
	// handler instead.
	var handler bpgen.BaseplateService
	processor := bpgen.NewBaseplateServiceProcessor(handler)

	server, err := thriftbp.NewBaseplateServer(bp, processor)
	if err != nil {
		log.Fatal(err)
	}

	log.Info(baseplate.Serve(ctx, server))
}

func NewServer

func NewServer(
	cfg ServerConfig,
	processor thrift.TProcessor,
	middlewares ...thrift.ProcessorMiddleware,
) (*thrift.TSimpleServer, error)

NewServer returns a thrift.TSimpleServer using the THeader transport and protocol to serve the given TProcessor which is wrapped with the given ProcessorMiddlewares.

func SetDeadlineBudget

func SetDeadlineBudget(next thrift.TClient) thrift.TClient

SetDeadlineBudget is the client middleware implementing Phase 1 of Baseplate deadline propogation.

func StartSpanFromThriftContext

func StartSpanFromThriftContext(ctx context.Context, name string) (context.Context, *tracing.Span)

StartSpanFromThriftContext creates a server span from thrift context object.

This span would usually be used as the span of the whole thrift endpoint handler, and the parent of the child-spans.

Caller should pass in the context object they got from thrift library, which would have all the required headers already injected.

Please note that "Sampled" header is default to false according to baseplate spec, so if the context object doesn't have headers injected correctly, this span (and all its child-spans) will never be sampled, unless debug flag was set explicitly later.

If any of the tracing related thrift header is present but malformed, it will be ignored. The error will also be logged if InitGlobalTracer was last called with a non-nil logger. Absent tracing related headers are always silently ignored.

Types

type AddressGenerator

type AddressGenerator func() (string, error)

AddressGenerator defines a function that returns the address of a thrift service.

Services should generally not have to use AddressGenerators directly, instead you should use NewBaseplateClientPool which uses the default AddressGenerator for a typical Baseplate Thrift Client.

func SingleAddressGenerator

func SingleAddressGenerator(addr string) AddressGenerator

SingleAddressGenerator returns an AddressGenerator that always returns addr.

Services should generally not have to use SingleAddressGenerator directly, instead you should use NewBaseplateClientPool which uses the default AddressGenerator for a typical Baseplate Thrift Client.

type Client

type Client interface {
	clientpool.Client
	thrift.TClient
}

Client is a client object that implements both the clientpool.Client and thrift.TCLient interfaces.

This allows it to be managed by a clientpool.Pool and be passed to a thrift client as the base thrift.TClient.

type ClientPool

type ClientPool interface {
	// ClientPool implements TClient by grabbing a Client from it's pool and
	// releasing that Client after it's Call method completes.
	//
	// If Call fails to get a client from the pool, it will return PoolError.
	// You can check the error returned by Call using:
	//
	//     var poolErr thriftbp.PoolError
	//     if errors.As(err, &poolErr) {
	//       // It's unable to get a client from the pool
	//     } else {
	//       // It's error from the actual thrift call
	//     }
	//
	// If the error is not of type PoolError that means it's returned by the
	// Call from the actual client.
	//
	// If Call fails to release the client back to the pool,
	// it will log the error on error level but not return it to the caller.
	// It also increase ServiceSlug+".pool-release-error" counter.
	thrift.TClient

	// Passthrough APIs from clientpool.Pool:
	io.Closer
	IsExhausted() bool
}

ClientPool defines an object that implements thrift.TClient using a pool of Client objects.

func NewBaseplateClientPool

func NewBaseplateClientPool(cfg ClientPoolConfig, middlewares ...thrift.ClientMiddleware) (ClientPool, error)

NewBaseplateClientPool returns a standard ClientPool wrapped with the BaseplateDefaultClientMiddlewares plus any additional client middlewares passed into this function.

func NewCustomClientPool

func NewCustomClientPool(
	cfg ClientPoolConfig,
	genAddr AddressGenerator,
	protoFactory thrift.TProtocolFactory,
	middlewares ...thrift.ClientMiddleware,
) (ClientPool, error)

NewCustomClientPool creates a ClientPool that uses a custom AddressGenerator and TProtocolFactory wrapped with the given middleware.

Most services will want to just use NewBaseplateClientPool, this has been provided to support services that have non-standard and/or legacy needs.

type ClientPoolConfig

type ClientPoolConfig struct {
	// ServiceSlug is a short identifier for the thrift service you are creating
	// clients for.  The preferred convention is to take the service's name,
	// remove the 'Service' prefix, if present, and convert from camel case to
	// all lower case, hyphen separated.
	//
	// Examples:
	//
	//     AuthenticationService -> authentication
	//     ImageUploadService -> image-upload
	ServiceSlug string

	// Addr is the address of a thrift service.  Addr must be in the format
	// "${host}:${port}"
	Addr string

	// InitialConnections is the inital number of thrift connections created by
	// the client pool.
	InitialConnections int

	// MaxConnections is the maximum number of thrift connections the client
	// pool can maintain.
	MaxConnections int

	// MaxConnectionAge is the maximum duration that a pooled connection will be
	// kept before closing in favor of a new one.
	//
	// If this is not set, the default duration is 5 minutes.
	//
	// To disable this and keep connections in the pool indefinetly, set this to
	// a negative value.
	MaxConnectionAge time.Duration

	// SocketTimeout is the timeout on the underling thrift.TSocket.
	SocketTimeout time.Duration

	// Any labels that should be applied to metrics logged by the ClientPool.
	// This includes the optional pool stats.
	MetricsLabels metricsbp.Labels

	// ReportPoolStats signals to the ClientPool that it should report
	// statistics on the underlying clientpool.Pool in a background
	// goroutine.  If this is set to false, the reporting goroutine will
	// not be started and it will not report pool stats.
	//
	// It reports:
	// - the number of active clients to a gauge named
	//   "${ServiceSlug}.pool-active-connections".
	// - the number of allocated clients to a gauge named
	//   "${ServiceSlug}.pool-allocated-clients".
	//
	// The reporting goroutine is cancelled when the global metrics client
	// context is Done.
	ReportPoolStats bool

	// PoolGaugeInterval indicates how often we should update the active
	// connections gauge when collecting pool stats.
	//
	// When PoolGaugeInterval <= 0 and ReportPoolStats is true,
	// DefaultPoolGaugeInterval will be used instead.
	PoolGaugeInterval time.Duration
}

ClientPoolConfig is the configuration struct for creating a new ClientPool.

type PoolError added in v0.2.0

type PoolError struct {
	// Cause is the inner error wrapped by PoolError.
	Cause error
}

PoolError is returned by ClientPool.Call when it fails to get a client from its pool.

func (PoolError) Error added in v0.2.0

func (err PoolError) Error() string

func (PoolError) Unwrap added in v0.2.0

func (err PoolError) Unwrap() error

type ServerConfig

type ServerConfig struct {
	// The endpoint address of your thrift service
	Addr string

	// The timeout for the underlying thrift.TServerSocket transport.
	Timeout time.Duration

	// A log wrapper that is used by the TSimpleServer.
	//
	// It's compatible with log.Wrapper (with an extra typecasting),
	// but you should not use log.ErrorWithSentryWrapper for this one,
	// as it would log all the network I/O errors,
	// which would be too spammy for sentry.
	Logger thrift.Logger
}

ServerConfig is the arg struct for NewServer.

Directories

Path Synopsis
Package thrifttest contains objects and utility methods to aid with testing code using Thrift clients and servers.
Package thrifttest contains objects and utility methods to aid with testing code using Thrift clients and servers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL