core

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2025 License: MIT Imports: 72 Imported by: 0

Documentation

Overview

Package core implements all core functionalities shared across the Data Master project, serving as the backbone for both the CLI and data processing components.

It provides reusable integrations with AWS services (S3, DynamoDB, Glue, DMS, Lake Formation), handles processing control logic, data catalog synchronization, infrastructure operations, benchmarking support, and orchestration utilities.

All orchestration logic, resource abstraction, and operational primitives used by the CLI commands and ECS/Lambda workers are defined in this package, enabling consistency, automation, and decoupling across the Medallion data pipeline.

Index

Constants

This section is empty.

Variables

View Source
var SupportedStreams = map[string]func(SimOptions){
	"profile": SimulateProfileStream,
	"review":  SimulateReviewStream,
}

Functions

func CastPgType

func CastPgType(pgattribute dialect.PgAttribute, pgtype dialect.PgType, db *gorm.DB) (def string)

CastPgType converts a PostgreSQL type (from pg_type and pg_attribute) into a logical representation used for Glue, Iceberg, or other downstream systems.

It handles scalar types, arrays, enums, user-defined types, and falls back based on category.

Parameters:

  • pgattribute: the pg_attribute entry for the column.
  • pgtype: the pg_type definition for the column's data type.
  • db: GORM DB instance used for recursive type resolution (e.g., array elements).

Returns:

  • string: logical type (e.g., "string", "int", "timestamp", "map", "array").

Panics:

  • if a type cannot be mapped or an array element's type fails to load.

func CloseConnection

func CloseConnection() error

CloseConnection gracefully closes the underlying database connection if it's open.

Returns:

  • error: if fetching the raw connection or closing it fails.

func ConfigureS3Notification

func ConfigureS3Notification(ctx context.Context, s3Client *s3.Client, lambdaClient *lambda.Client) error

ConfigureS3Notification sets up an S3 event notification to trigger a Lambda function when `.gz` files are uploaded to the `raw/` prefix of the staging bucket.

It first grants invoke permission to the S3 service on the specified Lambda, and then configures the notification rules on the bucket. If the permission already exists, it skips that step.

Parameters:

  • ctx: The context for the AWS SDK requests.
  • s3Client: A configured S3 client used to set the bucket notification.
  • lambdaClient: A configured Lambda client used to add invoke permissions.

Returns:

  • error: Any error encountered while configuring the notification or permission. Returns nil if successful.

func ConvertPgAttributesToGlueColumns

func ConvertPgAttributesToGlueColumns(layerType string, class *dialect.PgClass, db *gorm.DB) (columns []types.Column)

ConvertPgAttributesToGlueColumns converts a loaded PgClass (PostgreSQL table definition) into a slice of AWS Glue Column definitions compatible with Glue Catalog.

Behavior:

  • For the "bronze" layer, all columns are forced to type "string", and an additional column named "operation" is injected at the end.
  • For "silver" and "gold", types are inferred from PostgreSQL definitions.

Parameters:

  • layerType: lakehouse layer ("bronze", "silver", or "gold").
  • class: pointer to PgClass containing attributes.
  • db: GORM database connection for resolving PostgreSQL types.

Returns:

  • []types.Column: list of Glue-compatible column definitions.

func CreateGrafanaToken

func CreateGrafanaToken(cfg aws.Config, ttlSeconds int32) (string, error)

CreateGrafanaToken generates a temporary API token for a service account in an AWS-managed Grafana workspace.

It retrieves the workspace ID from the CloudFormation stack, ensures that the required service account exists, and then creates a new token with a unique name and the specified TTL (time-to-live).

Parameters:

  • cfg: AWS configuration used to authenticate and interact with the Grafana service.
  • ttlSeconds: time-to-live for the token, in seconds.

Returns:

  • string: the token key that can be used to authenticate Grafana API requests.
  • error: an error if the token creation fails at any step.

func CreateLfTag

func CreateLfTag(cfg aws.Config) error

CreateLfTag creates an AWS Lake Formation tag with the key "classification" and value "PII".

This tag can be used to classify sensitive data, such as fields containing Personally Identifiable Information (PII), for access control and governance purposes.

Parameters:

  • cfg: aws.Config - The AWS SDK configuration used to initialize the Lake Formation client.

Returns:

  • error: Any error encountered during tag creation, or nil if the operation was successful.

func CsvToMap

func CsvToMap(r io.Reader, model any) ([]map[string]any, error)

CsvToMap reads a CSV stream and converts each valid row into a map[string]any.

This function parses the CSV header to determine column names and processes each row, trimming whitespace and converting empty strings to `nil`. Each row is validated using the provided model via `ValidateRow`. Invalid rows are skipped with warnings.

Parameters:

  • r: An `io.Reader` containing the CSV data.
  • model: A reference model used to validate each row's structure and content.

Returns:

  • []map[string]any: A slice of validated row maps with column names as keys.
  • error: An error if reading the header fails.

func DeployAllStacks

func DeployAllStacks(templates, artifacts, scripts embed.FS) error

DeployAllStacks deploys all the predefined CloudFormation stacks in logical order. It uses DeployStack internally and stops at the first encountered failure.

Parameters:

  • templates: the embedded file system with all template definitions.
  • artifacts: the embedded file system with deployment artifacts.
  • scripts: the embedded file system with python scripts.

Returns:

  • error: if any stack deployment fails.

func DeployLambdaFromArtifact

func DeployLambdaFromArtifact(artifacts embed.FS, name string, memory, timeout int) error

DeployLambdaFromArtifact uploads the specified Lambda ZIP artifact to S3 and deploys the function. If the function does not exist, it is created. If it exists, its code and configuration are updated.

Parameters:

  • artifacts: an embedded filesystem containing the ZIP artifacts.
  • name: the name of the Lambda function (must match the artifact filename without extension).
  • memory: the memory size in MB to assign to the Lambda.
  • timeout: the timeout in seconds to assign to the Lambda.

Returns:

  • error: if any step in the deployment process fails.

func DeployStack

func DeployStack(stack *Stack, templates, artifacts, scripts embed.FS) error

DeployStack deploys or updates a specific CloudFormation stack using the provided template.

It handles creation, update (when it already exists), and waits for completion.

Parameters:

  • stack: the stack definition (name, params, etc.).
  • templates: embedded file system containing template files.
  • artifacts: embedded file system containing deployment artifacts.
  • scripts: embedded file system containing python scripts.

Returns:

  • error: if the stack creation or update fails.

func DetectPIIFields

func DetectPIIFields(cfg aws.Config, ctx context.Context, sample []map[string]any) ([]string, error)

DetectPIIFields analyzes sample data to identify fields that likely contain personally identifiable information (PII).

It uses AWS Comprehend to evaluate the combined text of each field across multiple records. A field is considered PII if at least 50% of its non-empty values are detected as PII with an average confidence score of 0.7 or higher.

Parameters:

  • cfg: aws.Config - AWS configuration used to initialize the Comprehend client.
  • ctx: context.Context - Context for the Comprehend API call.
  • sample: []map[string]any - Sample data used to evaluate potential PII fields.

Returns:

  • []string: A list of field names detected as containing PII.
  • error: An error if the detection fails.

func DownloadS3Object

func DownloadS3Object(ctx context.Context, cfg aws.Config, bucket, key string) (*s3.GetObjectOutput, error)

DownloadS3Object retrieves an object from an S3 bucket using the given bucket and key.

It uses the provided AWS configuration to create an S3 client and performs a `GetObject` call.

Parameters:

  • ctx: Context for the request.
  • cfg: AWS configuration used to create the S3 client.
  • bucket: Name of the S3 bucket.
  • key: Key (path) of the object to retrieve.

Returns:

  • *s3.GetObjectOutput: The S3 object output, including metadata and the object's body.
  • error: Any error encountered during the download operation. Returns nil if successful.

func EnsureECSServiceLinkedRole

func EnsureECSServiceLinkedRole(cfg aws.Config) error

EnsureECSServiceLinkedRole checks for the existence of the ECS service-linked role and creates it if missing.

Parameters:

  • cfg: the AWS configuration used to initialize the IAM client.

Returns:

  • error: an error if the role creation fails or IAM access is misconfigured.

func ExtractNameFromArn

func ExtractNameFromArn(arn string) string

ExtractNameFromArn extracts the resource name from a full AWS ARN.

func FetchPendingTables

func FetchPendingTables(cfg aws.Config, schema string) ([]string, error)

FetchPendingTables returns a list of unique table names that have pending items for the given schema name in the dm-processing-control table.

Parameters:

  • cfg: AWS configuration used to connect to DynamoDB.
  • schema: like "dm_silver" or "dm_gold"

Returns:

  • []string: list of table names
  • error: in case of failure

func FindOrCreateServiceAccount

func FindOrCreateServiceAccount(cfg aws.Config, workspaceID string) (string, error)

FindOrCreateServiceAccount locates an existing service account in a Grafana workspace or creates a new one if it doesn't exist.

It searches for a service account with the predefined name and returns its ID. If not found, it creates a new service account with `admin` privileges.

Parameters:

  • cfg: AWS configuration used for authenticating Grafana service calls.
  • workspaceID: ID of the Grafana workspace where the service account should be found or created.

Returns:

  • string: the ID of the existing or newly created service account.
  • error: an error if the listing or creation of the service account fails.

func GetAWSConfig

func GetAWSConfig() aws.Config

GetAWSConfig returns the currently persisted AWS configuration. If the configuration has not been initialized, it returns an empty aws.Config and any AWS SDK client using it will likely fail unless initialized via PersistAWSConfig.

func GetCallerIdentity

func GetCallerIdentity(ctx context.Context, cfg aws.Config) (*sts.GetCallerIdentityOutput, error)

GetCallerIdentity returns the identity of the current AWS caller, including the account, ARN, and user ID.

Parameters:

  • ctx: context for cancellation and timeout.
  • cfg: an initialized AWS configuration.

Returns:

  • *sts.GetCallerIdentityOutput: information about the caller identity.
  • error: if the STS call fails.

func GetConnection

func GetConnection() (*gorm.DB, error)

GetConnection returns a singleton *gorm.DB instance, initializing it on first use.

Returns:

  • *gorm.DB: the initialized database connection.
  • error: if connection initialization fails.

func GetMaxAttempts

func GetMaxAttempts() int

GetMaxAttempts returns the maximum number of processing attempts allowed. It reads the MAX_ATTEMPTS environment variable and falls back to a default value if the variable is unset, invalid, or less than or equal to zero.

Returns:

  • int: the configured or default maximum number of attempts.

func GetStorageLocation

func GetStorageLocation(layerType, tableName string) (string, error)

GetStorageLocation returns the S3 path for storing a table's data, based on its schema and corresponding lakehouse layer.

The location follows the structure:

s3://<DataLakeBucketName>/<layerType>/<tableName>/

Parameters:

  • schemaName: PostgreSQL schema name (e.g., "core", "view", or "mart").
  • tableName: name of the table.

Returns:

  • string: fully qualified S3 path.
  • error: if fetching stack outputs or mapping schema fails.

func GrantDataLocationAccess

func GrantDataLocationAccess(cfg aws.Config) error

GrantDataLocationAccess grants Lake Formation data location access to the Glue service role.

It retrieves the data lake bucket name and the Glue service role ARN from CloudFormation stack outputs, then issues a GrantPermissions call to Lake Formation, allowing Glue to access the specified S3 bucket.

Parameters:

  • cfg: AWS configuration used to create the Lake Formation client and access stack outputs.

Returns:

  • error: An error if retrieving outputs or granting permissions fails.

func LayerToSchema

func LayerToSchema(layerType string) string

LayerToSchema maps a lakehouse layer name (bronze, silver, or gold) to its corresponding PostgreSQL schema name.

Parameters:

  • layerType: the logical lakehouse layer ("bronze", "silver", or "gold").

Returns:

  • string: the associated PostgreSQL schema name.

Panics:

  • if the provided layerType is not recognized.

func LoadAWSConfig

func LoadAWSConfig(profile, accessKey, secretKey, region string) (aws.Config, error)

LoadAWSConfig loads an AWS configuration based on the input parameters. It supports named profiles, static credentials, or defaults.

Parameters:

  • profile: AWS profile name (overrides other methods if set).
  • accessKey: AWS access key ID (used only if profile is empty).
  • secretKey: AWS secret access key (used only if profile is empty).
  • region: AWS region (defaults to "us-east-1" if empty).

Returns:

  • aws.Config: the loaded AWS configuration.
  • error: if loading the configuration fails.

func LoadAuroraTablesWithColumns

func LoadAuroraTablesWithColumns(db *gorm.DB, schema string, tables ...string) (classes []dialect.PgClass, err error)

LoadAuroraTablesWithColumns queries PostgreSQL system catalogs to load user-defined tables, their columns, and data types from a given schema.

Optionally filters by table name. Returns a slice of PgClass structs populated with PgAttributes.

Parameters:

  • db: GORM database connection to the Aurora PostgreSQL instance.
  • schema: name of the schema to inspect.
  • tables: optional list of specific table names.

Returns:

  • []dialect.PgClass: table metadata with column details.
  • error: if the query fails.

func LoadCsvToAurora

func LoadCsvToAurora(r io.Reader, name string) error

LoadCsvToAurora reads CSV data from a reader, parses it into a slice of records, and inserts the data into the corresponding Aurora PostgreSQL table.

Parameters:

  • r: Reader containing CSV data (typically a gzip-decompressed CSV).
  • name: Name of the table to insert the data into (e.g., "brewery", "beer").

Returns:

  • error: If any step fails (e.g., DB connection, model loading, CSV parsing, or insertion), an error is returned.

func LoadRawS3Data

func LoadRawS3Data(cfg aws.Config, ctx context.Context, item *ProcessingControl) ([]map[string]any, error)

LoadRawS3Data downloads and parses a raw data file from S3 based on the metadata in ProcessingControl.

The file is expected to be compressed with gzip and stored in the `StageBucket`. It supports CSV and JSON formats.

Parameters:

  • cfg: AWS configuration used to access S3.
  • ctx: Context for the request.
  • item: ProcessingControl containing metadata like ObjectKey and FileFormat.

Returns:

  • []map[string]any: A slice of parsed records, where each record is a key-value map.
  • error: Any error encountered during the process (S3 access, decompression, or parsing).

func LoginToECRWithSDK

func LoginToECRWithSDK(cfg aws.Config) error

LoginToECRWithSDK authenticates the Docker CLI with AWS ECR using credentials retrieved via the AWS SDK. It fetches the ECR authorization token, decodes it, and performs a `docker login` using `--password-stdin`.

Parameters:

  • cfg: the AWS configuration used to create the ECR client.

Returns:

  • error: an error if the token retrieval, decoding, or Docker login fails.

func MaskPIIData

func MaskPIIData(cfg aws.Config, ctx context.Context, data []map[string]any) ([]map[string]any, error)

MaskPIIData scans and masks personally identifiable information (PII) fields in the provided dataset.

It uses a sample of the data to detect potential PII fields and then masks their values across the entire dataset.

Parameters:

  • cfg: aws.Config - AWS configuration used for service clients.
  • ctx: context.Context - Context for the AWS operations.
  • data: []map[string]any - The dataset where PII fields may be masked.

Returns:

  • []map[string]any: The dataset with masked PII values (if any).
  • error: An error if PII detection or masking fails.

func MaskValue

func MaskValue(val any) any

MaskValue returns a masked version of the input value for PII obfuscation.

The function converts the input to a string and applies masking as follows:

  • If the string is empty, returns an empty string.
  • If the string has 4 or fewer characters, replaces all characters with asterisks.
  • If the string has more than 4 characters, keeps the first and last characters and replaces the middle with asterisks.

Parameters:

  • val: any - The value to be masked.

Returns:

  • any: The masked string, or the original value if it could not be converted.

func NameWithPrefix

func NameWithPrefix(name string) string

NameWithPrefix applies the default project prefix (e.g., "dm") to a schema, database, or layer name.

func PersistAWSConfig

func PersistAWSConfig(profile, accessKey, secretKey, region string) (err error)

PersistAWSConfig loads and persists the AWS configuration globally, based on the provided profile or static credentials. If both profile and static credentials are provided, the profile takes precedence.

Parameters:

  • profile: name of the AWS named profile to load.
  • accessKey: AWS access key ID (used if profile is empty).
  • secretKey: AWS secret access key (used if profile is empty).
  • region: AWS region to use (default is "us-east-1" if empty).

Returns:

  • error: if loading the configuration fails.

func PrintBenchmarkReport

func PrintBenchmarkReport(result BenchmarkResult)

PrintBenchmarkReport displays the results of a benchmark execution.

This function prints a formatted report with key metrics such as implementation type, orchestration time, task execution time, file paths, and performance statistics (e.g., CSV read time, Parquet write time, memory usage). It gracefully handles missing fields and only prints populated values.

The function is intended to be called after decoding a BenchmarkResult, typically produced by an ECS task, Glue Job, or EMR Serverless job.

Parameters:

  • result: BenchmarkResult containing all relevant benchmark metrics.

Returns:

  • none

func PublishDockerImages

func PublishDockerImages(cfg aws.Config, artifacts embed.FS, images ...string) (map[string]string, error)

PublishDockerImages loads, tags, and pushes Docker images from embedded .tar artifacts to AWS ECR. If image names are provided, only those will be published. Otherwise, all .tar files in the artifacts directory are processed. The function requires Docker to be installed and available in the system PATH.

Parameters:

  • cfg: the AWS configuration used for ECR authentication and account resolution.
  • artifacts: the embedded filesystem containing Docker .tar files.
  • images: optional list of image base names (without extension) to publish.

Returns:

  • map[string]string: a mapping of image names to their full ECR URIs.
  • error: an error if any step in the publishing process fails.

func PushAllDashboards

func PushAllDashboards(fs embed.FS) error

PushAllDashboards reads all JSON dashboards from the embedded filesystem and pushes them to Grafana.

Parameters:

  • fs: the embedded filesystem containing JSON dashboard files under the "dashboards" directory.

Returns:

  • error: an error if reading the directory or pushing any dashboard fails.

func PushDashboard

func PushDashboard(name string, fs embed.FS) error

PushDashboard uploads a JSON dashboard to Grafana, replacing its datasource placeholder.

Parameters:

  • name: the name of the dashboard file (without .json extension) located in the "dashboards" folder.
  • fs: the embedded filesystem containing the dashboard definitions.

Returns:

  • error: an error if the dashboard file is missing, the datasource is not created, or the upload fails.

func PutDynamoDBItem

func PutDynamoDBItem(cfg aws.Config, ctx context.Context, item *ProcessingControl) error

PutDynamoDBItem stores a ProcessingControl item in the DynamoDB table "dm-processing-control". It marshals the item into a DynamoDB attribute map and performs a PutItem operation.

Parameters:

  • cfg: the AWS configuration used to connect to DynamoDB.
  • ctx: the context for the AWS request.
  • item: the ProcessingControl struct to persist.

Returns:

  • error: an error if the item fails to be marshaled or inserted into DynamoDB.

func QueryByObjectKey

func QueryByObjectKey(cfg aws.Config, ctx context.Context, objectKey string, md any) error

QueryByObjectKey retrieves a single item from the ProcessingControl table using the object_key GSI. It queries the appropriate DynamoDB index and unmarshals the result into the provided destination.

Parameters:

  • cfg: the AWS configuration used to connect to DynamoDB.
  • ctx: the context for the AWS request.
  • objectKey: the S3 object key to query.
  • md: a pointer to a struct where the result will be unmarshaled.

Returns:

  • error: an error if the query fails, no record is found, or unmarshaling fails.

func RunAllMigrations

func RunAllMigrations(fs embed.FS) error

RunAllMigrations executes all predefined SQL migration scripts in order.

It runs the core, view, and mart scripts sequentially using the embedded filesystem.

Parameters:

  • fs: Embedded filesystem containing the SQL migration scripts.

Returns:

  • error: An error if any of the migration scripts fail to execute.

func RunBenchmarkPostDeployment

func RunBenchmarkPostDeployment(cfg aws.Config, scripts embed.FS) error

RunBenchmarkPostDeployment performs post-deployment setup for the benchmark environment.

This function uploads the required benchmark input file and the Python scripts (`emr.py` and `glue.py`) to the appropriate S3 bucket. It assumes that the infrastructure stack has already been deployed and that the bucket output is available.

Parameters:

  • cfg: AWS configuration used to access S3.
  • scripts: embedded file system containing the Python scripts to be uploaded.

Returns:

  • error: if any step (retrieving the bucket, uploading input or scripts) fails.

func RunEcsBenchmark

func RunEcsBenchmark() error

RunEcsBenchmark executes a benchmark task in ECS using a Go-based container image.

The function retrieves necessary stack outputs such as ECS cluster name, task definition, subnets, and security group from CloudFormation. It then runs a Fargate task configured with environment variables pointing to input, output, and result file paths in S3.

The ECS task reads a `.csv.gz` file, writes the output as a Parquet file, and uploads a JSON result file with metrics. The function downloads this result, decodes it, and prints a benchmark report to stdout.

Parameters:

  • none

Returns:

  • error: an error if any step in the ECS task lifecycle or S3 download fails.

func RunEmrBenchmark

func RunEmrBenchmark() error

RunEmrBenchmark executes a PySpark benchmark using EMR Serverless.

This function starts a job run on an existing EMR Serverless application, submitting a benchmark script located in S3 (`scripts/test.py`) and passing input/output/result paths as arguments. The script processes a `.csv.gz` file and outputs a Parquet file along with a JSON file containing performance metrics.

After the job finishes, the result JSON is downloaded, decoded, and a benchmark report is printed to stdout. The total elapsed time is also measured and included in the report.

Parameters:

  • none

Returns:

  • error: if the job fails to start, complete, or if the result cannot be downloaded or parsed.

func RunGlueBenchmark

func RunGlueBenchmark() error

RunGlueBenchmark executes a benchmark using a PySpark Glue Job.

This function launches a managed AWS Glue job that processes a `.csv.gz` input file from S3 and writes the result as a Parquet file. It also outputs a result JSON with benchmark metrics. The benchmark job is named "dm-benchmark-glue" and is parameterized with input, output, and result paths.

After the job completes, the result JSON is downloaded, parsed, and printed as a report. The function measures the total elapsed time of the benchmark and includes it in the report.

Parameters:

  • none

Returns:

  • error: if the Glue job fails to start, complete, or the result cannot be downloaded or parsed.

func RunMigration

func RunMigration(fs embed.FS, script string) error

RunMigration executes an SQL script embedded via embed.FS against the Aurora PostgreSQL database.

Parameters:

  • fs: embedded file system containing SQL scripts.
  • script: path to the SQL script within the embedded FS.

Behavior:

  • Reads the script file from the embedded FS.
  • Opens a connection to Aurora using GetConnection.
  • Executes the SQL content (removing optional BOM prefix if present).

Returns:

  • error: if reading the script, connecting to the DB, or executing the SQL fails.

func SampleData

func SampleData(data []map[string]any, n int) []map[string]any

SampleData returns a subset of the input data containing at most `n` elements.

If the length of the input slice is less than `n`, it returns the entire slice. Otherwise, it returns the first `n` elements.

Parameters:

  • data: []map[string]any - The input dataset.
  • n: int - The maximum number of elements to return.

Returns:

  • []map[string]any: A subslice of the original dataset with at most `n` entries.

func SchemaToLayer

func SchemaToLayer(schema string) string

SchemaToLayer maps a PostgreSQL schema name to its corresponding logical lakehouse layer name ("bronze", "silver", or "gold").

Parameters:

  • schema: the physical PostgreSQL schema name (e.g., "dm_core", "dm_view", "dm_mart").

Returns:

  • string: the associated logical lakehouse layer name.

Panics:

  • if the provided schema name is not recognized.

func SeedFile

func SeedFile(name string) error

SeedFile downloads a compressed CSV dataset from GitHub and seeds it.

Parameters:

  • name: Name of the dataset (e.g., "brewery", "beer").

Returns:

  • error: An error if the download, read, or seeding fails; otherwise, nil.

func SeedFromReader

func SeedFromReader(data []byte, name string) error

SeedFromReader seeds a dataset into Aurora or uploads it to S3, depending on the table name.

It checks for duplicates by querying the ProcessingControl table using the schema ("dm_bronze") and table name. If an entry already exists, the operation is skipped.

Parameters:

  • data: Compressed CSV content in bytes (gzip).
  • name: Dataset name. Must be one of: "brewery", "beer", "profile", or "review".

Returns:

  • error: An error if decompression, duplicate check, or seeding/upload fails.

func SeedThemAll

func SeedThemAll() error

SeedThemAll seeds all predefined datasets sequentially.

This function iterates over a hardcoded list of dataset names and calls `SeedFile` for each one. If any dataset fails to seed, the process is aborted.

Returns:

  • error: An error if any seeding operation fails; otherwise, nil.

func SimulateProfileStream

func SimulateProfileStream(opts SimOptions)

SimulateProfileStream simulates the customer scenario only

func SimulateReviewStream

func SimulateReviewStream(opts SimOptions)

SimulateReviewStream simulates the purchase scenario (customer + purchase + delivery)

func StartReplication

func StartReplication() error

StartReplication initiates or resumes an AWS DMS replication task based on its current status. If the task is in "ready" state, it starts the task from the beginning. If the task is "stopped", it resumes processing. If already "running", no action is taken. The function waits until the task reaches the "running" state before returning.

Returns:

  • error: an error if the task cannot be described, started, resumed, or confirmed as running.

func StreamCsvToParquet

func StreamCsvToParquet(r io.Reader, model bronze.Model) (*bytes.Buffer, error)

StreamCsvToParquet reads CSV data from an input stream, parses it into model instances, and encodes the result into a Parquet-formatted buffer using concurrent workers.

This function uses the CSV header to map column names to indices and processes each record concurrently. Each line is converted into a model instance via `FromCSV`. The resulting records are written to a Parquet buffer using Snappy compression.

Parameters:

  • r: CSV input stream (typically a .csv.gz file decompressed).
  • model: An implementation of the bronze.Model interface used to instantiate records.

Returns:

  • *bytes.Buffer: A buffer containing the resulting Parquet data.
  • error: An error if any step in the parsing or writing process fails.

func StreamJsonToParquet

func StreamJsonToParquet(r io.Reader, model bronze.Model) (*bytes.Buffer, error)

func SyncCatalogFromDatabaseSchema

func SyncCatalogFromDatabaseSchema(db *gorm.DB, layerType string, tableList ...string) error

SyncCatalogFromDatabaseSchema inspects a lakehouse layer (bronze, silver, or gold), loads metadata from the corresponding Aurora PostgreSQL schema, and ensures each table is registered as an Iceberg table in the AWS Glue Catalog.

Parameters:

  • layerType: logical layer name ("bronze", "silver", or "gold").
  • tableList: optional list of table names to filter.

Returns:

  • error: if any step of the synchronization process fails.

func SyncGlueTable

func SyncGlueTable(layerType, tableName string, columns []types.Column) error

SyncGlueTable creates or updates a Glue table in the AWS Glue Catalog, using the appropriate format based on the lakehouse layer.

For the "bronze" layer, it creates a standard Hive table using Parquet, compatible with Athena and Glue without Iceberg dependencies.

For the "silver" and "gold" layers, it creates a native Iceberg table, enabling support for ACID operations, versioning, and time travel.

Parameters:

  • layerType: the lakehouse layer ("bronze", "silver", or "gold").
  • tableName: the name of the table to create or update.
  • columns: a slice of Glue-compatible column definitions.

Returns:

  • error: if creation or update fails.

func TagColumnAsPII

func TagColumnAsPII(cfg aws.Config, schema, table, column string) error

TagColumnAsPII applies the Lake Formation tag "classification=PII" to a specific column of a table.

This is useful for marking columns that contain Personally Identifiable Information (PII) so that access control and data governance rules can be applied accordingly.

Parameters:

  • cfg: aws.Config - The AWS SDK configuration used to initialize the Lake Formation client.
  • schema: string - The name of the AWS Glue database (schema).
  • table: string - The name of the table within the database.
  • column: string - The name of the column to tag.

Returns:

  • error: Any error encountered while tagging the column, or nil if successful.

func TriggerProcessing

func TriggerProcessing(cfg aws.Config, layer string, tables []string) error

TriggerProcessing starts a Step Function execution for the specified layer and optional tables.

Parameters:

  • cfg: AWS configuration used to connect to the Step Functions service.
  • layer: processing layer (e.g. "silver", "gold").
  • tables: optional list of table names.

Returns:

  • error: any error that occurs while triggering the pipeline.

func UnmarshalRecords

func UnmarshalRecords(model any, data []map[string]any) (any, error)

UnmarshalRecords converts a slice of generic maps into a typed slice using reflection.

This function uses the type of the `model` parameter to infer the target slice type, allocates a new slice of that type, e preenche os valores usando `misc.Copier`.

Parameters:

  • model: An instance of the target struct type (e.g., &MyStruct{}) used to determine the slice element type.
  • data: A slice of `map[string]any`, typically obtained from JSON or dynamic decoding.

Returns:

  • any: A pointer to a slice of the inferred type, e.g., *[]MyStruct.
  • error: An error if the conversion fails.

func UploadArtifacts

func UploadArtifacts(artifacts embed.FS, functions ...string) error

UploadArtifacts uploads one or more embedded Lambda ZIP artifacts to the configured S3 bucket.

Parameters:

  • artifacts: an embedded filesystem containing the ZIP artifacts.
  • functions: optional list of function names to upload (without ".zip"); if empty, all ZIPs are uploaded.

Returns:

  • error: if reading embedded files or uploading to S3 fails.

func UploadBatchFile

func UploadBatchFile(reader io.Reader, tableName string) error

UploadBatchFile uploads a batch data file to the raw ingestion path in S3.

The uploaded object is placed under the "raw/{tableName}/" prefix with a unique name based on timestamp and UUID.

Parameters:

  • reader: io.Reader providing the compressed file content (e.g., gzip).
  • tableName: Name of the logical table associated with the batch file.

Returns:

  • error: If the upload fails due to stack resolution, read error, or S3 operation.

func UploadBenchmarkInput

func UploadBenchmarkInput() error

UploadBenchmarkInput downloads the benchmark CSV file and uploads it to the benchmark S3 bucket.

This function fetches the file `review.csv.gz` from a public GitHub repository and uploads it to the `input/` prefix of the S3 bucket defined in the benchmark CloudFormation stack. It ensures the file is downloaded successfully, fully read into memory, and then stored in S3.

Parameters:

  • none

Returns:

  • error: if the file download, read, or upload to S3 fails.

func UploadDataToS3

func UploadDataToS3(cfg aws.Config, ctx context.Context, bucket, key string, data []byte) error

UploadDataToS3 uploads a byte slice to a specified S3 bucket and key.

It creates an S3 client from the provided AWS config and sends the data using `PutObject`.

Parameters:

  • cfg: The AWS configuration used to create the S3 client.
  • ctx: The context for the upload operation.
  • bucket: The name of the S3 bucket.
  • key: The key (path) where the object will be stored.
  • data: The byte slice to be uploaded.

Returns:

  • error: Any error encountered during the upload. Returns nil if the upload is successful.

func UploadPythonScripts

func UploadPythonScripts(cfg aws.Config, scripts embed.FS, bucket string, files ...string) error

UploadPythonScripts uploads the python scripts from the embedded filesystem to the S3 artifacts bucket.

Parameters:

  • cfg: the AWS configuration used to access S3 and resolve the bucket name.
  • scripts: the embedded filesystem containing the python scripts files.

Returns:

  • error: an error if any of the assets fail to be read or uploaded.

func ValidateRow

func ValidateRow(row map[string]any, model any, line int) error

ValidateRow checks if a given CSV row map conforms to the structure and types defined in a model struct.

It verifies the presence of all required fields (based on JSON tags) and attempts to convert values to the expected types. Pointer fields are allowed to be nil. Supported types include string, int64, float64, json.Number, and time.Time (RFC3339). Unsupported types result in an error.

Parameters:

  • row: A map representing a parsed CSV row, with column names as keys.
  • model: A struct type used to validate field presence and types (must be a pointer to struct).
  • line: The line number in the CSV file, used for error context.

Returns:

  • error: A validation error if the row is invalid, otherwise nil.

func WaitForEMRJobFinished

func WaitForEMRJobFinished(ctx context.Context, client *emrserverless.Client, appID, jobID string, timeout time.Duration) error

WaitForEMRJobFinished polls the status of an EMR Serverless job until it completes or a timeout occurs.

Parameters:

  • ctx: context used for cancellation and timeouts.
  • client: an instance of the EMR Serverless client.
  • appID: the EMR Serverless application ID.
  • jobID: the ID of the job run to monitor.
  • timeout: the maximum duration to wait for job completion.

Returns:

  • error: nil if the job succeeded, or an error if it failed, was cancelled, or timed out.

func WaitForGlueJobFinished

func WaitForGlueJobFinished(ctx context.Context, client *glue.Client, jobName, runID string, timeout time.Duration) error

WaitForGlueJobFinished waits until a Glue job run completes or the timeout is reached.

It polls the Glue job status periodically (every few seconds) and checks for terminal states: SUCCEEDED, FAILED, TIMEOUT, or STOPPED. If the job completes successfully, it returns nil. If the job ends in a failure state or the timeout is exceeded, it returns an error.

Parameters:

  • ctx: context used to cancel or time out the polling loop.
  • client: AWS Glue client used to query job status.
  • jobName: name of the Glue job being monitored.
  • runID: ID of the specific Glue job run.
  • timeout: maximum duration to wait before giving up.

Returns:

  • error: if the job fails, is stopped, or exceeds the timeout duration.

func WriteParquet

func WriteParquet(records any, writer io.Writer) error

WriteParquet writes a slice of records to the provided io.Writer in Parquet format.

The function expects a slice or pointer to slice as input. It uses the schema of the first element in the slice to construct the Parquet schema, compresses with Snappy, and encodes with Plain encoding. The function uses concurrency to prepare the records for writing.

Parameters:

  • records: A slice or pointer to a slice of structs representing the data.
  • writer: Destination writer to which the Parquet data will be written.

Returns:

  • error: An error if writing fails, the input is invalid, or the slice is empty.

Types

type BenchmarkResult

type BenchmarkResult struct {
	Implementation     string        `json:"implementation,omitempty"`
	StartTime          time.Time     `json:"start_time,omitempty"`
	EndTime            time.Time     `json:"end_time,omitempty"`
	StartClusterTime   time.Time     `json:"start_cluster_time,omitempty"`
	EndClusterTime     time.Time     `json:"end_cluster_time,omitempty"`
	StartTaskTime      time.Time     `json:"start_task_time,omitempty"`
	EndTaskTime        time.Time     `json:"end_task_time,omitempty"`
	InputFile          string        `json:"input_file,omitempty"`
	OutputFile         string        `json:"output_file,omitempty"`
	CSVReadTime        time.Duration `json:"csv_read_time,omitempty"`
	ParquetWriteTime   time.Duration `json:"parquet_write_time,omitempty"`
	TotalTaskTime      time.Duration `json:"total_task_time,omitempty"`
	TotalBenchmarkTime time.Duration `json:"total_benchmark_time,omitempty"`
	MemoryUsedMB       float64       `json:"memory_used_mb,omitempty"`
	Notes              string        `json:"notes,omitempty"`
}

BenchmarkResult holds detailed timing and resource usage information for a single benchmark run.

This structure captures both the ECS task orchestration metadata (at the cluster level) and the actual execution metrics from within the container.

Fields:

  • Implementation: programming language used ("go" or "python").
  • StartTime: timestamp when the benchmark command was triggered.
  • EndTime: timestamp when the full benchmark flow completed (including ECS orchestration and result retrieval).
  • StartClusterTime: timestamp when the ECS task was requested to start.
  • EndClusterTime: timestamp when the ECS task reached the STOPPED status.
  • StartTaskTime: timestamp at the beginning of the container's execution (inside the task).
  • EndTaskTime: timestamp at the end of the container's execution (just before producing the result).
  • InputFile: path to the input file (e.g., a `.csv.gz`) used in the benchmark.
  • OutputFile: path to the output file (e.g., a `.parquet`) generated by the benchmark.
  • CSVReadTime: time spent reading and parsing the input CSV.
  • ParquetWriteTime: time spent writing the output to Parquet format.
  • TotalTaskTime: total time elapsed inside the container (EndTaskTime - StartTaskTime).
  • TotalBenchmarkTime: total time elapsed for the entire benchmark flow (EndTime - StartTime).
  • MemoryUsedMB: memory usage observed inside the container (in megabytes).
  • Notes: any additional contextual information (e.g., anomalies, comments).

type GrafanaDatasource

type GrafanaDatasource struct {
	ID   int64  `json:"id"`
	UID  string `json:"uid"`
	Name string `json:"name"`
}

func FindOrCreateAthenaDatasource

func FindOrCreateAthenaDatasource(cfg aws.Config) (*GrafanaDatasource, error)

FindOrCreateAthenaDatasource ensures that a Grafana Athena datasource exists and returns its metadata.

If the datasource named "Athena" exists, it is returned. Otherwise, it is created using the default configuration with IAM role authentication and the S3 output path based on the `ObserverBucketName` stack output.

Parameters:

  • cfg: AWS configuration used to resolve stack outputs and region.

Returns:

  • *GrafanaDatasource: the existing or newly created datasource metadata.
  • error: an error if the operation fails at any step.

func FindOrCreateCloudWatchDatasource

func FindOrCreateCloudWatchDatasource(cfg aws.Config) (*GrafanaDatasource, error)

FindOrCreateCloudWatchDatasource ensures that a Grafana CloudWatch datasource exists and returns its metadata.

If a datasource named "CloudWatch" already exists in Grafana, it is returned. Otherwise, the function creates a new CloudWatch datasource using the default authentication method (IAM role from environment) and region specified in the AWS config.

Parameters:

  • cfg: AWS configuration used to determine the region and credentials.

Returns:

  • *GrafanaDatasource: the existing or newly created CloudWatch datasource.
  • error: an error if the operation fails at any step.

type ProcessingControl

type ProcessingControl struct {
	ControlID     string  `dynamodbav:"control_id"`
	ObjectKey     string  `dynamodbav:"object_key"`
	SchemaName    string  `dynamodbav:"schema_name"`
	TableName     string  `dynamodbav:"table_name"`
	RecordCount   int     `dynamodbav:"record_count"`
	FileFormat    string  `dynamodbav:"file_format"`
	FileSize      int64   `dynamodbav:"file_size"`
	Checksum      string  `dynamodbav:"checksum"`
	Status        string  `dynamodbav:"status"`
	AttemptCount  int     `dynamodbav:"attempt_count"`
	ComputeTarget string  `dynamodbav:"compute_target"`
	CreatedAt     string  `dynamodbav:"created_at"`
	UpdatedAt     string  `dynamodbav:"updated_at"`
	StartedAt     *string `dynamodbav:"started_at,omitempty"`
	FinishedAt    *string `dynamodbav:"finished_at,omitempty"`
	Duration      *int64  `dynamodbav:"duration,omitempty"`
	ErrorMessage  *string `dynamodbav:"error_message,omitempty"`
}

func NewProcessingControl

func NewProcessingControl(layer, key string) (*ProcessingControl, error)

NewProcessingControl creates a new ProcessingControl instance based on the given data lake layer and S3 object key.

Parameters:

  • layer: the logical lakehouse layer name (e.g., "bronze", "silver", "gold").
  • key: the S3 object key in the format "<prefix>/<table>/<filename>".

Returns:

  • *ProcessingControl: the initialized control record for tracking processing status.
  • error: an error if the key format is invalid.

func (*ProcessingControl) Delete

func (m *ProcessingControl) Delete(ctx context.Context, cfg aws.Config) error

Delete removes the current ProcessingControl item from the DynamoDB table.

It uses the control_id field as the primary key for the DeleteItem operation. The table name is resolved dynamically from the stack output "ProcessingControlTableName".

Parameters:

  • ctx: the context for the AWS request.
  • cfg: the AWS configuration used to connect to DynamoDB.

Returns:

  • error: an error if the table name cannot be resolved or the item fails to be deleted.

func (*ProcessingControl) DestinationKey

func (m *ProcessingControl) DestinationKey(targetLayer string) string

DestinationKey returns the destination S3 key for the processed file in the specified lakehouse layer.

If the target layer is "raw", the key will preserve the flat structure and original `.gz` extension. For all other layers (e.g., "bronze", "silver", "gold"), the key will follow a Hive-style partitioning layout based on the `created_at` timestamp, and the file will be renamed with a `.parquet` extension.

Parameters:

  • targetLayer: the destination layer name ("raw", "bronze", "silver", or "gold").

Returns:

  • string: the destination S3 object key, formatted according to the layer's storage strategy.

func (*ProcessingControl) Finish

func (m *ProcessingControl) Finish(ctx context.Context, cfg aws.Config, failure error) error

Finish finalizes the processing execution by updating the control item with completion metadata. It sets the finish timestamp, calculates the duration (if start time is available), and updates the status to "success" or "error" based on the presence of a failure.

Parameters:

  • ctx: the context for the AWS request.
  • cfg: the AWS configuration used to connect to DynamoDB.
  • failure: an error indicating if the execution failed; nil means success.

Returns:

  • error: the original failure if present, or an error saving the updated item to DynamoDB.

func (*ProcessingControl) Put

func (m *ProcessingControl) Put(ctx context.Context, cfg aws.Config) error

Put stores the ProcessingControl item in the DynamoDB table if it does not already exist. It uses a conditional expression to prevent overwriting existing items.

Parameters:

  • ctx: the context for the AWS request.
  • cfg: the AWS configuration used to create the DynamoDB client.

Returns:

  • error: an error if the operation fails (except when the item already exists).

func (*ProcessingControl) RegisterNextLayerControl

func (m *ProcessingControl) RegisterNextLayerControl(ctx context.Context, cfg aws.Config, data []byte) (*ProcessingControl, error)

RegisterNextLayerControl creates and stores a new ProcessingControl entry for the next lakehouse layer. It derives the destination key from the current control, calculates metadata (e.g., file size, checksum), and sets attributes such as format, compute target, and record count.

Parameters:

  • ctx: the context for the AWS request.
  • cfg: the AWS configuration used to connect to DynamoDB.
  • data: the raw byte content of the transformed file.

Returns:

  • *ProcessingControl: the newly created control item for the next layer.
  • error: an error if the control creation or persistence fails.

func (*ProcessingControl) Start

func (m *ProcessingControl) Start(ctx context.Context, cfg aws.Config) error

Start marks the beginning of the processing execution for the current control item. It updates the status to "running", increments the attempt count, resets the duration and error state, and sets the start timestamp.

Parameters:

  • ctx: the context for the AWS request.
  • cfg: the AWS configuration used to connect to DynamoDB.

Returns:

  • error: an error if the item fails to be persisted in DynamoDB.

type SimOptions

type SimOptions struct {
	RPS      int
	Duration time.Duration
	Quality  string
	Parallel int
}

type Stack

type Stack struct {
	Name   string            // Logical name of the stack (e.g., "network", "roles").
	Prefix string            // Optional prefix to be prepended to the stack name.
	Params []types.Parameter // Parameters to pass to the CloudFormation template.
}

Stack represents a CloudFormation stack.

func (*Stack) FullStackName

func (s *Stack) FullStackName() string

FullStackName returns the full stack name by concatenating the prefix (or default) and name.

func (*Stack) GetStackOutput

func (s *Stack) GetStackOutput(cfg aws.Config, key string) (string, error)

GetStackOutput retrieves a specific output value from a given CloudFormation stack.

Parameters:

  • cfg: AWS configuration used to initialize the CloudFormation client.
  • key: the name of the output key to retrieve.

Returns:

  • string: the value associated with the given output key.
  • error: if the stack cannot be described or the output key is not found.

func (*Stack) GetStackOutputs

func (s *Stack) GetStackOutputs(cfg aws.Config) (map[string]string, error)

GetStackOutputs retrieves all output key-value pairs from the specified CloudFormation stack.

It describes the stack using its full name and returns a map of outputs, where each key is the output name and each value is the output value.

Returns:

  • map[string]string: output values from the stack.
  • error: if the stack cannot be found or described.

func (*Stack) GetTemplateBody

func (s *Stack) GetTemplateBody(templates embed.FS) ([]byte, error)

GetTemplateBody reads and returns the contents of the embedded CloudFormation template for the stack.

Parameters:

  • templates: the embedded file system containing all templates.

Returns:

  • []byte: the contents of the template file.
  • error: if reading the file fails.

func (*Stack) TemplateFilePath

func (s *Stack) TemplateFilePath() string

TemplateFilePath returns the relative path of the CloudFormation template file for the stack.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL