client

package
v1.8.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: Apache-2.0 Imports: 47 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ClearEndpointCache added in v1.0.8

func ClearEndpointCache()

Types

type Cache

type Cache struct {
	DisableLogging bool
	// contains filtered or unexported fields
}

func (*Cache) AddServiceEndpoints

func (c *Cache) AddServiceEndpoints(serviceName string, eps []*serviceEndpoint)

AddServiceEndpoints will append slice of service endpoints associated with the given serviceName within map

serviceName = lowercase of servicename.namespacename

func (*Cache) GetLiveServiceEndpoints

func (c *Cache) GetLiveServiceEndpoints(serviceName string, version string, ignoreExpired ...bool) (liveEndpoints []*serviceEndpoint)

GetLiveServiceEndpoints will retrieve currently non-expired service endpoints and remove any expired service endpoints from map, for a given serviceName

serviceName = lowercase of servicename.namespacename

func (*Cache) GetServiceEndpoints added in v1.7.0

func (c *Cache) GetServiceEndpoints(serviceName string) []*serviceEndpoint

GetServiceEndpoints returns a deep copy of endpoints for the given service

func (*Cache) PurgeServiceEndpointByHostAndPort added in v1.0.8

func (c *Cache) PurgeServiceEndpointByHostAndPort(serviceName string, host string, port uint)

PurgeServiceEndpointByHostAndPort will remove a specific endpoint for a service based on host and port info

serviceName = lowercase of servicename.namespacename

func (*Cache) PurgeServiceEndpoints

func (c *Cache) PurgeServiceEndpoints(serviceName string)

PurgeServiceEndpoints will remove all endpoints associated with the given serviceName within map

serviceName = lowercase of servicename.namespacename

type Client

type Client struct {
	// client properties
	AppName          string
	ConfigFileName   string
	CustomConfigPath string

	// web server config - for optional gin web server to be launched upon grpc client dial
	WebServerConfig *WebServerConfig

	// indicate if after dial, client will wait for target service health probe success before continuing to allow rpc
	WaitForServerReady bool

	// P2-14: ConnectionCloseDelay controls how long setConnection waits
	// before closing a replaced gRPC connection, allowing in-flight RPCs
	// to drain. Zero or negative falls back to the 100ms default
	// (connectionCloseDelay). Raise this for services with long-running
	// unary calls; lower it for tight test loops.
	ConnectionCloseDelay time.Duration

	// one or more unary client interceptors for handling wrapping actions
	UnaryClientInterceptors []grpc.UnaryClientInterceptor

	// one or more stream client interceptors for handling wrapping actions
	StreamClientInterceptors []grpc.StreamClientInterceptor

	// typically wrapper action to handle monitoring
	StatsHandler stats.Handler

	// handler to invoke before gRPC client dial is to start
	BeforeClientDial func(cli *Client)

	// handler to invoke after gRPC client dial performed
	AfterClientDial func(cli *Client)

	// handler to invoke before gRPC client connection is to close
	BeforeClientClose func(cli *Client)

	// handler to invoke after gRPC client connection has closed
	AfterClientClose func(cli *Client)
	// contains filtered or unexported fields
}

Client is the gRPC client lifecycle owner for a connector-managed outbound connection. A Client encapsulates: configuration loading (client.yaml), service-discovery resolution (Cloud Map / DNS / direct), gRPC connection establishment with TLS or plaintext, an optional notifier sub-client for push-based service-discovery updates, an optional sidecar HTTP server for non-gRPC routes, automatic reconnect on notifier-driven service changes, and a graceful Close path that drains in-flight RPCs before tearing down the connection.

Construct one with NewClient, then optionally configure exported fields BEFORE calling Dial. Dial returns once the gRPC connection is READY (or fails fast on misconfiguration). Use the underlying *grpc.ClientConn (accessed via the resolver hooks) to make RPCs after Dial returns. Call Close exactly once when done — Close is idempotent against repeated invocation but a Client is not designed for re-Dial-after-Close cycles unless the caller explicitly resets state.

Concurrency:

  • Exported fields (AppName, WebServerConfig, WaitForServerReady, ConnectionCloseDelay, ...) MUST be set before Dial and MUST NOT be mutated after.
  • Dial and Close are guarded internally; Dial drains any stale notifier-reconnect goroutine from a prior lifecycle (P2-13) before flipping the closed flag.
  • The notifier reconnect goroutine is single-instance via the notifierReconnectActive atomic CAS guard and a defer-clear pattern (see Close + Dial drain logic).

Notes:

  1. Using Compressor with RPC a) import "google.golang.org/grpc/encoding/gzip" b) in RPC Call, pass grpc.UseCompressor(gzip.Name)) in the third parameter example: RPCCall(ctx, &pb.Request{...}, grpc.UseCompressor(gzip.Name))

  2. Notifier Client yaml a) xyz-notifier-client.yaml where xyz is the target gRPC service endpoint name

func NewClient

func NewClient(appName string, configFileName string, customConfigPath string) *Client

NewClient constructs a Client with only AppName / ConfigFileName / CustomConfigPath set. Configure all other exported fields (such as WebServerConfig, WaitForServerReady, ConnectionCloseDelay, the notifier handlers) on the returned struct BEFORE calling Dial.

Unlike NewService for the server side, this constructor does not validate inputs — validation happens at Dial time when the config file is read. A returned Client whose AppName is empty will fail at Dial with a config-load error.

func (*Client) ClientConnection

func (c *Client) ClientConnection() grpc.ClientConnInterface

ClientConnection returns the currently loaded grpc client connection

func (*Client) Close

func (c *Client) Close()

Close tears down the Client: it sets the closed flag, signals the notifier reconnect goroutine (if running) to exit on its next iteration, closes the gRPC connection (after a short ConnectionCloseDelay drain window), and stops the optional sidecar HTTP server.

Close is idempotent — calling it more than once on the same Client is safe but only the first call has effect.

Note: Close does NOT eagerly clear the notifierReconnectActive flag (see P2-13). The reconnect goroutine clears that flag itself from its own defer. A subsequent Dial on the same Client drains any in-flight notifier work before flipping the closed flag back to false to avoid spawning a second reconnect goroutine that races with a still-running stale one.

func (*Client) ConfiguredDialMinConnectTimeoutSeconds added in v1.0.7

func (c *Client) ConfiguredDialMinConnectTimeoutSeconds() uint

ConfiguredDialMinConnectTimeoutSeconds gets the timeout seconds from config yaml

func (*Client) ConfiguredForClientDial added in v1.0.7

func (c *Client) ConfiguredForClientDial() bool

ConfiguredForClientDial checks if the config yaml is ready for client dial operation

func (*Client) ConfiguredForSNSDiscoveryTopicArn added in v1.0.7

func (c *Client) ConfiguredForSNSDiscoveryTopicArn() bool

ConfiguredForSNSDiscoveryTopicArn indicates if the sns topic arn for service discovery is configured within the config yaml

func (*Client) ConfiguredSNSDiscoveryTopicArn added in v1.0.7

func (c *Client) ConfiguredSNSDiscoveryTopicArn() string

ConfiguredSNSDiscoveryTopicArn returns the sns discovery topic arn as configured in config yaml

func (*Client) Dial

func (c *Client) Dial(ctx context.Context) error

Dial loads the client configuration, resolves the target service (Cloud Map, DNS, or direct address depending on the config), opens a gRPC connection, optionally starts the sidecar HTTP server, and initializes the notifier sub-client when configured.

The supplied context governs only this Dial call's deadline / cancellation — once Dial returns, the underlying gRPC connection outlives the context. Pass nil for context.Background semantics.

Dial is single-use per logical lifecycle: calling it again after Close on the same Client is unusual and not the primary supported pattern. When Dial is invoked on a previously-closed Client, the implementation drains any stale notifier-reconnect goroutine from the prior lifecycle (P2-13) before flipping the closed flag back to false. The drain is bounded by notifierReconnectDrainTimeout and will log a warning if a stale goroutine is still in-flight after the drain window.

Returns an error if any step fails — the Client is in a partially- initialized state on failure and should be Closed before discarding.

Concurrency: Dial is internally serialized through the global _mux because gRPC's resolver registration is not thread-safe. This serialization is bounded to Dial only — established connections proceed concurrently for normal RPCs.

func (*Client) DoNotifierAlertService added in v1.0.8

func (c *Client) DoNotifierAlertService() (err error)

DoNotifierAlertService should be called from goroutine after the client dial completes, this service is to subscribe and receive callbacks from notifier server of service host online offline statuses

Example:

go func() {
			  svc1Cli.DoNotifierAlertService()
		  }()

func (*Client) GetLiveEndpointsCount added in v1.6.1

func (c *Client) GetLiveEndpointsCount(updateEndpointsToLoadBalanceResolver bool) (int, error)

GetLiveEndpointsCount queries cloudmap to retrieve live endpoints count, optionally update endpoints into client cache

if updateEndpointsToLoadBalanceResolver = true, then endpoint addresses will force refresh from cloudmap

func (*Client) GetState

func (c *Client) GetState() connectivity.State

GetState returns the current grpc client connection's state

func (*Client) HealthProbe

func (c *Client) HealthProbe(serviceName string, timeoutDuration ...time.Duration) (grpc_health_v1.HealthCheckResponse_ServingStatus, error)

HealthProbe manually checks service serving health status

func (*Client) PreloadConfigData added in v1.0.7

func (c *Client) PreloadConfigData() error

PreloadConfigData will load the config data before Dial()

func (*Client) Ready added in v1.0.8

func (c *Client) Ready() bool

Ready indicates client connection is ready to invoke grpc methods

func (*Client) RemoteAddress

func (c *Client) RemoteAddress() string

RemoteAddress gets the remote endpoint address currently connected to

func (*Client) UpdateLoadBalanceResolver added in v1.0.8

func (c *Client) UpdateLoadBalanceResolver() error

UpdateLoadBalanceResolves updates client load balancer resolver state with new endpoint addresses

func (*Client) ZLog added in v1.0.8

func (c *Client) ZLog() *data.ZapLog

ZLog returns the internal *data.ZapLog used by the Client for structured logging. The logger is created during Dial from the loaded config (Target.ZapLogEnabled / ZapLogOutputConsole). If Dial has not yet been called, or if the receiver is nil, ZLog returns nil — callers should nil-check before logging.

The returned logger is shared across the Client and any notifier sub-client it owns; do not replace it after Dial.

type HostDiscoveryNotification added in v1.0.8

type HostDiscoveryNotification struct {
	MsgType string `json:"msg_type"`
	Action  string `json:"action"`
	Host    string `json:"host"`
}

HostDiscoveryNotification struct contains the field values for notification discovery payload

`{"msg_type":"host-discovery", "action":"online | offline", "host":"123.123.123.123:9999"}`

func (*HostDiscoveryNotification) Marshal added in v1.0.8

func (d *HostDiscoveryNotification) Marshal() (string, error)

func (*HostDiscoveryNotification) Unmarshal added in v1.0.8

func (d *HostDiscoveryNotification) Unmarshal(jsonData string) error

type NotifierClient added in v1.0.8

type NotifierClient struct {
	AppName          string
	ConfigFileName   string
	CustomConfigPath string

	BeforeClientDialHandler  func(*Client)
	AfterClientDialHandler   func(*Client)
	BeforeClientCloseHandler func(*Client)
	AfterClientCloseHandler  func(*Client)

	UnaryClientInterceptorHandlers  []grpc.UnaryClientInterceptor
	StreamClientInterceptorHandlers []grpc.StreamClientInterceptor

	ServiceAlertStartedHandler func()
	ServiceAlertSkippedHandler func(reason string)
	ServiceAlertStoppedHandler func(reason string)
	ServiceHostOnlineHandler   func(host string, port uint)
	ServiceHostOfflineHandler  func(host string, port uint)
	// contains filtered or unexported fields
}

func NewNotifierClient added in v1.0.8

func NewNotifierClient(appName string, configFileName string, customConfigPath string, enableLogging ...bool) *NotifierClient

NewNotifierClient creates a new prepared notifier client for use in service discovery notification

func (*NotifierClient) Close added in v1.0.8

func (n *NotifierClient) Close()

Close will disconnect the notifier client from the notifier server

func (*NotifierClient) ConfiguredForNotifierClientDial added in v1.0.8

func (n *NotifierClient) ConfiguredForNotifierClientDial() bool

ConfiguredForNotifierClientDial checks if the notifier client is configured for options, where Dial can be attempted to invoke

func (*NotifierClient) ConfiguredSNSDiscoveryTopicArn added in v1.0.8

func (n *NotifierClient) ConfiguredSNSDiscoveryTopicArn() string

ConfiguredSNSDiscoveryTopicArn gets the topicArn defined for the notifier client service discovery endpoints

func (*NotifierClient) Dial added in v1.0.8

func (n *NotifierClient) Dial() error

Dial will connect the notifier client to the notifier server

func (*NotifierClient) NotifierClientAlertServicesStarted added in v1.0.8

func (n *NotifierClient) NotifierClientAlertServicesStarted() bool

NotifierClientAlertServicesStarted indicates notifier client services started via Subscribe() action

func (*NotifierClient) PurgeEndpointCache added in v1.0.8

func (n *NotifierClient) PurgeEndpointCache()

PurgeEndpointCache removes current client connection's service name ip port from cache, if current service name ip port not found, entire cache will be purged.

FIX #1: Uses RemoteAddress() (which holds connMu) instead of directly accessing _remoteAddress. FIX #2: Uses cacheMu-protected helpers instead of directly accessing _cache.

func (*NotifierClient) Subscribe added in v1.0.8

func (n *NotifierClient) Subscribe(topicArn string) (err error)

Subscribe will subscribe this notifier client to a specified topicArn with sns, via notifier server; this subscription will also start the recurring loop to wait for notifier server stream data, for receiving service discovery host info; when service discovery host info is received, the appropriate ServiceHostOnlineHandler or ServiceHostOfflineHandler is triggered; calling the Close() or Unsubscribe() or receiving error conditions from notifier server will sever the long running service discovery process.

func (*NotifierClient) Unsubscribe added in v1.0.8

func (n *NotifierClient) Unsubscribe() (err error)

Unsubscribe will stop notification alert services and disconnect from subscription on notifier server

type WebServerConfig added in v1.0.7

type WebServerConfig struct {
	AppName          string
	ConfigFileName   string
	CustomConfigPath string

	// define web server router info
	WebServerRoutes map[string]*ginw.RouteDefinition

	// getter only
	WebServerLocalAddress string

	// clean up func
	CleanUp func()
}

WebServerConfig info, note: WebServerLocalAddress = read only getter

note: WebServerRoutes = map[string]*ginw.RouteDefinition{
		"base": {
			Routes: []*ginw.Route{
				{
					Method: ginhttpmethod.GET,
					RelativePath: "/",
					Handler: func(c *gin.Context, bindingInputPtr interface{}) {
						c.String(200, "Connector Client Http Host Up")
					},
				},
			},
		},
	}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL