cfg

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2023 License: Apache-2.0 Imports: 61 Imported by: 4

README

概述

提供grpc-kit微服务脚手架的通用配置规范,用于初始化框架所需的各个实例,仅允许"pkg/cfg"下的包直接引用"pkg/*"。

避免依赖以及通用性,"pkg/*"除"cfg"外,其他包必须可以独立给外部应用直接引用(非grpc-kit框架)。

术语约定

连接地址
参数 类型 说明 示例
host string 主机IPv4或IPv6地址 127.0.0.1
port int 主机端口号 6379
address string 不包含协议,由主机IP与端口组成 127.0.0.1:6379 或 [fe80::1%lo0]:53 或 unix socket
endpoints []string 由协议、IP、端口组成,多个通过逗号","分割 https://node1:2379,https://node2:2379,https://node3:2379
其他类型
参数 类型 说明 示例
driver string 配置支持的驱动 etcdv3、redis等
enable bool 是否开启这个功能 true、false

配置示例

# https://github.com/grpc-kit/cfg/blob/master/app-sample.yaml

# 基础服务配置
services:
  # 服务注册的前缀,全局统一
  root_path: service
  # 服务注册的空间,全局统一
  namespace: example
  # 服务的代码,设置后不可变更
  service_code: cmdb.v1.commons
  # 接口网关的地址
  api_endpoint: api.grpc-kit.com
  # 服务所监听的grpc地址(如未设置,自动监听在127.0.0.1的随机端口)
  grpc_address: 127.0.0.1:10081
  # 服务所监听的http地址(如未设置,则不开启gateway服务)
  http_address: 127.0.0.1:10080
  # 服务注册,外部网络可连接的grpc地址(一般等同于grpc-address)
  public_address: ""

# 服务注册配置
discover:
  driver: etcdv3
  heartbeat: 15
  endpoints:
    - http://127.0.0.1:2379
  #discover:
  #  tls:
  #    ca_file: /opt/certs/etcd-ca.pem
  #    cert_file: /opt/certs/etcd.pem
  #    key_file: /opt/certs/etcd-key.pem

# 认证鉴权配置
security:
  enable: true
  # 认证:谁在登录
  authentication:
    # 跳过认证的rpc方法
    insecure_rpcs:
      - SearchHosts
    oidc_provider:
      issuer: https://accounts.example.com
      config:
        # 必须验证token.aud是否与client_id相等
        client_id: example
        # 允许的签名算法类别
        supported_signing_algs:
          - RS256
        # 忽略token.aud与client_id的验证
        skip_client_id_check: true
        # 忽略token是否过期的验证
        skip_expiry_check: false
        # 忽略token issuer的验证
        skip_issuer_check: true
    http_users:
      - username: user1
        password: pass1
  # TODO; 鉴权:能做什么
  authorization:

# 关系数据配置
database:
  driver: postgres
  dbname: demo
  user: demo
  password: grpc-kit
  host: 127.0.0.1
  port: 5432
  sslmode: disable
  connect_timeout: 10

# 缓存服务配置
cachebuf:
  enable: true
  driver: redis
  address: 127.0.0.1:6379
  password: ""

# 日志调试配置
debugger:
  enable_pprof: true
  log_level: debug
  log_format: text

# 链路追踪配置
opentracing:
  enable: true
  host: 127.0.0.1
  port: 6831
  
# 事件通道配置
cloudevents:
  protocol: "kafka_sarama"
  kafka_sarama:
    topic: "uptime-test"
    brokers:
      - 127.0.0.1:19092
      - 127.0.0.1:29092
      - 127.0.0.1:39092
    config:
      net:
        tls:
          enable: false
        sasl:
          enable: true
          mechanism: "SCRAM-SHA-256"
          user: "uptime"
          password: "testkey"
      producer:
        required_acks: 1
        return:
          successes: false
          errors: true
      consumer:
        group:
          rebalance:
            strategy: range
        offsets:
          auto_commit:
            enable: true
            interval: 1s
      version: "2.4.0"

# 应用私有配置
independent:
  name: grpc-kit
注册地址说明

在services中所配置的public-address地址是grpc服务,与其他服务之间必须能正常通讯。如果在k8s环境下可考虑设置环境变量GRPC_KIT_PUHLIC_IP把POD IP传递,如:

...
spec:
  template:
    spec:
      containers:
      - args:
        - /opt/service
        - --config
        - /opt/config/app.toml
        env:
        - name: GRPC_KIT_PUHLIC_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
...

Documentation

Overview

Package cfg 提供grpc-kit微服务脚手架的通用配置规范

Index

Constants

View Source
const (
	// HTTPHeaderRequestID 全局请求ID
	HTTPHeaderRequestID = "X-TR-REQUEST-ID"
	// TraceContextHeaderName 链路追踪ID
	TraceContextHeaderName = "jaeger-trace-id"
	// TraceBaggageHeaderPrefix 数据传递头前缀
	TraceBaggageHeaderPrefix = "jaeger-ctx"
	// AuthenticationTypeBasic 用于http basic认证
	AuthenticationTypeBasic = "basic"
	// AuthenticationTypeBearer 用于jwt认证
	AuthenticationTypeBearer = "bearer"
	// AuthenticationTypeNone 用于指明rpc未使用任何认证
	AuthenticationTypeNone = "none"
	// UsernameAnonymous 当未使用任何认证时的用户名
	UsernameAnonymous = "anonymous"
)
View Source
const (
	DatabaseDriverMysql      = "mysql"
	DatabaseDriverPostgresql = "postgres"
)
View Source
const (
	CloudEventsProtocolKafkaSarama = "kafka_sarama"
)

Variables

View Source
var (
	ErrDatabaseNotInit          = errors.New("database not initialize")
	ErrDatabaseNotEnable        = errors.New("database not enable")
	ErrDatabaseNotSupportDriver = errors.New("database driver not support")
	ErrDatabaseParamsMust       = errors.New("database dbname username or password must")
)

SHA256 hash generator function for SCRAM conversation

SHA512 hash generator function for SCRAM conversation

Functions

This section is empty.

Types

type Authentication

type Authentication struct {
	InsecureRPCs []string      `mapstructure:"insecure_rpcs"`
	OIDCProvider *OIDCProvider `mapstructure:"oidc_provider"`
	HTTPUsers    []*BasicAuth  `mapstructure:"http_users"`
}

Authentication 用于认证

type Authorization

type Authorization struct {
	AllowedGroups []string `mapstructure:"allowed_groups"`
}

Authorization 用于鉴权

type BasicAuth

type BasicAuth struct {
	Username string   `mapstructure:"username"`
	Password string   `mapstructure:"password"`
	Groups   []string `mapstructure:"groups"`
}

BasicAuth 用于HTTP基本认证的用户权限定义

type CachebufConfig

type CachebufConfig struct {
	Enable   bool   `mapstructure:"enable"`
	Driver   string `mapstructure:"driver"`
	Address  string `mapstructure:"address"`
	Password string `mapstructure:"password"`
}

CachebufConfig 缓存配置,区别于数据库配置,缓存的数据可以丢失

type CloudEventsConfig

type CloudEventsConfig struct {
	Protocol    string      `mapstructure:"protocol"`
	KafkaSarama KafkaSarama `mapstructure:"kafka_sarama"`
}

CloudEventsConfig cloudevents事件配置

type ConnectionPool

type ConnectionPool struct {
	MaxIdleTime  time.Duration `mapstructure:"max_idle_time"`
	MaxLifeTime  time.Duration `mapstructure:"max_life_time"`
	MaxIdleConns int           `mapstructure:"max_idle_conns"`
	MaxOpenConns int           `mapstructure:"max_open_conns"`
}

ConnectionPool 数据库连接池配置

type DatabaseConfig

type DatabaseConfig struct {
	Enable         bool           `mapstructure:"enable"`
	Driver         string         `mapstructure:"driver"`
	Username       string         `mapstructure:"username"`
	Password       string         `mapstructure:"password"`
	Protocol       string         `mapstructure:"protocol"`
	Address        string         `mapstructure:"address"`
	DBName         string         `mapstructure:"dbname"`
	Parameters     string         `mapstructure:"parameters"`
	ConnectionPool ConnectionPool `mapstructure:"connection_pool"`
	// contains filtered or unexported fields
}

DatabaseConfig 数据库设置,指关系数据库,数据不允许丢失,如postgres、mysql

type DebuggerConfig

type DebuggerConfig struct {
	EnablePprof bool   `mapstructure:"enable_pprof"`
	LogLevel    string `mapstructure:"log_level"`
	LogFormat   string `mapstructure:"log_format"`
}

DebuggerConfig 日志配置,用于设定服务启动后日志输出级别格式等

type DiscoverConfig

type DiscoverConfig struct {
	Driver    string     `mapstructure:"driver"`
	Endpoints []string   `mapstructure:"endpoints"`
	TLS       *TLSConfig `mapstructure:"tls" json:",omitempty"`
	Heartbeat int64      `mapstructure:"heartbeat"`
}

DiscoverConfig 服务注册,服务启动后如何汇报自身

type IDTokenClaims

type IDTokenClaims struct {
	jwt.RegisteredClaims
	Email           string            `json:"email"`
	EmailVerified   bool              `json:"email_verified"`
	Groups          []string          `json:"groups"`
	FederatedClaims map[string]string `json:"federated_claims"`
}

IDTokenClaims 用于框架jwt的数据结构

type KafkaSarama

type KafkaSarama struct {
	Brokers []string     `mapstructure:"brokers"`
	Topic   string       `mapstructure:"topic"`
	Config  SaramaConfig `mapstructure:"config"`
}

KafkaSarama xx

type LocalConfig

type LocalConfig struct {
	Services    *ServicesConfig    `json:",omitempty"` // 基础服务配置
	Discover    *DiscoverConfig    `json:",omitempty"` // 服务注册配置
	Security    *SecurityConfig    `json:",omitempty"` // 认证鉴权配置
	Database    *DatabaseConfig    `json:",omitempty"` // 关系数据配置
	Cachebuf    *CachebufConfig    `json:",omitempty"` // 缓存服务配置
	Debugger    *DebuggerConfig    `json:",omitempty"` // 日志调试配置
	Opentracing *OpentracingConfig `json:",omitempty"` // 链路追踪配置
	CloudEvents *CloudEventsConfig `json:",omitempty"` // 公共事件配置
	Independent interface{}        `json:",omitempty"` // 应用私有配置
	// contains filtered or unexported fields
}

LocalConfig 本地配置,全局微服务配置结构

func New

func New(v *viper.Viper) (*LocalConfig, error)

New 用于初始化获取全局配置实例

func (*LocalConfig) AuthenticationTypeFrom

func (c *LocalConfig) AuthenticationTypeFrom(ctx context.Context) (string, bool)

AuthenticationTypeFrom 用于获取当前会话的认证方式

func (*LocalConfig) Deregister

func (c *LocalConfig) Deregister() error

Deregister 用于撤销注册中心上的服务信息

func (*LocalConfig) GetClientDialOption

func (c *LocalConfig) GetClientDialOption(customOpts ...grpc.DialOption) []grpc.DialOption

GetClientDialOption 获取客户端连接的设置

func (*LocalConfig) GetClientStreamInterceptor

func (c *LocalConfig) GetClientStreamInterceptor() []grpc.StreamClientInterceptor

GetClientStreamInterceptor 获取客户端默认流拦截器

func (*LocalConfig) GetClientUnaryInterceptor

func (c *LocalConfig) GetClientUnaryInterceptor() []grpc.UnaryClientInterceptor

GetClientUnaryInterceptor 获取客户端默认一元拦截器

func (*LocalConfig) GetCloudEvents

func (c *LocalConfig) GetCloudEvents() (eventclient.Client, error)

GetCloudEvents 用于获取 cloudevents 连接客户端

func (*LocalConfig) GetDatabase

func (c *LocalConfig) GetDatabase() (*sql.DB, error)

GetDatabase 获取数据库实例

func (*LocalConfig) GetIndependent

func (c *LocalConfig) GetIndependent(t interface{}) error

GetIndependent 用于获取各个微服务独立的配置

func (*LocalConfig) GetLogger

func (c *LocalConfig) GetLogger() *logrus.Entry

GetLogger 用于获取全局日志

func (*LocalConfig) GetRPCClient

func (c *LocalConfig) GetRPCClient() (*rpc.Client, error)

GetRPCClient 获取rpc客户端实例,TODO;(未完善,当前主要解耦框架模版)

func (*LocalConfig) GetRPCServer

func (c *LocalConfig) GetRPCServer() (*rpc.Server, error)

GetRPCServer 获取rpc服务端实例,TODO;(未完善,当前主要解耦框架模版)

func (*LocalConfig) GetServiceName

func (c *LocalConfig) GetServiceName() string

GetServiceName 用于获取微服务名称

func (*LocalConfig) GetStreamInterceptor

func (c *LocalConfig) GetStreamInterceptor(interceptors ...grpc.StreamServerInterceptor) grpc.ServerOption

GetStreamInterceptor xx

func (*LocalConfig) GetUnaryInterceptor

func (c *LocalConfig) GetUnaryInterceptor(interceptors ...grpc.UnaryServerInterceptor) grpc.ServerOption

GetUnaryInterceptor 用于获取gRPC的一元拦截器

func (*LocalConfig) GroupsFrom

func (c *LocalConfig) GroupsFrom(ctx context.Context) ([]string, bool)

GroupsFrom 用于获取当前会话的用户组列表

func (*LocalConfig) IDTokenFrom

func (c *LocalConfig) IDTokenFrom(ctx context.Context) (IDTokenClaims, bool)

IDTokenFrom 用于获取当前会话的IDToken

func (*LocalConfig) Init

func (c *LocalConfig) Init() error

Init 用于根据配置初始化各个实例,初始化需注意空指针判断

func (*LocalConfig) InitCloudEvents

func (c *LocalConfig) InitCloudEvents() error

InitCloudEvents 初始化 cloudevents 数据实例

func (*LocalConfig) InitDatabase

func (c *LocalConfig) InitDatabase() error

InitDatabase 用于初始化数据库

func (*LocalConfig) InitDebugger

func (c *LocalConfig) InitDebugger() error

InitDebugger 用于初始化日志实例

func (*LocalConfig) InitOpentracing

func (c *LocalConfig) InitOpentracing() (io.Closer, error)

InitOpentracing 初始化全局分布式链路追踪

func (*LocalConfig) InitRPCConfig

func (c *LocalConfig) InitRPCConfig() error

InitRPCConfig 用于初始化rpc客户端、服务端配置

func (*LocalConfig) InitSecurity

func (c *LocalConfig) InitSecurity() error

InitSecurity 初始化认证

func (*LocalConfig) InitServices

func (c *LocalConfig) InitServices() error

InitServices 用于基础服务初始化配置检查

func (*LocalConfig) Register

Register 用于登记服务信息至注册中心

func (*LocalConfig) UsernameFrom

func (c *LocalConfig) UsernameFrom(ctx context.Context) (string, bool)

UsernameFrom 用于获取当前会话的用户名

func (*LocalConfig) WithAuthenticationType

func (c *LocalConfig) WithAuthenticationType(parent context.Context, authType string) context.Context

WithAuthenticationType 用于设置当前会话的认证方式

func (*LocalConfig) WithGroups

func (c *LocalConfig) WithGroups(parent context.Context, groups []string) context.Context

WithGroups 用于设置当前会话用户属于组

func (*LocalConfig) WithIDToken

func (c *LocalConfig) WithIDToken(parent context.Context, token IDTokenClaims) context.Context

WithIDToken 用于设置当前会话的IDToken

func (*LocalConfig) WithUsername

func (c *LocalConfig) WithUsername(parent context.Context, username string) context.Context

WithUsername 用于设置当前会话的用户名

type LogFields

type LogFields struct {
	HTTPBody     bool `mapstructure:"http_body"`
	HTTPResponse bool `mapstructure:"http_response"`
}

LogFields 开启请求追踪属性

type OIDCConfig

type OIDCConfig struct {
	ClientID             string   `mapstructure:"client_id"`
	SupportedSigningAlgs []string `mapstructure:"supported_signing_algs"`
	SkipClientIDCheck    bool     `mapstructure:"skip_client_id_check"`
	SkipExpiryCheck      bool     `mapstructure:"skip_expiry_check"`
	SkipIssuerCheck      bool     `mapstructure:"skip_issuer_check"`
	InsecureSkipVerify   bool     `mapstructure:"insecure_skip_verify"`
}

OIDCConfig 用于OIDC验证相关配置

type OIDCProvider

type OIDCProvider struct {
	Issuer string      `mapstructure:"issuer"`
	Config *OIDCConfig `mapstructure:"config"`
}

OIDCProvider 用于OIDC认证提供方配置

type OpentracingConfig

type OpentracingConfig struct {
	Enable    bool      `mapstructure:"enable"`
	Host      string    `mapstructure:"host"`
	Port      int       `mapstructure:"port"`
	LogFields LogFields `mapstructure:"log_fields"`
}

OpentracingConfig 分布式链路追踪

type SaramaConfig

type SaramaConfig struct {
	Net struct {
		// 默认:5
		MaxOpenRequests int `mapstructure:"max_open_requests"`

		// 以下默认:30s
		DialTimeout  time.Duration `mapstructure:"dial_timeout"`
		ReadTimeout  time.Duration `mapstructure:"read_timeout"`
		WriteTimeout time.Duration `mapstructure:"write_timeout"`

		TLS struct {
			// 默认:false
			Enable bool `mapstructure:"enable"`
		} `mapstructure:"tls"`

		SASL struct {
			Enable    bool   `mapstructure:"enable"`
			Mechanism string `mapstructure:"mechanism"`
			User      string `mapstructure:"user"`
			Password  string `mapstructure:"password"`
		} `mapstructure:"sasl"`

		KeepAlive time.Duration `mapstructure:"keep_alive"`
	} `mapstructure:"net"`

	Metadata struct {
		// 获取元数据的策略
		Retry struct {
			// 当集群处于leader选举时最大重试次数,默认:3
			Max int `mapstructure:"max"`
			// 当集群处于leader选举重试的等扽时间,默认:250ms
			Backoff time.Duration `mapstructure:"backoff"`
		} `mapstructure:"retry"`

		// 后台与集群同步metadata的间隔,默认: 10m
		RefreshFrequency time.Duration `mapstructure:"refresh_frequency"`

		// 是否为所有topic维护元数据,默认: true
		Full bool `mapstructure:"full"`

		// 等待metadata响应的超时时间,默认禁用表示失败则继续重试
		// Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max
		Timeout time.Duration `mapstructure:"timeout"`

		// 如果提供的topic不存在是否允许自动创建(前提是集群配置可允许该操作),默认:true
		AllowAutoTopicCreation bool `mapstructure:"allow_auto_topic_creation"`
	} `mapstructure:"metadata"`

	// 生产者相关配置
	Producer struct {
		// 允许的最大消息大小,最好等于集群配置的:message.max.bytes,默认:1000000
		MaxMessageBytes int `mapstructure:"max_message_bytes"`

		// 消息生产被集群接收的策略,主要影响是否会丢消息与性能,默认:1
		// 设置为0: 生产者不等扽集群的响应,继续下一条
		// 设置为1: 生成者等待leader响应,然后在继续下一条
		// 设置为-1: 生产者必须等待所有"in-sync"副本响应完成,继续下一条,这个副本由: min.insync.replicas 决定
		RequiredAcks int16 `mapstructure:"required_acks"`

		// 生产者等扽响应的最长时间,当RequiredAcks设置大于1时才有效,等同于`request.timeout.ms`,默认:10s
		Timeout time.Duration `mapstructure:"timeout"`

		// 生产的消息使用的压缩算法,默认不压缩,默认:0
		Compression int8 `mapstructure:"compression"`

		// 压缩的等级,依赖具体压缩算法
		CompressionLevel int `mapstructure:"commpression_level"`

		// 如果启用,生产者将确保每个消息只写入一个副本。
		Idempotent bool `mapstructure:"idempotent"`

		// 消息响应成功或失败是否写入channel里,如果写入则必须被消费,否则可能出现死锁
		Return struct {
			// 成功的消息是否记录,默认:false
			Successes bool `mapstructure:"successes"`
			// 失败的消息是否记录,默认:true
			Errors bool `mapstructure:"errors"`
		} `mapstructure:"return"`

		// 生产者达到以下阈值时触发打包消息发送至集群
		Flush struct {
			// 最大值被 sarama.MaxRequestSiz 限制,值:100 * 1024 * 1024
			Bytes int `mapstructure:"bytes"`
			// 消息数量阈值,最大限制通过以下MaxMessages控制
			Messages int `mapstructure:"messages"`
			// 等待时间阈值
			Frequency time.Duration `mapstructure:"frequency"`
			// 在单一请求broker时允许的最大消息数,设置为0则不限制
			MaxMessages int `mapstructure:"max_messages"`
		} `mapstructure:"flush"`

		// 生产消息失败的重试策略
		Retry struct {
			// 最大重试次数,等同于jvm的:message.send.max.retries,默认:3
			Max int `mapstructure:"max"`
			// 重试失败之间等待间隔,等同于jvm的:retry.backoff.ms,默认值:100ms
			Backoff time.Duration `mapstructure:"backoff"`
		} `mapstructure:"retry"`
	} `mapstructure:"producer"`

	// 消费者相关配置
	Consumer struct {
		Group struct {
			Session struct {
				// 当broker端未收到消费者的心跳包,超过该时间间隔,则broker认为该消费者离线,将进行重均衡,默认:10s
				// 该值必须在broker配置`group.min.session.timeout.ms`与`group.max.session.timeout.ms`之间
				Timeout time.Duration `mapstructure:"timeout"`
			} `mapstructure:"session"`
			Heartbeat struct {
				// kafka协调者预期的心跳间隔,用于确保消费者session处于活跃状态,值必须小于session.timeout,默认:3s
				// 一般建议设置为session.timeout的3分之一
				Interval time.Duration `mapstructure:"interval"`
			} `mapstructure:"heartbeat"`
			Rebalance struct {
				// topic分区分配给消费者的策略,支持:range, roundrobin, sticky,默认:range
				// range: 标识使用范围分区分配策略的策略
				// roundrobin: 标识使用循环分区分配策略的策略
				// sticky: 标识使用粘性分区分配策略的策略
				Strategy string `mapstructure:"strategy"`
				// 重均衡开始后,消费者加入群组的最大允许时间,默认:60s
				Timeout time.Duration `mapstructure:"timeout"`

				Retry struct {
					// 最大重试次数,默认:4
					Max int `mapstructure:"max"`
					// 重试失败之间等待间隔,默认:2s
					Backoff time.Duration `mapstructure:"backoff"`
				} `mapstructure:"retry"`
			} `mapstructure:"rebalance"`
		} `mapstructure:"group"`

		// 读取分区失败的重试
		Retry struct {
			// 重试失败之间等待间隔,默认:2s
			Backoff time.Duration `mapstructure:"backoff"`
		} `mapstructure:"retry"`

		// 控制每个请求所拉取数据的大小,单位bytes
		Fetch struct {
			// 必须等待的最小消息大小,不要设置为0,等同于jvm `fetch.min.bytes`,默认:1
			Min int32 `mapstructure:"min"`
			// 每请求从broker获取的消息大小,默认:1MB
			// 尽量大于你消息的大部分大小,否则还要做额外的切割,等同于jvm `fetch.message.max.bytes`
			Default int32 `mapstructure:"default"`
			// 每请求可最大获取的消息大小,值为0表示不限制,等同于jvm `fetch.message.max.bytes`,默认:0
			Max int32 `mapstructure:"max"`
		} `mapstructure:"fetch"`

		// broker在等待消息达到 Consumer.Fetch.Min 大小的最大时间,不要设置为0,默认:250ms
		// 建议在 100-500ms,等同于jvm `fetch.wait.max.ms`
		MaxWaitTime time.Duration `mapstructure:"max_wait_time"`

		// 消费者为用户处理消息所需的最长时间,如果写入消息通道所需的时间超过此时间,则该分区将停止获取更多消息,直到可以再次继续。
		// 由于消息通道已缓冲,因此实际宽限时间为 (MaxProcessingTime * ChannelBufferSize),默认:100ms
		MaxProcessingTime time.Duration `mapstructure:"max_processing_time"`

		// 消息响应成功或失败是否写入channel里,如果写入则必须被消费,否则可能出现死锁
		Return struct {
			// 失败的消息是否记录,默认:false
			Errors bool `mapstructure:"errors"`
		} `mapstructure:"return"`

		// 控制如何提交消费offset
		Offsets struct {
			AutoCommit struct {
				// 是否自动更新,默认:true
				Enable bool `mapstructure:"enable"`
				// 自动更新频率,默认:1s
				Interval time.Duration `mapstructure:"interval"`
			} `mapstructure:"auto_commit"`

			// OffsetNewest=-1 代表访问 commit 位置的下一条消息
			// OffsetOldest=-2 消费者可以访问到的 topic 里的最早的消息
			Initial   int64         `mapstructure:"initial"`
			Retention time.Duration `mapstructure:"retention"`

			// 提交offset失败的重试
			Retry struct {
				// 最大重试次数,默认:3
				Max int `mapstructure:"max"`
			} `mapstructure:"retry"`
		} `mapstructure:"offsets"`

		// 消费隔离级别,ReadUncommitted 或 ReadCommitted,默认:ReadUncommitted
		// ReadUncommitted: 可以读取到未提交的数据(报错终止前的数据)
		// ReadCommitted: 生产者已提交的数据才能读取到
		IsolationLevel int8 `mapstructure:"isolation_level"`
	} `mapstructure:"consumer"`

	// 标识该消费者
	ClientID string `mapstructure:"client_id"`
	// 机柜标识,见 'broker.rack'
	RackID string `mapstructure:"rack_id"`
	// 默认:256
	ChannelBufferSize int    `mapstructure:"chnnel_buffer_size"`
	Version           string `mapstructure:"version"`
}

SaramaConfig 用于kafka客户端配置,结构等同于sarama类库 https://pkg.go.dev/github.com/Shopify/sarama#Config

func (*SaramaConfig) Parse

func (s *SaramaConfig) Parse() *sarama.Config

Parse 解析为 https://pkg.go.dev/github.com/Shopify/sarama#Config

type SecurityConfig

type SecurityConfig struct {
	Enable         bool            `mapstructure:"enable"`
	Authentication *Authentication `mapstructure:"authentication"`
	Authorization  *Authorization  `mapstructure:"authorization"`
	// contains filtered or unexported fields
}

SecurityConfig 安全配置,对接认证、鉴权

type ServicesConfig

type ServicesConfig struct {
	RootPath      string `mapstructure:"root_path"`
	Namespace     string `mapstructure:"namespace"`
	ServiceCode   string `mapstructure:"service_code"`
	APIEndpoint   string `mapstructure:"api_endpoint"`
	GRPCAddress   string `mapstructure:"grpc_address"`
	HTTPAddress   string `mapstructure:"http_address"`
	PublicAddress string `mapstructure:"public_address"`
}

ServicesConfig 基础服务配置,用于设定命名空间、注册的路径、监听的地址等

type TLSConfig

type TLSConfig struct {
	CAFile             string `mapstructure:"ca_file"`
	CertFile           string `mapstructure:"cert_file"`
	KeyFile            string `mapstructure:"key_file"`
	InsecureSkipVerify bool   `mapstructure:"insecure_skip_verify"`
}

TLSConfig 证书配置

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

XDGSCRAMClient struct to perform SCRAM conversation

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

Begin starts SCRAM conversation

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

Done completes SCRAM conversation

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Step performs step in SCRAM conversation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL