cclf

package
v0.0.0-...-3540b70 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: CC0-1.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CheckIfAttributionCSVFile

func CheckIfAttributionCSVFile(filePath string) bool

func CopyFrom

func CopyFrom(ctx context.Context, tx pgx.Tx, scanner *bufio.Scanner, fileID uint, reportInterval int, logger logrus.FieldLogger, expectedRecordLength int) (int, int, error)

CopyFrom writes all of the beneficiary data captured in the scanner to the beneficiaries table. It returns the number of rows written along with any error that occurred.

func GetCSVMetadata

func GetCSVMetadata(path string) (csvFileMetadata, error)

GetCSVMetadata builds a metadata struct based on the filename parts. The filename regex is part of aco configuration.

Types

type CSVFileProcessor

type CSVFileProcessor interface {
	// Fetch the csv attribution file to be imported.
	LoadCSV(ctx context.Context, path string) (*bytes.Reader, func(), error)
	// Remove csv attribution file that was successfully imported.
	CleanUpCSV(ctx context.Context, file csvFile) (err error)
}

FileProcessors for attribution are created as interfaces so that they can be passed in place of the implementation; local development and other envs will require different processors. This interface has two implementations; one for ingesting and testing locally, and one for ingesting in s3.

type CSVImporter

type CSVImporter struct {
	Logger        logrus.FieldLogger
	FileProcessor CSVFileProcessor
	PgxPool       *pgxv5Pool.Pool
}

func (CSVImporter) ImportCSV

func (importer CSVImporter) ImportCSV(ctx context.Context, filepath string) error

func (CSVImporter) ProcessCSV

func (importer CSVImporter) ProcessCSV(csv csvFile) error

ProcessCSV() will take provided metadata and write a new record to the cclf_files table and the contents of the file and write new record(s) to the cclf_beneficiaries table. If any step of writing to the database should fail, the whole transaction will fail. If the new records are written successfully, then the new record in the cclf_files table will have it's import status updated.

type CSVParser

type CSVParser struct {
	FilePath string
}

type CclfFileProcessor

type CclfFileProcessor interface {
	// Load a list of valid CCLF files to be imported
	LoadCclfFiles(ctx context.Context, path string) (cclfList map[string][]*cclfZipMetadata, skipped int, failed int, err error)
	// Clean up CCLF files after failed or successful import runs
	CleanUpCCLF(ctx context.Context, cclfMap map[string][]*cclfZipMetadata) (deletedCount int, err error)
	// Open a zip archive
	OpenZipArchive(ctx context.Context, name string) (*zip.Reader, func(), error)
}

Manages the interaction of CCLF files from a given source

type CclfImporter

type CclfImporter struct {
	// contains filtered or unexported fields
}

Manages the import process for CCLF files from a given source

func NewCclfImporter

func NewCclfImporter(logger logrus.FieldLogger, fileProcessor CclfFileProcessor, pgxPool *pgxv5Pool.Pool) CclfImporter

func (CclfImporter) ImportCCLFDirectory

func (importer CclfImporter) ImportCCLFDirectory(filePath string) (success, failure, skipped int, err error)

type LocalFileProcessor

type LocalFileProcessor struct {
	Handler optout.LocalFileHandler
}

func (*LocalFileProcessor) CleanUpCCLF

func (processor *LocalFileProcessor) CleanUpCCLF(ctx context.Context, cclfMap map[string][]*cclfZipMetadata) (deletedCount int, err error)

func (*LocalFileProcessor) CleanUpCSV

func (processor *LocalFileProcessor) CleanUpCSV(ctx context.Context, file csvFile) error

func (*LocalFileProcessor) LoadCSV

func (processor *LocalFileProcessor) LoadCSV(ctx context.Context, filepath string) (*bytes.Reader, func(), error)

func (*LocalFileProcessor) LoadCclfFiles

func (processor *LocalFileProcessor) LoadCclfFiles(ctx context.Context, path string) (cclfList map[string][]*cclfZipMetadata, skipped int, failed int, err error)

func (*LocalFileProcessor) OpenZipArchive

func (processor *LocalFileProcessor) OpenZipArchive(ctx context.Context, filePath string) (*zip.Reader, func(), error)

type S3FileProcessor

type S3FileProcessor struct {
	Handler optout.S3FileHandler
}

func (*S3FileProcessor) CleanUpCCLF

func (processor *S3FileProcessor) CleanUpCCLF(ctx context.Context, cclfMap map[string][]*cclfZipMetadata) (deletedCount int, err error)

func (*S3FileProcessor) CleanUpCSV

func (processor *S3FileProcessor) CleanUpCSV(ctx context.Context, file csvFile) error

func (*S3FileProcessor) LoadCSV

func (processor *S3FileProcessor) LoadCSV(ctx context.Context, filepath string) (*bytes.Reader, func(), error)

func (*S3FileProcessor) LoadCclfFiles

func (processor *S3FileProcessor) LoadCclfFiles(ctx context.Context, path string) (cclfMap map[string][]*cclfZipMetadata, skipped int, failed int, err error)

func (*S3FileProcessor) OpenZipArchive

func (processor *S3FileProcessor) OpenZipArchive(ctx context.Context, filePath string) (*zip.Reader, func(), error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL