readers

package
v0.40.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: Apache-2.0 Imports: 4 Imported by: 0

README

Readers

Readers provide an implementation of various message readers. The service exposes an HTTP API for querying messages that have been persisted by the Writers (consumers).

Implementations

Two backend implementations are provided, each as its own binary:

Implementation Binary Description
PostgreSQL mainfluxlabs-postgres-reader Reads SenML and JSON messages from PostgreSQL
TimescaleDB mainfluxlabs-timescale-reader Reads SenML and JSON messages from TimescaleDB

Message Formats

The service supports two message formats:

SenML — structured IoT measurement records with well-known fields (n, v, vb, vs, vd, t, u).

JSON — free-form JSON records, queried by arbitrary filter expressions.

Authentication

Requests are authenticated using the Thing key:

Authorization: Thing <thing_key>

The authenticated thing determines which messages are accessible.

Configuration

PostgreSQL Reader

The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.

Variable Description Default
MF_POSTGRES_READER_LOG_LEVEL Log level (debug, info, warn, error) error
MF_POSTGRES_READER_PORT HTTP port 8180
MF_POSTGRES_READER_CLIENT_TLS Flag that indicates if TLS should be turned on false
MF_POSTGRES_READER_CA_CERTS Path to trusted CAs in PEM format
MF_POSTGRES_READER_SERVER_CERT Path to server certificate in PEM format
MF_POSTGRES_READER_SERVER_KEY Path to server key in PEM format
MF_POSTGRES_READER_DB_HOST Database host address localhost
MF_POSTGRES_READER_DB_PORT Database host port 5432
MF_POSTGRES_READER_DB_USER Database user mainflux
MF_POSTGRES_READER_DB_PASS Database password mainflux
MF_POSTGRES_READER_DB Database name mainflux
MF_POSTGRES_READER_DB_SSL_MODE Database connection SSL mode (disable, require, verify-ca, verify-full) disable
MF_POSTGRES_READER_DB_SSL_CERT Path to the PEM encoded certificate file
MF_POSTGRES_READER_DB_SSL_KEY Path to the PEM encoded key file
MF_POSTGRES_READER_DB_SSL_ROOT_CERT Path to the PEM encoded root certificate file
MF_THINGS_AUTH_GRPC_URL Things service Auth gRPC URL localhost:8183
MF_THINGS_AUTH_GRPC_TIMEOUT Things service Auth gRPC request timeout in seconds 1s
MF_AUTH_GRPC_URL Auth service gRPC URL localhost:8181
MF_AUTH_GRPC_TIMEOUT Auth service gRPC request timeout 1s
MF_JAEGER_URL Jaeger server URL for distributed tracing. Leave empty to disable tracing.
TimescaleDB Reader

Uses the same environment variable names with MF_TIMESCALE_READER_ prefix (except shared vars like MF_THINGS_AUTH_GRPC_URL).

Deployment

The service itself is distributed as Docker container. Check the postgres-reader service section in docker-compose to see how the service is deployed.

To start the PostgreSQL reader service, execute the following shell script:

# Download the latest version of the service
git clone https://github.com/MainfluxLabs/mainflux

cd mainflux

# compile the postgres-reader service
make postgres-reader

# Copy binary to bin
make install

# Set the environment variables and run the service
MF_POSTGRES_READER_LOG_LEVEL=[Log level] \
MF_POSTGRES_READER_PORT=[HTTP port] \
MF_POSTGRES_READER_DB_HOST=[Database host address] \
MF_POSTGRES_READER_DB_PORT=[Database host port] \
MF_POSTGRES_READER_DB_USER=[Database user] \
MF_POSTGRES_READER_DB_PASS=[Database password] \
MF_POSTGRES_READER_DB=[Database name] \
MF_THINGS_AUTH_GRPC_URL=[Things service Auth gRPC URL] \
$GOBIN/mainfluxlabs-postgres-reader

Usage

Starting the service exposes the HTTP API for querying persisted messages. Authentication is performed using the Thing key passed in the Authorization header.

For the full API reference, see the API documentation.

gRPC API

In addition to the HTTP API, the postgres-reader exposes a gRPC API that allows other internal services to query messages by thing key.

Environment Variables (postgres-reader)
Variable Description Default
MF_POSTGRES_READER_GRPC_PORT gRPC server port 8184
MF_POSTGRES_READER_GRPC_SERVER_CERT Path to gRPC server TLS certificate (optional) ""
MF_POSTGRES_READER_GRPC_SERVER_KEY Path to gRPC server TLS key (optional) ""
Methods
  • ListJSONMessages — returns a page of JSON messages filtered by thing key and page metadata
  • ListSenMLMessages — returns a page of SenML messages filtered by thing key and page metadata

Authentication is done via thing key (no user token required).

Consuming Services

Services that query the postgres-reader gRPC API (e.g. rules) use the following environment variable to locate the endpoint:

Variable Description Default
MF_POSTGRES_READER_GRPC_URL postgres-reader gRPC endpoint localhost:8184 (Docker: postgres-reader:8184)
MF_POSTGRES_READER_GRPC_TIMEOUT gRPC request timeout 1s

Documentation

Index

Constants

View Source
const (
	// EqualKey represents the equal comparison operator key.
	EqualKey = "eq"
	// LowerThanKey represents the lower-than comparison operator key.
	LowerThanKey = "lt"
	// LowerThanEqualKey represents the lower-than-or-equal comparison operator key.
	LowerThanEqualKey = "le"
	// GreaterThanKey represents the greater-than-or-equal comparison operator key.
	GreaterThanKey = "gt"
	// GreaterThanEqualKey represents the greater-than-or-equal comparison operator key.
	GreaterThanEqualKey = "ge"
	// AggregationMin represents the minimum aggregation key.
	AggregationMin = "min"
	// AggregationMax represents the maximum aggregation key.
	AggregationMax = "max"
	// AggregationAvg represents the average aggregation key.
	AggregationAvg = "avg"
	// AggregationCount represents the count aggregation key.
	AggregationCount = "count"
	// AggregationSum represents the sum aggregation key.
	AggregationSum = "sum"
	// AggregationFirst represents the first-in-bucket aggregation key.
	AggregationFirst = "first"
	// AggregationLast represents the last-in-bucket aggregation key.
	AggregationLast = "last"
)

Variables

View Source
var ErrReadMessages = errors.New("failed to read messages from database")

ErrReadMessages indicates failure occurred while reading messages from database.

Functions

func ComparatorSymbol added in v0.36.0

func ComparatorSymbol(key string) string

ComparatorSymbol converts a comparison operator key into its SQL symbol.

func ParseValueComparator

func ParseValueComparator(query map[string]any) string

ParseValueComparator converts comparison operator keys into mathematical notation.

Types

type Backup added in v0.35.0

type Backup struct {
	JSONMessages  JSONMessagesPage
	SenMLMessages SenMLMessagesPage
}

type JSONMessageRepository added in v0.30.1

type JSONMessageRepository interface {
	// Retrieve retrieves the json messages with given filters.
	Retrieve(ctx context.Context, rpm JSONPageMetadata) (JSONMessagesPage, error)

	// Backup backups the json messages with given filters.
	Backup(ctx context.Context, rpm JSONPageMetadata) (JSONMessagesPage, error)

	// Restore restores the json messages.
	Restore(ctx context.Context, messages ...Message) error

	// Remove deletes the json messages within a time range.
	Remove(ctx context.Context, rpm JSONPageMetadata) error
}

type JSONMessagesPage added in v0.30.0

type JSONMessagesPage = domain.JSONMessagesPage

Domain type aliases

type JSONPageMetadata added in v0.30.0

type JSONPageMetadata = domain.JSONPageMetadata

Domain type aliases

type Message

type Message = domain.Message

Domain type aliases

type MessagesPage

type MessagesPage = domain.MessagesPage

Domain type aliases

type SenMLMessageRepository added in v0.30.1

type SenMLMessageRepository interface {
	// Retrieve retrieves the senml messages with given filters.
	Retrieve(ctx context.Context, rpm SenMLPageMetadata) (SenMLMessagesPage, error)

	// Backup backups the senml messages with given filters.
	Backup(ctx context.Context, rpm SenMLPageMetadata) (SenMLMessagesPage, error)

	// Restore restores the senml messages.
	Restore(ctx context.Context, messages ...Message) error

	// Remove deletes the senml messages within a time range.
	Remove(ctx context.Context, rpm SenMLPageMetadata) error
}

type SenMLMessagesPage added in v0.30.0

type SenMLMessagesPage = domain.SenMLMessagesPage

Domain type aliases

type SenMLPageMetadata added in v0.30.0

type SenMLPageMetadata = domain.SenMLPageMetadata

Domain type aliases

type Service added in v0.30.1

type Service interface {
	// ListJSONMessages retrieves the json messages with given filters.
	ListJSONMessages(ctx context.Context, token string, key domain.ThingKey, rpm JSONPageMetadata) (JSONMessagesPage, error)

	// ListSenMLMessages retrieves the senml messages with given filters.
	ListSenMLMessages(ctx context.Context, token string, key domain.ThingKey, rpm SenMLPageMetadata) (SenMLMessagesPage, error)

	// ExportJSONMessages retrieves the json messages with given filters, intended for exporting.
	ExportJSONMessages(ctx context.Context, token string, rpm JSONPageMetadata) (JSONMessagesPage, error)

	// ExportSenMLMessages retrieves the senml messages with given filters, intended for exporting.
	ExportSenMLMessages(ctx context.Context, token string, rpm SenMLPageMetadata) (SenMLMessagesPage, error)

	// Backup backups all json and senml messages.
	Backup(ctx context.Context, token string) (Backup, error)

	// Restore restores json and senml messages.
	Restore(ctx context.Context, token string, backup Backup) error

	// DeleteJSONMessages deletes the json messages by publisher within a time range.
	DeleteJSONMessages(ctx context.Context, token string, rpm JSONPageMetadata) error

	// DeleteSenMLMessages deletes the senml messages by publisher within a time range.
	DeleteSenMLMessages(ctx context.Context, token string, rpm SenMLPageMetadata) error

	// DeleteAllJSONMessages deletes the senml messages within a time range, requires admin privileges.
	DeleteAllJSONMessages(ctx context.Context, token string, rpm JSONPageMetadata) error

	// DeleteAllSenMLMessages deletes the senml messages within a time range, requires admin privileges.
	DeleteAllSenMLMessages(ctx context.Context, token string, rpm SenMLPageMetadata) error
}

Service specifies an API that must be fullfiled by the domain service implementation, and all of its decorators (e.g. logging & metrics).

func New added in v0.30.1

Directories

Path Synopsis
api
Package postgres contains repository implementations using Postgres as the underlying database.
Package postgres contains repository implementations using Postgres as the underlying database.
Package timescale contains repository implementations using Timescale as the underlying database.
Package timescale contains repository implementations using Timescale as the underlying database.
Package tracing contains middlewares that will add spans to existing traces.
Package tracing contains middlewares that will add spans to existing traces.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL