iggcon

package
v0.5.0-edge.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2025 License: Apache-2.0 Imports: 6 Imported by: 4

Documentation

Index

Constants

View Source
const (
	Microsecond Duration = 1
	Millisecond          = 1000 * Microsecond
	Second               = 1000 * Millisecond
	Minute               = 60 * Second
	Hour                 = 60 * Minute
)
View Source
const (
	// MaxPayloadSize is maximum allowed size in bytes for a message payload.
	//
	// This constant defines the upper limit for the size of an IggyMessage payload. Attempting to create a message
	// with a payload larger than this value will result
	// in an ierror.TooBigUserMessagePayload error.
	//
	//  Constraints
	//  - Minimum payload size: 1 byte (empty payloads are not allowed)
	//  - Maximum payload size: 10 MB
	MaxPayloadSize = 10 * 1000 * 1000

	// MaxUserHeadersSize is maximum allowed size in bytes for user-defined headers.
	//
	// This constant defines the upper limit for the combined size of all user headers in an IggyMessage. Attempting to
	// create a message with user headers larger than this value will result in an ierror.TooBigUserHeaders error.
	//
	//  Constraints
	//  - Maximum headers size: 100 KB
	//  - Each individual header key is limited to 255 bytes
	//  - Each individual header value is limited to 255 bytes
	MaxUserHeadersSize = 100 * 1000
)
View Source
const MessageHeaderSize = 8 + 16 + 8 + 8 + 8 + 4 + 4

Variables

This section is empty.

Functions

func DeserializeHeaders

func DeserializeHeaders(userHeadersBytes []byte) (map[HeaderKey]HeaderValue, error)

func GetHeadersBytes

func GetHeadersBytes(headers map[HeaderKey]HeaderValue) []byte

Types

type ChangePasswordRequest

type ChangePasswordRequest struct {
	UserID          Identifier `json:"-"`
	CurrentPassword string     `json:"CurrentPassword"`
	NewPassword     string     `json:"NewPassword"`
}

type ClientInfo

type ClientInfo struct {
	ID                  uint32 `json:"id"`
	Address             string `json:"address"`
	UserID              uint32 `json:"userId"`
	Transport           string `json:"transport"`
	ConsumerGroupsCount uint32 `json:"consumerGroupsCount"`
}

type ClientInfoDetails

type ClientInfoDetails struct {
	ClientInfo
	ConsumerGroups []ConsumerGroupInfo `json:"consumerGroups,omitempty"`
}

type CommandCode

type CommandCode int
const (
	PingCode                 CommandCode = 1
	GetStatsCode             CommandCode = 10
	GetMeCode                CommandCode = 20
	GetClientCode            CommandCode = 21
	GetClientsCode           CommandCode = 22
	GetUserCode              CommandCode = 31
	GetUsersCode             CommandCode = 32
	CreateUserCode           CommandCode = 33
	DeleteUserCode           CommandCode = 34
	UpdateUserCode           CommandCode = 35
	UpdatePermissionsCode    CommandCode = 36
	ChangePasswordCode       CommandCode = 37
	LoginUserCode            CommandCode = 38
	LogoutUserCode           CommandCode = 39
	GetAccessTokensCode      CommandCode = 41
	CreateAccessTokenCode    CommandCode = 42
	DeleteAccessTokenCode    CommandCode = 43
	LoginWithAccessTokenCode CommandCode = 44
	PollMessagesCode         CommandCode = 100
	SendMessagesCode         CommandCode = 101
	GetOffsetCode            CommandCode = 120
	StoreOffsetCode          CommandCode = 121
	GetStreamCode            CommandCode = 200
	GetStreamsCode           CommandCode = 201
	CreateStreamCode         CommandCode = 202
	DeleteStreamCode         CommandCode = 203
	UpdateStreamCode         CommandCode = 204
	GetTopicCode             CommandCode = 300
	GetTopicsCode            CommandCode = 301
	CreateTopicCode          CommandCode = 302
	DeleteTopicCode          CommandCode = 303
	UpdateTopicCode          CommandCode = 304
	CreatePartitionsCode     CommandCode = 402
	DeletePartitionsCode     CommandCode = 403
	GetGroupCode             CommandCode = 600
	GetGroupsCode            CommandCode = 601
	CreateGroupCode          CommandCode = 602
	DeleteGroupCode          CommandCode = 603
	JoinGroupCode            CommandCode = 604
	LeaveGroupCode           CommandCode = 605
)

type CompressionAlgorithm

type CompressionAlgorithm uint8
const (
	CompressionAlgorithmNone CompressionAlgorithm = 1
	CompressionAlgorithmGzip CompressionAlgorithm = 2
)

type Consumer

type Consumer struct {
	Kind ConsumerKind
	Id   Identifier
}

func DefaultConsumer

func DefaultConsumer() Consumer

func NewGroupConsumer

func NewGroupConsumer(id Identifier) Consumer

NewGroupConsumer create a new Consumer whose kind is ConsumerKindGroup from the Identifier

func NewSingleConsumer

func NewSingleConsumer(id Identifier) Consumer

NewSingleConsumer create a new Consumer whose kind is ConsumerKindSingle from the Identifier

type ConsumerGroup

type ConsumerGroup struct {
	Id              uint32 `json:"id"`
	Name            string `json:"name"`
	PartitionsCount uint32 `json:"partitionsCount"`
	MembersCount    uint32 `json:"membersCount"`
}

type ConsumerGroupDetails

type ConsumerGroupDetails struct {
	ConsumerGroup
	Members []ConsumerGroupMember
}

type ConsumerGroupInfo

type ConsumerGroupInfo struct {
	StreamId        uint32 `json:"streamId"`
	TopicId         uint32 `json:"topicId"`
	ConsumerGroupId uint32 `json:"consumerGroupId"`
}

type ConsumerGroupMember

type ConsumerGroupMember struct {
	ID              uint32
	PartitionsCount uint32
	Partitions      []uint32
}

type ConsumerKind

type ConsumerKind int
const (
	ConsumerKindSingle ConsumerKind = 1
	ConsumerKindGroup  ConsumerKind = 2
)

type ConsumerOffsetInfo

type ConsumerOffsetInfo struct {
	PartitionId   uint32 `json:"partitionId"`
	CurrentOffset uint64 `json:"currentOffset"`
	StoredOffset  uint64 `json:"storedOffset"`
}

type CreateConsumerGroupRequest

type CreateConsumerGroupRequest struct {
	StreamId        Identifier `json:"streamId"`
	TopicId         Identifier `json:"topicId"`
	ConsumerGroupId *uint32    `json:"consumerGroupId"`
	Name            string     `json:"name"`
}

type CreatePartitionsRequest

type CreatePartitionsRequest struct {
	StreamId        Identifier `json:"streamId"`
	TopicId         Identifier `json:"topicId"`
	PartitionsCount uint32     `json:"partitionsCount"`
}

type CreatePersonalAccessTokenRequest

type CreatePersonalAccessTokenRequest struct {
	Name   string `json:"Name"`
	Expiry uint32 `json:"Expiry"`
}

type CreateStreamRequest

type CreateStreamRequest struct {
	StreamId int    `json:"streamId"`
	Name     string `json:"name"`
}

type CreateTopicRequest

type CreateTopicRequest struct {
	StreamId             Identifier    `json:"streamId"`
	TopicId              uint32        `json:"topicId"`
	PartitionsCount      int           `json:"partitionsCount"`
	CompressionAlgorithm uint8         `json:"compressionAlgorithm"`
	MessageExpiry        time.Duration `json:"messageExpiry"`
	MaxTopicSize         uint64        `json:"maxTopicSize"`
	ReplicationFactor    uint8         `json:"replicationFactor"`
	Name                 string        `json:"name"`
}

type CreateUserRequest

type CreateUserRequest struct {
	Username    string       `json:"username"`
	Password    string       `json:"Password"`
	Status      UserStatus   `json:"Status"`
	Permissions *Permissions `json:"Permissions,omitempty"`
}

type DeleteConsumerGroupRequest

type DeleteConsumerGroupRequest struct {
	StreamId        Identifier `json:"streamId"`
	TopicId         Identifier `json:"topicId"`
	ConsumerGroupId Identifier `json:"consumerGroupId"`
}

type DeletePartitionsRequest

type DeletePartitionsRequest struct {
	StreamId        Identifier `json:"streamId"`
	TopicId         Identifier `json:"topicId"`
	PartitionsCount uint32     `json:"partitionsCount"`
}

type DeletePersonalAccessTokenRequest

type DeletePersonalAccessTokenRequest struct {
	Name string `json:"Name"`
}

type Duration

type Duration uint64

Duration represents the expiration duration in microsecond (µs).

const (
	// IggyExpiryServerDefault use the default expiry time from the server
	IggyExpiryServerDefault Duration = 0
	// IggyExpiryNeverExpire never expire
	IggyExpiryNeverExpire Duration = math.MaxUint64
)

type GetConsumerOffsetRequest

type GetConsumerOffsetRequest struct {
	StreamId    Identifier `json:"streamId"`
	TopicId     Identifier `json:"topicId"`
	Consumer    Consumer   `json:"consumer"`
	PartitionId *uint32    `json:"partitionId"`
}

type GetStreamRequest

type GetStreamRequest struct {
	StreamID Identifier
}

type GlobalPermissions

type GlobalPermissions struct {
	ManageServers bool `json:"ManageServers"`
	ReadServers   bool `json:"ReadServers"`
	ManageUsers   bool `json:"ManageUsers"`
	ReadUsers     bool `json:"ReadUsers"`
	ManageStreams bool `json:"ManageStreams"`
	ReadStreams   bool `json:"ReadStreams"`
	ManageTopics  bool `json:"ManageTopics"`
	ReadTopics    bool `json:"ReadTopics"`
	PollMessages  bool `json:"PollMessages"`
	SendMessages  bool `json:"SendMessages"`
}

type HeaderKey

type HeaderKey struct {
	Value string
}

func NewHeaderKey

func NewHeaderKey(val string) (HeaderKey, error)

type HeaderKind

type HeaderKind int
const (
	Raw     HeaderKind = 1
	String  HeaderKind = 2
	Bool    HeaderKind = 3
	Int8    HeaderKind = 4
	Int16   HeaderKind = 5
	Int32   HeaderKind = 6
	Int64   HeaderKind = 7
	Int128  HeaderKind = 8
	Uint8   HeaderKind = 9
	Uint16  HeaderKind = 10
	Uint32  HeaderKind = 11
	Uint64  HeaderKind = 12
	Uint128 HeaderKind = 13
	Float   HeaderKind = 14
	Double  HeaderKind = 15
)

type HeaderValue

type HeaderValue struct {
	Kind  HeaderKind
	Value []byte
}

type IdKind

type IdKind uint8
const (
	NumericId IdKind = 1
	StringId  IdKind = 2
)

type Identifier

type Identifier struct {
	Kind   IdKind
	Length int
	Value  []byte
}

func NewIdentifier

func NewIdentifier[T uint32 | string](value T) (Identifier, error)

NewIdentifier create a new identifier

func (Identifier) String

func (id Identifier) String() (string, error)

String returns the string value of the identifier.

func (Identifier) Uint32

func (id Identifier) Uint32() (uint32, error)

Uint32 returns the numeric value of the identifier.

type IdentityInfo

type IdentityInfo struct {
	// Unique identifier (numeric) of the user.
	UserId uint32 `json:"userId"`
	// The optional tokens, used only by HTTP transport.
	AccessToken *string `json:"accessToken"`
}

type IggyMessage

type IggyMessage struct {
	Header      MessageHeader
	Payload     []byte
	UserHeaders []byte
}

func NewIggyMessage

func NewIggyMessage(payload []byte, opts ...IggyMessageOpt) (IggyMessage, error)

NewIggyMessage Creates a new message with customizable parameters.

type IggyMessageCompression

type IggyMessageCompression string
const (
	MESSAGE_COMPRESSION_NONE      IggyMessageCompression = "none"
	MESSAGE_COMPRESSION_S2        IggyMessageCompression = "s2"
	MESSAGE_COMPRESSION_S2_BETTER IggyMessageCompression = "s2-better"
	MESSAGE_COMPRESSION_S2_BEST   IggyMessageCompression = "s2-best"
)

type IggyMessageOpt

type IggyMessageOpt func(message *IggyMessage)

func WithID

func WithID(id [16]byte) IggyMessageOpt

func WithUserHeaders

func WithUserHeaders(userHeaders map[HeaderKey]HeaderValue) IggyMessageOpt

type JoinConsumerGroupRequest

type JoinConsumerGroupRequest struct {
	StreamId        Identifier `json:"streamId"`
	TopicId         Identifier `json:"topicId"`
	ConsumerGroupId Identifier `json:"consumerGroupId"`
}

type LeaveConsumerGroupRequest

type LeaveConsumerGroupRequest struct {
	StreamId        Identifier `json:"streamId"`
	TopicId         Identifier `json:"topicId"`
	ConsumerGroupId Identifier `json:"consumerGroupId"`
}

type LoginUserRequest

type LoginUserRequest struct {
	Username string `json:"username"`
	Password string `json:"password"`
	Version  string `json:"version,omitempty"`
	Context  string `json:"context,omitempty"`
}

type LoginWithPersonalAccessTokenRequest

type LoginWithPersonalAccessTokenRequest struct {
	Token string `json:"token"`
}

type MessageHeader

type MessageHeader struct {
	Checksum         uint64    `json:"checksum"`
	Id               MessageID `json:"id"`
	Offset           uint64    `json:"offset"`
	Timestamp        uint64    `json:"timestamp"`
	OriginTimestamp  uint64    `json:"origin_timestamp"`
	UserHeaderLength uint32    `json:"user_header_length"`
	PayloadLength    uint32    `json:"payload_length"`
}

func MessageHeaderFromBytes

func MessageHeaderFromBytes(data []byte) (*MessageHeader, error)

func NewMessageHeader

func NewMessageHeader(id MessageID, payloadLength uint32, userHeaderLength uint32) MessageHeader

func (*MessageHeader) ToBytes

func (mh *MessageHeader) ToBytes() []byte

type MessageID

type MessageID [16]byte

type MessagePolling

type MessagePolling byte
const (
	POLLING_OFFSET    MessagePolling = 1
	POLLING_TIMESTAMP MessagePolling = 2
	POLLING_FIRST     MessagePolling = 3
	POLLING_LAST      MessagePolling = 4
	POLLING_NEXT      MessagePolling = 5
)

type PartitionContract

type PartitionContract struct {
	Id            uint32 `json:"id"`
	MessagesCount uint64 `json:"messagesCount"`
	CreatedAt     uint64 `json:"createdAt"`
	SegmentsCount uint32 `json:"segmentsCount"`
	CurrentOffset uint64 `json:"currentOffset"`
	SizeBytes     uint64 `json:"sizeBytes"`
}

type Partitioning

type Partitioning struct {
	Kind   PartitioningKind
	Length int
	Value  []byte
}

func EntityIdBytes

func EntityIdBytes(value []byte) (Partitioning, error)

func EntityIdGuid

func EntityIdGuid(value uuid.UUID) Partitioning

func EntityIdInt

func EntityIdInt(value int) Partitioning

func EntityIdString

func EntityIdString(value string) (Partitioning, error)

func EntityIdUlong

func EntityIdUlong(value uint64) Partitioning

func None

func None() Partitioning

func PartitionId

func PartitionId(value uint32) Partitioning

type PartitioningKind

type PartitioningKind int
const (
	Balanced        PartitioningKind = 1
	PartitionIdKind PartitioningKind = 2
	MessageKey      PartitioningKind = 3
)

type Permissions

type Permissions struct {
	Global  GlobalPermissions          `json:"Global"`
	Streams map[int]*StreamPermissions `json:"Streams,omitempty"`
}

type PersonalAccessTokenInfo

type PersonalAccessTokenInfo struct {
	Name   string     `json:"Name"`
	Expiry *time.Time `json:"Expiry"`
}

type PollMessageRequest

type PollMessageRequest struct {
	StreamId        Identifier      `json:"streamId"`
	TopicId         Identifier      `json:"topicId"`
	Consumer        Consumer        `json:"consumer"`
	PartitionId     uint32          `json:"partitionId"`
	PollingStrategy PollingStrategy `json:"pollingStrategy"`
	Count           int             `json:"count"`
	AutoCommit      bool            `json:"autoCommit"`
}

type PolledMessage

type PolledMessage struct {
	PartitionId   uint32
	CurrentOffset uint64
	MessageCount  uint32
	Messages      []IggyMessage
}

type PollingStrategy

type PollingStrategy struct {
	Kind  MessagePolling
	Value uint64
}

func FirstPollingStrategy

func FirstPollingStrategy() PollingStrategy

func LastPollingStrategy

func LastPollingStrategy() PollingStrategy

func NewPollingStrategy

func NewPollingStrategy(kind MessagePolling, value uint64) PollingStrategy

func NextPollingStrategy

func NextPollingStrategy() PollingStrategy

func OffsetPollingStrategy

func OffsetPollingStrategy(value uint64) PollingStrategy

func TimestampPollingStrategy

func TimestampPollingStrategy(value uint64) PollingStrategy

type Protocol

type Protocol string
const (
	Http Protocol = "Http"
	Tcp  Protocol = "Tcp"
	Quic Protocol = "Quic"
)

type RawPersonalAccessToken

type RawPersonalAccessToken struct {
	Token string `json:"token"`
}

type ReceivedMessage

type ReceivedMessage struct {
	Message       IggyMessage
	CurrentOffset uint64
	PartitionId   uint32
}

type SendMessagesRequest

type SendMessagesRequest struct {
	StreamId     Identifier    `json:"streamId"`
	TopicId      Identifier    `json:"topicId"`
	Partitioning Partitioning  `json:"partitioning"`
	Messages     []IggyMessage `json:"messages"`
}

type Stats

type Stats struct {
	ProcessId           uint32  `json:"process_id"`
	CpuUsage            float32 `json:"cpu_usage"`
	TotalCpuUsage       float32 `json:"total_cpu_usage"`
	MemoryUsage         uint64  `json:"memory_usage"`
	TotalMemory         uint64  `json:"total_memory"`
	AvailableMemory     uint64  `json:"available_memory"`
	RunTime             uint64  `json:"run_time"`
	StartTime           uint64  `json:"start_time"`
	ReadBytes           uint64  `json:"read_bytes"`
	WrittenBytes        uint64  `json:"written_bytes"`
	MessagesSizeBytes   uint64  `json:"messages_size_bytes"`
	StreamsCount        uint32  `json:"streams_count"`
	TopicsCount         uint32  `json:"topics_count"`
	PartitionsCount     uint32  `json:"partitions_count"`
	SegmentsCount       uint32  `json:"segments_count"`
	MessagesCount       uint64  `json:"messages_count"`
	ClientsCount        uint32  `json:"clients_count"`
	ConsumerGroupsCount uint32  `json:"consumer_groups_count"`
	Hostname            string  `json:"hostname"`
	OsName              string  `json:"os_name"`
	OsVersion           string  `json:"os_version"`
	KernelVersion       string  `json:"kernel_version"`
}

type StoreConsumerOffsetRequest

type StoreConsumerOffsetRequest struct {
	StreamId    Identifier `json:"streamId"`
	TopicId     Identifier `json:"topicId"`
	Consumer    Consumer   `json:"consumer"`
	PartitionId *uint32    `json:"partitionId"`
	Offset      uint64     `json:"offset"`
}

type Stream

type Stream struct {
	Id            uint32 `json:"id"`
	Name          string `json:"name"`
	SizeBytes     uint64 `json:"sizeBytes"`
	CreatedAt     uint64 `json:"createdAt"`
	MessagesCount uint64 `json:"messagesCount"`
	TopicsCount   uint32 `json:"topicsCount"`
}

type StreamDetails

type StreamDetails struct {
	Stream
	Topics []Topic `json:"topics,omitempty"`
}

type StreamPermissions

type StreamPermissions struct {
	ManageStream bool                      `json:"ManageStream"`
	ReadStream   bool                      `json:"ReadStream"`
	ManageTopics bool                      `json:"ManageTopics"`
	ReadTopics   bool                      `json:"ReadTopics"`
	PollMessages bool                      `json:"PollMessages"`
	SendMessages bool                      `json:"SendMessages"`
	Topics       map[int]*TopicPermissions `json:"Topics,omitempty"`
}

type Topic

type Topic struct {
	Id                   uint32   `json:"id"`
	CreatedAt            uint64   `json:"createdAt"`
	Name                 string   `json:"name"`
	Size                 uint64   `json:"size"`
	MessageExpiry        Duration `json:"messageExpiry"`
	CompressionAlgorithm uint8    `json:"compressionAlgorithm"`
	MaxTopicSize         uint64   `json:"maxTopicSize"`
	ReplicationFactor    uint8    `json:"replicationFactor"`
	MessagesCount        uint64   `json:"messagesCount"`
	PartitionsCount      uint32   `json:"partitionsCount"`
}

type TopicDetails

type TopicDetails struct {
	Topic
	Partitions []PartitionContract `json:"partitions,omitempty"`
}

type TopicPermissions

type TopicPermissions struct {
	ManageTopic  bool `json:"ManageTopic"`
	ReadTopic    bool `json:"ReadTopic"`
	PollMessages bool `json:"PollMessages"`
	SendMessages bool `json:"SendMessages"`
}

type UpdatePermissionsRequest

type UpdatePermissionsRequest struct {
	UserID      Identifier   `json:"-"`
	Permissions *Permissions `json:"Permissions,omitempty"`
}

type UpdateTopicRequest

type UpdateTopicRequest struct {
	StreamId             Identifier    `json:"streamId"`
	TopicId              Identifier    `json:"topicId"`
	CompressionAlgorithm uint8         `json:"compressionAlgorithm"`
	MessageExpiry        time.Duration `json:"messageExpiry"`
	MaxTopicSize         uint64        `json:"maxTopicSize"`
	ReplicationFactor    uint8         `json:"replicationFactor"`
	Name                 string        `json:"name"`
}

type UpdateUserRequest

type UpdateUserRequest struct {
	UserID   Identifier  `json:"-"`
	Username *string     `json:"username"`
	Status   *UserStatus `json:"userStatus"`
}

type UserInfo

type UserInfo struct {
	Id        uint32     `json:"Id"`
	CreatedAt uint64     `json:"CreatedAt"`
	Status    UserStatus `json:"Status"`
	Username  string     `json:"Username"`
}

type UserInfoDetails

type UserInfoDetails struct {
	UserInfo
	Permissions *Permissions `json:"Permissions"`
}

type UserStatus

type UserStatus int
const (
	Active UserStatus = iota
	Inactive
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL