importer

package
v2.0.12-core Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2022 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Example (DecodeImportTrigger_Manual)

Trigger for a manual dataset regeneration (user clicks save button on dataset edit page)

trigger := `{
	"datasetID": "189137412",
	"logID": "dataimport-zmzddoytch2krd7n"
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output:
Source Bucket: ""
Source file: ""
Dataset: "189137412"
Log: "dataimport-zmzddoytch2krd7n"
Err: "<nil>"
Example (DecodeImportTrigger_ManualBadDatasetID)
trigger := `{
	"datasetID": "",
	"logID": "dataimport-zmzddoytch2krd7n"
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output:
Source Bucket: ""
Source file: ""
Dataset: ""
Log: ""
Err: "Failed to find dataset ID in reprocess trigger"
Example (DecodeImportTrigger_ManualBadLogID)
trigger := `{
		"datasetID": "qwerty"
	}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output:
Source Bucket: ""
Source file: ""
Dataset: ""
Log: ""
Err: "Failed to find log ID in reprocess trigger"
Example (DecodeImportTrigger_ManualBadMsg)
trigger := `{
	"weird": "message"
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output:
Source Bucket: ""
Source file: ""
Dataset: ""
Log: ""
Err: "Unexpected or no message type embedded in triggering SNS message"
Example (DecodeImportTrigger_OCS)

Trigger from when a new zip arrives from the pipeline

trigger := `{
    "Records": [
        {
            "eventVersion": "2.1",
            "eventSource": "aws:s3",
            "awsRegion": "us-east-1",
            "eventTime": "2022-09-16T09:10:28.417Z",
            "eventName": "ObjectCreated:CompleteMultipartUpload",
            "userIdentity": {
                "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
            },
            "requestParameters": {
                "sourceIPAddress": "81.154.57.137"
            },
            "responseElements": {
                "x-amz-request-id": "G3QWWT0BAYKP81QK",
                "x-amz-id-2": "qExUWHHDE1nL+UP3zim1XA7FIXRUoKxlIrJt/7ULAtn08/+EvRCt4sChLhCGEqMo7ny4CU/KufMNmOcyZsDPKGWHT2ukMbo+"
            },
            "s3": {
                "s3SchemaVersion": "1.0",
                "configurationId": "OTBjMjZmYzAtYThlOC00OWRmLWIwMzUtODkyZDk0YmRhNzkz",
                "bucket": {
                    "name": "prodpipeline-rawdata202c7bd0-o40ktu17o2oj",
                    "ownerIdentity": {
                        "principalId": "AP902Y0PI20DF"
                    },
                    "arn": "arn:aws:s3:::prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
                },
                "object": {
                    "key": "189137412-07-09-2022-10-07-57.zip",
                    "size": 54237908,
                    "eTag": "b21ebca14f67255be1cd28c01d494508-7",
                    "sequencer": "0063243D6858D568F0"
                }
            }
        }
    ]
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))

// NOTE: we're only checking the length of the log ID because it's a timestamp+random chars. Other code has this stubbed out but here it's probably sufficient
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(logID), err)
Output:
Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
Source file: "189137412-07-09-2022-10-07-57.zip"
Dataset: "189137412"
Log Str Len: "43"
Err: "<nil>"
Example (DecodeImportTrigger_OCS2)

Trigger from when a new zip arrives from the pipeline

trigger := `{
    "Records": [
        {
            "eventVersion": "2.1",
            "eventSource": "aws:s3",
            "awsRegion": "us-east-1",
            "eventTime": "2022-09-25T14:33:49.456Z",
            "eventName": "ObjectCreated:Put",
            "userIdentity": {
                "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
            },
            "requestParameters": {
                "sourceIPAddress": "3.12.95.94"
            },
            "responseElements": {
                "x-amz-request-id": "K811ZDJ52EYBJ8P2",
                "x-amz-id-2": "R7bGQ2fOjvSZHkHez700w3wRVpn32nmr6jVPVYhKtNE2c2KYOmgm9hjmOA5WSQFh8faLRe6fHAmANKSTNRhwCq7Xgol0DgX4"
            },
            "s3": {
                "s3SchemaVersion": "1.0",
                "configurationId": "OTBjMjZmYzAtYThlOC00OWRmLWIwMzUtODkyZDk0YmRhNzkz",
                "bucket": {
                    "name": "prodpipeline-rawdata202c7bd0-o40ktu17o2oj",
                    "ownerIdentity": {
                        "principalId": "AP902Y0PI20DF"
                    },
                    "arn": "arn:aws:s3:::prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
                },
                "object": {
                    "key": "197329413-25-09-2022-14-33-39.zip",
                    "size": 1388,
                    "eTag": "932bda7d32c05d90ecc550d061862994",
                    "sequencer": "00633066CD68A4BF43"
                }
            }
        }
    ]
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))

// NOTE: we're only checking the length of the log ID because it's a timestamp+random chars. Other code has this stubbed out but here it's probably sufficient
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(logID), err)
Output:
Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
Source file: "197329413-25-09-2022-14-33-39.zip"
Dataset: "197329413"
Log Str Len: "43"
Err: "<nil>"
Example (DecodeImportTrigger_OCS_BadEventType)
trigger := `{
    "Records": [
        {
            "eventVersion": "2.1",
            "eventSource": "aws:sqs",
            "awsRegion": "us-east-1",
            "eventTime": "2022-09-16T09:10:28.417Z",
            "eventName": "ObjectCreated:CompleteMultipartUpload",
            "userIdentity": {
                "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
            },
            "requestParameters": {
                "sourceIPAddress": "81.154.57.137"
            },
            "responseElements": {
                "x-amz-request-id": "G3QWWT0BAYKP81QK",
                "x-amz-id-2": "qExUWHHDE1nL+UP3zim1XA7FIXRUoKxlIrJt/7ULAtn08/+EvRCt4sChLhCGEqMo7ny4CU/KufMNmOcyZsDPKGWHT2ukMbo+"
            }
        }
    ]
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output:
Source Bucket: ""
Source file: ""
Dataset: ""
Log: ""
Err: "Failed to decode dataset import trigger: Failed to decode sqs body to an S3 event: unexpected end of JSON input"
Example (DecodeImportTrigger_OCS_Error)
trigger := `{
		"Records": []
}`
sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output:
Source Bucket: ""
Source file: ""
Dataset: ""
Log: ""
Err: "Unexpected or no message type embedded in triggering SNS message"

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ImportForTrigger

func ImportForTrigger(triggerMessage []byte, envName string, configBucket string, datasetBucket string, manualBucket string, log logger.ILogger, remoteFS fileaccess.FileAccess) (string, error)

func TriggerDatasetReprocessViaSNS

func TriggerDatasetReprocessViaSNS(snsSvc awsutil.SNSInterface, idGen services.IDGenerator, datasetID string, snsTopic string) (*sns.PublishOutput, string, error)

Firing a trigger message. Anything calling this is triggering a dataset reimport via a lambda function

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL