journal

package
v0.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

README

Journal

The Journal service listens to the platform event stream, persists each event to PostgreSQL for auditability and exposes HTTP endpoints to query journals or view per-client telemetry (first/last seen, subscriptions, in/out message counters).

Configuration

The service is configured with the following environment variables (unset values fall back to defaults).

Variable Description Default
SMQ_JOURNAL_LOG_LEVEL Log level for Journal (debug, info, warn, error) info
SMQ_JOURNAL_HTTP_HOST Journal HTTP host localhost
SMQ_JOURNAL_HTTP_PORT Journal HTTP port 9021
SMQ_JOURNAL_HTTP_SERVER_CERT Path to PEM-encoded HTTP server certificate ""
SMQ_JOURNAL_HTTP_SERVER_KEY Path to PEM-encoded HTTP server key ""
SMQ_JOURNAL_HTTP_SERVER_CA_CERTS Path to trusted CA bundle for the HTTP server ""
SMQ_JOURNAL_HTTP_CLIENT_CA_CERTS Path to client CA bundle to require HTTP mTLS ""
SMQ_JOURNAL_DB_HOST Database host address localhost
SMQ_JOURNAL_DB_PORT Database host port 5432
SMQ_JOURNAL_DB_USER Database user supermq
SMQ_JOURNAL_DB_PASS Database password supermq
SMQ_JOURNAL_DB_NAME Name of the database used by the service journal
SMQ_JOURNAL_DB_SSL_MODE Database connection SSL mode (disable, require, verify-ca, verify-full) disable
SMQ_JOURNAL_DB_SSL_CERT Path to the PEM-encoded certificate file ""
SMQ_JOURNAL_DB_SSL_KEY Path to the PEM-encoded key file ""
SMQ_JOURNAL_DB_SSL_ROOT_CERT Path to the PEM-encoded root certificate file ""
SMQ_ES_URL Event store URL (NATS) consumed for journal entries nats://localhost:4222
SMQ_JAEGER_URL Jaeger tracing endpoint http://localhost:4318/v1/traces
SMQ_JAEGER_TRACE_RATIO Trace sampling ratio 1.0
SMQ_SEND_TELEMETRY Send telemetry to the SuperMQ call-home server true
SMQ_AUTH_GRPC_URL Auth service gRPC URL ""
SMQ_AUTH_GRPC_TIMEOUT Auth service gRPC timeout 1s
SMQ_AUTH_GRPC_CLIENT_CERT Path to PEM-encoded Auth gRPC client certificate ""
SMQ_AUTH_GRPC_CLIENT_KEY Path to PEM-encoded Auth gRPC client key ""
SMQ_AUTH_GRPC_SERVER_CA_CERTS Path to PEM-encoded Auth gRPC trusted CA bundle ""
SMQ_DOMAINS_GRPC_URL Domains service gRPC URL ""
SMQ_DOMAINS_GRPC_TIMEOUT Domains service gRPC timeout 1s
SMQ_DOMAINS_GRPC_CLIENT_CERT Path to PEM-encoded Domains gRPC client certificate ""
SMQ_DOMAINS_GRPC_CLIENT_KEY Path to PEM-encoded Domains gRPC client key ""
SMQ_DOMAINS_GRPC_SERVER_CA_CERTS Path to PEM-encoded Domains gRPC trusted CA bundle ""
SMQ_JOURNAL_INSTANCE_ID Journal instance ID (auto-generated when empty) ""
SMQ_ALLOW_UNVERIFIED_USER Allow unverified users to authenticate (useful in dev) false

Deployment

The service is distributed as a Docker container. Check the journals for the journal and journal-db services and how they are wired into the base stack.

To start the service outside of the container, execute the following shell script:

git clone https://github.com/absmach/supermq
cd supermq

# build and install the binary
make journal
make install

# run with the essentials; requires Postgres, Auth gRPC, Domains gRPC, and NATS running
SMQ_JOURNAL_HTTP_HOST=localhost \
SMQ_JOURNAL_HTTP_PORT=9021 \
SMQ_JOURNAL_DB_HOST=localhost \
SMQ_JOURNAL_DB_PORT=5432 \
SMQ_JOURNAL_DB_USER=supermq \
SMQ_JOURNAL_DB_PASS=supermq \
SMQ_JOURNAL_DB_NAME=journal \
SMQ_AUTH_GRPC_URL=localhost:7001 \
SMQ_DOMAINS_GRPC_URL=localhost:7003 \
SMQ_ES_URL=nats://localhost:4222 \
$GOBIN/supermq-journal

HTTP API

Base URL defaults to http://localhost:9021. All journal and telemetry endpoints require Authorization: Bearer <token> (health is public).

Usage
Operation Description
List user journals Page through journals for a user across domains.
List entity journals Page through journals for a group, client, channel, or user within a domain.
View client telemetry Aggregate telemetry counters for a client in a domain.
Health check Liveness and build info.
API examples
List user journals
curl -X GET "http://localhost:9021/journal/user/${USER_ID}?limit=5&with_attributes=true&dir=desc" \
  -H "Authorization: Bearer $TOKEN"

Expected response:

{
  "journals": [
    {
      "operation": "user.create",
      "occurred_at": "2024-01-11T12:05:07.449053Z",
      "attributes": {
        "created_at": "2024-06-12T11:34:32.991591Z",
        "id": "29d425c8-542b-4614-8a4d-a5951945d720",
        "identity": "Gawne-Havlicek@email.com",
        "name": "Newgard-Frisina",
        "status": "enabled",
        "updated_at": "2024-06-12T11:34:33.116795Z",
        "updated_by": "ad228f20-4741-47c5-bef7-d871b541c019"
      },
      "metadata": {
        "Update": "Calvo-Felkins"
      }
    }
  ],
  "total": 1,
  "offset": 0,
  "limit": 5
}
List entity journals in a domain

Retrieves telemetry data for a specific client within a domain. This includes connection status, messages sent/received, and other metrics.

curl -X GET "http://localhost:9021/${DOMAIN_ID}/journal/client/${CLIENT_ID}?operation=client.create&with_metadata=true&dir=desc&limit=10" \
  -H "Authorization: Bearer $TOKEN"

Expected response:

{
  "total": 2,
  "offset": 0,
  "limit": 10,
  "journals": [
    {
      "operation": "client.create",
      "occurred_at": "2024-06-12T11:34:33Z",
      "domain": "29d425c8-542b-4614-8a4d-a5951945d720",
      "attributes": {
        "id": "bb7edb32-2eac-4aad-aebe-ed96fe073879",
        "domain": "29d425c8-542b-4614-8a4d-a5951945d720",
        "name": "clientName",
        "status": "enabled"
      },
      "metadata": {
        "trace_id": "6efb4c24b1b4a684"
      }
    }
  ]
}
View client telemetry

Retrieves telemetry data for a specific client within a domain. This includes connection status, messages sent/received, and other metrics.

curl -X GET "http://localhost:9021/${DOMAIN_ID}/journal/client/${CLIENT_ID}/telemetry" \
  -H "Authorization: Bearer $TOKEN"

Expected response:

{
  "client_id": "bb7edb32-2eac-4aad-aebe-ed96fe073879",
  "domain_id": "29d425c8-542b-4614-8a4d-a5951945d720",
  "subscriptions": 5,
  "inbound_messages": 1234567,
  "outbound_messages": 987654,
  "first_seen": "2024-01-11T10:00:00Z",
  "last_seen": "2024-01-11T12:05:07.449053Z"
}
Health check
curl "http://localhost:9021/health"

Expected response:

{
  "status": "pass",
  "version": "0.18.0",
  "commit": "ffffffff",
  "description": "journal service",
  "build_time": "1970-01-01_00:00:00",
  "instance_id": "b4f1d5d2-4f24-4c2a-9a40-123456789abc"
}

Documentation

Overview

Package journal contains the journal service. This service is responsible for storing events from the event store to a journal log repository. It is also responsible for providing a REST API to query events.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ClientSubscription added in v0.17.0

type ClientSubscription struct {
	ID           string `json:"id" db:"id"`
	SubscriberID string `json:"subscriber_id" db:"subscriber_id"`
	ChannelID    string `json:"channel_id" db:"channel_id"`
	Subtopic     string `json:"subtopic" db:"subtopic"`
	ClientID     string `json:"client_id" db:"client_id"`
}

type ClientTelemetry added in v0.17.0

type ClientTelemetry struct {
	ClientID         string    `json:"client_id"`
	DomainID         string    `json:"domain_id"`
	Subscriptions    uint64    `json:"subscriptions"`
	InboundMessages  uint64    `json:"inbound_messages"`
	OutboundMessages uint64    `json:"outbound_messages"`
	FirstSeen        time.Time `json:"first_seen"`
	LastSeen         time.Time `json:"last_seen"`
}

type EntityType

type EntityType uint8
const (
	UserEntity EntityType = iota
	GroupEntity
	ClientEntity
	ChannelEntity
)

func ToEntityType

func ToEntityType(entityType string) (EntityType, error)

ToEntityType converts string value to a valid entity type.

func (EntityType) Query

func (e EntityType) Query() string

Query returns the SQL condition for the entity type.

func (EntityType) String

func (e EntityType) String() string

String converts entity type to string literal.

type Journal

type Journal struct {
	ID         string         `json:"id,omitempty" db:"id"`
	Domain     string         `json:"domain,omitempty" db:"domain"`
	Operation  string         `json:"operation,omitempty" db:"operation,omitempty"`
	OccurredAt time.Time      `json:"occurred_at,omitempty" db:"occurred_at,omitempty"`
	Attributes map[string]any `json:"attributes,omitempty" db:"attributes,omitempty"` // This is extra information about the journal for example client_id, user_id, group_id etc.
	Metadata   map[string]any `json:"metadata,omitempty" db:"metadata,omitempty"`     // This is decoded metadata from the journal.
}

Journal represents an event journal that occurred in the system.

type JournalsPage

type JournalsPage struct {
	Total    uint64    `json:"total"`
	Offset   uint64    `json:"offset"`
	Limit    uint64    `json:"limit"`
	Journals []Journal `json:"journals"`
}

JournalsPage represents a page of journals.

func (JournalsPage) MarshalJSON

func (page JournalsPage) MarshalJSON() ([]byte, error)

type Page

type Page struct {
	Offset         uint64     `json:"offset" db:"offset"`
	Limit          uint64     `json:"limit" db:"limit"`
	Operation      string     `json:"operation,omitempty" db:"operation,omitempty"`
	From           time.Time  `json:"from,omitempty" db:"from,omitempty"`
	To             time.Time  `json:"to,omitempty" db:"to,omitempty"`
	WithAttributes bool       `json:"with_attributes,omitempty"`
	WithMetadata   bool       `json:"with_metadata,omitempty"`
	EntityID       string     `json:"entity_id,omitempty" db:"entity_id,omitempty"`
	EntityType     EntityType `json:"entity_type,omitempty" db:"entity_type,omitempty"`
	Direction      string     `json:"direction,omitempty"`
}

Page is used to filter journals.

type Repository

type Repository interface {
	// Save persists the journal to a database.
	Save(ctx context.Context, journal Journal) error

	// RetrieveAll retrieves all journals from the database with the given page.
	RetrieveAll(ctx context.Context, page Page) (JournalsPage, error)

	// SaveClientTelemetry persists telemetry data for a client to the database.
	SaveClientTelemetry(ctx context.Context, ct ClientTelemetry) error

	// RetrieveClientTelemetry retrieves telemetry data for a client from the database.
	RetrieveClientTelemetry(ctx context.Context, clientID, domainID string) (ClientTelemetry, error)

	// DeleteClientTelemetry removes telemetry data for a client from the database.
	DeleteClientTelemetry(ctx context.Context, clientID, domainID string) error

	// AddSubscription adds a subscription to the client telemetry.
	AddSubscription(ctx context.Context, sub ClientSubscription) error

	// CountSubscriptions returns the number of subscriptions for a client.
	CountSubscriptions(ctx context.Context, clientID string) (uint64, error)

	// RemoveSubscription removes a subscription from the client telemetry.
	RemoveSubscription(ctx context.Context, subscriberID string) error

	// IncrementInboundMessages increments the inbound messages count for a client.
	IncrementInboundMessages(ctx context.Context, ct ClientTelemetry) error

	// IncrementOutboundMessages increments the outbound messages count for a client.
	IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error
}

Repository provides access to the journal log database.

type Service

type Service interface {
	// Save saves the journal to the database.
	Save(ctx context.Context, journal Journal) error

	// RetrieveAll retrieves all journals from the database with the given page.
	RetrieveAll(ctx context.Context, session smqauthn.Session, page Page) (JournalsPage, error)

	// RetrieveClientTelemetry retrieves telemetry data for a client.
	RetrieveClientTelemetry(ctx context.Context, session smqauthn.Session, clientID string) (ClientTelemetry, error)
}

Service provides access to the journal log service.

func NewService

func NewService(idp supermq.IDProvider, repository Repository) Service

Directories

Path Synopsis
Package api contains API-related concerns: endpoint definitions, middlewares and all resource representations.
Package api contains API-related concerns: endpoint definitions, middlewares and all resource representations.
Package events provides the event consumer for the journal service.
Package events provides the event consumer for the journal service.
Package middleware provides tracing, logging and metrics middleware for SuperMQ Journal service.
Package middleware provides tracing, logging and metrics middleware for SuperMQ Journal service.
Package mocks contains mocks for testing purposes.
Package mocks contains mocks for testing purposes.
Package postgres provides a postgres implementation of the journal log repository.
Package postgres provides a postgres implementation of the journal log repository.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL