README
ΒΆ
__ __ ______ __ __ ______
/\_\_\_\/\__ _\/\ \/ / /\__ _\
\/_/\_\/\/_/\ \/\ \ _"-.\/_/\ \/
/\_\/\_\ \ \_\ \ \_\ \_\ \ \_\
\/_/\/_/ \/_/ \/_/\/_/ \/_/
v0.1.2
xtkt ("extract") is an opinionated data extraction tool that follows the Singer.io specification. Supported sources include RESTful-APIs, databases and files (csv, jsonl).
xtkt can be pipe'd to any target that meets the Singer.io specification but has been designed and tested for databases such as SQLite & Postgres. Each stream is handled independently and deletion-at-source is not detected.
Extracted records are versioned, with new and updated data being treated as distinct records (with resulting keys _sdc_natural_key (identifier) and _sdc_surrogate_key (version key)).
Determine which records are processed by xtkt and subsequently sent to your target by using a bookmark. A bookmark can be either a field within the records indicating the latest record processed (e.g. updated_at) or set to new-record-detection (records.bookmark_path: [*], not advised for large data) (see examples below).
In the absence of a bookmark, all records will be processed and sent to your target. This may be suitable if you want to detect deletion in your data model (using _sdc_time_extracted).
Fields can be dropped from records prior to being sent to your target using the records.drop_field_paths field in your JSON configuration file (see examples below). This may be suitable for dropping redundant, large objects within a record.
Fields can be hashed within records prior to being sent to your target using the records.sensitive_field_paths field in your JSON configuration file (see examples below). This may be suitable for handling sensitive data.
Both integers and floats are sent as floats. All fields are considered NULLABLE.
π» Installation
Locally: git clone git@github.com:5amCurfew/xtkt.git; make build
via Homebrew: brew tap 5amCurfew/5amCurfew; brew install 5amCurfew/5amCurfew/xtkt
$ xtkt --help
xtkt is a command line interface to extract data from a RESTful API, database or files to pipe to any target that meets the Singer.io specification
Usage:
xtkt [PATH_TO_CONFIG_JSON] [flags]
Flags:
-h, --help help for xtkt
--save-schema save the schema to a file after extraction
-v, --version version for xtkt
π© Using with Singer.io Targets
Install targets (Python) in _targets/ in virtual environments:
python3 -m venv ./_targets/target-namesource ./_targets/target-name/bin/activatepython3 -m pip install target-namedeactivate
xtkt config.json | ./_targets/target-name/bin/target-name`
For example:
xtkt config.json | ./_targets/pipelinewise-target-postgres/bin/target-postgres -c config_target_postgres.json
I have been using jq to view stdout messages in development. For example:
$ xtkt config.json 2>&1 | jq .
xtkt can be used in a bash script to iterate over a template config.json file to create many data extractions. For example
#!/bin/bash
# Loop from 2009 to 2019
for year in {2009..2019}
do
new_config="config_${year}.json"
sed "s/YYYY/${year}/g" config.json.template > $new_config
echo "Generated ${new_config}"
echo "Running xtkt on ${new_config}"
xtkt $new_config | ./_targets/target-name/bin/target-name -c config_target_name.json
done
rm -f state_* config_*
πΎ Metadata
xtkt adds the following metadata to records
_sdc_surrogate_key: SHA256 of a record_sdc_natural_key: the unique key identifier of the record at source_sdc_time_extracted: a timestamp (R3339) at the time of the data extraction
π§ Config.json
xtkt
{
"stream_name": "<stream_name>", // required, <string>: the name of your stream
"source_type": "<source_type>", // required, <string>: one of either csv, db, jsonl, html, rest
"url": "<url>", // required, <string>: address of the data source (e.g. REST-ful API address, database connection URL or relative file path)
"records": { // required <object>: describes handling of records
"unique_key_path": ["<key_path_1>", "<key_path_2>", ...], // required <array[string]>: path to unique key of records
"bookmark_path": ["<key_path_1>", "<key_path_1>", ...], // optional <array[string]>: path to bookmark within records
"drop_field_paths": [ // optional <array[array]>: paths to remove within records
["<key_path_1>", "<key_path_1>", ...], // required <array[string]>
...
],
"sensitive_field_paths": [ // optional <array[array]>: array of paths of fields to hash
["<sensitive_path_1_1>", "<sensitive_path_1_2>", ...], // required <array[string]>
...
],
}
...
database
...
"db": { // optional <object>: required when "source_type": "db"
"table": "<table>" // required <string>: table name in database
},
...
rest
...
"rest": { // optional <object>: required when "source_type": "rest"
"sleep": "<sleep>", // required <int>: number of seconds between pagination requests
"auth": { // optional <object>: describe the authorisation strategy
"required": "<required>", // required <boolean>: is authorisation required?
"strategy": "<strategy>", // optional <string>: required if "required": true, one of either basic, token or oauth
"basic": { // optional <object>: required if "strategy": "basic"
"username": "<username>", // required <string>
"password": "<password>" // required <string>
},
"token": { // optional <object>: required if "strategy": "token"
"header": "<header>", // required <string>: authorisation header name
"header_value": "<header_value>" // required <string> authorisation header value
},
"oauth": { // optional <object>: required if "strategy": "oauth"
"client_id": "<client_id>", // required <string>
"client_secret": "<client_secret>", // required <string>
"refresh_token": "<refresh_token>", // required <string>
"token_url": "<token_url>" // required <string>
}
},
"response": { // required <object>: describes the REST-ful API response handling
"records_path": ["<records_path_1>", "<records_path_2>", ...], // optional <string>: path to records in response (omit if immediately returned)
"pagination": "<pagination>", // required <boolean>: is there pagination in the response?
"pagination_strategy": "<pagination_strategy>", // optional <string>: required if "pagination": true, one of either "next" or "query"
"pagination_next_path": ["<pagination_next_path_1>", "<pagination_next_path_2>", ...], // optional <array[string]>: required if "pagination_strategy": "next", path to "next" URL in response
"pagination_query": { // optional <object>: required if "pagination_strategy": "query", describes pagination query strategy
"query_parameter": "<query_parameter>", // required <string>: parameter name for URL pagination
"query_value": "<query_value>", // required <int>: initial value after base URL is called
"query_increment": "<query_increment>" // required <int>: query parameter increment
}
}
}
...
π Examples
Rick & Morty API
No authentication required, records found in the response "results" array, paginated using "next", new-record-detection used for bookmark
config.json
{
"stream_name": "rick_and_morty_characters",
"source_type": "rest",
"url": "https://rickandmortyapi.com/api/character",
"records": {
"unique_key_path": ["id"],
"bookmark_path": ["*"],
"drop_field_paths": [
["episode"],
["origin", "url"]
],
"sensitive_field_paths": [
["name"],
["location", "name"]
]
},
"rest": {
"sleep": 0,
"auth": {
"required": false
},
"response": {
"records_path": ["results"],
"pagination": true,
"pagination_strategy": "next",
"pagination_next_path": ["info", "next"]
}
}
}
Github API
Token authentication required, records returned immediately as an array, pagination using query parameter, bookmark'd using "commit.author.date" in record
config.json
{
"stream_name": "xtkt_github_commits",
"source_type": "rest",
"url": "https://api.github.com/repos/5amCurfew/xtkt/commits",
"records": {
"unique_key_path": ["sha"],
"bookmark_path": ["commit", "author", "date"],
"drop_field_paths": [
["author"],
["committer", "avatar_url"],
["committer", "events_url"]
],
"sensitive_field_paths": [
["commit", "author", "email"],
["commit", "committer", "email"]
]
},
"rest": {
"auth": {
"required": true,
"strategy": "token",
"token": {
"header": "Authorization",
"header_value": "Bearer YOUR_GITHUB_API_TOKEN"
}
},
"response": {
"pagination": true,
"pagination_strategy": "query",
"pagination_query": {
"query_parameter": "page",
"query_value": 2,
"query_increment": 1
}
}
}
}
Strava API
Oauth authentication required, records returned immediately in an array, paginated using query parameter, bookmark'd using "start_date" in record
config.json
{
"stream_name": "my_strava_activities",
"source_type": "rest",
"url": "https://www.strava.com/api/v3/athlete/activities",
"records": {
"unique_key_path": ["id"],
"bookmark_path": ["start_date"]
},
"rest": {
"auth": {
"required": true,
"strategy": "oauth",
"oauth": {
"client_id": "YOUR_CLIENT_ID",
"client_secret": "YOUR_CLIENT_SECRET",
"refresh_token": "YOUR_REFRESH_TOKEN",
"token_url": "https://www.strava.com/oauth/token"
}
},
"sleep": 1,
"response": {
"pagination": true,
"pagination_strategy": "query",
"pagination_query": {
"query_parameter": "page",
"query_value": 2,
"query_increment": 1
}
}
}
}
Postgres
config.json
{
"stream_name": "xtkt_github_commits_from_postgres",
"source_type": "db",
"url": "postgres://admin:admin@localhost:5432/postgres?sslmode=disable",
"records": {
"unique_key_path": ["_sdc_natural_key"]
},
"db": {
"table": "xtkt_github_commits"
}
}
SQLite
config.json
{
"stream_name": "sqlite_customers",
"source_type": "db",
"url": "sqlite:///example.db",
"records": {
"unique_key_path": ["id"],
"bookmark_path": ["updated_at"]
},
"db": {
"table": "customers"
}
}
File
config.json
{
"stream_name": "xtkt_jsonl",
"source_type": "jsonl",
"url": "_config_json/data.jsonl",
"records": {
"unique_key_path": ["id"],
"sensitive_field_paths": [
["location", "address"]
]
}
}
Documentation
ΒΆ
There is no documentation for this package.