postgres

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

README

MIDAS Database Setup

Simple database setup for the MIDAS authority governance engine.

Quick Start

# Create the database (interactive)
./setup-db.sh

# Or specify the database name directly
./setup-db.sh midas

That's it! The script will create the database and apply the complete schema.

Files

  • schema.sql - Complete MIDAS database schema (single file)
  • setup-db.sh - Setup script that creates and initializes the database

What Gets Created

The script creates these tables:

  1. decision_surfaces - Decision domains with versioning
  2. authority_profiles - Authority configurations per surface
  3. agents - Autonomous actors in the system
  4. agent_authorizations - Grants linking agents to profiles
  5. operational_envelopes - Five-section envelope model for decisions
  6. audit_events - Immutable audit log with hash chain integrity

Configuration

Use standard PostgreSQL environment variables:

export PGHOST=localhost      # default: localhost
export PGPORT=5432          # default: 5432
export PGUSER=postgres      # default: postgres
export PGPASSWORD=secret    # your password

Or pass them inline:

PGHOST=db.example.com PGUSER=midas ./setup-db.sh midas

Examples

# Create database named 'midas' on localhost
./setup-db.sh midas

# Create database on remote host
PGHOST=db.example.com ./setup-db.sh midas_prod

# Show help
./setup-db.sh --help

Migration from Old Setup

If you're migrating from the old multi-file migration structure:

  1. Back up your data if you have an existing database
  2. Run ./setup-db.sh to create a fresh database
  3. Restore your data from backup if needed

The new schema.sql represents the final state of all previous migrations combined into a single, clean schema definition.

Development

To update the schema:

  1. Edit schema.sql directly
  2. Run ./setup-db.sh to test on a fresh database
  3. For existing databases, create a migration SQL file with the changes

Troubleshooting

"Database already exists"

  • The script will prompt you to drop and recreate
  • Or manually: dropdb midas && ./setup-db.sh midas

"Permission denied"

  • Make sure the script is executable: chmod +x setup-db.sh

Connection errors

  • Check PostgreSQL is running: psql -l
  • Verify connection parameters (PGHOST, PGPORT, PGUSER)
  • Check authentication in pg_hba.conf

Wrong table count

  • Should create exactly 6 tables
  • If count is wrong, check the schema.sql file for errors
  • Look at PostgreSQL logs for detailed error messages

Testing

MIDAS has comprehensive test coverage across unit tests and PostgreSQL integration tests.

Quick Start
# Run all tests (automatically starts database)
./test.sh

# Or use Make
make test
Test Categories
  • Unit Tests: Fast in-memory tests, no dependencies required
  • Integration Tests: Validate against real PostgreSQL database

See TESTING.md for detailed testing guide.

Common Commands
./test.sh           # All tests
./test.sh unit      # Unit tests only (fast, no database)
./test.sh db        # Integration tests only
./test.sh coverage  # Generate coverage report

Note: Integration tests require Docker. If Docker is not available, these tests will be automatically skipped.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEnvelopeNotFound = errors.New("envelope not found")

ErrEnvelopeNotFound is returned by Update when no row matches the given ID.

View Source
var ErrNilDB = errors.New("postgres db is nil")

Functions

func EnsureSchema

func EnsureSchema(db *sql.DB) error

EnsureSchema applies the MIDAS schema to the database. schema.sql is written with idempotent DDL (CREATE TABLE IF NOT EXISTS, CREATE INDEX IF NOT EXISTS, CREATE OR REPLACE VIEW) so this function is safe to call on every startup against an already-initialised database.

This is intentionally a simple bootstrap mechanism, not a migration system. schema.sql is the single source of truth for the database structure.

Types

type AgentRepo

type AgentRepo struct {
	// contains filtered or unexported fields
}

func NewAgentRepo

func NewAgentRepo(db sqltx.DBTX) (*AgentRepo, error)

func (*AgentRepo) Create

func (r *AgentRepo) Create(ctx context.Context, a *agent.Agent) error

func (*AgentRepo) GetByID

func (r *AgentRepo) GetByID(ctx context.Context, id string) (*agent.Agent, error)

func (*AgentRepo) List

func (r *AgentRepo) List(ctx context.Context) ([]*agent.Agent, error)

func (*AgentRepo) Update

func (r *AgentRepo) Update(ctx context.Context, a *agent.Agent) error

type ControlAuditRepo

type ControlAuditRepo struct {
	// contains filtered or unexported fields
}

ControlAuditRepo implements controlaudit.Repository against Postgres. All writes are INSERT-only; UPDATE and DELETE are never issued.

func NewControlAuditRepo

func NewControlAuditRepo(db sqltx.DBTX) (*ControlAuditRepo, error)

NewControlAuditRepo constructs a ControlAuditRepo. db must be non-nil.

func (*ControlAuditRepo) Append

Append inserts one control-plane audit record. The record is immutable after insert.

func (*ControlAuditRepo) List

List returns control-plane audit records newest-first, applying the filter constraints.

type EnvelopeRepo

type EnvelopeRepo struct {
	// contains filtered or unexported fields
}

EnvelopeRepo implements envelope.EnvelopeRepository against Postgres.

Schema v2.1 Column layout (operational_envelopes):

Section 1 — Identity:   id, request_source, request_id, schema_version
Section 2 — Submitted:  submitted_raw (JSONB), submitted_hash (TEXT), received_at
Section 3 — Resolved:   resolved_json (JSONB) + denormalized authority chain columns
Section 4 — Evaluation: state, outcome, reason_code, explanation_json (JSONB), evaluated_at
Section 5 — Integrity:  integrity_json (JSONB)
Review:                 review_json (JSONB)
Lifecycle:              created_at, updated_at, closed_at

Schema v2.1 denormalized authority chain:

resolved_surface_id, resolved_surface_version
resolved_profile_id, resolved_profile_version
resolved_grant_id, resolved_agent_id, resolved_subject_id

func NewEnvelopeRepo

func NewEnvelopeRepo(db sqltx.DBTX) (*EnvelopeRepo, error)

func (*EnvelopeRepo) Create

func (r *EnvelopeRepo) Create(ctx context.Context, e *envelope.Envelope) error

func (*EnvelopeRepo) GetByID

func (r *EnvelopeRepo) GetByID(ctx context.Context, id string) (*envelope.Envelope, error)

func (*EnvelopeRepo) GetByRequestID

func (r *EnvelopeRepo) GetByRequestID(ctx context.Context, requestID string) (*envelope.Envelope, error)

GetByRequestID retrieves by request_id only (legacy compatibility). For schema v2.1, prefer GetByRequestScope which uses (request_source, request_id).

func (*EnvelopeRepo) GetByRequestScope

func (r *EnvelopeRepo) GetByRequestScope(ctx context.Context, requestSource, requestID string) (*envelope.Envelope, error)

GetByRequestScope retrieves by (request_source, request_id) composite key. This is the preferred lookup method for schema v2.1 scoped idempotency.

func (*EnvelopeRepo) List

func (r *EnvelopeRepo) List(ctx context.Context) ([]*envelope.Envelope, error)

func (*EnvelopeRepo) ListByState

func (r *EnvelopeRepo) ListByState(ctx context.Context, state envelope.EnvelopeState) ([]*envelope.Envelope, error)

ListByState returns all envelopes in the given lifecycle state, ordered by creation time descending. An empty state returns all envelopes (same as List).

func (*EnvelopeRepo) Update

func (r *EnvelopeRepo) Update(ctx context.Context, e *envelope.Envelope) error

type GrantRepo

type GrantRepo struct {
	// contains filtered or unexported fields
}

func NewGrantRepo

func NewGrantRepo(db sqltx.DBTX) (*GrantRepo, error)

func (*GrantRepo) Create

func (*GrantRepo) FindActiveByAgentAndProfile

func (r *GrantRepo) FindActiveByAgentAndProfile(ctx context.Context, agentID, profileID string) (*authority.AuthorityGrant, error)

FindActiveByAgentAndProfile returns the active grant linking agentID to profileID. Schema v2.1: Checks status='active' AND effective_date <= now AND (expires_at IS NULL OR expires_at > now)

func (*GrantRepo) FindByID

func (r *GrantRepo) FindByID(ctx context.Context, id string) (*authority.AuthorityGrant, error)

func (*GrantRepo) ListByAgent

func (r *GrantRepo) ListByAgent(ctx context.Context, agentID string) ([]*authority.AuthorityGrant, error)

func (*GrantRepo) ListByProfile

func (r *GrantRepo) ListByProfile(ctx context.Context, profileID string) ([]*authority.AuthorityGrant, error)

func (*GrantRepo) Reactivate

func (r *GrantRepo) Reactivate(ctx context.Context, id string) error

Reactivate restores a suspended grant. Schema v2.1: Sets status='active' (only valid from suspended state)

func (*GrantRepo) Revoke

func (r *GrantRepo) Revoke(ctx context.Context, id string, revokedBy string) error

Revoke marks a grant as revoked and records revocation metadata. Schema v2.1: Sets status='revoked', revoked_at=NOW(), revoked_by=revokedBy

func (*GrantRepo) Suspend

func (r *GrantRepo) Suspend(ctx context.Context, id string) error

Suspend temporarily disables a grant without full revocation. Schema v2.1: Sets status='suspended'

func (*GrantRepo) Update

Update persists all mutable fields of a grant atomically. Used by grant lifecycle governance (suspend, revoke, reinstate) to write actor, reason, and timestamp fields in a single operation.

type LocalSessionRepo

type LocalSessionRepo struct {
	// contains filtered or unexported fields
}

LocalSessionRepo implements localiam.SessionRepository against Postgres.

func NewLocalSessionRepo

func NewLocalSessionRepo(db sqltx.DBTX) (*LocalSessionRepo, error)

func (*LocalSessionRepo) Create

func (*LocalSessionRepo) Delete

func (r *LocalSessionRepo) Delete(ctx context.Context, id string) error

func (*LocalSessionRepo) DeleteExpired

func (r *LocalSessionRepo) DeleteExpired(ctx context.Context) error

func (*LocalSessionRepo) FindByID

func (r *LocalSessionRepo) FindByID(ctx context.Context, id string) (*localiam.Session, error)

type LocalUserRepo

type LocalUserRepo struct {
	// contains filtered or unexported fields
}

LocalUserRepo implements localiam.UserRepository against Postgres.

func NewLocalUserRepo

func NewLocalUserRepo(db sqltx.DBTX) (*LocalUserRepo, error)

func (*LocalUserRepo) Count

func (r *LocalUserRepo) Count(ctx context.Context) (int, error)

func (*LocalUserRepo) Create

func (r *LocalUserRepo) Create(ctx context.Context, u *localiam.User) error

func (*LocalUserRepo) FindByID

func (r *LocalUserRepo) FindByID(ctx context.Context, id string) (*localiam.User, error)

func (*LocalUserRepo) FindByUsername

func (r *LocalUserRepo) FindByUsername(ctx context.Context, username string) (*localiam.User, error)

func (*LocalUserRepo) Update

func (r *LocalUserRepo) Update(ctx context.Context, u *localiam.User) error

type OutboxRepo

type OutboxRepo struct {
	// contains filtered or unexported fields
}

OutboxRepo is the Postgres-backed implementation of outbox.Repository.

Every write method must be called with a db instance that is bound to the current database transaction. The outbox row and the domain row must commit together; rolling back the transaction removes both.

func NewOutboxRepo

func NewOutboxRepo(db sqltx.DBTX) (*OutboxRepo, error)

NewOutboxRepo constructs an OutboxRepo using the supplied DBTX, which may be a *sql.DB for out-of-transaction reads or a *sql.Tx for transactional writes.

func (*OutboxRepo) Append

func (r *OutboxRepo) Append(ctx context.Context, ev *outbox.OutboxEvent) error

Append inserts a single outbox event row. The row inherits the surrounding transaction: if the transaction is rolled back, the row is removed.

func (*OutboxRepo) ClaimUnpublished

func (r *OutboxRepo) ClaimUnpublished(ctx context.Context, limit int) ([]*outbox.OutboxEvent, error)

ClaimUnpublished returns up to limit unpublished rows using SELECT FOR UPDATE SKIP LOCKED, ordered by created_at ASC, id ASC.

When the underlying db is a *sql.DB, ClaimUnpublished opens an internal short-lived transaction: it acquires row-level locks, reads the rows into memory, and immediately commits (releasing the locks). This prevents a concurrent dispatcher instance from claiming the same rows during the same poll window.

When the underlying db is already a *sql.Tx, ClaimUnpublished runs the locking SELECT directly on that transaction; lock lifetime is controlled by the caller.

func (*OutboxRepo) ListUnpublished

func (r *OutboxRepo) ListUnpublished(ctx context.Context) ([]*outbox.OutboxEvent, error)

ListUnpublished returns all rows where published_at IS NULL, ordered by created_at ascending. Dispatcher implementations call this to find events awaiting delivery.

func (*OutboxRepo) MarkPublished

func (r *OutboxRepo) MarkPublished(ctx context.Context, id string) error

MarkPublished sets published_at to the current UTC time for the event with the given ID. Returns an error if the row does not exist.

type ProfileRepo

type ProfileRepo struct {
	// contains filtered or unexported fields
}

func NewProfileRepo

func NewProfileRepo(db sqltx.DBTX) (*ProfileRepo, error)

func (*ProfileRepo) Create

func (*ProfileRepo) FindActiveAt

func (r *ProfileRepo) FindActiveAt(ctx context.Context, id string, at time.Time) (*authority.AuthorityProfile, error)

FindActiveAt resolves the active version where:

  • status = 'active'
  • effective_date <= at
  • (effective_until IS NULL OR effective_until > at)

Schema v2.1: Now checks status field in addition to date range.

func (*ProfileRepo) FindByID

FindByID returns the latest version of a profile by its logical ID.

func (*ProfileRepo) FindByIDAndVersion

func (r *ProfileRepo) FindByIDAndVersion(ctx context.Context, id string, version int) (*authority.AuthorityProfile, error)

FindByIDAndVersion retrieves a specific profile version.

func (*ProfileRepo) ListBySurface

func (r *ProfileRepo) ListBySurface(ctx context.Context, surfaceID string) ([]*authority.AuthorityProfile, error)

func (*ProfileRepo) ListVersions

func (r *ProfileRepo) ListVersions(ctx context.Context, id string) ([]*authority.AuthorityProfile, error)

ListVersions returns all versions of a profile ordered by version DESC.

func (*ProfileRepo) Update

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(db *sql.DB, metrics store.TransactionRecorder) (*Store, error)

func (*Store) Repositories

func (s *Store) Repositories() (*store.Repositories, error)

Repositories returns repositories bound to the base DB connection. Use this for read operations that do not require a transaction.

func (*Store) WithTx

func (s *Store) WithTx(ctx context.Context, operation string, fn func(*store.Repositories) error) (err error)

WithTx executes fn with repositories bound to a transaction. operation should describe the business workflow (e.g., "evaluation", "review", "admin_update").

type SurfaceRepo

type SurfaceRepo struct {
	// contains filtered or unexported fields
}

func NewSurfaceRepo

func NewSurfaceRepo(db sqltx.DBTX) (*SurfaceRepo, error)

func (*SurfaceRepo) Create

func (*SurfaceRepo) FindActiveAt

func (r *SurfaceRepo) FindActiveAt(ctx context.Context, id string, at time.Time) (*surface.DecisionSurface, error)

func (*SurfaceRepo) FindByIDVersion

func (r *SurfaceRepo) FindByIDVersion(ctx context.Context, id string, version int) (*surface.DecisionSurface, error)

FindByIDVersion returns a specific version

func (*SurfaceRepo) FindLatestByID

func (r *SurfaceRepo) FindLatestByID(ctx context.Context, id string) (*surface.DecisionSurface, error)

FindLatestByID returns the latest version (renamed from FindByID)

func (*SurfaceRepo) ListAll

func (r *SurfaceRepo) ListAll(ctx context.Context) ([]*surface.DecisionSurface, error)

ListAll returns the latest version of each surface (renamed from List)

func (*SurfaceRepo) ListByDomain

func (r *SurfaceRepo) ListByDomain(ctx context.Context, domain string) ([]*surface.DecisionSurface, error)

ListByDomain returns surfaces (latest version) in given domain

func (*SurfaceRepo) ListByStatus

func (r *SurfaceRepo) ListByStatus(ctx context.Context, status surface.SurfaceStatus) ([]*surface.DecisionSurface, error)

ListByStatus returns surfaces (latest version) with given status

func (*SurfaceRepo) ListVersions

func (r *SurfaceRepo) ListVersions(ctx context.Context, id string) ([]*surface.DecisionSurface, error)

ListVersions returns all versions of a surface ordered by version ascending.

func (*SurfaceRepo) Search

Search finds surfaces (latest version) matching criteria. NOTE: This is a simplified implementation that filters by domain and status. Full tag/taxonomy/category filtering requires additional schema columns.

func (*SurfaceRepo) Update

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL