cluster

package
v1.67.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: MIT Imports: 18 Imported by: 2

README

Package cluster

Пакет cluster предназначен для управления кластером микросервисов с возможностями:

  • Регистрация модулей в кластере
  • Динамическое обновление конфигурации
  • Обмен маршрутами между сервисами
  • Обработка событий кластера

Types

Client

Структура Client представляет собой клиент кластера с функциональностью установки и поддержки соединения с конфигурационным сервисом, регистрации модуля в кластере, обработки жизненного цикла соединения с отправкой метаданных модуля (версия, эндпоинты)

Methods:

NewClient(moduleInfo ModuleInfo, configData ConfigData, hosts []string, handleConfigTimeout time.Duration, logger log.Logger) *Client

Конструктор клиента кластера.

(c *Client) Run(ctx context.Context, eventHandler *EventHandler) error

Запустить клиент кластера с указанным обработчиком событий.

(c *Client) Close() error

Завершить работу клиента.

(c *Client) Healthcheck(ctx context.Context) error

Проверить активна ли текущая сессия.

EventHandler

Структура EventHandler представляет собой компоновку обработчиков основных событий в кластере микросервисов.

Methods:

NewEventHandler() *EventHandler

Конструктор.

(h *EventHandler) RemoteConfigReceiver(receiver RemoteConfigReceiver) *EventHandler

Установить обработчик на получение обновленной динамической-конфигурации текущего модуля. Обработчик должен реализовывать интерфейс RemoteConfigReceiver. По умолчанию выставляет timeout на обработку конфига в 5 секунд.

(h *EventHandler) RoutesReceiver(receiver RoutesReceiver) *EventHandler

Установить обработчик на получение актуальных эндпоинтов, зависимостях и прочей информации о сервисах в кластере. Обработчик должен реализовывать интерфейс RoutesReceiver.

(h *EventHandler) RequireModule(moduleName string, upgrader HostsUpgrader) *EventHandler

Установить обработчик на получение списка адресов модулей, от которых зависит текущий сервис. Обработчик должен реализовывать интерфейс HostsUpgrader.

Usage

Default usage flow
package main

import (
	"context"
	"log"
	"time"

	"github.com/txix-open/isp-kit/app"
	"github.com/txix-open/isp-kit/cluster"
	"github.com/txix-open/isp-kit/config"
	"github.com/txix-open/isp-kit/grpc/client"
	"github.com/txix-open/isp-kit/rc"
	"github.com/txix-open/isp-kit/shutdown"
)

type remoteConfigUpdater struct{}

func (r remoteConfigUpdater) ReceiveConfig(ctx context.Context, remoteConfig []byte) error {
	/* handle & update new remote config */
	return nil
}

type routesHandler struct{}

func (r routesHandler) ReceiveRoutes(ctx context.Context, routes cluster.RoutingConfig) error {
	for _, module := range routes {
		/* handle each module's info */
	}
	return nil
}

func noopHandler() {}

func main() {
	application, err := app.New()
	if err != nil {
		log.Fatal(err)
	}

	moduleInfo := cluster.ModuleInfo{
		ModuleName: "noop-service",
		Version:    "1.0.0",
		Endpoints: []cluster.EndpointDescriptor{{
			Path:    "/api/foo",
			Inner:   false,
			Handler: noopHandler,
		}},
	}
	defaultRemoteConfig := []byte{...}
	configData := cluster.ConfigData{
		Version: "1.0.0",
		Schema:  rc.GenerateConfigSchema(defaultRemoteConfig),
		Config:  defaultRemoteConfig,
	}
	clusterCli := cluster.NewClient(
		moduleInfo,
		configData,
		[]string{"config-service:8080"},
		application.Config().Optional().Duration("remoteConfigReceiverTimeout", 5*time.Second),
		application.Logger(),
	)

	/* client implements HostsUpgrader interface & will have auth-service's addresses */
	authServiceCli, err := client.Default()
	if err != nil {
		log.Fatal(err)
	}

	eventHandler := cluster.NewEventHandler(). /* Настройка обработчиков */
		RemoteConfigReceiver(remoteConfigUpdater{}).
		RoutesReceiver(routesHandler{}).
		RequireModule("auth-service", authServiceCli)
	application.AddRunners(app.RunnerFunc(func(ctx context.Context) error {
		err := clusterCli.Run(ctx, eventHandler)
		if err != nil {
			return err
		}
		return nil
	}))
	application.AddClosers(clusterCli, authServiceCli)

	shutdown.On(func() { /* waiting for SIGINT & SIGTERM signals */
		log.Println("shutting down...")
		application.Shutdown()
		log.Println("shutdown completed")
	})

	err = application.Run() /* blocking here */
	if err != nil {
		log.Fatal(err)
	}
}

Documentation

Overview

Package cluster provides client-side functionality for connecting to a isp-config-service in a distributed system. It handles session management, event-driven configuration updates, and module dependency management.

The package uses the ETP (Event Transport Protocol) for communication and supports automatic load balancing, liveness probing, and graceful reconnection.

Basic usage:

client := cluster.NewClient(moduleInfo, configData, hosts, timeout, logger)
eventHandler := cluster.NewEventHandler().
	RemoteConfigReceiver(myReceiver).
	RoutesReceiver(myRoutesReceiver)
err := client.Run(ctx, eventHandler)

Index

Constants

View Source
const (
	RequiredAdminPermission = "reqAdminPerm"

	GrpcTransport  = "grpc"
	HttpTransport  = "http"
	EmptyTransport = "empty"
)
View Source
const (
	// ErrorConnection is the event name for connection errors.
	ErrorConnection = "ERROR_CONNECTION"
	// ConfigError is the event name for configuration errors.
	ConfigError = "ERROR_CONFIG"

	// ConfigSendConfigWhenConnected is the event for sending config when connected.
	ConfigSendConfigWhenConnected = "CONFIG:SEND_CONFIG_WHEN_CONNECTED"
	// ConfigSendConfigChanged is the event for config change notifications.
	ConfigSendConfigChanged = "CONFIG:SEND_CONFIG_CHANGED"

	// ConfigSendRoutesWhenConnected is the event for sending routes when connected.
	ConfigSendRoutesWhenConnected = "CONFIG:SEND_ROUTES_WHEN_CONNECTED"
	// ConfigSendRoutesChanged is the event for route change notifications.
	ConfigSendRoutesChanged = "CONFIG:SEND_ROUTES_CHANGED"

	// ModuleReady is the event sent when a module is ready.
	ModuleReady = "MODULE:READY"
	// ModuleSendRequirements is the event for sending module requirements.
	ModuleSendRequirements = "MODULE:SEND_REQUIREMENTS"
	// ModuleSendConfigSchema is the event for sending configuration schema.
	ModuleSendConfigSchema = "MODULE:SEND_CONFIG_SCHEMA"

	// ModuleConnectionSuffix is the suffix appended to module names for connection events.
	ModuleConnectionSuffix = "MODULE_CONNECTED"
)
View Source
const (
	// EventIdContextKey is the context key used for event IDs.
	EventIdContextKey = "eventId"
)

Variables

This section is empty.

Functions

func GetRequiredAdminPermission

func GetRequiredAdminPermission(desc EndpointDescriptor) (string, bool)

GetRequiredAdminPermission extracts the admin permission requirement from an EndpointDescriptor. Returns the permission string and a boolean indicating whether the permission requirement was found.

func HideSecrets

func HideSecrets(data []byte) ([]byte, error)

HideSecrets masks sensitive data (passwords, secrets, tokens) in JSON configuration by replacing their values with "***". Returns the masked JSON or an error.

func ModuleConnectedEvent

func ModuleConnectedEvent(moduleName string) string

ModuleConnectedEvent generates the event name for a module connection event by combining the module name with the ModuleConnectionSuffix.

func RegisterSecretSubstrings added in v1.43.0

func RegisterSecretSubstrings(substrings []string)

RegisterSecretSubstrings adds custom field name substrings to the list of patterns that indicate sensitive data.

func RequireAdminPermission

func RequireAdminPermission(perm string) map[string]any

RequireAdminPermission creates endpoint metadata that specifies an admin permission requirement for the endpoint. The permission string is stored under the RequiredAdminPermission key.

func UnregisterSecretSubstrings added in v1.43.0

func UnregisterSecretSubstrings(substrings []string)

UnregisterSecretSubstrings removes custom field name substrings from the list of patterns that indicate sensitive data.

Types

type AddressConfiguration

type AddressConfiguration struct {
	IP   string `json:"ip"`
	Port string `json:"port"`
}

AddressConfiguration represents the IP address and port for a service endpoint.

type BackendDeclaration

type BackendDeclaration struct {
	ModuleName           string
	Version              string
	LibVersion           string
	Transport            string
	Endpoints            []EndpointDescriptor
	RequiredModules      []ModuleDependency
	Address              AddressConfiguration
	MetricsAutodiscovery *MetricsAutodiscovery
}

BackendDeclaration describes a backend service, including its metadata, endpoints, dependencies, and network configuration.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client manages the connection to a isp-config-service, handling session lifecycle, event processing, and automatic reconnection.

func NewClient

func NewClient(
	moduleInfo ModuleInfo,
	configData ConfigData,
	hosts []string,
	handleConfigTimeout time.Duration,
	logger log.Logger,
) *Client

NewClient creates a new Client with the provided module information, configuration data, list of hosts, timeout duration, and logger.

func (*Client) Close

func (c *Client) Close() error

Close terminates the client and releases associated resources.

func (*Client) Healthcheck

func (c *Client) Healthcheck(ctx context.Context) error

Healthcheck returns an error if the session is inactive, otherwise returns nil.

func (*Client) Run

func (c *Client) Run(ctx context.Context, eventHandler *EventHandler) error

Run starts the client's main loop, establishing a connection to the isp-config-service and handling events through the provided event handler. It blocks until the context is canceled or the client is closed.

type ConfigData

type ConfigData struct {
	Version string
	Schema  json.RawMessage
	Config  json.RawMessage
}

ConfigData contains configuration data for a module, including version information and JSON-encoded schema and configuration payloads.

type EndpointDescriptor

type EndpointDescriptor struct {
	Path             string
	Inner            bool
	UserAuthRequired bool
	HttpMethod       string
	Extra            map[string]any
	Handler          any `json:"-"`
}

EndpointDescriptor describes an HTTP endpoint, including its path, method, authentication requirements, and optional metadata. The Handler field is excluded from JSON serialization.

type EventHandler

type EventHandler struct {
	// contains filtered or unexported fields
}

EventHandler manages event handling for remote configuration, routes, and module dependencies.

func NewEventHandler

func NewEventHandler() *EventHandler

NewEventHandler creates a new EventHandler with default settings.

func (*EventHandler) RemoteConfigReceiver

func (h *EventHandler) RemoteConfigReceiver(receiver RemoteConfigReceiver) *EventHandler

RemoteConfigReceiver sets the receiver for remote configuration updates and returns the EventHandler for method chaining.

func (*EventHandler) RequireModule

func (h *EventHandler) RequireModule(moduleName string, upgrader HostsUpgrader) *EventHandler

RequireModule registers a module dependency with its corresponding hosts upgrader and returns the EventHandler for method chaining.

func (*EventHandler) RoutesReceiver

func (h *EventHandler) RoutesReceiver(receiver RoutesReceiver) *EventHandler

RoutesReceiver sets the receiver for routing configuration updates and returns the EventHandler for method chaining.

type HostsUpgrader

type HostsUpgrader interface {
	// Upgrade updates the list of hosts.
	Upgrade(hosts []string)
}

HostsUpgrader is an interface for upgrading or updating host lists.

type MetricsAutodiscovery added in v1.52.0

type MetricsAutodiscovery struct {
	Address string
	Labels  map[string]string
}

MetricsAutodiscovery configuration for automatic metrics endpoint discovery, including the address and optional Prometheus labels.

type ModuleDependency

type ModuleDependency struct {
	Name     string
	Required bool
}

ModuleDependency represents a dependency on another module, indicating whether the dependency is required for operation.

type ModuleInfo

type ModuleInfo struct {
	ModuleName           string
	ModuleVersion        string
	LibVersion           string
	Transport            string
	GrpcOuterAddress     AddressConfiguration
	Endpoints            []EndpointDescriptor
	MetricsAutodiscovery *MetricsAutodiscovery
}

ModuleInfo describes a module's metadata, including its name, version, transport configuration, exposed endpoints, and metrics autodiscovery settings.

type ModuleRequirements

type ModuleRequirements struct {
	RequiredModules []string
	RequireRoutes   bool
}

ModuleRequirements specifies the modules and routes that a module depends on.

type OfflineClient added in v1.62.0

type OfflineClient struct {
	// contains filtered or unexported fields
}

OfflineClient provides configuration loading from a local file instead of a remote isp-config-service. It is useful for offline or testing scenarios.

func NewOfflineClient added in v1.62.0

func NewOfflineClient(configPath string) OfflineClient

NewOfflineClient creates a new OfflineClient that reads configuration from the specified path.

func (OfflineClient) Close added in v1.62.0

func (d OfflineClient) Close() error

Close releases resources associated with the offline client.

func (OfflineClient) Run added in v1.62.0

func (d OfflineClient) Run(ctx context.Context, handler *EventHandler) error

Run loads the configuration from the local file and applies it using the provided event handler. Returns nil if the handler is nil.

type RemoteConfigReceiver

type RemoteConfigReceiver interface {
	// ReceiveConfig processes the remote configuration.
	ReceiveConfig(ctx context.Context, remoteConfig []byte) error
}

RemoteConfigReceiver is an interface for receiving and processing remote configuration updates.

type RoutesReceiver

type RoutesReceiver interface {
	// ReceiveRoutes processes the routing configuration.
	ReceiveRoutes(ctx context.Context, routes RoutingConfig) error
}

RoutesReceiver is an interface for receiving and processing routing configuration updates.

type RoutingConfig

type RoutingConfig []BackendDeclaration

RoutingConfig represents a list of backend declarations for routing configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL