cluster

package
v1.57.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2025 License: MIT Imports: 17 Imported by: 0

README

Package cluster

Пакет cluster предназначен для управления кластером микросервисов с возможностями:

  • Регистрация модулей в кластере
  • Динамическое обновление конфигурации
  • Обмен маршрутами между сервисами
  • Обработка событий кластера

Types

Client

Структура Client представляет собой клиент кластера с функциональностью установки и поддержки соединения с конфигурационным сервисом, регистрации модуля в кластере, обработки жизненного цикла соединения с отправкой метаданных модуля (версия, эндпоинты)

Methods:

NewClient(moduleInfo ModuleInfo, configData ConfigData, hosts []string, handleConfigTimeout time.Duration, logger log.Logger) *Client

Конструктор клиента кластера.

(c *Client) Run(ctx context.Context, eventHandler *EventHandler) error

Запустить клиент кластера с указанным обработчиком событий.

(c *Client) Close() error

Завершить работу клиента.

(c *Client) Healthcheck(ctx context.Context) error

Проверить активна ли текущая сессия.

EventHandler

Структура EventHandler представляет собой компоновку обработчиков основных событий в кластере микросервисов.

Methods:

NewEventHandler() *EventHandler

Конструктор.

(h *EventHandler) RemoteConfigReceiver(receiver RemoteConfigReceiver) *EventHandler

Установить обработчик на получение обновленной динамической-конфигурации текущего модуля. Обработчик должен реализовывать интерфейс RemoteConfigReceiver. По умолчанию выставляет timeout на обработку конфига в 5 секунд.

(h *EventHandler) RoutesReceiver(receiver RoutesReceiver) *EventHandler

Установить обработчик на получение актуальных эндпоинтов, зависимостях и прочей информации о сервисах в кластере. Обработчик должен реализовывать интерфейс RoutesReceiver.

(h *EventHandler) RequireModule(moduleName string, upgrader HostsUpgrader) *EventHandler

Установить обработчик на получение списка адресов модулей, от которых зависит текущий сервис. Обработчик должен реализовывать интерфейс HostsUpgrader.

Usage

Default usage flow
package main

import (
	"context"
	"log"
	"time"

	"github.com/txix-open/isp-kit/app"
	"github.com/txix-open/isp-kit/cluster"
	"github.com/txix-open/isp-kit/config"
	"github.com/txix-open/isp-kit/grpc/client"
	"github.com/txix-open/isp-kit/rc"
	"github.com/txix-open/isp-kit/shutdown"
)

type remoteConfigUpdater struct{}

func (r remoteConfigUpdater) ReceiveConfig(ctx context.Context, remoteConfig []byte) error {
	/* handle & update new remote config */
	return nil
}

type routesHandler struct{}

func (r routesHandler) ReceiveRoutes(ctx context.Context, routes cluster.RoutingConfig) error {
	for _, module := range routes {
		/* handle each module's info */
	}
	return nil
}

func noopHandler() {}

func main() {
	application, err := app.New()
	if err != nil {
		log.Fatal(err)
	}

	moduleInfo := cluster.ModuleInfo{
		ModuleName: "noop-service",
		Version:    "1.0.0",
		Endpoints: []cluster.EndpointDescriptor{{
			Path:    "/api/foo",
			Inner:   false,
			Handler: noopHandler,
		}},
	}
	defaultRemoteConfig := []byte{...}
	configData := cluster.ConfigData{
		Version: "1.0.0",
		Schema:  rc.GenerateConfigSchema(defaultRemoteConfig),
		Config:  defaultRemoteConfig,
	}
	clusterCli := cluster.NewClient(
		moduleInfo,
		configData,
		[]string{"config-service:8080"},
		application.Config().Optional().Duration("remoteConfigReceiverTimeout", 5*time.Second),
		application.Logger(),
	)

	/* client implements HostsUpgrader interface & will have auth-service's addresses */
	authServiceCli, err := client.Default()
	if err != nil {
		log.Fatal(err)
	}

	eventHandler := cluster.NewEventHandler(). /* Настройка обработчиков */
		RemoteConfigReceiver(remoteConfigUpdater{}).
		RoutesReceiver(routesHandler{}).
		RequireModule("auth-service", authServiceCli)
	application.AddRunners(app.RunnerFunc(func(ctx context.Context) error {
		err := clusterCli.Run(ctx, eventHandler)
		if err != nil {
			return err
		}
		return nil
	}))
	application.AddClosers(clusterCli, authServiceCli)

	shutdown.On(func() { /* waiting for SIGINT & SIGTERM signals */
		log.Println("shutting down...")
		application.Shutdown()
		log.Println("shutdown completed")
	})

	err = application.Run() /* blocking here */
	if err != nil {
		log.Fatal(err)
	}
}

Documentation

Index

Constants

View Source
const (
	ErrorConnection = "ERROR_CONNECTION"
	ConfigError     = "ERROR_CONFIG"

	ConfigSendConfigWhenConnected = "CONFIG:SEND_CONFIG_WHEN_CONNECTED"
	ConfigSendConfigChanged       = "CONFIG:SEND_CONFIG_CHANGED"

	ConfigSendRoutesWhenConnected = "CONFIG:SEND_ROUTES_WHEN_CONNECTED"
	ConfigSendRoutesChanged       = "CONFIG:SEND_ROUTES_CHANGED"

	ModuleReady            = "MODULE:READY"
	ModuleSendRequirements = "MODULE:SEND_REQUIREMENTS"
	ModuleSendConfigSchema = "MODULE:SEND_CONFIG_SCHEMA"

	ModuleConnectionSuffix = "MODULE_CONNECTED"
)
View Source
const (
	RequiredAdminPermission = "reqAdminPerm"
)

Variables

This section is empty.

Functions

func GetRequiredAdminPermission

func GetRequiredAdminPermission(desc EndpointDescriptor) (string, bool)

func HideSecrets

func HideSecrets(data []byte) ([]byte, error)

func ModuleConnectedEvent

func ModuleConnectedEvent(moduleName string) string

func RegisterSecretSubstrings added in v1.43.0

func RegisterSecretSubstrings(substrings []string)

func RequireAdminPermission

func RequireAdminPermission(perm string) map[string]any

func UnregisterSecretSubstrings added in v1.43.0

func UnregisterSecretSubstrings(substrings []string)

Types

type AddressConfiguration

type AddressConfiguration struct {
	IP   string `json:"ip"`
	Port string `json:"port"`
}

type BackendDeclaration

type BackendDeclaration struct {
	ModuleName           string
	Version              string
	LibVersion           string
	Endpoints            []EndpointDescriptor
	RequiredModules      []ModuleDependency
	Address              AddressConfiguration
	MetricsAutodiscovery *MetricsAutodiscovery
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(
	moduleInfo ModuleInfo,
	configData ConfigData,
	hosts []string,
	handleConfigTimeout time.Duration,
	logger log.Logger,
) *Client

func (*Client) Close

func (c *Client) Close() error

func (*Client) Healthcheck

func (c *Client) Healthcheck(ctx context.Context) error

func (*Client) Run

func (c *Client) Run(ctx context.Context, eventHandler *EventHandler) error

type ConfigData

type ConfigData struct {
	Version string
	Schema  json.RawMessage
	Config  json.RawMessage
}

type EndpointDescriptor

type EndpointDescriptor struct {
	Path             string
	Inner            bool
	UserAuthRequired bool
	Extra            map[string]any
	Handler          any `json:"-"`
}

type EventHandler

type EventHandler struct {
	// contains filtered or unexported fields
}

func NewEventHandler

func NewEventHandler() *EventHandler

func (*EventHandler) RemoteConfigReceiver

func (h *EventHandler) RemoteConfigReceiver(receiver RemoteConfigReceiver) *EventHandler

func (*EventHandler) RequireModule

func (h *EventHandler) RequireModule(moduleName string, upgrader HostsUpgrader) *EventHandler

func (*EventHandler) RoutesReceiver

func (h *EventHandler) RoutesReceiver(receiver RoutesReceiver) *EventHandler

type HostsUpgrader

type HostsUpgrader interface {
	Upgrade(hosts []string)
}

type MetricsAutodiscovery added in v1.52.0

type MetricsAutodiscovery struct {
	Address string
	Labels  map[string]string
}

type ModuleDependency

type ModuleDependency struct {
	Name     string
	Required bool
}

type ModuleInfo

type ModuleInfo struct {
	ModuleName           string
	ModuleVersion        string
	LibVersion           string
	GrpcOuterAddress     AddressConfiguration
	Endpoints            []EndpointDescriptor
	MetricsAutodiscovery *MetricsAutodiscovery
}

type ModuleRequirements

type ModuleRequirements struct {
	RequiredModules []string
	RequireRoutes   bool
}

type RemoteConfigReceiver

type RemoteConfigReceiver interface {
	ReceiveConfig(ctx context.Context, remoteConfig []byte) error
}

type RoutesReceiver

type RoutesReceiver interface {
	ReceiveRoutes(ctx context.Context, routes RoutingConfig) error
}

type RoutingConfig

type RoutingConfig []BackendDeclaration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL