Documentation
¶
Index ¶
- func CheckIfAttributionCSVFile(filePath string) bool
- func CopyFrom(ctx context.Context, tx pgx.Tx, scanner *bufio.Scanner, fileID uint, ...) (int, int, error)
- func GetCSVMetadata(path string) (csvFileMetadata, error)
- type CSVFileProcessor
- type CSVImporter
- type CSVParser
- type CclfFileProcessor
- type CclfImporter
- type LocalFileProcessor
- func (processor *LocalFileProcessor) CleanUpCCLF(ctx context.Context, cclfMap map[string][]*cclfZipMetadata) (deletedCount int, err error)
- func (processor *LocalFileProcessor) CleanUpCSV(ctx context.Context, file csvFile) error
- func (processor *LocalFileProcessor) LoadCSV(ctx context.Context, filepath string) (*bytes.Reader, func(), error)
- func (processor *LocalFileProcessor) LoadCclfFiles(ctx context.Context, path string) (cclfList map[string][]*cclfZipMetadata, skipped int, failed int, err error)
- func (processor *LocalFileProcessor) OpenZipArchive(ctx context.Context, filePath string) (*zip.Reader, func(), error)
- type S3FileProcessor
- func (processor *S3FileProcessor) CleanUpCCLF(ctx context.Context, cclfMap map[string][]*cclfZipMetadata) (deletedCount int, err error)
- func (processor *S3FileProcessor) CleanUpCSV(ctx context.Context, file csvFile) error
- func (processor *S3FileProcessor) LoadCSV(ctx context.Context, filepath string) (*bytes.Reader, func(), error)
- func (processor *S3FileProcessor) LoadCclfFiles(ctx context.Context, path string) (cclfMap map[string][]*cclfZipMetadata, skipped int, failed int, err error)
- func (processor *S3FileProcessor) OpenZipArchive(ctx context.Context, filePath string) (*zip.Reader, func(), error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CopyFrom ¶
func CopyFrom(ctx context.Context, tx pgx.Tx, scanner *bufio.Scanner, fileID uint, reportInterval int, logger logrus.FieldLogger, expectedRecordLength int) (int, int, error)
CopyFrom writes all of the beneficiary data captured in the scanner to the beneficiaries table. It returns the number of rows written along with any error that occurred.
func GetCSVMetadata ¶
GetCSVMetadata builds a metadata struct based on the filename parts. The filename regex is part of aco configuration.
Types ¶
type CSVFileProcessor ¶
type CSVFileProcessor interface {
// Fetch the csv attribution file to be imported.
LoadCSV(ctx context.Context, path string) (*bytes.Reader, func(), error)
// Remove csv attribution file that was successfully imported.
CleanUpCSV(ctx context.Context, file csvFile) (err error)
}
FileProcessors for attribution are created as interfaces so that they can be passed in place of the implementation; local development and other envs will require different processors. This interface has two implementations; one for ingesting and testing locally, and one for ingesting in s3.
type CSVImporter ¶
type CSVImporter struct {
Logger logrus.FieldLogger
FileProcessor CSVFileProcessor
PgxPool *pgxv5Pool.Pool
}
func (CSVImporter) ImportCSV ¶
func (importer CSVImporter) ImportCSV(ctx context.Context, filepath string) error
func (CSVImporter) ProcessCSV ¶
func (importer CSVImporter) ProcessCSV(csv csvFile) error
ProcessCSV() will take provided metadata and write a new record to the cclf_files table and the contents of the file and write new record(s) to the cclf_beneficiaries table. If any step of writing to the database should fail, the whole transaction will fail. If the new records are written successfully, then the new record in the cclf_files table will have it's import status updated.
type CclfFileProcessor ¶
type CclfFileProcessor interface {
// Load a list of valid CCLF files to be imported
LoadCclfFiles(ctx context.Context, path string) (cclfList map[string][]*cclfZipMetadata, skipped int, failed int, err error)
// Clean up CCLF files after failed or successful import runs
CleanUpCCLF(ctx context.Context, cclfMap map[string][]*cclfZipMetadata) (deletedCount int, err error)
// Open a zip archive
OpenZipArchive(ctx context.Context, name string) (*zip.Reader, func(), error)
}
Manages the interaction of CCLF files from a given source
type CclfImporter ¶
type CclfImporter struct {
// contains filtered or unexported fields
}
Manages the import process for CCLF files from a given source
func NewCclfImporter ¶
func NewCclfImporter(logger logrus.FieldLogger, fileProcessor CclfFileProcessor, pgxPool *pgxv5Pool.Pool) CclfImporter
func (CclfImporter) ImportCCLFDirectory ¶
func (importer CclfImporter) ImportCCLFDirectory(filePath string) (success, failure, skipped int, err error)
type LocalFileProcessor ¶
type LocalFileProcessor struct {
Handler optout.LocalFileHandler
}
func (*LocalFileProcessor) CleanUpCCLF ¶
func (*LocalFileProcessor) CleanUpCSV ¶
func (processor *LocalFileProcessor) CleanUpCSV(ctx context.Context, file csvFile) error
func (*LocalFileProcessor) LoadCclfFiles ¶
func (*LocalFileProcessor) OpenZipArchive ¶
type S3FileProcessor ¶
type S3FileProcessor struct {
Handler optout.S3FileHandler
}
func (*S3FileProcessor) CleanUpCCLF ¶
func (*S3FileProcessor) CleanUpCSV ¶
func (processor *S3FileProcessor) CleanUpCSV(ctx context.Context, file csvFile) error