 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Overview ¶
Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.
Index ¶
- Constants
- Variables
- func IsCancelError(err error) bool
- func IsFatalEHError(err error) bool
- func IsNotAllowedError(err error) bool
- func IsOwnershipLostError(err error) bool
- func IsQuickRecoveryError(err error) bool
- func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, ...) error
- func NewErrNonRetriable(message string) error
- func NewRPCLink(ctx context.Context, args RPCLinkArgs) (amqpwrap.RPCLink, error)
- func TransformError(err error) error
- type AMQPLink
- type AMQPReceiver
- type AMQPReceiverCloser
- type AMQPSender
- type AMQPSenderCloser
- type Closeable
- type FakeAMQPReceiver
- func (r *FakeAMQPReceiver) Close(ctx context.Context) error
- func (r *FakeAMQPReceiver) Credits() uint32
- func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
- func (r *FakeAMQPReceiver) LinkName() string
- func (r *FakeAMQPReceiver) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
 
- type FakeAMQPSender
- type FakeAMQPSession
- func (sess *FakeAMQPSession) Close(ctx context.Context) error
- func (sess *FakeAMQPSession) NewReceiver(ctx context.Context, source string, partitionID string, ...) (amqpwrap.AMQPReceiverCloser, error)
- func (sess *FakeAMQPSession) NewSender(ctx context.Context, target string, partitionID string, ...) (AMQPSenderCloser, error)
 
- type FakeNSForPartClient
- func (ns *FakeNSForPartClient) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *FakeNSForPartClient) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *FakeNSForPartClient) Recover(ctx context.Context, clientRevision uint64) error
 
- type LinkRetrier
- type LinkWithID
- type Links
- func (l *Links[LinkT]) Close(ctx context.Context) error
- func (l *Links[LinkT]) GetLink(ctx context.Context, partitionID string) (LinkWithID[LinkT], error)
- func (l *Links[LinkT]) GetManagementLink(ctx context.Context) (LinkWithID[amqpwrap.RPCLink], error)
- func (l *Links[LinkT]) Retry(ctx context.Context, eventName azlog.Event, operation string, ...) error
- func (l *Links[LinkT]) RetryManagement(ctx context.Context, eventName azlog.Event, operation string, ...) error
 
- type LinksForPartitionClient
- type Namespace
- func (ns *Namespace) Check() error
- func (ns *Namespace) Close(ctx context.Context, permanently bool) error
- func (ns *Namespace) GetAMQPClientImpl(ctx context.Context) (amqpwrap.AMQPClient, uint64, error)
- func (ns *Namespace) GetEntityAudience(entityPath string) string
- func (ns *Namespace) GetHTTPSHostURI() string
- func (ns *Namespace) GetTokenForEntity(eventHub string) (*auth.Token, error)
- func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *Namespace) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, uint64, error)
- func (ns *Namespace) Recover(ctx context.Context, theirConnID uint64) error
 
- type NamespaceForAMQPLinks
- type NamespaceForManagementOps
- type NamespaceForProducerOrConsumer
- type NamespaceOption
- func NamespaceWithConnectionString(connStr string) NamespaceOption
- func NamespaceWithCustomEndpoint(customEndpoint string) NamespaceOption
- func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
- func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
- func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
- func NamespaceWithUserAgent(userAgent string) NamespaceOption
- func NamespaceWithWebSocket(...) NamespaceOption
 
- type NamespaceWithNewAMQPLinks
- type NewLinksFn
- type RPCError
- type RPCLinkArgs
- type RPCLinkOption
- type RecoveryKind
- type RetryCallback
Constants ¶
const CapabilityGeoDRReplication = "com.microsoft:georeplication"
    CapabilityGeoDRReplication is passed as part of our desired capabilities when creating links.
const Version = "v2.0.0"
    Version is the semantic version number
Variables ¶
var ErrClientClosed = NewErrNonRetriable("client has been closed by user")
    var RPCLinkClosedErr = errors.New("rpc link closed")
    Functions ¶
func IsCancelError ¶
func IsFatalEHError ¶
func IsNotAllowedError ¶
func IsOwnershipLostError ¶
func IsQuickRecoveryError ¶
func NegotiateClaim ¶
func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, provider auth.TokenProvider) error
NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
func NewErrNonRetriable ¶
func NewRPCLink ¶
NewRPCLink will build a new request response link
func TransformError ¶
TransformError will create a proper error type that users can potentially inspect. If the error is actionable then it'll be of type exported.Error which has a 'Code' field that can be used programatically. If it's not actionable or if it's nil it'll just be returned.
Types ¶
type AMQPReceiver ¶
type AMQPReceiver = amqpwrap.AMQPReceiver
type AMQPReceiverCloser ¶
type AMQPReceiverCloser = amqpwrap.AMQPReceiverCloser
type AMQPSender ¶
type AMQPSender = amqpwrap.AMQPSender
type AMQPSenderCloser ¶
type AMQPSenderCloser = amqpwrap.AMQPSenderCloser
type Closeable ¶
Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.
type FakeAMQPReceiver ¶
type FakeAMQPReceiver struct {
	amqpwrap.AMQPReceiverCloser
	// ActiveCredits are incremented and decremented by IssueCredit and Receive.
	ActiveCredits int32
	// IssuedCredit just accumulates, so we can get an idea of how many credits we issued overall.
	IssuedCredit []uint32
	// CreditsSetFromOptions is similar to issuedCredit, but only tracks credits added in via the LinkOptions.Credit
	// field (ie, enabling prefetch).
	CreditsSetFromOptions int32
	// ManualCreditsSetFromOptions is the value of the LinkOptions.ManualCredits value.
	ManualCreditsSetFromOptions bool
	Messages []*amqp.Message
	NameForLink string
	CloseCalled int
	CloseError  error
}
    func (*FakeAMQPReceiver) Credits ¶
func (r *FakeAMQPReceiver) Credits() uint32
func (*FakeAMQPReceiver) IssueCredit ¶
func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
func (*FakeAMQPReceiver) LinkName ¶
func (r *FakeAMQPReceiver) LinkName() string
type FakeAMQPSender ¶
type FakeAMQPSender struct {
	amqpwrap.AMQPSenderCloser
	CloseCalled int
	CloseError  error
}
    type FakeAMQPSession ¶
type FakeAMQPSession struct {
	amqpwrap.AMQPSession
	NS          *FakeNSForPartClient
	CloseCalled int
}
    func (*FakeAMQPSession) NewReceiver ¶
func (sess *FakeAMQPSession) NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (amqpwrap.AMQPReceiverCloser, error)
func (*FakeAMQPSession) NewSender ¶
func (sess *FakeAMQPSession) NewSender(ctx context.Context, target string, partitionID string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)
type FakeNSForPartClient ¶
type FakeNSForPartClient struct {
	NamespaceForAMQPLinks
	Receiver          *FakeAMQPReceiver
	NewReceiverErr    error
	NewReceiverCalled int
	Sender          *FakeAMQPSender
	NewSenderErr    error
	NewSenderCalled int
	RecoverFn func(ctx context.Context, clientRevision uint64) error
}
    func (*FakeNSForPartClient) NegotiateClaim ¶
func (ns *FakeNSForPartClient) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
func (*FakeNSForPartClient) NewAMQPSession ¶
func (ns *FakeNSForPartClient) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
type LinkRetrier ¶
type LinkRetrier[LinkT AMQPLink] struct { // GetLink is set to [Links.GetLink] GetLink func(ctx context.Context, partitionID string) (LinkWithID[LinkT], error) // CloseLink is set to [Links.closePartitionLinkIfMatch] CloseLink func(ctx context.Context, partitionID string, linkName string) error // NSRecover is set to [Namespace.Recover] NSRecover func(ctx context.Context, connID uint64) error }
func (LinkRetrier[LinkT]) RecoverIfNeeded ¶
func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error) error
RecoverIfNeeded will check the error and pick the correct minimal recovery pattern (none, link only, connection and link, etc..) NOTE: if 'ctx' is cancelled this function will still close out all the connections/links involved.
func (LinkRetrier[LinkT]) Retry ¶
func (l LinkRetrier[LinkT]) Retry(ctx context.Context, eventName azlog.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn RetryCallback[LinkT]) error
Retry runs the fn argument in a loop, respecting retry counts. If connection/link failures occur it also takes care of running recovery logic to bring them back, or return an appropriate error if retries are exhausted.
type LinkWithID ¶
type LinkWithID[LinkT AMQPLink] interface { ConnID() uint64 Link() LinkT PartitionID() string Close(ctx context.Context) error String() string }
LinkWithID is a readonly interface over the top of a linkState.
type Links ¶
type Links[LinkT AMQPLink] struct { // contains filtered or unexported fields }
func NewLinks ¶
func NewLinks[LinkT AMQPLink](ns NamespaceForAMQPLinks, managementPath string, entityPathFn func(partitionID string) string, newLinkFn NewLinksFn[LinkT]) *Links[LinkT]
func (*Links[LinkT]) GetManagementLink ¶
type LinksForPartitionClient ¶
type LinksForPartitionClient[LinkT AMQPLink] interface { // Retry is [Links.Retry] Retry(ctx context.Context, eventName azlog.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[LinkT]) error) error // Close is [Links.Close] Close(ctx context.Context) error }
LinksForPartitionClient are the functions that the PartitionClient uses within Links[T] (for unit testing only)
type Namespace ¶
type Namespace struct {
	FQDN          string
	TokenProvider *sbauth.TokenProvider
	// NOTE: exported only so it can be checked in a test
	RetryOptions exported.RetryOptions
	// contains filtered or unexported fields
}
    Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per client..
func NewNamespace ¶
func NewNamespace(opts ...NamespaceOption) (*Namespace, error)
NewNamespace creates a new namespace configured through NamespaceOption(s)
func (*Namespace) Check ¶
Check returns an error if the namespace cannot be used (ie, closed permanently), or nil otherwise.
func (*Namespace) GetAMQPClientImpl ¶
func (*Namespace) GetEntityAudience ¶
func (*Namespace) GetHTTPSHostURI ¶
func (*Namespace) GetTokenForEntity ¶
func (*Namespace) NegotiateClaim ¶
func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.
func (*Namespace) NewAMQPSession ¶
NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client. Returns a closeable AMQP session and the current client revision.
func (*Namespace) NewRPCLink ¶
type NamespaceForAMQPLinks ¶
type NamespaceForAMQPLinks interface {
	NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
	NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
	NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, uint64, error)
	GetEntityAudience(entityPath string) string
	// Recover destroys the currently held AMQP connection and recreates it, if needed.
	//
	// NOTE: cancelling the context only cancels the initialization of a new AMQP
	// connection - the previous connection is always closed.
	Recover(ctx context.Context, clientRevision uint64) error
	Close(ctx context.Context, permanently bool) error
}
    NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.
type NamespaceForManagementOps ¶
type NamespaceForManagementOps interface {
	NamespaceForAMQPLinks
	GetTokenForEntity(eventHub string) (*auth.Token, error)
}
    type NamespaceForProducerOrConsumer ¶
type NamespaceForProducerOrConsumer = NamespaceForManagementOps
TODO: might just consolidate.
type NamespaceOption ¶
NamespaceOption provides structure for configuring a new Event Hub namespace
func NamespaceWithConnectionString ¶
func NamespaceWithConnectionString(connStr string) NamespaceOption
NamespaceWithConnectionString configures a namespace with the information provided in a Event Hub connection string
func NamespaceWithCustomEndpoint ¶
func NamespaceWithCustomEndpoint(customEndpoint string) NamespaceOption
NamespaceWithCustomEndpoint sets a custom endpoint, useful for when you're connecting through a TCP proxy. When establishing a TCP connection we connect to this address. The audience is extracted from the fullyQualifiedNamespace given to NamespaceWithTokenCredential or the endpoint in the connection string passed to NamespaceWithConnectionString.
func NamespaceWithRetryOptions ¶
func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
func NamespaceWithTLSConfig ¶
func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
NamespaceWithTLSConfig appends to the TLS config.
func NamespaceWithTokenCredential ¶
func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
NamespaceWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Event Hub namespace name (ex: myservicebus.servicebus.windows.net)
func NamespaceWithUserAgent ¶
func NamespaceWithUserAgent(userAgent string) NamespaceOption
NamespaceWithUserAgent appends to the root user-agent value.
func NamespaceWithWebSocket ¶
func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args exported.WebSocketConnParams) (net.Conn, error)) NamespaceOption
NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://
type NamespaceWithNewAMQPLinks ¶
type NamespaceWithNewAMQPLinks interface {
	Check() error
}
    NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.
type NewLinksFn ¶
type RPCError ¶
type RPCError struct {
	Resp    *amqpwrap.RPCResponse
	Message string
}
    RPCError is an error from an RPCLink. RPCLinks are used for communication with the $management and $cbs links.
type RPCLinkArgs ¶
type RPCLinkOption ¶
type RPCLinkOption func(link *rpcLink) error
RPCLinkOption provides a way to customize the construction of a Link
type RecoveryKind ¶
type RecoveryKind string
RecoveryKind dictates what kind of recovery is possible. Used with GetRecoveryKind().
const ( RecoveryKindNone RecoveryKind = "" RecoveryKindFatal RecoveryKind = "fatal" RecoveryKindLink RecoveryKind = "link" RecoveryKindConn RecoveryKind = "connection" )
func GetRecoveryKind ¶
func GetRecoveryKind(err error) RecoveryKind
GetRecoveryKind determines the recovery type for non-session based links.
type RetryCallback ¶
type RetryCallback[LinkT AMQPLink] func(ctx context.Context, lwid LinkWithID[LinkT]) error
       Source Files
      ¶
      Source Files
      ¶
    
  
       Directories
      ¶
      Directories
      ¶
    
    | Path | Synopsis | 
|---|---|
| Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types. | Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types. | 
| Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus. | Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus. | 
| 
          
            stress
            
            command
          
          
         | |
| Package mock is a generated GoMock package. | Package mock is a generated GoMock package. |