Documentation
¶
Overview ¶
Implements importer triggering based on SNS queues. This decodes incoming SNS messages and extracts files ready for importer code to run
Example (DecodeImportTrigger_Manual) ¶
Trigger for a manual dataset regeneration (user clicks save button on dataset edit page)
trigger := `{
"datasetID": "189137412",
"logID": "dataimport-zmzddoytch2krd7n"
}`
sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output: Source Bucket: "" Source file: "" Dataset: "189137412" Log: "dataimport-zmzddoytch2krd7n" Err: "<nil>"
Example (DecodeImportTrigger_ManualBadDatasetID) ¶
trigger := `{
"datasetID": "",
"logID": "dataimport-zmzddoytch2krd7n"
}`
sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output: Source Bucket: "" Source file: "" Dataset: "" Log: "" Err: "Failed to find dataset ID in reprocess trigger"
Example (DecodeImportTrigger_ManualBadLogID) ¶
trigger := `{
"datasetID": "qwerty"
}`
sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output: Source Bucket: "" Source file: "" Dataset: "" Log: "" Err: "Failed to find log ID in reprocess trigger"
Example (DecodeImportTrigger_ManualBadMsg) ¶
trigger := `{
"weird": "message"
}`
sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output: Source Bucket: "" Source file: "" Dataset: "" Log: "" Err: "Unexpected or no message type embedded in triggering SNS message"
Example (DecodeImportTrigger_OCS) ¶
Trigger from when a new zip arrives from the pipeline
trigger := `{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "us-east-1",
"eventTime": "2022-09-16T09:10:28.417Z",
"eventName": "ObjectCreated:CompleteMultipartUpload",
"userIdentity": {
"principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
},
"requestParameters": {
"sourceIPAddress": "81.154.57.137"
},
"responseElements": {
"x-amz-request-id": "G3QWWT0BAYKP81QK",
"x-amz-id-2": "qExUWHHDE1nL+UP3zim1XA7FIXRUoKxlIrJt/7ULAtn08/+EvRCt4sChLhCGEqMo7ny4CU/KufMNmOcyZsDPKGWHT2ukMbo+"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "OTBjMjZmYzAtYThlOC00OWRmLWIwMzUtODkyZDk0YmRhNzkz",
"bucket": {
"name": "prodpipeline-rawdata202c7bd0-o40ktu17o2oj",
"ownerIdentity": {
"principalId": "AP902Y0PI20DF"
},
"arn": "arn:aws:s3:::prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
},
"object": {
"key": "189137412-07-09-2022-10-07-57.zip",
"size": 54237908,
"eTag": "b21ebca14f67255be1cd28c01d494508-7",
"sequencer": "0063243D6858D568F0"
}
}
}
]
}`
sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
// NOTE: we're only checking the length of the log ID because it's a timestamp+random chars. Other code has this stubbed out but here it's probably sufficient
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(logID), err)
Output: Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj" Source file: "189137412-07-09-2022-10-07-57.zip" Dataset: "189137412" Log Str Len: "43" Err: "<nil>"
Example (DecodeImportTrigger_OCS2) ¶
Trigger from when a new zip arrives from the pipeline
trigger := `{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "us-east-1",
"eventTime": "2022-09-25T14:33:49.456Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
},
"requestParameters": {
"sourceIPAddress": "3.12.95.94"
},
"responseElements": {
"x-amz-request-id": "K811ZDJ52EYBJ8P2",
"x-amz-id-2": "R7bGQ2fOjvSZHkHez700w3wRVpn32nmr6jVPVYhKtNE2c2KYOmgm9hjmOA5WSQFh8faLRe6fHAmANKSTNRhwCq7Xgol0DgX4"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "OTBjMjZmYzAtYThlOC00OWRmLWIwMzUtODkyZDk0YmRhNzkz",
"bucket": {
"name": "prodpipeline-rawdata202c7bd0-o40ktu17o2oj",
"ownerIdentity": {
"principalId": "AP902Y0PI20DF"
},
"arn": "arn:aws:s3:::prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
},
"object": {
"key": "197329413-25-09-2022-14-33-39.zip",
"size": 1388,
"eTag": "932bda7d32c05d90ecc550d061862994",
"sequencer": "00633066CD68A4BF43"
}
}
}
]
}`
sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
// NOTE: we're only checking the length of the log ID because it's a timestamp+random chars. Other code has this stubbed out but here it's probably sufficient
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(logID), err)
Output: Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj" Source file: "197329413-25-09-2022-14-33-39.zip" Dataset: "197329413" Log Str Len: "43" Err: "<nil>"
Example (DecodeImportTrigger_OCS_BadEventType) ¶
trigger := `{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:sqs",
"awsRegion": "us-east-1",
"eventTime": "2022-09-16T09:10:28.417Z",
"eventName": "ObjectCreated:CompleteMultipartUpload",
"userIdentity": {
"principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
},
"requestParameters": {
"sourceIPAddress": "81.154.57.137"
},
"responseElements": {
"x-amz-request-id": "G3QWWT0BAYKP81QK",
"x-amz-id-2": "qExUWHHDE1nL+UP3zim1XA7FIXRUoKxlIrJt/7ULAtn08/+EvRCt4sChLhCGEqMo7ny4CU/KufMNmOcyZsDPKGWHT2ukMbo+"
}
}
]
}`
sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output: Source Bucket: "" Source file: "" Dataset: "" Log: "" Err: "Failed to decode dataset import trigger: Failed to decode sqs body to an S3 event: unexpected end of JSON input"
Example (DecodeImportTrigger_OCS_Error) ¶
trigger := `{
"Records": []
}`
sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output: Source Bucket: "" Source file: "" Dataset: "" Log: "" Err: "Unexpected or no message type embedded in triggering SNS message"
Index ¶
Examples ¶
- Package (DecodeImportTrigger_Manual)
- Package (DecodeImportTrigger_ManualBadDatasetID)
- Package (DecodeImportTrigger_ManualBadLogID)
- Package (DecodeImportTrigger_ManualBadMsg)
- Package (DecodeImportTrigger_OCS)
- Package (DecodeImportTrigger_OCS2)
- Package (DecodeImportTrigger_OCS_BadEventType)
- Package (DecodeImportTrigger_OCS_Error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func TriggerDatasetReprocessViaSNS ¶
func TriggerDatasetReprocessViaSNS(snsSvc awsutil.SNSInterface, idGen services.IDGenerator, datasetID string, snsTopic string) (*sns.PublishOutput, string, error)
Firing a trigger message. Anything calling this is triggering a dataset reimport via a lambda function
Types ¶
type ImportResult ¶
type ImportResult struct {
WorkingDir string // so it can be cleaned up by caller if needed
WhatChanged string // what changed between this import and a previous one, for notification reasons
IsUpdate bool // IsUpdate flag
DatasetTitle string // Name of the dataset that was imported
DatasetID string // ID of the dataset that was imported
Logger logger.ILogger // Caller must call Close() on it, otherwise we may lose the last few log events
}
Structure returned after importing NOTE: the logger must have Close() called on it, otherwise we may lose the last few log events
func ImportForTrigger ¶
func ImportForTrigger( triggerMessage []byte, envName string, configBucket string, datasetBucket string, manualBucket string, log logger.ILogger, remoteFS fileaccess.FileAccess) (ImportResult, error)
ImportForTrigger - Parses a trigger message (from SNS) and decides what to import Returns: Result struct - NOTE: logger must have Close() called on it, otherwise we may lose the last few log events Error (or nil)