subscribe

package
v1.5.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

README

Subscribe Package

The subscribe package provides gRPC service subscription and connection management for the Lynx framework. It enables applications to connect to upstream gRPC services with service discovery, TLS support, and connection health monitoring.

Overview

This package handles:

  • gRPC Service Subscription - Connect to upstream services by name
  • Service Discovery Integration - Automatic service node discovery
  • TLS Support - Secure connections with certificate management
  • Connection Health Monitoring - Automatic connection state tracking
  • Load Balancing - Node filtering and routing support

File Structure

File Description
subscribe.go Core GrpcSubscribe struct and connection management
loader.go Configuration-based subscription loading
loader_integrated.go Integrated loader with framework support
tls.go TLS configuration and certificate handling

Usage

Basic Subscription
import (
    "github.com/go-lynx/lynx/subscribe"
)

// Create a gRPC subscription
sub := subscribe.NewGrpcSubscribe(
    subscribe.WithServiceName("user-service"),
    subscribe.WithDiscovery(discoveryInstance),
)

// Connect to the service
conn, err := sub.NewGrpcConn()
if err != nil {
    return err
}
defer conn.Close()

// Use the connection
client := pb.NewUserServiceClient(conn)
With TLS
sub := subscribe.NewGrpcSubscribe(
    subscribe.WithServiceName("secure-service"),
    subscribe.WithDiscovery(discoveryInstance),
    subscribe.EnableTls(),
    subscribe.WithRootCAFileName("ca.crt"),
    subscribe.WithRootCAFileGroup("certificates"),
)

conn, err := sub.NewGrpcConn()
Required Dependencies

Mark a service as required to ensure it's available at startup:

sub := subscribe.NewGrpcSubscribe(
    subscribe.WithServiceName("critical-service"),
    subscribe.WithDiscovery(discoveryInstance),
    subscribe.Required(), // Will fail startup if unavailable
)
With Custom Routing
sub := subscribe.NewGrpcSubscribe(
    subscribe.WithServiceName("my-service"),
    subscribe.WithDiscovery(discoveryInstance),
    subscribe.WithRouterFactory(func(service string) selector.NodeFilter {
        return myCustomNodeFilter
    }),
)

Configuration

YAML Configuration
lynx:
  subscriptions:
    grpc:
      - service: "user-service"
        required: true
        tls: false
      - service: "payment-service"
        required: true
        tls: true
        ca_name: "payment-ca.crt"
        ca_group: "certificates"
      - service: "notification-service"
        required: false
        tls: false
Proto Definition
message GrpcSubscription {
  string service = 1;    // Service name in service discovery
  bool required = 2;     // Strong dependency check at startup
  bool tls = 3;          // Enable TLS
  string ca_name = 4;    // CA certificate filename
  string ca_group = 5;   // CA certificate file group
}

Options

Option Description
WithServiceName(name) Set the service name to subscribe to
WithDiscovery(discovery) Set the service discovery instance
EnableTls() Enable TLS encryption
WithRootCAFileName(name) Set the root CA certificate filename
WithRootCAFileGroup(group) Set the CA certificate file group
Required() Mark as required dependency
WithRouterFactory(factory) Set custom node routing
WithConfigProvider(provider) Set configuration source provider
WithDefaultRootCA(provider) Set default root CA provider

Connection States

The package monitors gRPC connection states:

State Description
IDLE Connection is idle
CONNECTING Establishing connection
READY Connection is ready
TRANSIENT_FAILURE Temporary failure, will retry
SHUTDOWN Connection is shut down

Built-in Middleware

Connections are created with standard middleware:

  • Tracing - Distributed tracing support
  • Logging - Request/response logging
  • Recovery - Panic recovery

Integration with Lynx

The subscription system integrates with the Lynx framework:

// In your plugin
func (p *MyPlugin) InitializeResources(rt plugins.Runtime) error {
    // Subscriptions are automatically built from configuration
    // Access them through the application context
    return nil
}

Best Practices

  1. Mark critical services as required - Ensure dependencies are available at startup
  2. Use TLS in production - Enable encryption for secure communication
  3. Configure appropriate timeouts - Prevent hanging connections
  4. Monitor connection states - Track connectivity for observability
  5. Use service discovery - Avoid hardcoded endpoints

License

Apache License 2.0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildGrpcSubscriptions

func BuildGrpcSubscriptions(cfg *conf.Subscriptions, discovery registry.Discovery, routerFactory func(string) selector.NodeFilter) (map[string]*ggrpc.ClientConn, error)

BuildGrpcSubscriptions builds gRPC subscription connections based on configuration Returns map where key is service name, value is reusable gRPC ClientConn

func BuildGrpcSubscriptionsLegacy

func BuildGrpcSubscriptionsLegacy(cfg *conf.Subscriptions, discovery registry.Discovery, routerFactory func(string) selector.NodeFilter) (map[string]*ggrpc.ClientConn, error)

BuildGrpcSubscriptionsLegacy builds gRPC subscription connections using the legacy method This is kept for backward compatibility

func BuildGrpcSubscriptionsWithPlugin

func BuildGrpcSubscriptionsWithPlugin(cfg *conf.Subscriptions, discovery registry.Discovery, routerFactory func(string) selector.NodeFilter, pluginManager interface{}) (map[string]*ggrpc.ClientConn, error)

BuildGrpcSubscriptionsWithPlugin builds gRPC subscription connections using the gRPC client plugin This is the new integrated version that uses the gRPC client plugin

func CloseGrpcConnection

func CloseGrpcConnection(serviceName string, pluginManager interface{}) error

CloseGrpcConnection closes a gRPC connection for a specific service

func CreateGrpcConnectionWithConfig

func CreateGrpcConnectionWithConfig(config interface{}, pluginManager interface{}) (*ggrpc.ClientConn, error)

CreateGrpcConnectionWithConfig creates a gRPC connection with custom configuration

func GetGrpcClientPlugin

func GetGrpcClientPlugin(pluginManager interface{}) (interface{}, error)

GetGrpcClientPlugin returns the gRPC client plugin instance

func GetGrpcConnection

func GetGrpcConnection(serviceName string, pluginManager interface{}) (*ggrpc.ClientConn, error)

GetGrpcConnection gets a gRPC connection for a specific service using the plugin

func GetGrpcConnectionCount

func GetGrpcConnectionCount(pluginManager interface{}) (int, error)

GetGrpcConnectionCount returns the number of active gRPC connections

func GetGrpcConnectionStatus

func GetGrpcConnectionStatus(pluginManager interface{}) (map[string]string, error)

GetGrpcConnectionStatus returns the status of all gRPC connections

func GetGrpcMetrics

func GetGrpcMetrics(pluginManager interface{}) (*interface{}, error)

GetGrpcMetrics returns gRPC client metrics

func HealthCheckGrpcConnections

func HealthCheckGrpcConnections(pluginManager interface{}) error

HealthCheckGrpcConnections performs health check on all gRPC connections

func InitializeGrpcClientIntegration

func InitializeGrpcClientIntegration(pluginManager interface{}) error

InitializeGrpcClientIntegration initializes the gRPC client integration This should be called during application startup

Types

type GrpcSubscribe

type GrpcSubscribe struct {
	// contains filtered or unexported fields
}

GrpcSubscribe represents a struct for subscribing to GRPC services. Contains service name, service discovery instance, TLS enablement, root CA filename and file group information.

func NewGrpcSubscribe

func NewGrpcSubscribe(opts ...Option) *GrpcSubscribe

NewGrpcSubscribe creates a new GrpcSubscribe instance using the provided options. If no options are provided, default configuration will be used.

func (*GrpcSubscribe) Subscribe

func (g *GrpcSubscribe) Subscribe() *gGrpc.ClientConn

Subscribe subscribes to the specified GRPC service and returns a gGrpc.ClientConn connection instance. Returns nil if service name is empty.

type Option

type Option func(o *GrpcSubscribe)

Option defines a function type for configuring GrpcSubscribe instances.

func EnableTls

func EnableTls() Option

EnableTls returns an Option function for enabling TLS encryption.

func Required

func Required() Option

Required returns an Option function for setting the service as a strongly dependent upstream service.

func WithConfigProvider

func WithConfigProvider(f func(name, group string) (config.Source, error)) Option

WithConfigProvider injects configuration source provider (name, group) -> config.Source

func WithDefaultRootCA

func WithDefaultRootCA(f func() []byte) Option

WithDefaultRootCA injects default RootCA provider

func WithDiscovery

func WithDiscovery(discovery registry.Discovery) Option

WithDiscovery returns an Option function for setting the service discovery instance.

func WithNodeRouterFactory

func WithNodeRouterFactory(f func(string) selector.NodeFilter) Option

WithNodeRouterFactory injects node router factory (optional)

func WithRootCAFileGroup

func WithRootCAFileGroup(caGroup string) Option

WithRootCAFileGroup returns an Option function for setting the group that the root CA certificate file belongs to.

func WithRootCAFileName

func WithRootCAFileName(caName string) Option

WithRootCAFileName returns an Option function for setting the root CA certificate filename.

func WithServiceName

func WithServiceName(svcName string) Option

WithServiceName returns an Option function for setting the name of the GRPC service to subscribe to.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL