Documentation
¶
Overview ¶
Package thriftbp provides Baseplate specific thrift related helpers.
Clients ¶
On the client side, this package provides a middleware framework for thrift.TClient to allow you to automatically run code before and after making a Thrift call. It also includes middleware implementations to wrap each call in a Thrift client span as well as a function that most services can use as the "golden path" for setting up a Thrift client pool.
Servers ¶
On the server side, this package provides middleware implementations for EdgeRequestContext handling and tracing propagation according to Baseplate spec.
Example (ClientPool) ¶
This example demonstrates a typical use case of thriftbp pool in microservice code with custom middleware.
package main
import (
"context"
"time"
"github.com/apache/thrift/lib/go/thrift"
"github.com/reddit/baseplate.go/log"
"github.com/reddit/baseplate.go/thriftbp"
)
// In real code these should be coming from either config file or flags instead.
const (
remoteAddr = "host:port"
socketTimeout = time.Millisecond * 10
initialConnections = 50
maxConnections = 100
clientTTL = time.Minute * 5
poolGaugeInterval = time.Second * 10
)
// BEGIN THRIFT GENERATED CODE SECTION
//
// In real code this section should be from thrift generated code instead,
// but for this example we just define some placeholders here.
type MyEndpointRequest struct{}
type MyEndpointResponse struct{}
type MyService interface {
MyEndpoint(ctx context.Context, req *MyEndpointRequest) (*MyEndpointResponse, error)
}
func NewMyServiceClient(_ thrift.TClient) MyService {
// In real code this certainly won't return nil.
return nil
}
// END THRIFT GENERATED CODE SECTION
func LoggingMiddleware(next thrift.TClient) thrift.TClient {
return thrift.WrappedTClient{
Wrapped: func(ctx context.Context, method string, args, result thrift.TStruct) error {
log.Infof("pre: %s", method)
log.Infof("args: %#v", args)
defer func() {
log.Infof("after: %s", method)
}()
return next.Call(ctx, method, args, result)
},
}
}
// This example demonstrates a typical use case of thriftbp pool in
// microservice code with custom middleware.
func main() {
pool, err := thriftbp.NewBaseplateClientPool(
thriftbp.ClientPoolConfig{
ServiceSlug: "my-service",
Addr: remoteAddr,
InitialConnections: initialConnections,
MaxConnections: maxConnections,
MaxConnectionAge: clientTTL,
SocketTimeout: socketTimeout,
ReportPoolStats: true,
PoolGaugeInterval: poolGaugeInterval,
},
LoggingMiddleware,
)
if err != nil {
panic(err)
}
defer pool.Close()
client := NewMyServiceClient(pool)
if _, err = client.MyEndpoint(context.Background(), &MyEndpointRequest{}); err != nil {
panic(err)
}
}
Index ¶
- Constants
- Variables
- func ApplyBaseplate(bp baseplate.Baseplate, server *thrift.TSimpleServer) baseplate.Server
- func AttachEdgeRequestContext(ctx context.Context, ec *edgecontext.EdgeRequestContext) context.Context
- func BaseplateDefaultClientMiddlewares(args DefaultClientMiddlewareArgs) []thrift.ClientMiddleware
- func BaseplateDefaultProcessorMiddlewares(ecImpl *edgecontext.Impl) []thrift.ProcessorMiddleware
- func CreateThriftContextFromSpan(ctx context.Context, span *tracing.Span) context.Context
- func ExtractDeadlineBudget(name string, next thrift.TProcessorFunction) thrift.TProcessorFunction
- func ForwardEdgeRequestContext(next thrift.TClient) thrift.TClient
- func InitializeEdgeContext(ctx context.Context, impl *edgecontext.Impl) context.Context
- func InjectEdgeContext(impl *edgecontext.Impl) thrift.ProcessorMiddleware
- func InjectServerSpan(name string, next thrift.TProcessorFunction) thrift.TProcessorFunction
- func Merge(processors ...thrift.TProcessor) thrift.TProcessor
- func MonitorClient(service string) thrift.ClientMiddleware
- func NewBaseplateServer(bp baseplate.Baseplate, processor thrift.TProcessor, ...) (baseplate.Server, error)
- func NewServer(cfg ServerConfig, processor thrift.TProcessor, ...) (*thrift.TSimpleServer, error)
- func Retry(defaults ...retry.Option) thrift.ClientMiddleware
- func SetDeadlineBudget(next thrift.TClient) thrift.TClient
- func StartSpanFromThriftContext(ctx context.Context, name string) (context.Context, *tracing.Span)
- func WithDefaultRetryFilters(filters ...retrybp.Filter) []retrybp.Filter
- type AddressGenerator
- type Client
- type ClientPool
- type ClientPoolConfig
- type DefaultClientMiddlewareArgs
- type PoolError
- type ServerConfig
Examples ¶
Constants ¶
const ( // The Trace ID, a 64-bit integer encoded in decimal. HeaderTracingTrace = "Trace" // The Span ID, a 64-bit integer encoded in decimal. HeaderTracingSpan = "Span" // The Parent Span ID, a 64-bit integer encoded in decimal. HeaderTracingParent = "Parent" // The Sampled flag, an ASCII "1" (HeaderTracingSampledTrue) if true, // otherwise false. // If not present, defaults to false. HeaderTracingSampled = "Sampled" // Trace flags, a 64-bit integer encoded in decimal. // If not present, defaults to null. HeaderTracingFlags = "Flags" )
Tracing related headers, as defined in https://pages.github.snooguts.net/reddit/baseplate.spec/component-apis/thrift#tracing
const DefaultMaxConnectionAge = time.Minute * 5
DefaultMaxConnectionAge is the default max age for a Thrift client connection.
const DefaultPoolGaugeInterval = time.Second * 10
DefaultPoolGaugeInterval is the fallback value to be used when ClientPoolConfig.PoolGaugeInterval <= 0.
const (
// Number of milliseconds, 64-bit integer encoded in decimal.
HeaderDeadlineBudget = "Deadline-Budget"
)
Deadline propagation related headers.
const (
HeaderEdgeRequest = "Edge-Request"
)
Edge request context propagation related headers, as defined in https://pages.github.snooguts.net/reddit/baseplate.spec/component-apis/thrift#edge-request-context-propagation
const HeaderTracingSampledTrue = "1"
HeaderTracingSampledTrue is the header value to indicate that this trace should be sampled.
Variables ¶
var HeadersToForward = []string{ HeaderEdgeRequest, HeaderTracingTrace, HeaderTracingSpan, HeaderTracingParent, HeaderTracingSampled, HeaderTracingFlags, }
HeadersToForward are the headers that should always be forwarded to upstream thrift servers, to be used in thrift.TSimpleServer.SetForwardHeaders.
Functions ¶
func ApplyBaseplate ¶ added in v0.2.1
ApplyBaseplate returns the given TSimpleServer as a baseplate Server with the given Baseplate.
You generally don't need to use this, instead use NewBaseplateServer, which will take care of this for you.
func AttachEdgeRequestContext ¶
func AttachEdgeRequestContext(ctx context.Context, ec *edgecontext.EdgeRequestContext) context.Context
AttachEdgeRequestContext returns a context that has the header of the given EdgeRequestContext set to forward using the "Edge-Request" header on any Thrift calls made with that context object.
func BaseplateDefaultClientMiddlewares ¶
func BaseplateDefaultClientMiddlewares(args DefaultClientMiddlewareArgs) []thrift.ClientMiddleware
BaseplateDefaultClientMiddlewares returns the default client middlewares that should be used by a baseplate service.
Currently they are (in order):
1. ForwardEdgeRequestContext
- Retry(retryOptions) - If retryOptions is empty/nil, default to only retry.Attempts(1), this will not actually retry any calls but your client is configured to set retry logic per-call using retrybp.WithOptions.
3. MonitorClient
4. SetDeadlineBudget
func BaseplateDefaultProcessorMiddlewares ¶
func BaseplateDefaultProcessorMiddlewares(ecImpl *edgecontext.Impl) []thrift.ProcessorMiddleware
BaseplateDefaultProcessorMiddlewares returns the default processor
middlewares that should be used by a baseplate Thrift service.
Currently they are (in order):
1. ExtractDeadlineBudget
2. InjectServerSpan
3. InjectEdgeContext
func CreateThriftContextFromSpan ¶
CreateThriftContextFromSpan injects span info into a context object that can be used in thrift client code. If you are using a client pool created using thriftbp.NewBaseplateClientPool, all of your thrift calls will already be call this automatically, so there is no need to use it directly.
Caller should first create a client child-span for the thrift call as usual, then use that span and the parent context object with this call, then use the returned context object in the thrift call. Something like:
span, clientCtx := opentracing.StartSpanFromContext(
ctx,
"myCall",
tracing.SpanTypeOption{Type: tracing.SpanTypeClient},
)
clientCtx = thriftbp.CreateThriftContextFromSpan(clientCtx, tracing.AsSpan(span))
result, err := client.MyCall(clientCtx, arg1, arg2)
span.FinishWithOptions(tracing.FinishOptions{
Ctx: clientCtx,
Err: err,
}.Convert())
func ExtractDeadlineBudget ¶
func ExtractDeadlineBudget(name string, next thrift.TProcessorFunction) thrift.TProcessorFunction
ExtractDeadlineBudget is the server middleware implementing Phase 1 of Baseplate deadline propagation.
It only sets the timeout if the passed in deadline is at least 1ms.
func ForwardEdgeRequestContext ¶
ForwardEdgeRequestContext forwards the EdgeRequestContext set on the context object to the Thrift service being called if one is set.
If you are using a thrift ClientPool created by NewBaseplateClientPool, this will be included automatically and should not be passed in as a ClientMiddleware to NewBaseplateClientPool.
func InitializeEdgeContext ¶
InitializeEdgeContext sets an edge request context created from the Thrift headers set on the context onto the context and configures Thrift to forward the edge requent context header on any Thrift calls made by the server.
func InjectEdgeContext ¶
func InjectEdgeContext(impl *edgecontext.Impl) thrift.ProcessorMiddleware
InjectEdgeContext returns a ProcessorMiddleware that injects an edge request context created from the Thrift headers set on the context into the `next` thrift.TProcessorFunction.
Note, this depends on the edge context headers already being set on the context object. These should be automatically injected by your thrift.TSimpleServer.
func InjectServerSpan ¶
func InjectServerSpan(name string, next thrift.TProcessorFunction) thrift.TProcessorFunction
InjectServerSpan implements thrift.ProcessorMiddleware and injects a server span into the `next` context.
Starts the server span before calling the `next` TProcessorFunction and stops the span after it finishes. If the function returns an error, that will be passed to span.Stop.
Note, the span will be created according to tracing related headers already being set on the context object. These should be automatically injected by your thrift.TSimpleServer.
func Merge ¶
func Merge(processors ...thrift.TProcessor) thrift.TProcessor
Merge merges together multiple processors into the first one.
It's useful when the server needs to support more than one separated thrift file.
It's kind of like thrift's TMultiplexedProcessor. The key difference is that TMultiplexedProcessor requires the client to also use TMultiplexedProtocol, while here the client doesn't need any special handling.
func MonitorClient ¶
func MonitorClient(service string) thrift.ClientMiddleware
MonitorClient is a ClientMiddleware that wraps the inner thrift.TClient.Call in a thrift client span.
If you are using a thrift ClientPool created by NewBaseplateClientPool, this will be included automatically and should not be passed in as a ClientMiddleware to NewBaseplateClientPool.
Example ¶
This example illustrates what thriftbp.MonitorClient does specifically and the details of how thriftbp.WrapClient works, a typical service will not write code like this and will instead be creating a ClientPool using thriftbp.NewBaseplateClientPool.
package main
import (
"context"
"github.com/apache/thrift/lib/go/thrift"
retry "github.com/avast/retry-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/reddit/baseplate.go/internal/gen-go/reddit/baseplate"
"github.com/reddit/baseplate.go/log"
"github.com/reddit/baseplate.go/retrybp"
"github.com/reddit/baseplate.go/thriftbp"
"github.com/reddit/baseplate.go/tracing"
)
func main() {
// variables should be properly initialized in production code
var (
transport thrift.TTransport
factory thrift.TProtocolFactory
)
// Create an actual service client
client := baseplate.NewBaseplateServiceClient(
// Use MonitoredClient to wrap a standard thrift client
thrift.WrapClient(
thrift.NewTStandardClient(
factory.GetProtocol(transport),
factory.GetProtocol(transport),
),
thriftbp.MonitorClient("service"),
),
)
// Create a context with a server span
_, ctx := opentracing.StartSpanFromContext(
context.Background(),
"test",
tracing.SpanTypeOption{Type: tracing.SpanTypeServer},
)
// Calls should be automatically wrapped using client spans
healthy, err := client.IsHealthy(
// The default middleware does not automatically retry requests but does set
// up the retry middleware so individual requests can be configured to retry
// using retrybp.WithOptions.
retrybp.WithOptions(
ctx,
// This call will make at most 2 attempts, that is the initial attempt and
// a single retry.
retry.Attempts(2),
// Apply the thriftbp default retry filters as well as NetworkErrorFilter
// to retry networking errors.
//
// NetworkErrorFilter should only be used for requests that are safe to
// repeat, such as reads or idempotent requests.
retrybp.Filters(
thriftbp.WithDefaultRetryFilters(retrybp.NetworkErrorFilter)...,
),
),
)
log.Debug("%v, %s", healthy, err)
}
func NewBaseplateServer ¶
func NewBaseplateServer( bp baseplate.Baseplate, processor thrift.TProcessor, middlewares ...thrift.ProcessorMiddleware, ) (baseplate.Server, error)
NewBaseplateServer returns a new Thrift implementation of a Baseplate server with the given TProcessor.
The TProcessor underlying the server will be wrapped in the default Baseplate Middleware and any additional middleware passed in.
Example ¶
This example demonstrates what a typical main function should look like for a Baseplate thrift service.
package main
import (
"context"
baseplate "github.com/reddit/baseplate.go"
bpgen "github.com/reddit/baseplate.go/internal/gen-go/reddit/baseplate"
"github.com/reddit/baseplate.go/log"
"github.com/reddit/baseplate.go/thriftbp"
)
func main() {
ctx, bp, err := baseplate.New(context.Background(), "example.yaml", nil)
if err != nil {
panic(err)
}
defer bp.Close()
// In real prod code, you should define your thrift endpoints and create this
// handler instead.
var handler bpgen.BaseplateService
processor := bpgen.NewBaseplateServiceProcessor(handler)
server, err := thriftbp.NewBaseplateServer(bp, processor)
if err != nil {
log.Fatal(err)
}
log.Info(baseplate.Serve(ctx, server))
}
func NewServer ¶
func NewServer( cfg ServerConfig, processor thrift.TProcessor, middlewares ...thrift.ProcessorMiddleware, ) (*thrift.TSimpleServer, error)
NewServer returns a thrift.TSimpleServer using the THeader transport and protocol to serve the given TProcessor which is wrapped with the given ProcessorMiddlewares.
func Retry ¶ added in v0.3.0
func Retry(defaults ...retry.Option) thrift.ClientMiddleware
Retry returns a thrift.ClientMiddleware that can be used to automatically retry thrift requests.
func SetDeadlineBudget ¶
SetDeadlineBudget is the client middleware implementing Phase 1 of Baseplate deadline propogation.
func StartSpanFromThriftContext ¶
StartSpanFromThriftContext creates a server span from thrift context object.
This span would usually be used as the span of the whole thrift endpoint handler, and the parent of the child-spans.
Caller should pass in the context object they got from thrift library, which would have all the required headers already injected.
Please note that "Sampled" header is default to false according to baseplate spec, so if the context object doesn't have headers injected correctly, this span (and all its child-spans) will never be sampled, unless debug flag was set explicitly later.
If any of the tracing related thrift header is present but malformed, it will be ignored. The error will also be logged if InitGlobalTracer was last called with a non-nil logger. Absent tracing related headers are always silently ignored.
func WithDefaultRetryFilters ¶ added in v0.3.0
WithDefaultRetryFilters returns a list of retrybp.Filters by appending the given filters to the "default" retry filters:
1. ContextErrorFilter - do not retry on context cancellation/timeout.
- UnrecoverableErrorFilter - do not retry errors marked as unrecoverable with retry.Unrecoverable.
3. PoolExhaustedFilter - do retry on clientpool.PoolExhausted errors.
Types ¶
type AddressGenerator ¶
AddressGenerator defines a function that returns the address of a thrift service.
Services should generally not have to use AddressGenerators directly, instead you should use NewBaseplateClientPool which uses the default AddressGenerator for a typical Baseplate Thrift Client.
func SingleAddressGenerator ¶
func SingleAddressGenerator(addr string) AddressGenerator
SingleAddressGenerator returns an AddressGenerator that always returns addr.
Services should generally not have to use SingleAddressGenerator directly, instead you should use NewBaseplateClientPool which uses the default AddressGenerator for a typical Baseplate Thrift Client.
type Client ¶
type Client interface {
clientpool.Client
thrift.TClient
}
Client is a client object that implements both the clientpool.Client and thrift.TCLient interfaces.
This allows it to be managed by a clientpool.Pool and be passed to a thrift client as the base thrift.TClient.
type ClientPool ¶
type ClientPool interface {
// ClientPool implements TClient by grabbing a Client from it's pool and
// releasing that Client after it's Call method completes.
//
// If Call fails to get a client from the pool, it will return PoolError.
// You can check the error returned by Call using:
//
// var poolErr thriftbp.PoolError
// if errors.As(err, &poolErr) {
// // It's unable to get a client from the pool
// } else {
// // It's error from the actual thrift call
// }
//
// If the error is not of type PoolError that means it's returned by the
// Call from the actual client.
//
// If Call fails to release the client back to the pool,
// it will log the error on error level but not return it to the caller.
// It also increase ServiceSlug+".pool-release-error" counter.
thrift.TClient
// Passthrough APIs from clientpool.Pool:
io.Closer
IsExhausted() bool
}
ClientPool defines an object that implements thrift.TClient using a pool of Client objects.
func NewBaseplateClientPool ¶
func NewBaseplateClientPool(cfg ClientPoolConfig, middlewares ...thrift.ClientMiddleware) (ClientPool, error)
NewBaseplateClientPool returns a standard ClientPool wrapped with the BaseplateDefaultClientMiddlewares plus any additional client middlewares passed into this function.
func NewCustomClientPool ¶
func NewCustomClientPool( cfg ClientPoolConfig, genAddr AddressGenerator, protoFactory thrift.TProtocolFactory, middlewares ...thrift.ClientMiddleware, ) (ClientPool, error)
NewCustomClientPool creates a ClientPool that uses a custom AddressGenerator and TProtocolFactory wrapped with the given middleware.
Most services will want to just use NewBaseplateClientPool, this has been provided to support services that have non-standard and/or legacy needs.
type ClientPoolConfig ¶
type ClientPoolConfig struct {
// ServiceSlug is a short identifier for the thrift service you are creating
// clients for. The preferred convention is to take the service's name,
// remove the 'Service' prefix, if present, and convert from camel case to
// all lower case, hyphen separated.
//
// Examples:
//
// AuthenticationService -> authentication
// ImageUploadService -> image-upload
ServiceSlug string
// Addr is the address of a thrift service. Addr must be in the format
// "${host}:${port}"
Addr string
// InitialConnections is the inital number of thrift connections created by
// the client pool.
InitialConnections int
// MaxConnections is the maximum number of thrift connections the client
// pool can maintain.
MaxConnections int
// MaxConnectionAge is the maximum duration that a pooled connection will be
// kept before closing in favor of a new one.
//
// If this is not set, the default duration is 5 minutes.
//
// To disable this and keep connections in the pool indefinetly, set this to
// a negative value.
MaxConnectionAge time.Duration
// SocketTimeout is the timeout on the underling thrift.TSocket.
SocketTimeout time.Duration
// Any labels that should be applied to metrics logged by the ClientPool.
// This includes the optional pool stats.
MetricsLabels metricsbp.Labels
// DefaultRetryOptions is the list of retry.Options to apply as the defaults
// for the Retry middleware.
//
// This is optional, if it is not set, we will use a single option,
// retry.Attempts(1). This sets up the retry middleware but does not
// automatically retry any requests. You can set retry behavior per-call by
// using retrybp.WithOptions.
DefaultRetryOptions []retry.Option
// ReportPoolStats signals to the ClientPool that it should report
// statistics on the underlying clientpool.Pool in a background
// goroutine. If this is set to false, the reporting goroutine will
// not be started and it will not report pool stats.
//
// It reports:
// - the number of active clients to a gauge named
// "${ServiceSlug}.pool-active-connections".
// - the number of allocated clients to a gauge named
// "${ServiceSlug}.pool-allocated-clients".
//
// The reporting goroutine is cancelled when the global metrics client
// context is Done.
ReportPoolStats bool
// PoolGaugeInterval indicates how often we should update the active
// connections gauge when collecting pool stats.
//
// When PoolGaugeInterval <= 0 and ReportPoolStats is true,
// DefaultPoolGaugeInterval will be used instead.
PoolGaugeInterval time.Duration
}
ClientPoolConfig is the configuration struct for creating a new ClientPool.
type DefaultClientMiddlewareArgs ¶ added in v0.3.0
type DefaultClientMiddlewareArgs struct {
// ServiceSlug is a short identifier for the thrift service you are creating
// clients for. The preferred convention is to take the service's name,
// remove the 'Service' prefix, if present, and convert from camel case to
// all lower case, hyphen separated.
//
// Examples:
//
// AuthenticationService -> authentication
// ImageUploadService -> image-upload
ServiceSlug string
// RetryOptions is the list of retry.Options to apply as the defaults for the
// Retry middleware.
//
// This is optional, if it is not set, we will use a single option,
// retry.Attempts(1). This sets up the retry middleware but does not
// automatically retry any requests. You can set retry behavior per-call by
// using retrybp.WithOptions.
RetryOptions []retry.Option
}
DefaultClientMiddlewareArgs is the arg struct for BaseplateDefaultClientMiddlewares.
type PoolError ¶ added in v0.2.0
type PoolError struct {
// Cause is the inner error wrapped by PoolError.
Cause error
}
PoolError is returned by ClientPool.Call when it fails to get a client from its pool.
type ServerConfig ¶
type ServerConfig struct {
// The endpoint address of your thrift service.
//
// This is ignored if Socket is non-nil.
Addr string
// The timeout for the underlying thrift.TServerSocket transport.
//
// This is ignored if Socket is non-nil.
Timeout time.Duration
// A log wrapper that is used by the TSimpleServer.
//
// It's compatible with log.Wrapper (with an extra typecasting),
// but you should not use log.ErrorWithSentryWrapper for this one,
// as it would log all the network I/O errors,
// which would be too spammy for sentry.
Logger thrift.Logger
// Optional TServerSocket you can use instead of setting one up using Addr
// plus timeout. If provided, this will be used rather than Addr and Timeout.
Socket *thrift.TServerSocket
}
ServerConfig is the arg struct for NewServer.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package thrifttest contains objects and utility methods to aid with testing code using Thrift clients and servers.
|
Package thrifttest contains objects and utility methods to aid with testing code using Thrift clients and servers. |