client

package
v0.0.0-...-13cc80a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ClearEndpointCache

func ClearEndpointCache()

Types

type Cache

type Cache struct {
	DisableLogging bool
	// contains filtered or unexported fields
}

func (*Cache) AddServiceEndpoints

func (c *Cache) AddServiceEndpoints(serviceName string, eps []*serviceEndpoint)

AddServiceEndpoints will append slice of service endpoints associated with the given serviceName within map

serviceName = lowercase of servicename.namespacename

func (*Cache) GetLiveServiceEndpoints

func (c *Cache) GetLiveServiceEndpoints(serviceName string, version string, ignoreExpired ...bool) (liveEndpoints []*serviceEndpoint)

GetLiveServiceEndpoints will retrieve currently non-expired service endpoints and remove any expired service endpoints from map, for a given serviceName

serviceName = lowercase of servicename.namespacename

func (*Cache) GetServiceEndpoints

func (c *Cache) GetServiceEndpoints(serviceName string) []*serviceEndpoint

GetServiceEndpoints returns a deep copy of endpoints for the given service

func (*Cache) PurgeServiceEndpointByHostAndPort

func (c *Cache) PurgeServiceEndpointByHostAndPort(serviceName string, host string, port uint)

PurgeServiceEndpointByHostAndPort will remove a specific endpoint for a service based on host and port info

serviceName = lowercase of servicename.namespacename

func (*Cache) PurgeServiceEndpoints

func (c *Cache) PurgeServiceEndpoints(serviceName string)

PurgeServiceEndpoints will remove all endpoints associated with the given serviceName within map

serviceName = lowercase of servicename.namespacename

type Client

type Client struct {
	// client properties
	AppName          string
	ConfigFileName   string
	CustomConfigPath string

	// web server config - for optional gin web server to be launched upon grpc client dial
	WebServerConfig *WebServerConfig

	// indicate if after dial, client will wait for target service health probe success before continuing to allow rpc
	WaitForServerReady bool

	// P2-14: ConnectionCloseDelay controls how long setConnection waits
	// before closing a replaced gRPC connection, allowing in-flight RPCs
	// to drain. Zero or negative falls back to the 100ms default
	// (connectionCloseDelay). Raise this for services with long-running
	// unary calls; lower it for tight test loops.
	ConnectionCloseDelay time.Duration

	// one or more unary client interceptors for handling wrapping actions
	UnaryClientInterceptors []grpc.UnaryClientInterceptor

	// one or more stream client interceptors for handling wrapping actions
	StreamClientInterceptors []grpc.StreamClientInterceptor

	// typically wrapper action to handle monitoring
	StatsHandler stats.Handler

	// handler to invoke before gRPC client dial is to start
	BeforeClientDial func(cli *Client)

	// handler to invoke after gRPC client dial performed
	AfterClientDial func(cli *Client)

	// handler to invoke before gRPC client connection is to close
	BeforeClientClose func(cli *Client)

	// handler to invoke after gRPC client connection has closed
	AfterClientClose func(cli *Client)
	// contains filtered or unexported fields
}

Client is the gRPC client lifecycle owner for a connector-managed outbound connection. A Client encapsulates: configuration loading (client.yaml), service-discovery resolution (Cloud Map / DNS / direct), gRPC connection establishment with TLS or plaintext, an optional notifier sub-client for push-based service-discovery updates, an optional sidecar HTTP server for non-gRPC routes, automatic reconnect on notifier-driven service changes, and a graceful Close path that drains in-flight RPCs before tearing down the connection.

Construct one with NewClient, then optionally configure exported fields BEFORE calling Dial. Dial returns once the gRPC connection is READY (or fails fast on misconfiguration). Use the underlying *grpc.ClientConn (accessed via the resolver hooks) to make RPCs after Dial returns. Call Close exactly once when done — Close is idempotent against repeated invocation but a Client is not designed for re-Dial-after-Close cycles unless the caller explicitly resets state.

Concurrency:

  • Exported fields (AppName, WebServerConfig, WaitForServerReady, ConnectionCloseDelay, ...) MUST be set before Dial and MUST NOT be mutated after.
  • Dial and Close are guarded internally; Dial drains any stale notifier-reconnect goroutine from a prior lifecycle (P2-13) before flipping the closed flag.
  • The notifier reconnect goroutine is single-instance via the notifierReconnectActive atomic CAS guard and a defer-clear pattern (see Close + Dial drain logic).

Notes:

  1. Using Compressor with RPC a) import "google.golang.org/grpc/encoding/gzip" b) in RPC Call, pass grpc.UseCompressor(gzip.Name)) in the third parameter example: RPCCall(ctx, &pb.Request{...}, grpc.UseCompressor(gzip.Name))

  2. Notifier Client yaml a) xyz-notifier-client.yaml where xyz is the target gRPC service endpoint name

func NewClient

func NewClient(appName string, configFileName string, customConfigPath string) *Client

NewClient constructs a Client with only AppName / ConfigFileName / CustomConfigPath set. Configure all other exported fields (such as WebServerConfig, WaitForServerReady, ConnectionCloseDelay, the notifier handlers) on the returned struct BEFORE calling Dial.

Unlike NewService for the server side, this constructor does not validate inputs — validation happens at Dial time when the config file is read. A returned Client whose AppName is empty will fail at Dial with a config-load error.

func (*Client) ClientConnection

func (c *Client) ClientConnection() grpc.ClientConnInterface

ClientConnection returns the currently loaded grpc client connection

func (*Client) Close

func (c *Client) Close()

Close tears down the Client: it sets the closed flag, signals the notifier reconnect goroutine (if running) to exit on its next iteration, closes the gRPC connection (after a short ConnectionCloseDelay drain window), and stops the optional sidecar HTTP server.

Close is idempotent — calling it more than once on the same Client is safe but only the first call has effect.

Note: Close does NOT eagerly clear the notifierReconnectActive flag (see P2-13). The reconnect goroutine clears that flag itself from its own defer. A subsequent Dial on the same Client drains any in-flight notifier work before flipping the closed flag back to false to avoid spawning a second reconnect goroutine that races with a still-running stale one.

func (*Client) ConfiguredDialMinConnectTimeoutSeconds

func (c *Client) ConfiguredDialMinConnectTimeoutSeconds() uint

ConfiguredDialMinConnectTimeoutSeconds gets the timeout seconds from config yaml

func (*Client) ConfiguredForClientDial

func (c *Client) ConfiguredForClientDial() bool

ConfiguredForClientDial checks if the config yaml is ready for client dial operation

func (*Client) ConfiguredForSNSDiscoveryTopicArn

func (c *Client) ConfiguredForSNSDiscoveryTopicArn() bool

ConfiguredForSNSDiscoveryTopicArn indicates if the sns topic arn for service discovery is configured within the config yaml

func (*Client) ConfiguredSNSDiscoveryTopicArn

func (c *Client) ConfiguredSNSDiscoveryTopicArn() string

ConfiguredSNSDiscoveryTopicArn returns the sns discovery topic arn as configured in config yaml

func (*Client) Dial

func (c *Client) Dial(ctx context.Context) error

Dial loads the client configuration, resolves the target service (Cloud Map, DNS, or direct address depending on the config), opens a gRPC connection, optionally starts the sidecar HTTP server, and initializes the notifier sub-client when configured.

The supplied context governs only this Dial call's deadline / cancellation — once Dial returns, the underlying gRPC connection outlives the context. Pass nil for context.Background semantics.

Dial is single-use per logical lifecycle: calling it again after Close on the same Client is unusual and not the primary supported pattern. When Dial is invoked on a previously-closed Client, the implementation drains any stale notifier-reconnect goroutine from the prior lifecycle (P2-13) before flipping the closed flag back to false. The drain is bounded by notifierReconnectDrainTimeout and will log a warning if a stale goroutine is still in-flight after the drain window.

Returns an error if any step fails — the Client is in a partially- initialized state on failure and should be Closed before discarding.

Concurrency: Dial is internally serialized through the global _mux because gRPC's resolver registration is not thread-safe. This serialization is bounded to Dial only — established connections proceed concurrently for normal RPCs.

func (*Client) DialViaRedis

func (c *Client) DialViaRedis(ctx context.Context) error

DialViaRedis is a fallback dial method that uses Redis-based service discovery instead of AWS CloudMap. It is intended to be called when the primary Dial() (which depends on CloudMap) fails due to AWS outages or restarts.

This method:

  • Reads Redis config from the client YAML config
  • Discovers service endpoints from Redis (registered by servers via RedisServiceRegistry)
  • Tries endpoints in round-robin order until one connects successfully
  • Optionally waits for health check (if WaitForServerReady is set)
  • Stores the RedisServiceDiscovery instance for later use (e.g., removing failed endpoints)

func (*Client) DoNotifierAlertService

func (c *Client) DoNotifierAlertService() (err error)

DoNotifierAlertService should be called from goroutine after the client dial completes, this service is to subscribe and receive callbacks from notifier server of service host online offline statuses

Example:

go func() {
			  svc1Cli.DoNotifierAlertService()
		  }()

func (*Client) GetLiveEndpointsCount

func (c *Client) GetLiveEndpointsCount(updateEndpointsToLoadBalanceResolver bool) (int, error)

GetLiveEndpointsCount queries cloudmap to retrieve live endpoints count, optionally update endpoints into client cache

if updateEndpointsToLoadBalanceResolver = true, then endpoint addresses will force refresh from cloudmap

func (*Client) GetState

func (c *Client) GetState() connectivity.State

GetState returns the current grpc client connection's state

func (*Client) HealthProbe

func (c *Client) HealthProbe(serviceName string, timeoutDuration ...time.Duration) (grpc_health_v1.HealthCheckResponse_ServingStatus, error)

HealthProbe manually checks service serving health status

func (*Client) PreloadConfigData

func (c *Client) PreloadConfigData() error

PreloadConfigData will load the config data before Dial()

func (*Client) Ready

func (c *Client) Ready() bool

Ready indicates client connection is ready to invoke grpc methods

func (*Client) RedisDiscovery

func (c *Client) RedisDiscovery() *RedisServiceDiscovery

RedisDiscovery returns the current Redis service discovery instance (if any). Can be used by callers to manually remove failed instances or refresh.

func (*Client) RemoteAddress

func (c *Client) RemoteAddress() string

RemoteAddress gets the remote endpoint address currently connected to

func (*Client) UpdateLoadBalanceResolver

func (c *Client) UpdateLoadBalanceResolver() error

UpdateLoadBalanceResolves updates client load balancer resolver state with new endpoint addresses

func (*Client) ZLog

func (c *Client) ZLog() *data.ZapLog

ZLog returns the internal *data.ZapLog used by the Client for structured logging. The logger is created during Dial from the loaded config (Target.ZapLogEnabled / ZapLogOutputConsole). If Dial has not yet been called, or if the receiver is nil, ZLog returns nil — callers should nil-check before logging.

The returned logger is shared across the Client and any notifier sub-client it owns; do not replace it after Dial.

type HostDiscoveryNotification

type HostDiscoveryNotification struct {
	MsgType string `json:"msg_type"`
	Action  string `json:"action"`
	Host    string `json:"host"`
}

HostDiscoveryNotification struct contains the field values for notification discovery payload

`{"msg_type":"host-discovery", "action":"online | offline", "host":"123.123.123.123:9999"}`

func (*HostDiscoveryNotification) Marshal

func (d *HostDiscoveryNotification) Marshal() (string, error)

func (*HostDiscoveryNotification) Unmarshal

func (d *HostDiscoveryNotification) Unmarshal(jsonData string) error

type NotifierClient

type NotifierClient struct {
	AppName          string
	ConfigFileName   string
	CustomConfigPath string

	BeforeClientDialHandler  func(*Client)
	AfterClientDialHandler   func(*Client)
	BeforeClientCloseHandler func(*Client)
	AfterClientCloseHandler  func(*Client)

	UnaryClientInterceptorHandlers  []grpc.UnaryClientInterceptor
	StreamClientInterceptorHandlers []grpc.StreamClientInterceptor

	ServiceAlertStartedHandler func()
	ServiceAlertSkippedHandler func(reason string)
	ServiceAlertStoppedHandler func(reason string)
	ServiceHostOnlineHandler   func(host string, port uint)
	ServiceHostOfflineHandler  func(host string, port uint)
	// contains filtered or unexported fields
}

func NewNotifierClient

func NewNotifierClient(appName string, configFileName string, customConfigPath string, enableLogging ...bool) *NotifierClient

NewNotifierClient creates a new prepared notifier client for use in service discovery notification

func (*NotifierClient) Close

func (n *NotifierClient) Close()

Close will disconnect the notifier client from the notifier server

func (*NotifierClient) ConfiguredForNotifierClientDial

func (n *NotifierClient) ConfiguredForNotifierClientDial() bool

ConfiguredForNotifierClientDial checks if the notifier client is configured for options, where Dial can be attempted to invoke

func (*NotifierClient) ConfiguredSNSDiscoveryTopicArn

func (n *NotifierClient) ConfiguredSNSDiscoveryTopicArn() string

ConfiguredSNSDiscoveryTopicArn gets the topicArn defined for the notifier client service discovery endpoints

func (*NotifierClient) Dial

func (n *NotifierClient) Dial() error

Dial will connect the notifier client to the notifier server

func (*NotifierClient) NotifierClientAlertServicesStarted

func (n *NotifierClient) NotifierClientAlertServicesStarted() bool

NotifierClientAlertServicesStarted indicates notifier client services started via Subscribe() action

func (*NotifierClient) PurgeEndpointCache

func (n *NotifierClient) PurgeEndpointCache()

PurgeEndpointCache removes current client connection's service name ip port from cache, if current service name ip port not found, entire cache will be purged.

FIX #1: Uses RemoteAddress() (which holds connMu) instead of directly accessing _remoteAddress. FIX #2: Uses cacheMu-protected helpers instead of directly accessing _cache.

func (*NotifierClient) Subscribe

func (n *NotifierClient) Subscribe(topicArn string) (err error)

Subscribe will subscribe this notifier client to a specified topicArn with sns, via notifier server; this subscription will also start the recurring loop to wait for notifier server stream data, for receiving service discovery host info; when service discovery host info is received, the appropriate ServiceHostOnlineHandler or ServiceHostOfflineHandler is triggered; calling the Close() or Unsubscribe() or receiving error conditions from notifier server will sever the long running service discovery process.

func (*NotifierClient) Unsubscribe

func (n *NotifierClient) Unsubscribe() (err error)

Unsubscribe will stop notification alert services and disconnect from subscription on notifier server

type RedisInstanceInfo

type RedisInstanceInfo struct {
	IP         string `json:"ip"`
	Port       uint   `json:"port"`
	LastUpdate int64  `json:"lastUpdate"`
}

RedisInstanceInfo Redis 中存储的实例信息

type RedisServiceDiscovery

type RedisServiceDiscovery struct {
	// contains filtered or unexported fields
}

RedisServiceDiscovery 客户端 Redis 服务发现

func NewRedisServiceDiscovery

func NewRedisServiceDiscovery(writeEndpoint, readEndpoint string, password string, db int, serviceName string, instanceTTL uint) (*RedisServiceDiscovery, error)

NewRedisServiceDiscovery 创建 Redis 服务发现客户端

func (*RedisServiceDiscovery) Close

func (d *RedisServiceDiscovery) Close() error

Close 关闭 Redis 连接

func (*RedisServiceDiscovery) GetInstanceCount

func (d *RedisServiceDiscovery) GetInstanceCount() int

GetInstanceCount 获取当前可用实例数量

func (*RedisServiceDiscovery) GetNextInstance

func (d *RedisServiceDiscovery) GetNextInstance() (string, error)

GetNextInstance 获取下一个可用实例(轮询)

func (*RedisServiceDiscovery) Refresh

func (d *RedisServiceDiscovery) Refresh() error

Refresh 刷新实例列表

func (*RedisServiceDiscovery) RemoveFailedInstance

func (d *RedisServiceDiscovery) RemoveFailedInstance(addr string) error

RemoveFailedInstance 移除连接失败的实例

type WebServerConfig

type WebServerConfig struct {
	AppName          string
	ConfigFileName   string
	CustomConfigPath string

	// define web server router info
	WebServerRoutes map[string]*ginw.RouteDefinition

	// getter only
	WebServerLocalAddress string

	// clean up func
	CleanUp func()
}

WebServerConfig info, note: WebServerLocalAddress = read only getter

note: WebServerRoutes = map[string]*ginw.RouteDefinition{
		"base": {
			Routes: []*ginw.Route{
				{
					Method: ginhttpmethod.GET,
					RelativePath: "/",
					Handler: func(c *gin.Context, bindingInputPtr interface{}) {
						c.String(200, "Connector Client Http Host Up")
					},
				},
			},
		},
	}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL