cluster

package
v1.64.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2026 License: MIT Imports: 18 Imported by: 0

README

Package cluster

Пакет cluster предназначен для управления кластером микросервисов с возможностями:

  • Регистрация модулей в кластере
  • Динамическое обновление конфигурации
  • Обмен маршрутами между сервисами
  • Обработка событий кластера

Types

Client

Структура Client представляет собой клиент кластера с функциональностью установки и поддержки соединения с конфигурационным сервисом, регистрации модуля в кластере, обработки жизненного цикла соединения с отправкой метаданных модуля (версия, эндпоинты)

Methods:

NewClient(moduleInfo ModuleInfo, configData ConfigData, hosts []string, handleConfigTimeout time.Duration, logger log.Logger) *Client

Конструктор клиента кластера.

(c *Client) Run(ctx context.Context, eventHandler *EventHandler) error

Запустить клиент кластера с указанным обработчиком событий.

(c *Client) Close() error

Завершить работу клиента.

(c *Client) Healthcheck(ctx context.Context) error

Проверить активна ли текущая сессия.

EventHandler

Структура EventHandler представляет собой компоновку обработчиков основных событий в кластере микросервисов.

Methods:

NewEventHandler() *EventHandler

Конструктор.

(h *EventHandler) RemoteConfigReceiver(receiver RemoteConfigReceiver) *EventHandler

Установить обработчик на получение обновленной динамической-конфигурации текущего модуля. Обработчик должен реализовывать интерфейс RemoteConfigReceiver. По умолчанию выставляет timeout на обработку конфига в 5 секунд.

(h *EventHandler) RoutesReceiver(receiver RoutesReceiver) *EventHandler

Установить обработчик на получение актуальных эндпоинтов, зависимостях и прочей информации о сервисах в кластере. Обработчик должен реализовывать интерфейс RoutesReceiver.

(h *EventHandler) RequireModule(moduleName string, upgrader HostsUpgrader) *EventHandler

Установить обработчик на получение списка адресов модулей, от которых зависит текущий сервис. Обработчик должен реализовывать интерфейс HostsUpgrader.

Usage

Default usage flow
package main

import (
	"context"
	"log"
	"time"

	"github.com/txix-open/isp-kit/app"
	"github.com/txix-open/isp-kit/cluster"
	"github.com/txix-open/isp-kit/config"
	"github.com/txix-open/isp-kit/grpc/client"
	"github.com/txix-open/isp-kit/rc"
	"github.com/txix-open/isp-kit/shutdown"
)

type remoteConfigUpdater struct{}

func (r remoteConfigUpdater) ReceiveConfig(ctx context.Context, remoteConfig []byte) error {
	/* handle & update new remote config */
	return nil
}

type routesHandler struct{}

func (r routesHandler) ReceiveRoutes(ctx context.Context, routes cluster.RoutingConfig) error {
	for _, module := range routes {
		/* handle each module's info */
	}
	return nil
}

func noopHandler() {}

func main() {
	application, err := app.New()
	if err != nil {
		log.Fatal(err)
	}

	moduleInfo := cluster.ModuleInfo{
		ModuleName: "noop-service",
		Version:    "1.0.0",
		Endpoints: []cluster.EndpointDescriptor{{
			Path:    "/api/foo",
			Inner:   false,
			Handler: noopHandler,
		}},
	}
	defaultRemoteConfig := []byte{...}
	configData := cluster.ConfigData{
		Version: "1.0.0",
		Schema:  rc.GenerateConfigSchema(defaultRemoteConfig),
		Config:  defaultRemoteConfig,
	}
	clusterCli := cluster.NewClient(
		moduleInfo,
		configData,
		[]string{"config-service:8080"},
		application.Config().Optional().Duration("remoteConfigReceiverTimeout", 5*time.Second),
		application.Logger(),
	)

	/* client implements HostsUpgrader interface & will have auth-service's addresses */
	authServiceCli, err := client.Default()
	if err != nil {
		log.Fatal(err)
	}

	eventHandler := cluster.NewEventHandler(). /* Настройка обработчиков */
		RemoteConfigReceiver(remoteConfigUpdater{}).
		RoutesReceiver(routesHandler{}).
		RequireModule("auth-service", authServiceCli)
	application.AddRunners(app.RunnerFunc(func(ctx context.Context) error {
		err := clusterCli.Run(ctx, eventHandler)
		if err != nil {
			return err
		}
		return nil
	}))
	application.AddClosers(clusterCli, authServiceCli)

	shutdown.On(func() { /* waiting for SIGINT & SIGTERM signals */
		log.Println("shutting down...")
		application.Shutdown()
		log.Println("shutdown completed")
	})

	err = application.Run() /* blocking here */
	if err != nil {
		log.Fatal(err)
	}
}

Documentation

Index

Constants

View Source
const (
	ErrorConnection = "ERROR_CONNECTION"
	ConfigError     = "ERROR_CONFIG"

	ConfigSendConfigWhenConnected = "CONFIG:SEND_CONFIG_WHEN_CONNECTED"
	ConfigSendConfigChanged       = "CONFIG:SEND_CONFIG_CHANGED"

	ConfigSendRoutesWhenConnected = "CONFIG:SEND_ROUTES_WHEN_CONNECTED"
	ConfigSendRoutesChanged       = "CONFIG:SEND_ROUTES_CHANGED"

	ModuleReady            = "MODULE:READY"
	ModuleSendRequirements = "MODULE:SEND_REQUIREMENTS"
	ModuleSendConfigSchema = "MODULE:SEND_CONFIG_SCHEMA"

	ModuleConnectionSuffix = "MODULE_CONNECTED"
)
View Source
const (
	RequiredAdminPermission = "reqAdminPerm"
)

Variables

This section is empty.

Functions

func GetRequiredAdminPermission

func GetRequiredAdminPermission(desc EndpointDescriptor) (string, bool)

func HideSecrets

func HideSecrets(data []byte) ([]byte, error)

func ModuleConnectedEvent

func ModuleConnectedEvent(moduleName string) string

func RegisterSecretSubstrings added in v1.43.0

func RegisterSecretSubstrings(substrings []string)

func RequireAdminPermission

func RequireAdminPermission(perm string) map[string]any

func UnregisterSecretSubstrings added in v1.43.0

func UnregisterSecretSubstrings(substrings []string)

Types

type AddressConfiguration

type AddressConfiguration struct {
	IP   string `json:"ip"`
	Port string `json:"port"`
}

type BackendDeclaration

type BackendDeclaration struct {
	ModuleName           string
	Version              string
	LibVersion           string
	Endpoints            []EndpointDescriptor
	RequiredModules      []ModuleDependency
	Address              AddressConfiguration
	MetricsAutodiscovery *MetricsAutodiscovery
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(
	moduleInfo ModuleInfo,
	configData ConfigData,
	hosts []string,
	handleConfigTimeout time.Duration,
	logger log.Logger,
) *Client

func (*Client) Close

func (c *Client) Close() error

func (*Client) Healthcheck

func (c *Client) Healthcheck(ctx context.Context) error

func (*Client) Run

func (c *Client) Run(ctx context.Context, eventHandler *EventHandler) error

type ConfigData

type ConfigData struct {
	Version string
	Schema  json.RawMessage
	Config  json.RawMessage
}

type EndpointDescriptor

type EndpointDescriptor struct {
	Path             string
	Inner            bool
	UserAuthRequired bool
	Extra            map[string]any
	Handler          any `json:"-"`
}

type EventHandler

type EventHandler struct {
	// contains filtered or unexported fields
}

func NewEventHandler

func NewEventHandler() *EventHandler

func (*EventHandler) RemoteConfigReceiver

func (h *EventHandler) RemoteConfigReceiver(receiver RemoteConfigReceiver) *EventHandler

func (*EventHandler) RequireModule

func (h *EventHandler) RequireModule(moduleName string, upgrader HostsUpgrader) *EventHandler

func (*EventHandler) RoutesReceiver

func (h *EventHandler) RoutesReceiver(receiver RoutesReceiver) *EventHandler

type HostsUpgrader

type HostsUpgrader interface {
	Upgrade(hosts []string)
}

type MetricsAutodiscovery added in v1.52.0

type MetricsAutodiscovery struct {
	Address string
	Labels  map[string]string
}

type ModuleDependency

type ModuleDependency struct {
	Name     string
	Required bool
}

type ModuleInfo

type ModuleInfo struct {
	ModuleName           string
	ModuleVersion        string
	LibVersion           string
	GrpcOuterAddress     AddressConfiguration
	Endpoints            []EndpointDescriptor
	MetricsAutodiscovery *MetricsAutodiscovery
}

type ModuleRequirements

type ModuleRequirements struct {
	RequiredModules []string
	RequireRoutes   bool
}

type OfflineClient added in v1.62.0

type OfflineClient struct {
	// contains filtered or unexported fields
}

func NewOfflineClient added in v1.62.0

func NewOfflineClient(configPath string) OfflineClient

func (OfflineClient) Close added in v1.62.0

func (d OfflineClient) Close() error

func (OfflineClient) Run added in v1.62.0

func (d OfflineClient) Run(ctx context.Context, handler *EventHandler) error

type RemoteConfigReceiver

type RemoteConfigReceiver interface {
	ReceiveConfig(ctx context.Context, remoteConfig []byte) error
}

type RoutesReceiver

type RoutesReceiver interface {
	ReceiveRoutes(ctx context.Context, routes RoutingConfig) error
}

type RoutingConfig

type RoutingConfig []BackendDeclaration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL