Documentation
¶
Index ¶
- func ClearEndpointCache()
- type Cache
- func (c *Cache) AddServiceEndpoints(serviceName string, eps []*serviceEndpoint)
- func (c *Cache) GetLiveServiceEndpoints(serviceName string, version string, ignoreExpired ...bool) (liveEndpoints []*serviceEndpoint)
- func (c *Cache) GetServiceEndpoints(serviceName string) []*serviceEndpoint
- func (c *Cache) PurgeServiceEndpointByHostAndPort(serviceName string, host string, port uint)
- func (c *Cache) PurgeServiceEndpoints(serviceName string)
- type Client
- func (c *Client) ClientConnection() grpc.ClientConnInterface
- func (c *Client) Close()
- func (c *Client) ConfiguredDialMinConnectTimeoutSeconds() uint
- func (c *Client) ConfiguredForClientDial() bool
- func (c *Client) ConfiguredForSNSDiscoveryTopicArn() bool
- func (c *Client) ConfiguredSNSDiscoveryTopicArn() string
- func (c *Client) Dial(ctx context.Context) error
- func (c *Client) DialViaRedis(ctx context.Context) error
- func (c *Client) DoNotifierAlertService() (err error)
- func (c *Client) GetLiveEndpointsCount(updateEndpointsToLoadBalanceResolver bool) (int, error)
- func (c *Client) GetState() connectivity.State
- func (c *Client) HealthProbe(serviceName string, timeoutDuration ...time.Duration) (grpc_health_v1.HealthCheckResponse_ServingStatus, error)
- func (c *Client) PreloadConfigData() error
- func (c *Client) Ready() bool
- func (c *Client) RedisDiscovery() *RedisServiceDiscovery
- func (c *Client) RemoteAddress() string
- func (c *Client) UpdateLoadBalanceResolver() error
- func (c *Client) ZLog() *data.ZapLog
- type HostDiscoveryNotification
- type NotifierClient
- func (n *NotifierClient) Close()
- func (n *NotifierClient) ConfiguredForNotifierClientDial() bool
- func (n *NotifierClient) ConfiguredSNSDiscoveryTopicArn() string
- func (n *NotifierClient) Dial() error
- func (n *NotifierClient) NotifierClientAlertServicesStarted() bool
- func (n *NotifierClient) PurgeEndpointCache()
- func (n *NotifierClient) Subscribe(topicArn string) (err error)
- func (n *NotifierClient) Unsubscribe() (err error)
- type RedisInstanceInfo
- type RedisServiceDiscovery
- type WebServerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ClearEndpointCache ¶
func ClearEndpointCache()
Types ¶
type Cache ¶
type Cache struct {
DisableLogging bool
// contains filtered or unexported fields
}
func (*Cache) AddServiceEndpoints ¶
AddServiceEndpoints will append slice of service endpoints associated with the given serviceName within map
serviceName = lowercase of servicename.namespacename
func (*Cache) GetLiveServiceEndpoints ¶
func (c *Cache) GetLiveServiceEndpoints(serviceName string, version string, ignoreExpired ...bool) (liveEndpoints []*serviceEndpoint)
GetLiveServiceEndpoints will retrieve currently non-expired service endpoints and remove any expired service endpoints from map, for a given serviceName
serviceName = lowercase of servicename.namespacename
func (*Cache) GetServiceEndpoints ¶
GetServiceEndpoints returns a deep copy of endpoints for the given service
func (*Cache) PurgeServiceEndpointByHostAndPort ¶
PurgeServiceEndpointByHostAndPort will remove a specific endpoint for a service based on host and port info
serviceName = lowercase of servicename.namespacename
func (*Cache) PurgeServiceEndpoints ¶
PurgeServiceEndpoints will remove all endpoints associated with the given serviceName within map
serviceName = lowercase of servicename.namespacename
type Client ¶
type Client struct {
// client properties
AppName string
ConfigFileName string
CustomConfigPath string
// web server config - for optional gin web server to be launched upon grpc client dial
WebServerConfig *WebServerConfig
// indicate if after dial, client will wait for target service health probe success before continuing to allow rpc
WaitForServerReady bool
// P2-14: ConnectionCloseDelay controls how long setConnection waits
// before closing a replaced gRPC connection, allowing in-flight RPCs
// to drain. Zero or negative falls back to the 100ms default
// (connectionCloseDelay). Raise this for services with long-running
// unary calls; lower it for tight test loops.
ConnectionCloseDelay time.Duration
// one or more unary client interceptors for handling wrapping actions
UnaryClientInterceptors []grpc.UnaryClientInterceptor
// one or more stream client interceptors for handling wrapping actions
StreamClientInterceptors []grpc.StreamClientInterceptor
// typically wrapper action to handle monitoring
StatsHandler stats.Handler
// handler to invoke before gRPC client dial is to start
BeforeClientDial func(cli *Client)
// handler to invoke after gRPC client dial performed
AfterClientDial func(cli *Client)
// handler to invoke before gRPC client connection is to close
BeforeClientClose func(cli *Client)
// handler to invoke after gRPC client connection has closed
AfterClientClose func(cli *Client)
// contains filtered or unexported fields
}
Client is the gRPC client lifecycle owner for a connector-managed outbound connection. A Client encapsulates: configuration loading (client.yaml), service-discovery resolution (Cloud Map / DNS / direct), gRPC connection establishment with TLS or plaintext, an optional notifier sub-client for push-based service-discovery updates, an optional sidecar HTTP server for non-gRPC routes, automatic reconnect on notifier-driven service changes, and a graceful Close path that drains in-flight RPCs before tearing down the connection.
Construct one with NewClient, then optionally configure exported fields BEFORE calling Dial. Dial returns once the gRPC connection is READY (or fails fast on misconfiguration). Use the underlying *grpc.ClientConn (accessed via the resolver hooks) to make RPCs after Dial returns. Call Close exactly once when done — Close is idempotent against repeated invocation but a Client is not designed for re-Dial-after-Close cycles unless the caller explicitly resets state.
Concurrency:
- Exported fields (AppName, WebServerConfig, WaitForServerReady, ConnectionCloseDelay, ...) MUST be set before Dial and MUST NOT be mutated after.
- Dial and Close are guarded internally; Dial drains any stale notifier-reconnect goroutine from a prior lifecycle (P2-13) before flipping the closed flag.
- The notifier reconnect goroutine is single-instance via the notifierReconnectActive atomic CAS guard and a defer-clear pattern (see Close + Dial drain logic).
Notes:
Using Compressor with RPC a) import "google.golang.org/grpc/encoding/gzip" b) in RPC Call, pass grpc.UseCompressor(gzip.Name)) in the third parameter example: RPCCall(ctx, &pb.Request{...}, grpc.UseCompressor(gzip.Name))
Notifier Client yaml a) xyz-notifier-client.yaml where xyz is the target gRPC service endpoint name
func NewClient ¶
NewClient constructs a Client with only AppName / ConfigFileName / CustomConfigPath set. Configure all other exported fields (such as WebServerConfig, WaitForServerReady, ConnectionCloseDelay, the notifier handlers) on the returned struct BEFORE calling Dial.
Unlike NewService for the server side, this constructor does not validate inputs — validation happens at Dial time when the config file is read. A returned Client whose AppName is empty will fail at Dial with a config-load error.
func (*Client) ClientConnection ¶
func (c *Client) ClientConnection() grpc.ClientConnInterface
ClientConnection returns the currently loaded grpc client connection
func (*Client) Close ¶
func (c *Client) Close()
Close tears down the Client: it sets the closed flag, signals the notifier reconnect goroutine (if running) to exit on its next iteration, closes the gRPC connection (after a short ConnectionCloseDelay drain window), and stops the optional sidecar HTTP server.
Close is idempotent — calling it more than once on the same Client is safe but only the first call has effect.
Note: Close does NOT eagerly clear the notifierReconnectActive flag (see P2-13). The reconnect goroutine clears that flag itself from its own defer. A subsequent Dial on the same Client drains any in-flight notifier work before flipping the closed flag back to false to avoid spawning a second reconnect goroutine that races with a still-running stale one.
func (*Client) ConfiguredDialMinConnectTimeoutSeconds ¶
ConfiguredDialMinConnectTimeoutSeconds gets the timeout seconds from config yaml
func (*Client) ConfiguredForClientDial ¶
ConfiguredForClientDial checks if the config yaml is ready for client dial operation
func (*Client) ConfiguredForSNSDiscoveryTopicArn ¶
ConfiguredForSNSDiscoveryTopicArn indicates if the sns topic arn for service discovery is configured within the config yaml
func (*Client) ConfiguredSNSDiscoveryTopicArn ¶
ConfiguredSNSDiscoveryTopicArn returns the sns discovery topic arn as configured in config yaml
func (*Client) Dial ¶
Dial loads the client configuration, resolves the target service (Cloud Map, DNS, or direct address depending on the config), opens a gRPC connection, optionally starts the sidecar HTTP server, and initializes the notifier sub-client when configured.
The supplied context governs only this Dial call's deadline / cancellation — once Dial returns, the underlying gRPC connection outlives the context. Pass nil for context.Background semantics.
Dial is single-use per logical lifecycle: calling it again after Close on the same Client is unusual and not the primary supported pattern. When Dial is invoked on a previously-closed Client, the implementation drains any stale notifier-reconnect goroutine from the prior lifecycle (P2-13) before flipping the closed flag back to false. The drain is bounded by notifierReconnectDrainTimeout and will log a warning if a stale goroutine is still in-flight after the drain window.
Returns an error if any step fails — the Client is in a partially- initialized state on failure and should be Closed before discarding.
Concurrency: Dial is internally serialized through the global _mux because gRPC's resolver registration is not thread-safe. This serialization is bounded to Dial only — established connections proceed concurrently for normal RPCs.
func (*Client) DialViaRedis ¶
DialViaRedis is a fallback dial method that uses Redis-based service discovery instead of AWS CloudMap. It is intended to be called when the primary Dial() (which depends on CloudMap) fails due to AWS outages or restarts.
This method:
- Reads Redis config from the client YAML config
- Discovers service endpoints from Redis (registered by servers via RedisServiceRegistry)
- Tries endpoints in round-robin order until one connects successfully
- Optionally waits for health check (if WaitForServerReady is set)
- Stores the RedisServiceDiscovery instance for later use (e.g., removing failed endpoints)
func (*Client) DoNotifierAlertService ¶
DoNotifierAlertService should be called from goroutine after the client dial completes, this service is to subscribe and receive callbacks from notifier server of service host online offline statuses
Example:
go func() {
svc1Cli.DoNotifierAlertService()
}()
func (*Client) GetLiveEndpointsCount ¶
GetLiveEndpointsCount queries cloudmap to retrieve live endpoints count, optionally update endpoints into client cache
if updateEndpointsToLoadBalanceResolver = true, then endpoint addresses will force refresh from cloudmap
func (*Client) GetState ¶
func (c *Client) GetState() connectivity.State
GetState returns the current grpc client connection's state
func (*Client) HealthProbe ¶
func (c *Client) HealthProbe(serviceName string, timeoutDuration ...time.Duration) (grpc_health_v1.HealthCheckResponse_ServingStatus, error)
HealthProbe manually checks service serving health status
func (*Client) PreloadConfigData ¶
PreloadConfigData will load the config data before Dial()
func (*Client) RedisDiscovery ¶
func (c *Client) RedisDiscovery() *RedisServiceDiscovery
RedisDiscovery returns the current Redis service discovery instance (if any). Can be used by callers to manually remove failed instances or refresh.
func (*Client) RemoteAddress ¶
RemoteAddress gets the remote endpoint address currently connected to
func (*Client) UpdateLoadBalanceResolver ¶
UpdateLoadBalanceResolves updates client load balancer resolver state with new endpoint addresses
func (*Client) ZLog ¶
ZLog returns the internal *data.ZapLog used by the Client for structured logging. The logger is created during Dial from the loaded config (Target.ZapLogEnabled / ZapLogOutputConsole). If Dial has not yet been called, or if the receiver is nil, ZLog returns nil — callers should nil-check before logging.
The returned logger is shared across the Client and any notifier sub-client it owns; do not replace it after Dial.
type HostDiscoveryNotification ¶
type HostDiscoveryNotification struct {
MsgType string `json:"msg_type"`
Action string `json:"action"`
Host string `json:"host"`
}
HostDiscoveryNotification struct contains the field values for notification discovery payload
`{"msg_type":"host-discovery", "action":"online | offline", "host":"123.123.123.123:9999"}`
func (*HostDiscoveryNotification) Marshal ¶
func (d *HostDiscoveryNotification) Marshal() (string, error)
func (*HostDiscoveryNotification) Unmarshal ¶
func (d *HostDiscoveryNotification) Unmarshal(jsonData string) error
type NotifierClient ¶
type NotifierClient struct {
AppName string
ConfigFileName string
CustomConfigPath string
BeforeClientDialHandler func(*Client)
AfterClientDialHandler func(*Client)
BeforeClientCloseHandler func(*Client)
AfterClientCloseHandler func(*Client)
UnaryClientInterceptorHandlers []grpc.UnaryClientInterceptor
StreamClientInterceptorHandlers []grpc.StreamClientInterceptor
ServiceAlertStartedHandler func()
ServiceAlertSkippedHandler func(reason string)
ServiceAlertStoppedHandler func(reason string)
ServiceHostOnlineHandler func(host string, port uint)
ServiceHostOfflineHandler func(host string, port uint)
// contains filtered or unexported fields
}
func NewNotifierClient ¶
func NewNotifierClient(appName string, configFileName string, customConfigPath string, enableLogging ...bool) *NotifierClient
NewNotifierClient creates a new prepared notifier client for use in service discovery notification
func (*NotifierClient) Close ¶
func (n *NotifierClient) Close()
Close will disconnect the notifier client from the notifier server
func (*NotifierClient) ConfiguredForNotifierClientDial ¶
func (n *NotifierClient) ConfiguredForNotifierClientDial() bool
ConfiguredForNotifierClientDial checks if the notifier client is configured for options, where Dial can be attempted to invoke
func (*NotifierClient) ConfiguredSNSDiscoveryTopicArn ¶
func (n *NotifierClient) ConfiguredSNSDiscoveryTopicArn() string
ConfiguredSNSDiscoveryTopicArn gets the topicArn defined for the notifier client service discovery endpoints
func (*NotifierClient) Dial ¶
func (n *NotifierClient) Dial() error
Dial will connect the notifier client to the notifier server
func (*NotifierClient) NotifierClientAlertServicesStarted ¶
func (n *NotifierClient) NotifierClientAlertServicesStarted() bool
NotifierClientAlertServicesStarted indicates notifier client services started via Subscribe() action
func (*NotifierClient) PurgeEndpointCache ¶
func (n *NotifierClient) PurgeEndpointCache()
PurgeEndpointCache removes current client connection's service name ip port from cache, if current service name ip port not found, entire cache will be purged.
FIX #1: Uses RemoteAddress() (which holds connMu) instead of directly accessing _remoteAddress. FIX #2: Uses cacheMu-protected helpers instead of directly accessing _cache.
func (*NotifierClient) Subscribe ¶
func (n *NotifierClient) Subscribe(topicArn string) (err error)
Subscribe will subscribe this notifier client to a specified topicArn with sns, via notifier server; this subscription will also start the recurring loop to wait for notifier server stream data, for receiving service discovery host info; when service discovery host info is received, the appropriate ServiceHostOnlineHandler or ServiceHostOfflineHandler is triggered; calling the Close() or Unsubscribe() or receiving error conditions from notifier server will sever the long running service discovery process.
func (*NotifierClient) Unsubscribe ¶
func (n *NotifierClient) Unsubscribe() (err error)
Unsubscribe will stop notification alert services and disconnect from subscription on notifier server
type RedisInstanceInfo ¶
type RedisInstanceInfo struct {
IP string `json:"ip"`
Port uint `json:"port"`
LastUpdate int64 `json:"lastUpdate"`
}
RedisInstanceInfo Redis 中存储的实例信息
type RedisServiceDiscovery ¶
type RedisServiceDiscovery struct {
// contains filtered or unexported fields
}
RedisServiceDiscovery 客户端 Redis 服务发现
func NewRedisServiceDiscovery ¶
func NewRedisServiceDiscovery(writeEndpoint, readEndpoint string, password string, db int, serviceName string, instanceTTL uint) (*RedisServiceDiscovery, error)
NewRedisServiceDiscovery 创建 Redis 服务发现客户端
func (*RedisServiceDiscovery) Close ¶
func (d *RedisServiceDiscovery) Close() error
Close 关闭 Redis 连接
func (*RedisServiceDiscovery) GetInstanceCount ¶
func (d *RedisServiceDiscovery) GetInstanceCount() int
GetInstanceCount 获取当前可用实例数量
func (*RedisServiceDiscovery) GetNextInstance ¶
func (d *RedisServiceDiscovery) GetNextInstance() (string, error)
GetNextInstance 获取下一个可用实例(轮询)
func (*RedisServiceDiscovery) Refresh ¶
func (d *RedisServiceDiscovery) Refresh() error
Refresh 刷新实例列表
func (*RedisServiceDiscovery) RemoveFailedInstance ¶
func (d *RedisServiceDiscovery) RemoveFailedInstance(addr string) error
RemoveFailedInstance 移除连接失败的实例
type WebServerConfig ¶
type WebServerConfig struct {
AppName string
ConfigFileName string
CustomConfigPath string
// define web server router info
WebServerRoutes map[string]*ginw.RouteDefinition
// getter only
WebServerLocalAddress string
// clean up func
CleanUp func()
}
WebServerConfig info, note: WebServerLocalAddress = read only getter
note: WebServerRoutes = map[string]*ginw.RouteDefinition{
"base": {
Routes: []*ginw.Route{
{
Method: ginhttpmethod.GET,
RelativePath: "/",
Handler: func(c *gin.Context, bindingInputPtr interface{}) {
c.String(200, "Connector Client Http Host Up")
},
},
},
},
}