pip install api2db
api2db is designed to make building data-pipelines easy.¶
because **** messy data and scope creep¶
Note
Project requirements changed? No problem.
API got updated? Easy.
Changing databases? Change 1 line of code.
Multiple storage targets? Done.
Stop focusing on collecting data, and start focusing on using it.
Use api2db to:
Collect data from any API
Perform pre-processing on data
Clean and extract features from data
Perform post-processing on data
Store data
api2db supports any/all of the following concurrently:
Streaming data live to
Local storage of data in files using Parquet, pickle, JSON, and CSV format
MySQL
Bigquery
MariaDB
PostgreSQL
Omnisci
Storing data periodically to
MySQL
Bigquery
MariaDB
PostgreSQL
Omnisci
api2db is currently adding support for:
Oracle
Amazon Aurora
Microsoft SQL Server
Firebase RTDB
Don’t see your database? Submit a feature request.. or even better add it. api2db is open-source.
Contents¶
Installation and Quickstart¶
Installation¶
Install the library
pip install api2db
To add MySQL support
pip install mypysql
To add MariaDB support
pip install mariadb
To add PostgreSQL support
pip install psycopg2
To add Omnisci support
pip install pymapd==0.25.0
pip install pyarrow==3.0.0
pip install pandas --upgrade
Quickstart¶
Create a project with the pmake
shell command
Initial directory structure
project_dir/
path/to/project_dir/> pmake
New project directory structure
project_dir-----/
|
apis-----/ # Each API will get its own file
| |- __init__.py
|
AUTH-----/ # These templates are used for adding database targets
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/ # Application cache files will be stored and used here.
|
STORE/ # This is where incoming data can be stored locally.
|
LOGS/ # Each collector will receive it's own log file.
|
helpers.py # Common helper functions can be written here.
|
main.py # This is the application entry point.
Choose an API This example will use the CoinCap API as it is free, does not require an API key, and seems to have good uptime. (This project has no affiliation with CoinCap)
Create a collector with the cadd
shell command
path/to/project_dir/> cadd coincap
project_dir-----/
|
apis-----/
| |- __init__.py
| |- coincap.py # This is where you'll write code!
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
LOGS/
|
helpers.py
|
main.py
Understanding project_dir/apis/some_api_collector.py
Each collector has 4 parts.
- Data Import
Performs a call to an API to request data.
- Data Processing
Processes and cleans incoming data, making it useful.
- Data Streams
Streams data as it arrives to a storage target.
- Data Stores
Stores data periodically to a storage target.
The base template for the coincap collector looks like this
from api2db.ingest import *
from api2db.stream import *
from api2db.store import *
from helpers import *
def coincap_import():
return None
def coincap_form():
pre_process = [
# Preprocessing Here
]
data_features = [
# Data Features Here
]
post_process = [
# Postproccesing Here
]
return ApiForm(name="coincap", pre_process=pre_process, data_features=data_features, post_process=post_process)
def coincap_streams():
streams = [
]
return streams
def coincap_stores():
stores = [
]
return stores
coincap_info = Collector(name="coincap",
seconds=0, # Import frequency of 0 disables collector
import_target=coincap_import,
api_form=coincap_form,
streams=coincap_streams,
stores=coincap_stores,
debug=True # Set to False for production
)
Using the lab to build ApiForms¶
To simplify setting up the data import and data-processing first run the mlab
shell command
path/to/project_dir/> mlab
project_dir-----/
|
apis------/
| |- __init__.py
| |- coincap.py
|
AUTH------/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
laboratory/
| |- lab.py # This is where you can experiment with pre-processing
|
STORE/
|
LOGS/
|
helpers.py
|
main.py
A blank lab.py file will look like this
from api2db.ingest import *
CACHE=True # Caches API data so that only a single API call is made if True
def import_target():
return None
def pre_process():
return None
def data_features():
return None
def post_process():
return None
if __name__ == "__main__":
api_form = ApiForm(name="lab",
pre_process=pre_process(),
data_features=data_features(),
post_process=post_process()
)
api_form.experiment(CACHE, import_target)
Importing data¶
Perform a data import by writing the code for the import_target
function
lab.py
.
.
.
import requests
import logging
def import_target():
"""
Data returned by the import target must be an array of dicts.
This allows for either a single API call to be returned, or an array of them.
"""
data = None
url = "https://api.coincap.io/v2/assets/"
try:
data = [requests.get(url).json()]
except Exception as e:
logging.exception(e)
return data
.
.
.
Use the rlab
shell command to run the lab
Note
Watch the laboratory directory closely. Data will be dumped into JSON files at different points during data-processing to provide the programmer with an easier to read format.
path/to/project_dir/> rlab
Output:
data:
{
"data": [
{
"id": "bitcoin",
"rank": "1",
"symbol": "BTC",
"name": "Bitcoin",
"supply": "18698850.0000000000000000",
"maxSupply": "21000000.0000000000000000",
"marketCapUsd": "1041388865130.8623213956691350",
"volumeUsd24Hr": "12822561919.6746830356589619",
"priceUsd": "55692.6690748822693051",
"changePercent24Hr": "-4.1033665252363403",
"vwap24Hr": "57708.7312639442977184",
"explorer": "https://blockchain.info/",
},
.
.
.
],
"timestamp": 1620100433183,
}
data keys:
dict_keys(['data', 'timestamp'])
pre_process must return a list of 0 or more pre-processors.
pre_process:
None
Performing pre-processing on data¶
Perform pre-processing on data by writing the code for the pre_process
function
lab.py
.
.
.
def pre_process():
"""
Pre-processors are applied sequentially.
In this example, we will:
1. Extract the timestamp and make it a global feature using GlobalExtract
2. Perform a ListExtract to extract the list of data which will become the rows in the storage target table
"""
return [
GlobalExtract(key="timestamp",
lam=lambda x: x["timestamp"],
dtype=int
),
ListExtract(lam=lambda x: x["data"])
]
.
.
.
Use the rlab
shell command to run the lab
path/to/project_dir/> rlab
Output:
data point 1:
{'id': 'bitcoin', 'rank': '1', 'symbol': 'BTC', 'name': 'Bitcoin', 'supply': '18698850.0000000000000000', 'maxSupply': '21000000.0000000000000000', 'marketCapUsd': '1
041388865130.8623213956691350', 'volumeUsd24Hr': '12822561919.6746830356589619', 'priceUsd': '55692.6690748822693051', 'changePercent24Hr': '-4.1033665252363403', 'vw
ap24Hr': '57708.7312639442977184', 'explorer': 'https://blockchain.info/'}
data point 2:
{'id': 'ethereum', 'rank': '2', 'symbol': 'ETH', 'name': 'Ethereum', 'supply': '115729464.3115000000000000', 'maxSupply': None, 'marketCapUsd': '376411190202.66581272
13330461', 'volumeUsd24Hr': '17656637086.6618270054805080', 'priceUsd': '3252.5095699873722881', 'changePercent24Hr': '6.4420494833790460', 'vwap24Hr': '3234.41835079
37765772', 'explorer': 'https://etherscan.io/'}
data point 3:
{'id': 'binance-coin', 'rank': '3', 'symbol': 'BNB', 'name': 'Binance Coin', 'supply': '153432897.0000000000000000', 'maxSupply': '170532785.0000000000000000', 'marke
tCapUsd': '98431624817.6777436959489247', 'volumeUsd24Hr': '254674805.8210425908376882', 'priceUsd': '641.5288164550379551', 'changePercent24Hr': '1.1504585233985471'
, 'vwap24Hr': '653.0516845642682435', 'explorer': 'https://etherscan.io/token/0xB8c77482e45F1F44dE1745F52C74426C631bDD52'}
data_features must return a list of data-features.
data_features:
None
Extracting features from data¶
Extract data-features from data by writing the code for the data_features
function
Note
Pick and choose which data-features you wish to extract from your data. This example will extract the
id
, rank
, symbol
, name
, priceUsd
, and volumeUsd24Hr
Feature extraction will handle null data and data of the wrong type automatically.
lab.py
.
.
.
def data_features():
return [
Feature(key="id",
lam=lambda x: x["id"],
dtype=str),
Feature(key="rank",
lam=lambda x: x["rank"],
dtype=int),
Feature(key="symbol",
lam=lambda x: x["symbol"],
dtype=str),
Feature(key="name",
lam=lambda x: x["name"],
dtype=str),
Feature(key="price_usd", # Keys support renaming
lam=lambda x: x["priceUsd"],
dtype=float),
Feature(key="volume_usd_24_hr",
lam=lambda x: x["volumeUsd24Hr"],
dtype=float)
]
.
.
.
Use the rlab
shell command to run the lab
path/to/project_dir/> rlab
Output:
data point 1:
{'id': 'bitcoin', 'rank': '1', 'symbol': 'BTC', 'name': 'Bitcoin', 'supply': '18698850.0000000000000000', 'maxSupply': '21000000.0000000000000000', 'marketCapUsd': '1
041388865130.8623213956691350', 'volumeUsd24Hr': '12822561919.6746830356589619', 'priceUsd': '55692.6690748822693051', 'changePercent24Hr': '-4.1033665252363403', 'vw
ap24Hr': '57708.7312639442977184', 'explorer': 'https://blockchain.info/'}
data point 2:
{'id': 'ethereum', 'rank': '2', 'symbol': 'ETH', 'name': 'Ethereum', 'supply': '115729464.3115000000000000', 'maxSupply': None, 'marketCapUsd': '376411190202.66581272
13330461', 'volumeUsd24Hr': '17656637086.6618270054805080', 'priceUsd': '3252.5095699873722881', 'changePercent24Hr': '6.4420494833790460', 'vwap24Hr': '3234.41835079
37765772', 'explorer': 'https://etherscan.io/'}
data point 3:
{'id': 'binance-coin', 'rank': '3', 'symbol': 'BNB', 'name': 'Binance Coin', 'supply': '153432897.0000000000000000', 'maxSupply': '170532785.0000000000000000', 'marke
tCapUsd': '98431624817.6777436959489247', 'volumeUsd24Hr': '254674805.8210425908376882', 'priceUsd': '641.5288164550379551', 'changePercent24Hr': '1.1504585233985471'
, 'vwap24Hr': '653.0516845642682435', 'explorer': 'https://etherscan.io/token/0xB8c77482e45F1F44dE1745F52C74426C631bDD52'}
data:
id rank symbol name price_usd volume_usd_24_hr timestamp
0 bitcoin 1 BTC Bitcoin 55692.669075 12822561919.674683 1620100433183
1 ethereum 2 ETH Ethereum 3252.50957 17656637086.661827 1620100433183
2 binance-coin 3 BNB Binance Coin 641.528816 254674805.821043 1620100433183
3 xrp 4 XRP XRP 1.461734 1969092162.016667 1620100433183
4 dogecoin 5 DOGE Dogecoin 0.419828 2694025432.110168 1620100433183
.. ... ... ... ... ... ... ...
95 abbc-coin 96 ABBC ABBC Coin 0.755244 355316.252287 1620100433183
96 status 97 SNT Status 0.169848 5966843.243043 1620100433183
97 nxm 98 NXM NXM 90.764252 7577199.874023 1620100433183
98 ocean-protocol 99 OCEAN Ocean Protocol 1.357968 9131449.423728 1620100433183
99 iotex 100 IOTX IoTeX 0.057802 576658.038699 1620100433183
[100 rows x 7 columns]
data dtypes:
id string
rank Int64
symbol string
name string
price_usd Float64
volume_usd_24_hr Float64
timestamp Int64
dtype: object
Performing post-processing on data¶
Perform post-processing on data by writing the code for the post_process
function
Note
Post-processors can be applied to alter the data, or extract new information from the data.
lab.py
.
.
.
import time
def post_process():
"""
In this example we will add a timestamp for the arrival time of the data.
"""
return [
ColumnAdd(key="arrival_time",
lam=lambda: int(time.time()*1000),
dtype=int
)
]
.
.
.
Use the rlab
shell command to run the lab
path/to/project_dir/> rlab
Output:
data point 1:
{'id': 'bitcoin', 'rank': '1', 'symbol': 'BTC', 'name': 'Bitcoin', 'supply': '18698850.0000000000000000', 'maxSupply': '21000000.0000000000000000', 'marketCapUsd': '1
041388865130.8623213956691350', 'volumeUsd24Hr': '12822561919.6746830356589619', 'priceUsd': '55692.6690748822693051', 'changePercent24Hr': '-4.1033665252363403', 'vw
ap24Hr': '57708.7312639442977184', 'explorer': 'https://blockchain.info/'}
data point 2:
{'id': 'ethereum', 'rank': '2', 'symbol': 'ETH', 'name': 'Ethereum', 'supply': '115729464.3115000000000000', 'maxSupply': None, 'marketCapUsd': '376411190202.66581272
13330461', 'volumeUsd24Hr': '17656637086.6618270054805080', 'priceUsd': '3252.5095699873722881', 'changePercent24Hr': '6.4420494833790460', 'vwap24Hr': '3234.41835079
37765772', 'explorer': 'https://etherscan.io/'}
data point 3:
{'id': 'binance-coin', 'rank': '3', 'symbol': 'BNB', 'name': 'Binance Coin', 'supply': '153432897.0000000000000000', 'maxSupply': '170532785.0000000000000000', 'marke
tCapUsd': '98431624817.6777436959489247', 'volumeUsd24Hr': '254674805.8210425908376882', 'priceUsd': '641.5288164550379551', 'changePercent24Hr': '1.1504585233985471'
, 'vwap24Hr': '653.0516845642682435', 'explorer': 'https://etherscan.io/token/0xB8c77482e45F1F44dE1745F52C74426C631bDD52'}
finalized data:
id rank symbol name price_usd volume_usd_24_hr timestamp arrival_time
0 bitcoin 1 BTC Bitcoin 55692.669075 12822561919.674683 1620100433183 1620104839526
1 ethereum 2 ETH Ethereum 3252.50957 17656637086.661827 1620100433183 1620104839526
2 binance-coin 3 BNB Binance Coin 641.528816 254674805.821043 1620100433183 1620104839526
3 xrp 4 XRP XRP 1.461734 1969092162.016667 1620100433183 1620104839526
4 dogecoin 5 DOGE Dogecoin 0.419828 2694025432.110168 1620100433183 1620104839526
.. ... ... ... ... ... ... ... ...
95 abbc-coin 96 ABBC ABBC Coin 0.755244 355316.252287 1620100433183 1620104839526
96 status 97 SNT Status 0.169848 5966843.243043 1620100433183 1620104839526
97 nxm 98 NXM NXM 90.764252 7577199.874023 1620100433183 1620104839526
98 ocean-protocol 99 OCEAN Ocean Protocol 1.357968 9131449.423728 1620100433183 1620104839526
99 iotex 100 IOTX IoTeX 0.057802 576658.038699 1620100433183 1620104839526
[100 rows x 8 columns]
finalized data dtypes:
id string
rank Int64
symbol string
name string
price_usd Float64
volume_usd_24_hr Float64
timestamp Int64
arrival_time Int64
dtype: object
Exporting data from the lab to a collector¶
Note
Once the lab has been used to build the form fields for an ApiForm, move the data to the collector
It is not necessary to use the lab feature of the library to perform data-extraction, it just makes things a bit easier.
Move the code from lab.py
to coincap.py
coincap.py
.
.
.
import requests
import logging
import time
def coincap_import():
data = None
url = "https://api.coincap.io/v2/assets/"
try:
data = [requests.get(url).json()]
except Exception as e:
logging.exception(e)
return data
def coincap_form():
pre_process = [
GlobalExtract(key="timestamp",
lam=lambda x: x["timestamp"],
dtype=int
),
ListExtract(lam=lambda x: x["data"])
]
data_features = [
Feature(key="id",
lam=lambda x: x["id"],
dtype=str),
Feature(key="rank",
lam=lambda x: x["rank"],
dtype=int),
Feature(key="symbol",
lam=lambda x: x["symbol"],
dtype=str),
Feature(key="name",
lam=lambda x: x["name"],
dtype=str),
Feature(key="price_usd", # Keys support renaming
lam=lambda x: x["priceUsd"],
dtype=float),
Feature(key="volume_usd_24_hr",
lam=lambda x: x["volumeUsd24Hr"],
dtype=float)
]
post_process = [
ColumnAdd(key="arrival_time",
lam=lambda: int(time.time()*1000),
dtype=int
)
]
return ApiForm(name="coincap", pre_process=pre_process, data_features=data_features, post_process=post_process)
.
.
.
Once the lab has been moved over, you can optionally run the clab
shell command to delete the lab
Setting up an authentication file for database targets¶
Create a JSON file in the AUTH directory
Copy the template for the database target you wish to use
Fill out the template
Setting up a stream target for live data¶
The following code will set up live streaming both to a local file location, and to a MySQL database
coincap.py
.
.
.
def coincap_streams():
"""
In this example, we will stream data live into a local file, and directly into a MySQL database.
"""
streams = [
Stream2Local(name="coincap",
path="STORE/coincap/live"
),
Stream2Sql(name="coincap",
auth_path="AUTH/mysql_auth.json",
db_name="stream_coincap",
dialect="mysql",
port="3306"
)
]
return streams
.
.
.
Yes it is that easy, no you do not have to build the tables.
Setting up a store target for data¶
The following will set up a storage target that will pull data from STORE/coincap/live
and store it to a MariaDB database periodically
coincap.py
.
.
.
def coincap_stores():
"""
In this example, we will store data every 10 minutes to a MariaDB database.
The files we store will then be composed into a single file, and stored in a different storage location.
"""
stores = [
Store2Sql(name="coincap",
seconds=600,
path="STORE/coincap/live",
db_name="store_coincap",
auth_path="AUTH/mariadb_auth.json",
port="3306",
dialect="mariadb",
move_composed_path="STORE/coincap/ten_minute_intervals/"
)
]
return stores
.
.
.
Registering a collector to run¶
To register a collector, all that needs to be done is set the import frequency by changing the seconds
parameter
coincap.py
.
.
.
coincap_info = Collector(name="coincap",
seconds=30, # Import data from the API every 30 seconds
import_target=coincap_import,
api_form=coincap_form,
streams=coincap_streams,
stores=coincap_stores,
debug=True # Set to False for production
)
.
.
.
Running the application¶
Run main.py
Info Log Outputs:
2021-05-04 01:01:14 stream.py INFO stream starting -> (local.parquet)
2021-05-04 01:01:14 stream.py INFO stream starting -> (sql.mysql)
2021-05-04 01:01:14 api2db.py INFO import scheduled: [30 seconds] (api request data) -> (streams)
2021-05-04 01:01:14 api2db.py INFO storage refresh scheduled: [30 seconds] -> (check stores)
2021-05-04 01:01:15 api2db.py INFO storage scheduled: [600 seconds] (STORE/coincap/live) -> (store)
2021-05-04 01:01:15 stream2sql.py INFO establishing connection to mysql://***/stream_coincap
2021-05-04 01:01:15 stream2sql.py INFO database not found mysql://***.com/stream_coincap... creating database
2021-05-04 01:01:15 stream2sql.py INFO connection established mysql://***/stream_coincap
2021-05-04 01:01:25 store.py INFO storage files composed, attempting to store 3600 rows to mariadb://***/store_coincap
2021-05-04 01:01:25 stream2sql.py INFO establishing connection to mariadb://***/store_coincap
2021-05-04 01:01:25 stream2sql.py INFO database not found mariadb://***/store_coincap... creating database
2021-05-04 01:01:25 stream2sql.py INFO connection established mariadb://***/store_coincap
Debug Log Outputs:
.
.
.
2021-05-04 01:01:24 stream2sql.py DEBUG 100 rows inserted into mysql://***/stream_coincap
2021-05-04 01:01:24 stream2local.py DEBUG storing 100 rows to STORE/coincap/live
2021-05-04 01:01:24 stream2sql.py DEBUG 100 rows inserted into mysql://***/stream_coincap
2021-05-04 01:01:25 stream2local.py DEBUG storing 100 rows to STORE/coincap/live
2021-05-04 01:01:25 stream2sql.py DEBUG 100 rows inserted into mysql://***/stream_coincap
2021-05-04 01:01:25 stream2sql.py DEBUG 3600 rows inserted into mariadb://***/store_coincap
2021-05-04 01:01:25 stream2local.py DEBUG storing 100 rows to STORE/coincap/live
2021-05-04 01:01:25 stream2sql.py DEBUG 100 rows inserted into mysql://***/stream_coincap
2021-05-04 01:01:26 stream2local.py DEBUG storing 100 rows to STORE/coincap/live
2021-05-04 01:01:26 stream2sql.py DEBUG 100 rows inserted into mysql://***/stream_coincap
2021-05-04 01:01:26 stream2local.py DEBUG storing 100 rows to STORE/coincap/live
2021-05-04 01:01:26 stream2sql.py DEBUG 100 rows inserted into mysql://***/stream_coincap
.
.
.
Examples¶
Shell Commands¶
pmake¶
This shell command is used for initial creation of the project structure.
Given a blank project directory
project_dir-----/
Shell Command: path/to/project_dir> pmake FooCollector BarCollector
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Note
This command can also be used without any collector arguments, and collectors can be added using the cadd
shell command.
cadd¶
This shell command is used to add a collector to an existing api2db project
Given the following project structure
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Shell Command: path/to/procect_dir> cadd BarCollector
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
crem¶
This shell command is used to remove a collector registered with an existing api2db project
Given the following project
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Shell Command: path/to/project_dir> crem BarCollector
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
clist¶
This shell command is used to show a list of collectors registered with an existing api2db project
Given the following project
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Shell Command: path/to/procect_dir> clist
Out: ["FooCollector", "BarCollector"]
pclear¶
This shell command is used to clear a project and should ONLY be used if a complete restart is required.
Given the following project
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Shell Command: path/to/project_dir> pclear
project_dir-----/
mlab¶
This shell command is used for creation of a lab. Labs offer an easier way to design an ApiForm.
Given a project directory
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Shell Command: path/to/project_dir> mlab
project_dir-----/
|
apis-------/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-------/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
laboratory-/
| |- lab.py EDIT THIS FILE!
|
helpers.py
|
main.py
rlab¶
This shell command is used to run a lab.
clab¶
This shell command is used to clear a lab.
project_dir-----/
|
apis-------/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-------/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
laboratory-/
| |- lab.py EDIT THIS FILE!
|
helpers.py
|
main.py
Shell Command: path/to/project_dir> clab
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Pre-processing¶
BadRowSwap¶
Note
BadRowSwap should not be used until AFTER ListExtract has been performed on the data, unless performing a list extract is not necessary on the data.
When using BadRowSwap, the following conditions must be met:
The value contained at location
key_1
must be able to be identified as valid, or in need of being swapped without any reference to the value at locationkey_2
. (Typically using regex or performing type-checking)
key_1
andkey_2
must be unique within their respective row of data.data = {"key_1": {"key_1": 1, "key_2": 2}}
would be invalid.
BadRowSwap will potentially drop rows of data. Rows meeting the following conditions will be dropped:
Any row that is missing
key_1
as a key will be dropped.Any row that evaluates as needing to be swapped based on
key_1
that is missingkey_2
will be dropped.
BadRowSwap will keep rows that meet the following conditions:
Any row that evaluates as not needing to be swapped based on
key_1
will be kept, regardless of ifkey_2
exists or not.Any row that evaluates as needing to be swapped based on
key_1
that also containskey_2
will swap the values at the locations of thekey_1
andkey_2
and the row will be kept.
Performing BadRowSwap can be computationally expensive, since it walks all nested data until it finds the desired keys. So here are a few tips to help you determine if you should be using it or not.
Usage Tips for using BadRowSwap:
If both
key_1
andkey_2
are unimportant fields, I.e. Nullable fields and keeping them does not add significant value to the data consider just allowing the collector to Null them if they do not match the types or consider allowing them to simply have the wrong values if they have the same data-types. Otherwise you risk both slowing down data-collection, and dropping rows that have good data other than those swapped rows.Always attempt to place the key at location
key_1
as the more important value to retain. If you need to swap data like a “uuid” and a “description”, use the “uuid” askey_1
If you cannot place the key at location
key_1
as the more important key, consider if the risk of losing data with a valid value for the more important key is worth it in instances where the less important key is missingConsider the frequency that BadRowSwap would need to be run. If 1 out of 1,000,000 data-points contains values with swapped keys, is it worth running the computation on all 1,000,000 rows to save just that 1 row?
Analyze the data by hand. Pull it into a pandas DataFrame, and check it.
How often are is a row incorrect?
Are the erroneous rows ALWAYS the same key?
How often is one of the keys for the row missing when the rows have bad data?
Summary of BadRowSwap usage:
data = [
{
"id": "17.0",
"size": "Foo",
},
{
"id": "Bar",
"size": "10.0"
}
]
pre = BadRowSwap(key_1="id",
key_2="size",
lam=lambda x: re.match("[0-9][0-9]\.[0-9]+", x["id"]) is not None
)
Example Usage of BadRowSwap:
Occasionally when dealing with an API, the data is not always where it is supposed to be. Oftentimes this results in the rows containing the misplaced data being dropped altogether. In the instance that for some unknown reason the incoming data has keys that tend to occasionally have their values swapped so long as it is possible to check to see if the data has been swapped due to what the data should be, use BadRowSwap.
This example assumes that the API occasionally swaps the values for “id” and “latitude”. BadRowSwap can handle any level of nested data in these instances, so long as the keys for the values that are occasionally swapped are unique within a single row
>>> import re
... data = [
... {
... "id": "16.53", # NEEDS SWAP = True
... "place": {
... "coords": {
... "latitude": "ID_1",
... "longitude": "-20.43"
... },
... "name": "place_1"
... },
... "details": "Some details... etc"
... },
...
... {
... "id": "ID_2", # NEEDS SWAP = False
... "place": {
... "coords": {
... "latitude": "15.43",
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... },
...
... {
... "id": "10.21", # NEEDS SWAP = True
... "place": {
... "coords": {
... # Missing "latitude" key, results in row being skipped
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... },
...
... {
... # Missing "id" key, results in row being skipped
... "place": {
... "coords": {
... "latitude": "ID_4",
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... },
...
... {
... "id": "ID_5", # NEEDS SWAP = False
... "place": {
... "coords": {
... # Missing "latitude" row is kept, because no row swap needed
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... }
... ]
...
... pre = BadRowSwap(key_1="id",
... key_2="latitude",
... lam=lambda x: re.match("[0-9][0-9]\.[0-9]+", x["id"]) is not None
... )
...
... pre.lam_wrap(data)
[
{
"id": "ID_1", # "id" and "latitude" have been swapped
"place": {
"coords": {
"latitude": "16.53",
"longitude": "-20.43"
},
"name": "place_1"
},
"details": "Some details... etc"
},
{
"id": "ID_2", # No changes required with this row
"place": {
"coords": {
"latitude": "15.43",
"longitude": "-20.43"
},
"name": "place_2"
},
"details": "Some details... etc"
},
# Row 3, and Row 4 have been dropped because they were missing key_1 or they required a swap and were missing key_2
{
"id": "ID_5", # No changes required with this row
"place": {
"coords": {
# The latitude is still missing but that can be handled later, it may be nullable, so it should be kept
"longitude": "-20.43"
},
"name": "place_2"
},
"details": "Some details... etc"
}
]
FeatureFlatten¶
Note
FeatureFlatten should not be used until AFTER ListExtract has been performed on the data, unless performing a list extract is not necessary on the data.
Summary of FeatureFlatten usage:
data = [
{
"data_id": 1,
"data_features": [
{
"x": 5,
"y": 10
},
{
"x": 7,
"y": 15
},
.
.
.
]
}
]
pre = FeatureFlatten(key="data_features")
Example Usage of FeatureFlatten:
>>> data = [
... {
... "data_id": 1,
... "data_features": {
... "Foo": 5,
... "Bar": 10
... }
... },
...
... {
... "data_id": 2,
... "data_features": [
... {
... "Foo": 5,
... "Bar": 10
... },
... {
... "Foo": 7,
... "Bar": 15
... }
... ]
... }
... ]
... pre = FeatureFlatten(key="data_features")
... pre.lam_wrap(data)
[
{
"data_id": 1,
"data_features": {
"Foo": 5,
"Bar": 10
}
},
{
"data_id": 2,
"data_features": {
"Foo": 5,
"Bar": 10
}
},
{
"data_id": 2,
"data_features": {
"Foo": 7,
"Bar": 15
}
}
]
GlobalExtract¶
Summary of GlobalExtract usage:
data = {"date": "2021-04-19", "data_array": [{"id": 1, "name": "Foo"}, {"id": 2, "name": "Bar"}]}
pre = GlobalExtract(key="publish_time",
lam=lambda x: x["date"],
dtype=str
)
Final DataFrame
id |
name |
publish_time |
---|---|---|
1 |
Foo |
2021-04-19 |
2 |
Bar |
2021-04-19 |
Example Usage of GlobalExtract:
>>> # pre-processing operators
... pres = []
... # Dictionary that contains all globally extracted data
... pre_2_post_dict = {}
... # Incoming Data
... data = {"date": "2021-04-19", "data_array": [{"id": 1, "name": "Foo"}, {"id": 2, "name": "Bar"}]}
... # GlobalExtract instance for extracting the "date" from data, but replacing its key with "publish_time"
... pre = GlobalExtract(key="publish_time",
... lam=lambda x: x["date"],
... dtype=str
... )
... # The preprocessor gets added to the list of preprocessors
... pres.append(pre)
... # Each preprocesser gets applied sequentially
... for p in pres:
... if p.ctype == "global_extract":
... pre_2_post_dict[p.key] = p.lam_wrap(data)
... else:
... pass # See other pre-processors
... pre_2_post_dict
{"publish_time": {"value": "2021-04-19", "dtype": str}}
Later after the data has been extracted to a DataFrame df
# Assume df = DataFrame containing extracted data
# Assume dtype_convert is a function that maps a python native type to a pandas dtype
# For each globally extracted item
for k, v in pre_2_post_dict.items():
# Add the item to the DataFrame -> These are GLOBAL values shared amongst ALL rows
df[k] = v["value"]
# Typecast the value to ensure it is the correct dtype
df[k] = df[k].astype(dtype_convert(v["dtype"]))
Example of what DataFrame would be:
id |
name |
publish_time |
---|---|---|
1 |
Foo |
2021-04-19 |
2 |
Bar |
2021-04-19 |
ListExtract¶
Summary of ListExtract Usage:
data = { "actual_data_rows": [{"id": "row1"}, {"id": "row2"}], "erroneous_data": "FooBar" }
pre = ListExtract(lam=lambda x: x["actual_data_rows"])
Example Usage of ListExtract:
>>> data = {
... "Foo": "Metadata",
... "data_array": [
... {
... "data_id": 1,
... "name": "name_1"
... },
... {
... "data_id": 2,
... "name": "name_2"
... }
... ]
... }
...
... pre = ListExtract(lam=lambda x: x["data_array"])
... pre.lam_wrap(data)
[
{
"data_id": 1,
"name": "name_1"
},
{
"data_id": 2,
"name": "name_2"
}
]
Extracting data-features¶
Summary of Feature Usage:¶
data = [{"id": 1, "name": "Foo", "nest0": {"nest1": {"x": True}, "y": 14.3 } }, ... ]
data_features = [
Feature(key="uuid", lam=lambda x: x["id"], dtype=int), # Extracts "id" and rename it to "uuid"
Feature(key="name", lam=lambda x: x["name"], dtype=str), # Will extract "name" keeping the key as "name"
Feature(key="x", lam=lambda x: x["nest0"]["nest1"]["x"], dtype=bool), # Will extract "x"
Feature(key="y", lam=lambda x: x["nest0"]["y"], dtype=bool) # Will extract "y"
]
Post-processing¶
ColumnAdd¶
Summary of ColumnAdd Usage:
DataFrame df
Foo |
Bar |
---|---|
1 |
A |
2 |
B |
3 |
C |
post = ColumnAdd(key="FooBar", lam=lambda: 5, dtype=int)
DataFrame df
Foo |
Bar |
FooBar |
---|---|---|
1 |
A |
5 |
2 |
B |
5 |
3 |
C |
5 |
Example Usage of ColumnAdd:
>>> import pandas as pd
... def f():
... return 5
... df = pd.DataFrame({"Foo": [1, 2, 3], "Bar": ["A", "B", "C"]}) # Setup
...
... post = ColumnAdd(key="timestamp", lam=lambda x: f, dtype=int)
... post.lam_wrap(df)
pd.DataFrame({"Foo": [1, 2, 3], "Bar": ["A", "B", "C"], "FooBar": [5, 5, 5]})
ColumnApply¶
Summary of ColumnApply Usage:
DataFrame df
Foo |
Bar |
---|---|
1 |
A |
2 |
B |
3 |
C |
post = ColumnApply(key="Foo", lam=lambda x: x + 1, dtype=int)
DataFrame df
Foo |
Bar |
---|---|
2 |
A |
3 |
B |
4 |
C |
Example Usage of ColumnApply:
>>> import pandas as pd
... df = pd.DataFrame({"Foo": [1, 2, 3], "Bar": ["A", "B", "C"]}) # Setup
...
... post = ColumnApply(key="Foo", lam=lambda x: x + 1, dtype=int)
... post.lam_wrap(df)
pd.DataFrame({"Foo": [2, 3, 4], "Bar": ["A", "B", "C"]})
ColumnsCalculate¶
Note
ColumnsCalculate can be used to
Replace columns in a DataFrame with calculated values
Add new columns to a DataFrame based on calculations from existing columns
Summary of ColumnsCalculate Usage:
DataFrame df
Foo |
Bar |
---|---|
1 |
2 |
2 |
4 |
3 |
8 |
def foobar(df):
df["Foo+Bar"] = df["Foo"] + df["Bar"]
df["Foo*Bar"] = df["Foo"] * df["Bar"]
return df[["Foo+Bar", "Foo*Bar"]]
post = ColumnsCalculate(keys=["Foo+Bar", "Foo*Bar"], lam=lambda x: foobar(x), dtype=int)
DataFrame df
Foo |
Bar |
Foo+Bar |
Foo*Bar |
---|---|---|---|
1 |
2 |
3 |
2 |
2 |
4 |
6 |
8 |
3 |
8 |
11 |
24 |
Example Usage of ColumnsCalculate:
>>> import pandas as pd
... df = pd.DataFrame({"Foo": [1, 2, 3], "Bar": [2, 4, 8]}) # Setup
...
... def foobar(d):
... d["Foo+Bar"] = d["Foo"] + d["Bar"]
... d["Foo*Bar"] = d["Foo"] * d["Bar"]
... return d[["Foo+Bar", "Foo*Bar"]]
...
... post = ColumnsCalculate(keys=["Foo+Bar", "Foo*Bar"], lam=lambda x: foobar(x), dtype=int)
... post.lam_wrap(df)
pd.DataFrame({"Foo+Bar": [3, 6, 11], "Foo*Bar": [2, 8, 24]})
DateCast¶
Summary of DateCast Usage:
DataFrame df
Foo |
Bar |
---|---|
2021-04-29 01:39:00 |
False |
2021-04-29 01:39:00 |
False |
Bar! |
True |
DataFrame df.dtypes
Foo |
Bar |
---|---|
string |
bool |
post = DateCast(key="Foo", fmt="%Y-%m-%d %H:%M:%S")
DataFrame df
Foo |
Bar |
---|---|
2021-04-29 01:39:00 |
False |
2021-04-29 01:39:00 |
False |
NaT |
True |
DataFrame df.dtypes
Foo |
Bar |
---|---|
datetime64[ns] |
bool |
DropNa¶
Simply a shortcut class for a common operation.
Summary of DropNa Usage:
See pandas Documentation
MergeStatic¶
Note
MergeStatic is used to merge data together. A common use case of this is in situations where a data-vendor provides an API that gives data-points “Foo”, “Bar”, and “location_id” where “location_id” references a different data-set.
It is common for data-providers to have a file that does not update very frequently, i.e. is mostly static that contains this information.
The typical workflow of a MergeStatic instance is as follows:
Create a LocalStream with mode set to update or replace and a target like CACHE/my_local_stream.pickle
Set the LocalStream to run periodically (6 hours, 24 hours, 10 days, whatever frequency this data is updated)
Add a MergeStatic object to the frequently updating datas post-processors and set the path to the LocalStream storage path.
The future of api2db¶
Future Plans¶
Add support for more storage targets
Oracle
Amazon Aurora
MariaDB
Microsoft SQL Server
Firebase RTDB
Add support for uploading in chunks and resumable uploads
Add support for sharding data collection across multiple servers.
If one server does not send a heartbeat out after a certain period of time, another server begins collecting data.
Add 100% test coverage for code-base
Add library support utilizing the ApiForm to create database migration support
Treat a database as a stream, and pull data from it in chunks before migrating to a new target
Used to switch databases, and also clean messy database data
Add ML targets that can be attached directly to streams
Allow for streams to feed directly into predictive models
Add support for an api2db implementation
Performs things in a manner opposite of api2db
Objects such as an EndPoint object used to create Api Endpoints
Take any database, and turn it into an API
Include role-based authentication
Remove BaseLam object
Since collectors run in a single process, this needs depreciated. No need to serialize the state
Add additional pre/post processors
Listen to what users want upon release, and implement them
Remove unnecessary strings
Fix implementations using strings to represent object types I.e. ctype in processors
Use isinstance, and pandas.api.types.is_x_type
Add support for GPU processing of data
Allow for ingestion to be performed on a GPU for high-volume streams
Rewrite performance critical areas of application in C
Create a Store2Local object that can be used to aggregate storage in time intervals
Add support for messaging
Redis Pub/Sub
Google Cloud Pub/Sub
Kafka Pub/Sub
Using Firestore
Add the ability to generate live insights
As data arrives, create the ability to perform rolling averages of data
Allow for chaining messaging abilities onto streams
api2db¶
api2db package¶
Subpackages¶
api2db.app package¶
Submodules¶
api2db.app.api2db module¶
-
api2db.app.api2db.
DEV_SHRINK_DATA
= 0¶ Library developer setting to shrink incoming data to the first DEV_SHRINK_DATA rows
- Type
int
-
class
api2db.app.api2db.
Api2Db
(collector: api2db.ingest.collector.Collector)¶ Bases:
object
Performs data import, passes data to streams, and schedules data storage
-
__init__
(collector: api2db.ingest.collector.Collector)¶ Creates a Api2Db object and attaches the collector
- Parameters
collector – The collector object to attach to
-
wrap_start
() → multiprocessing.context.Process¶ Starts the running loop of an Api2Db instance in a spawned process
- Returns
The process spawned with target start
-
start
() → None¶ The target for Api2Db main process running loop
- Returns
None
-
schedule
() → None¶ schedule starts the streams, schedules collector refresh, schedules storage refresh
- Returns
None
- Raises
NameError or ModuleNotFoundError if streams cannot be created –
-
static
collect_wrap
(import_target: Callable[], Optional[List[dict]]], api_form: Callable[], api2db.ingest.api_form.ApiForm], stream_qs: List[queue.Queue], stream_locks: List[_thread.allocate_lock]) → Optional[type]¶ Starts/restarts dead streams, and calls method collect to import data
- Parameters
import_target – Function that returns data imported from an Api
api_form – Function that instantiates and returns an ApiForm object
stream_qs – A list of queues to pass the incoming data into to be handled by stream targets
stream_locks – A list of locks that become acquirable if their respective stream has died
- Returns
CancelJob if stream has died, restarting the streams, None otherwise
-
static
collect
(import_target: Callable[], Optional[List[dict]]], api_form: Callable[], api2db.ingest.api_form.ApiForm], stream_qs: List[queue.Queue]) → None¶ Performs a data-import, cleans the data, and sends the data into its stream queues
- Parameters
import_target – Function that returns data imported from an Api
api_form – Function that instantiates and returns an ApiForm object
stream_qs – A list of queues to pass the incoming data into to be handled by stream targets
- Returns
None
-
static
store_wrap
(stores: Callable[], List[api2db.store.store.Store]]) → None¶ Checks to ensure that storage jobs are scheduled to run and schedules any jobs that have been unscheduled
- Parameters
stores – Function that returns a list of Store subclassed objects
- Returns
None
- Raises
NameError or ModuleNotFoundError if stores cannot be created –
-
static
store
(store: api2db.store.store.Store) → None¶ Performs the data storage operation of a Store subclass
- Parameters
store – The Store to perform storage on
- Returns
None
-
static
import_handle
(e: Exception) → Exception¶ Handles import errors. Informs the user of libraries they need
- Parameters
e – The raised Exception
- Returns
ModuleNotFoundError if dependencies missing otherwise the original exception
-
api2db.app.auth_manager module¶
-
api2db.app.auth_manager.
auth_manage
(path: str) → Optional[dict]¶ Loads authentication credentials from the specified path
- Parameters
path – The path where the authentication file resides
- Returns
Authentication credentials if file successfully loaded, None otherwise
api2db.app.log module¶
-
api2db.app.log.
get_logger
(filename: Optional[str] = None, q: Optional[multiprocessing.context.BaseContext.Queue] = None) → logging.Logger¶ Retrieves the logger for the current process for logging to the log file
If no filename is provided, the logger for the current process is assumed to already have handlers registered, and will be returned.
If a filename is provided an the logger has no handlers, a handler will be created and registered
- Parameters
filename – The name of the file to log to
q – The queue used to pass messages if the collector is running in debug mode
- Returns
A logger that can be used to log messages
api2db.app.run module¶
-
class
api2db.app.run.
Run
(collectors: List[api2db.ingest.collector.Collector])¶ Bases:
object
Serves as the main entry point for the application
-
__init__
(collectors: List[api2db.ingest.collector.Collector])¶ The Run object is the application entry point
- Parameters
collectors – A list of collector objects to collect data for
-
q
¶ Used for message passing for collectors with debug mode enabled
- Type
multiprocessing.Queue
-
run
()¶ Starts the application
- Returns
None
-
multiprocessing_start
()¶ Starts each collector in it’s own process
- Returns
None
-
Module contents¶
Original Author |
Tristen Harr |
Creation Date |
04/27/2021 |
Revisions |
None |
api2db.ingest package¶
Subpackages¶
data = [{"id": 1, "name": "Foo", "nest0": {"nest1": {"x": True}, "y": 14.3 } }, ... ]
data_features = [
Feature(key="uuid", lam=lambda x: x["id"], dtype=int), # Extracts "id" and rename it to "uuid"
Feature(key="name", lam=lambda x: x["name"], dtype=str), # Will extract "name" keeping the key as "name"
Feature(key="x", lam=lambda x: x["nest0"]["nest1"]["x"], dtype=bool), # Will extract "x"
Feature(key="y", lam=lambda x: x["nest0"]["y"], dtype=bool) # Will extract "y"
]
-
class
api2db.ingest.data_feature.feature.
Feature
(key: str, lam: Callable[[dict], Any], dtype: Any, nan_int: Optional[int] = None, nan_float: Optional[float] = None, nan_bool: Optional[bool] = False, nan_str: Optional[str] = None)¶ Bases:
api2db.ingest.base_lam.BaseLam
Used to extract a data-feature from incoming data
-
__init__
(key: str, lam: Callable[[dict], Any], dtype: Any, nan_int: Optional[int] = None, nan_float: Optional[float] = None, nan_bool: Optional[bool] = False, nan_str: Optional[str] = None)¶ Creates a Feature object
Note
All values default to nulling the data that cannot be type-casted to its expected type. For the majority of instances this is going to be the programmers desired effect. If there is a way to make it so that the data can be cleaned in order to prevent it from being nulled, that should be done using the libraries pre-processing tools. If the data cannot be cleaned in pre-processing and it cannot be type-casted to its expected type, then it is by definition worthless. If it is possible to clean it, it can be cleaned in pre-processing, although it may require the programmer to subclass
Pre
- Parameters
key – The name of the column that will be stored in the storage target
lam – Function that takes as parameter a dictionary, and returns where the data the programmer wants should be. api2db handles null data and unexpected data types automatically
dtype – The python native type of the data feature
nan_int – If specified and
dtype
isint
this value will be used to replace null values and values that fail to be casted to typeint
nan_float – If specified and
dtype
isfloat
this value will be used to replace null values and values that fail to be casted to typefloat
nan_bool – If specified and
dtype
isbool
this value will be used to replace null values and values that fail to be casted to typebool
nan_str – If specified and
dtype
isstr
this value will be used to replace null values and values that fail to be casted to typestr
-
lam_wrap
(data: dict) → Any¶ Overrides super class method
Extracts a feature from incoming data
Workflow:
Attempt to call
lam
on data to get data-featureAttempt to typecast result to
dtype
If
dtype
isstr
and the result.lower() is “none”, “nan”, “null”, or “nil” replace it withnan_str
If an exception occurs when attempting any of the above, set the result to None
Return the result
- Parameters
data – A dictionary of incoming data representing a single row in a DataFrame
- Returns
The extracted data-feature
-
Original Author |
Tristen Harr |
Creation Date |
04/29/2021 |
Revisions |
None |
DataFrame df
Foo |
Bar |
---|---|
1 |
A |
2 |
B |
3 |
C |
post = ColumnAdd(key="FooBar", lam=lambda: 5, dtype=int)
DataFrame df
Foo |
Bar |
FooBar |
---|---|---|
1 |
A |
5 |
2 |
B |
5 |
3 |
C |
5 |
>>> import pandas as pd
... def f():
... return 5
... df = pd.DataFrame({"Foo": [1, 2, 3], "Bar": ["A", "B", "C"]}) # Setup
...
... post = ColumnAdd(key="timestamp", lam=lambda x: f, dtype=int)
... post.lam_wrap(df)
pd.DataFrame({"Foo": [1, 2, 3], "Bar": ["A", "B", "C"], "FooBar": [5, 5, 5]})
-
class
api2db.ingest.post_process.column_add.
ColumnAdd
(key: str, lam: Callable[], Any], dtype: Any)¶ Bases:
api2db.ingest.post_process.post.Post
Used to add global values to a DataFrame, primarily for timestamps/ids
-
__init__
(key: str, lam: Callable[], Any], dtype: Any)¶ Creates a ColumnAdd object
- Parameters
key – The column name for the DataFrame
lam – A function that returns the value that should be globally placed into the DataFrame in
key
columndtype – The python native type of the functions return
-
ctype
¶ type of the data processor
- Type
str
-
lam_wrap
(lam_arg: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame¶ Overrides super class method
Workflow:
Assign the
lam
function return tolam_arg[self.key]
Typecast
lam_arg[self.key]
todtype
Return
lam_arg
- Parameters
lam_arg – The DataFrame to add a column to
- Returns
The modified DataFrame
-
DataFrame df
Foo |
Bar |
---|---|
1 |
A |
2 |
B |
3 |
C |
post = ColumnApply(key="Foo", lam=lambda x: x + 1, dtype=int)
DataFrame df
Foo |
Bar |
---|---|
2 |
A |
3 |
B |
4 |
C |
>>> import pandas as pd
... df = pd.DataFrame({"Foo": [1, 2, 3], "Bar": ["A", "B", "C"]}) # Setup
...
... post = ColumnApply(key="Foo", lam=lambda x: x + 1, dtype=int)
... post.lam_wrap(df)
pd.DataFrame({"Foo": [2, 3, 4], "Bar": ["A", "B", "C"]})
-
class
api2db.ingest.post_process.column_apply.
ColumnApply
(key: str, lam: Callable[[Any], Any], dtype: Any)¶ Bases:
api2db.ingest.post_process.post.Post
Used to apply a function across the rows in a column of a DataFrame
-
__init__
(key: str, lam: Callable[[Any], Any], dtype: Any)¶ Creates a ColumnApply Object
- Parameters
key – The column to apply the function to
lam – The function to apply
dtype – The python native type of the function output
-
ctype
¶ type of data processor
- Type
str
-
lam_wrap
(lam_arg: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame¶ Overrides a super class method
Workflow:
Apply
lam
tolam_arg[self.key]
Cast
lam_arg[self.key]
todtype
Return
lam_arg
- Parameters
lam_arg – The DataFrame to modify
- Returns
The modified DataFrame
-
Note
ColumnsCalculate can be used to
Replace columns in a DataFrame with calculated values
Add new columns to a DataFrame based on calculations from existing columns
DataFrame df
Foo |
Bar |
---|---|
1 |
2 |
2 |
4 |
3 |
8 |
def foobar(df):
df["Foo+Bar"] = df["Foo"] + df["Bar"]
df["Foo*Bar"] = df["Foo"] * df["Bar"]
return df[["Foo+Bar", "Foo*Bar"]]
post = ColumnsCalculate(keys=["Foo+Bar", "Foo*Bar"], lam=lambda x: foobar(x), dtype=int)
DataFrame df
Foo |
Bar |
Foo+Bar |
Foo*Bar |
---|---|---|---|
1 |
2 |
3 |
2 |
2 |
4 |
6 |
8 |
3 |
8 |
11 |
24 |
>>> import pandas as pd
... df = pd.DataFrame({"Foo": [1, 2, 3], "Bar": [2, 4, 8]}) # Setup
...
... def foobar(d):
... d["Foo+Bar"] = d["Foo"] + d["Bar"]
... d["Foo*Bar"] = d["Foo"] * d["Bar"]
... return d[["Foo+Bar", "Foo*Bar"]]
...
... post = ColumnsCalculate(keys=["Foo+Bar", "Foo*Bar"], lam=lambda x: foobar(x), dtype=int)
... post.lam_wrap(df)
pd.DataFrame({"Foo+Bar": [3, 6, 11], "Foo*Bar": [2, 8, 24]})
-
class
api2db.ingest.post_process.columns_calculate.
ColumnsCalculate
(keys: List[str], lam: Callable[[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame], dtypes: List[Any])¶ Bases:
api2db.ingest.post_process.post.Post
Used to calculate new column values to add to the DataFrame
-
__init__
(keys: List[str], lam: Callable[[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame], dtypes: List[Any])¶ Creates a ColumnsCalculate object
- Parameters
keys – A list of the keys to add/replace in the existing DataFrame
lam – A function that takes as parameter a DataFrame, and returns a DataFrame with column names matching
keys
and the columns having/being castable todtypes
dtypes – A list of python native types that are associated with
keys
-
ctype
¶ type of data processor
- Type
str
-
lam_wrap
(lam_arg: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame¶ Overrides super class method
Workflow:
Create a temporary DataFrame
tmp_df
by applyinglam
tolam_arg
For each
key
inself.keys
setlam_arg[key] = tmp_df[key]
For each
key
inself.keys
castlam_arg[key]
to the appropriate pandas dtypeReturn
lam_arg
- Parameters
lam_arg – The DataFrame to modify
- Returns
The modified DataFrame
-
DataFrame df
Foo |
Bar |
---|---|
2021-04-29 01:39:00 |
False |
2021-04-29 01:39:00 |
False |
Bar! |
True |
DataFrame df.dtypes
Foo |
Bar |
---|---|
string |
bool |
post = DateCast(key="Foo", fmt="%Y-%m-%d %H:%M:%S")
DataFrame df
Foo |
Bar |
---|---|
2021-04-29 01:39:00 |
False |
2021-04-29 01:39:00 |
False |
NaT |
True |
DataFrame df.dtypes
Foo |
Bar |
---|---|
datetime64[ns] |
bool |
-
class
api2db.ingest.post_process.date_cast.
DateCast
(key: str, fmt: str)¶ Bases:
api2db.ingest.post_process.post.Post
Used to cast columns containing dates in string format to pandas DateTimes
-
__init__
(key: str, fmt: str)¶ Creates a DateCast object
- Parameters
key – The name of the column containing strings that should be cast to datetimes
fmt – A string formatter that specifies the datetime format of the strings in the column named
key
-
ctype
¶ type of data processor
- Type
str
-
lam_wrap
(lam_arg: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame¶ Overrides super class method
Workflow:
Attempt to cast
lam_arg[self.key]
from strings to datetimesReturn the modified
lam_arg
- Parameters
lam_arg – The DataFrame to modify
- Returns
The modified DataFrame
-
Simply a shortcut class for a common operation.
See pandas Documentation
-
class
api2db.ingest.post_process.drop_na.
DropNa
(keys: List[str])¶ Bases:
api2db.ingest.post_process.post.Post
Used to drop columns with null values on specified keys
-
__init__
(keys: List[str])¶ Creates a DropNa object
- Parameters
keys – The subset of keys to drop if the keys are null
-
ctype
¶ type of data processor
- Type
str
-
lam_wrap
(lam_arg: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame¶ Overrides super class method
Shortcut used to drop null values. Performs
pd.DataFrame.drop_na(subset=self.keys)
- Parameters
lam_arg – The DataFrame to modify
- Returns
The modified DataFrame
-
Note
MergeStatic is used to merge data together. A common use case of this is in situations where a data-vendor provides an API that gives data-points “Foo”, “Bar”, and “location_id” where “location_id” references a different data-set.
It is common for data-providers to have a file that does not update very frequently, i.e. is mostly static that contains this information.
The typical workflow of a MergeStatic instance is as follows:
Create a LocalStream with mode set to update or replace and a target like CACHE/my_local_stream.pickle
Set the LocalStream to run periodically (6 hours, 24 hours, 10 days, whatever frequency this data is updated)
Add a MergeStatic object to the frequently updating datas post-processors and set the path to the LocalStream storage path.
-
class
api2db.ingest.post_process.merge_static.
MergeStatic
(key: str, path: str)¶ Bases:
api2db.ingest.post_process.post.Post
Merges incoming data with a locally stored DataFrame
-
__init__
(key: str, path: str)¶ Creates a MergeStatic object
- Parameters
key – The key that the DataFrames should be merged on
path – The path to the locally stored file containing the pickled DataFrame to merge with
-
ctype
¶ type of data processor
- Type
str
-
lam_wrap
(lam_arg: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame¶ Overrides super class method
Workflow:
Load DataFrame
df
from file specified atself.path
Use
lam_arg
to perform left-merge onself.key
merging withdf
Return the modified DataFrame
- Parameters
lam_arg – The DataFrame to modify
- Returns
The modified DataFrame
-
-
class
api2db.ingest.post_process.post.
Post
¶ Bases:
api2db.ingest.base_lam.BaseLam
Used as a BaseClass for all PostProcessors
-
static
typecast
(dtype: Any) → str¶ Yields a string that can be used for typecasting to pandas dtype.
- Parameters
dtype – A python native type
- Returns
A string that can be used in conjunction with a pandas DataFrame/Series for typecasting
-
static
Original Author |
Tristen Harr |
Creation Date |
04/29/2021 |
Revisions |
None |
Note
BadRowSwap should not be used until AFTER ListExtract has been performed on the data, unless performing a list extract is not necessary on the data.
When using BadRowSwap, the following conditions must be met:
The value contained at location
key_1
must be able to be identified as valid, or in need of being swapped without any reference to the value at locationkey_2
. (Typically using regex or performing type-checking)
key_1
andkey_2
must be unique within their respective row of data.data = {"key_1": {"key_1": 1, "key_2": 2}}
would be invalid.
BadRowSwap will potentially drop rows of data. Rows meeting the following conditions will be dropped:
Any row that is missing
key_1
as a key will be dropped.Any row that evaluates as needing to be swapped based on
key_1
that is missingkey_2
will be dropped.
BadRowSwap will keep rows that meet the following conditions:
Any row that evaluates as not needing to be swapped based on
key_1
will be kept, regardless of ifkey_2
exists or not.Any row that evaluates as needing to be swapped based on
key_1
that also containskey_2
will swap the values at the locations of thekey_1
andkey_2
and the row will be kept.
Performing BadRowSwap can be computationally expensive, since it walks all nested data until it finds the desired keys. So here are a few tips to help you determine if you should be using it or not.
Usage Tips for using BadRowSwap:
If both
key_1
andkey_2
are unimportant fields, I.e. Nullable fields and keeping them does not add significant value to the data consider just allowing the collector to Null them if they do not match the types or consider allowing them to simply have the wrong values if they have the same data-types. Otherwise you risk both slowing down data-collection, and dropping rows that have good data other than those swapped rows.Always attempt to place the key at location
key_1
as the more important value to retain. If you need to swap data like a “uuid” and a “description”, use the “uuid” askey_1
If you cannot place the key at location
key_1
as the more important key, consider if the risk of losing data with a valid value for the more important key is worth it in instances where the less important key is missingConsider the frequency that BadRowSwap would need to be run. If 1 out of 1,000,000 data-points contains values with swapped keys, is it worth running the computation on all 1,000,000 rows to save just that 1 row?
Analyze the data by hand. Pull it into a pandas DataFrame, and check it.
How often are is a row incorrect?
Are the erroneous rows ALWAYS the same key?
How often is one of the keys for the row missing when the rows have bad data?
data = [
{
"id": "17.0",
"size": "Foo",
},
{
"id": "Bar",
"size": "10.0"
}
]
pre = BadRowSwap(key_1="id",
key_2="size",
lam=lambda x: re.match("[0-9][0-9]\.[0-9]+", x["id"]) is not None
)
Occasionally when dealing with an API, the data is not always where it is supposed to be. Oftentimes this results in the rows containing the misplaced data being dropped altogether. In the instance that for some unknown reason the incoming data has keys that tend to occasionally have their values swapped so long as it is possible to check to see if the data has been swapped due to what the data should be, use BadRowSwap.
This example assumes that the API occasionally swaps the values for “id” and “latitude”. BadRowSwap can handle any level of nested data in these instances, so long as the keys for the values that are occasionally swapped are unique within a single row
>>> import re
... data = [
... {
... "id": "16.53", # NEEDS SWAP = True
... "place": {
... "coords": {
... "latitude": "ID_1",
... "longitude": "-20.43"
... },
... "name": "place_1"
... },
... "details": "Some details... etc"
... },
...
... {
... "id": "ID_2", # NEEDS SWAP = False
... "place": {
... "coords": {
... "latitude": "15.43",
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... },
...
... {
... "id": "10.21", # NEEDS SWAP = True
... "place": {
... "coords": {
... # Missing "latitude" key, results in row being skipped
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... },
...
... {
... # Missing "id" key, results in row being skipped
... "place": {
... "coords": {
... "latitude": "ID_4",
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... },
...
... {
... "id": "ID_5", # NEEDS SWAP = False
... "place": {
... "coords": {
... # Missing "latitude" row is kept, because no row swap needed
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... }
... ]
...
... pre = BadRowSwap(key_1="id",
... key_2="latitude",
... lam=lambda x: re.match("[0-9][0-9]\.[0-9]+", x["id"]) is not None
... )
...
... pre.lam_wrap(data)
[
{
"id": "ID_1", # "id" and "latitude" have been swapped
"place": {
"coords": {
"latitude": "16.53",
"longitude": "-20.43"
},
"name": "place_1"
},
"details": "Some details... etc"
},
{
"id": "ID_2", # No changes required with this row
"place": {
"coords": {
"latitude": "15.43",
"longitude": "-20.43"
},
"name": "place_2"
},
"details": "Some details... etc"
},
# Row 3, and Row 4 have been dropped because they were missing key_1 or they required a swap and were missing key_2
{
"id": "ID_5", # No changes required with this row
"place": {
"coords": {
# The latitude is still missing but that can be handled later, it may be nullable, so it should be kept
"longitude": "-20.43"
},
"name": "place_2"
},
"details": "Some details... etc"
}
]
-
class
api2db.ingest.pre_process.bad_row_swap.
BadRowSwap
(key_1: str, key_2: str, lam: Callable[[dict], bool])¶ Bases:
api2db.ingest.pre_process.pre.Pre
Used to swap rows arriving from the API that have the values for the given key swapped occasionally
-
__init__
(key_1: str, key_2: str, lam: Callable[[dict], bool])¶ Creates a BadRowSwap object
- Parameters
key_1 – The key of a field that occasionally has its value swapped with the data from
key_2
key_2 – The key of a field that occasionally has its value swapped with the data from
key_1
lam – A function (anonymous, or not) that when given the value located under
key_1
returns True if the keys need their values swapped otherwise returns False
-
lam_wrap
(lam_arg: List[dict]) → List[dict]¶ Overrides super class method
- Parameters
lam_arg – A list of dictionaries with each dictionary containing what will become a row in a DataFrame
- Returns
A modified list of dictionaries with bad rows being either swapped or dropped.
-
Note
FeatureFlatten should not be used until AFTER ListExtract has been performed on the data, unless performing a list extract is not necessary on the data.
data = [
{
"data_id": 1,
"data_features": [
{
"x": 5,
"y": 10
},
{
"x": 7,
"y": 15
},
.
.
.
]
}
]
pre = FeatureFlatten(key="data_features")
>>> data = [
... {
... "data_id": 1,
... "data_features": {
... "Foo": 5,
... "Bar": 10
... }
... },
...
... {
... "data_id": 2,
... "data_features": [
... {
... "Foo": 5,
... "Bar": 10
... },
... {
... "Foo": 7,
... "Bar": 15
... }
... ]
... }
... ]
... pre = FeatureFlatten(key="data_features")
... pre.lam_wrap(data)
[
{
"data_id": 1,
"data_features": {
"Foo": 5,
"Bar": 10
}
},
{
"data_id": 2,
"data_features": {
"Foo": 5,
"Bar": 10
}
},
{
"data_id": 2,
"data_features": {
"Foo": 7,
"Bar": 15
}
}
]
-
class
api2db.ingest.pre_process.feature_flatten.
FeatureFlatten
(key: str)¶ Bases:
api2db.ingest.pre_process.pre.Pre
Used to flatten features containing arrays causing them to be incompatible for storage in a table-based schema
-
__init__
(key: str)¶ Creates a FeatureFlatten object
- Parameters
key – The key containing nested data that each needs to have its own row in the final DataFrame
-
ctype
¶ type of data processor
- Type
str
-
lam_wrap
(lam_arg: Optional[List[dict]]) → List[dict]¶ Overrides super class method
Workflow:
Create an array of
rows
For each dictionary
d
in the array of data-pointsIf the type of
self.key
is ind.keys()
andtype(d[self.key]) == list
For each item in list
Create a new row containing all data-features and the item by itself and add it to
rows
If the type of
self.key
is ind.keys()
andtype(d[self.key]) == dict
Keep the row as it is, and add it to
rows
- Parameters
lam_arg – A list of dictionaries that each represent a row in the final DataFrame (Optional to safeguard against if previous pre-processors could not parse data, i.e. No data-points existed)
- Returns
An array of dictionaries that each represent a row, with nested data extracted to their own rows
-
data = {"date": "2021-04-19", "data_array": [{"id": 1, "name": "Foo"}, {"id": 2, "name": "Bar"}]}
pre = GlobalExtract(key="publish_time",
lam=lambda x: x["date"],
dtype=str
)
Final DataFrame
id |
name |
publish_time |
---|---|---|
1 |
Foo |
2021-04-19 |
2 |
Bar |
2021-04-19 |
>>> # pre-processing operators
... pres = []
... # Dictionary that contains all globally extracted data
... pre_2_post_dict = {}
... # Incoming Data
... data = {"date": "2021-04-19", "data_array": [{"id": 1, "name": "Foo"}, {"id": 2, "name": "Bar"}]}
... # GlobalExtract instance for extracting the "date" from data, but replacing its key with "publish_time"
... pre = GlobalExtract(key="publish_time",
... lam=lambda x: x["date"],
... dtype=str
... )
... # The preprocessor gets added to the list of preprocessors
... pres.append(pre)
... # Each preprocesser gets applied sequentially
... for p in pres:
... if p.ctype == "global_extract":
... pre_2_post_dict[p.key] = p.lam_wrap(data)
... else:
... pass # See other pre-processors
... pre_2_post_dict
{"publish_time": {"value": "2021-04-19", "dtype": str}}
Later after the data has been extracted to a DataFrame df
# Assume df = DataFrame containing extracted data
# Assume dtype_convert is a function that maps a python native type to a pandas dtype
# For each globally extracted item
for k, v in pre_2_post_dict.items():
# Add the item to the DataFrame -> These are GLOBAL values shared amongst ALL rows
df[k] = v["value"]
# Typecast the value to ensure it is the correct dtype
df[k] = df[k].astype(dtype_convert(v["dtype"]))
Example of what DataFrame would be:
id |
name |
publish_time |
---|---|---|
1 |
Foo |
2021-04-19 |
2 |
Bar |
2021-04-19 |
-
class
api2db.ingest.pre_process.global_extract.
GlobalExtract
(key: str, lam: Callable[[dict], Any], dtype)¶ Bases:
api2db.ingest.pre_process.pre.Pre
Used to extract a global feature from incoming data
-
__init__
(key: str, lam: Callable[[dict], Any], dtype)¶ Creates a GlobalExtract object
- Parameters
key – The desired key of the feature for the storage target
lam – Anonymous function that specifies where the location of the feature that should be extracted is
dtype – The python native datatype the feature is expected to be
-
ctype
¶ type of the data processor
- Type
str
-
lam_wrap
(lam_arg: dict) → dict¶ Overrides super class method
Workflow:
Attempt to perform the
lam
operation on the incoming dataAttempt to cast the result of the
lam
operation to thedtype
If an exception occurs, returns {“value”: None, “dtype”:
dtype
}
Return {“value”:
result
, “dtype”:dtype
}
- Parameters
lam_arg – A dictionary containing the feature that should be extracted
- Returns
result or None
, “dtype”:dtype
}- Return type
A dictionary containing {“value”
-
data = { "actual_data_rows": [{"id": "row1"}, {"id": "row2"}], "erroneous_data": "FooBar" }
pre = ListExtract(lam=lambda x: x["actual_data_rows"])
>>> data = {
... "Foo": "Metadata",
... "data_array": [
... {
... "data_id": 1,
... "name": "name_1"
... },
... {
... "data_id": 2,
... "name": "name_2"
... }
... ]
... }
...
... pre = ListExtract(lam=lambda x: x["data_array"])
... pre.lam_wrap(data)
[
{
"data_id": 1,
"name": "name_1"
},
{
"data_id": 2,
"name": "name_2"
}
]
-
class
api2db.ingest.pre_process.list_extract.
ListExtract
(lam: Callable[[dict], list])¶ Bases:
api2db.ingest.pre_process.pre.Pre
Used to extract a list of dictionaries that will each represent a single row in a database
-
__init__
(lam: Callable[[dict], list])¶ Creates a ListExtract object
- Parameters
lam – Anonymous function that attempts to extract a list of data that will become rows in a DataFrame
-
ctype
¶ type of data processor
- Type
str
-
dtype
¶ the datatype performing lam should yield
- Type
type(list)
-
lam_wrap
(lam_arg: dict) → Optional[List[dict]]¶ Overrides super class method
Workflow:
Attempt to perform the
lam
operation on the incoming dataAttempt to cast the result
lam
operation to a listIf an exception occurs, return None
Return the list of data
- Parameters
lam_arg – A dictionary containing a list of dictionaries that will become the rows of a DataFrame
- Returns
A list of dictionaries that will become the rows of a DataFrame if successful otherwise None
-
-
class
api2db.ingest.pre_process.pre.
Pre
¶ Bases:
api2db.ingest.base_lam.BaseLam
Direct subclass of BaseLam with no overriders, members, or methods. Exists solely for organizational purposes
Original Author |
Tristen Harr |
Creation Date |
04/29/2021 |
Revisions |
None |
Submodules¶
api2db.ingest.api2pandas module¶
-
class
api2db.ingest.api2pandas.
Api2Pandas
(api_form: Callable[], api2db.ingest.api_form.ApiForm])¶ Bases:
object
Used to extract incoming data from an API into a pandas DataFrame
-
__init__
(api_form: Callable[], api2db.ingest.api_form.ApiForm])¶ Creates a Api2Pandas object and loads its ApiForm
- Parameters
api_form – The function that generates the ApiForm for the associated collector
-
dependencies_satisfied
() → bool¶ Checks to ensure any data-linking dependency files exist
This feature currently only exists for
api2db.ingest.post_process.merge_static.MergeStatic
- Returns
True if all dependencies are satisfied, otherwise False
-
extract
(data: dict) → Optional[pandas.core.frame.DataFrame]¶ Performs data-extraction from data arriving from an API.
Workflow:
Perform all pre-processing on data
Perform all data-feature extraction
Perform all post-processing on data
Return a DataFrame containing the cleaned data.
- Parameters
data – The data arriving from an API to perform data extraction on.
- Returns
The cleaned data if it is possible to clean the data otherwise None
-
api2db.ingest.api_form module¶
-
class
api2db.ingest.api_form.
ApiForm
(name: str, pre_process: Optional[List[api2db.ingest.pre_process.pre.Pre]] = None, data_features: Optional[List[api2db.ingest.data_feature.feature.Feature]] = None, post_process: Optional[List[api2db.ingest.post_process.post.Post]] = None)¶ Bases:
object
Used to clean and process incoming data arriving from an Api
-
__init__
(name: str, pre_process: Optional[List[api2db.ingest.pre_process.pre.Pre]] = None, data_features: Optional[List[api2db.ingest.data_feature.feature.Feature]] = None, post_process: Optional[List[api2db.ingest.post_process.post.Post]] = None)¶ Creates an ApiForm
Note
The ApiForm is used by api2db to do the processing and cleaning of data. Incoming data goes through 3 phases.
Pre-Processing
Extract global data-features
Extract a list of data-points that will serve as the rows in a database
Flatten nested arrays of data
Swap extraneous rows returned from poorly implemented APIs
Feature Extraction
Extracts the data features for each row that will be stored in a database
Post-Processing
Add new columns of data that will be the same globally for the arriving data. I.e. arrival timestamps
Apply functions across data columns, replacing the data with the calculated value. I.e. Reformat strings, strip whitespace, etc.
Add new columns of data that are derived from performing calculations on existing columns. I.e. Use a latitude and longitude column to calculate a new column called country
Cast columns that contain datetime data from strings to date times.
Drop columns that should not contain null values.
Perform merging of incoming data with locally stored reference tables. I.e. Incoming data has column location_id field, a reference table contains location info with the location_id field being a link between the two. This allows for data to be merged on column location_id in order to contain all data in a single table.
- Parameters
name – The name of the collector the ApiForm is associated with
pre_process – An array pre-processing objects to be applied sequentially on incoming data
data_features – An array of data features to be extracted from the incoming data. The programmer can choose which data features they require, and keep only those.
post_process – An array of post-processing objects to be applied sequentially on the data after data has been cleaned and extracted to a pandas.DataFrame
-
add_pre
(pre: api2db.ingest.pre_process.pre.Pre) → None¶ Allows the programmer to manually add a item to the pre-processing array.
- Parameters
pre – The pre-processing object to add
- Returns
None
-
add_feature
(feat: api2db.ingest.data_feature.feature.Feature) → None¶ Allows the programmer to manually add a item to the data-features array.
- Parameters
feat – The feature object to add
- Returns
None
-
add_post
(post: api2db.ingest.post_process.post.Post) → None¶ Allows the programmer to manually add a item to the post-processing array.
- Parameters
post – The post-processing object to add
- Returns
None
-
pandas_typecast
() → dict¶ Performs typecasting from python native types to their pandas counterparts. Currently supported types are:
int
float
bool
str
Since API data is inconsistent, all typecasting makes the values nullable inside the DataFrame. Null values can be removed during post-processing.
- Returns
A dictionary that can be used to cast a DataFrames types using DataFrame.astype()
-
static
typecast
(dtype: Any) → str¶ Yields a string containing the pandas dtype when given a python native type.
- Parameters
dtype – The python native type
- Returns
The string representing the type that the native type converts to when put into a DataFrame
-
experiment
(CACHE, import_target) → bool¶ Tool used to build an ApiForm
Note
The laboratory is an experimental feature and does not currently support the StaticMerge post-processor.
- Parameters
CACHE – If the data imports should be cached. I.e. Only call the API once
import_target – The target function that performs an API import
- Returns
True if experiment is ready for export otherwise False
-
api2db.ingest.base_lam module¶
-
class
api2db.ingest.base_lam.
BaseLam
¶ Bases:
object
Used as a Base object for pre-process subclasses, post-process subclasses, and data-features.
-
__call__
(lam_arg: Any) → Any¶ Makes the class callable, with target of class method lam_wrap This is used to allow for anonymous functions to be passed to the class, and to enhance ease of use for library developers.
- Parameters
lam_arg – The argument to be passed to the lam_wrap class method.
- Returns
The response of the lam_wrap class method
-
__getstate__
() → dict¶ Allows for lambda operations to be serialized in order to allow for instance to be passed between processes
- Returns
Customized self.__dict__ items with values serialized using the dill library
-
__setstate__
(state: dict) → None¶ Allows for lambda operations to be deserialized using the dill library in order to allow for instance to be passed between processes
- Parameters
state – Incoming state
- Returns
None
-
lam_wrap
(lam_arg: Any) → None¶ Method that performs class lambda method on
lam_arg
This method will ALWAYS be overriden.- Parameters
lam_arg – The incoming data to perform the lambda operation on.
- Returns
None if attempting to call
BaseLam.lam_wrap
, return is dictated by subclasses.
-
api2db.ingest.collector module¶
-
class
api2db.ingest.collector.
Collector
(name: str, seconds: int, import_target: Callable[], Optional[List[dict]]], api_form: Callable[], api2db.ingest.api_form.ApiForm], streams: Callable[], List[api2db.stream.stream.Stream]], stores: Callable[], List[api2db.store.store.Store]], debug: bool = True)¶ Bases:
object
Used for creating a data-collection pipeline from API to storage medium
-
__init__
(name: str, seconds: int, import_target: Callable[], Optional[List[dict]]], api_form: Callable[], api2db.ingest.api_form.ApiForm], streams: Callable[], List[api2db.stream.stream.Stream]], stores: Callable[], List[api2db.store.store.Store]], debug: bool = True)¶ Creates a Collector object
Note
Project collectors are disabled by default, this allows the project to run immediately after
pmake
is run without any code being written. To enable a collector, you must change itsseconds
parameter to a number greater than zero. This represents the periodic interval that the collectorsimport_target
is run. I.e. The collector will request data from its configured API everyseconds
seconds.A perceptive user may notice that
import_target
,api_form
,streams
andstores
appear to be written in seemingly extraneous functions. Why not just pass in the actual data directly to the Collector object? This occurs due to the extensive use of anonymous functions which is what allows the library to be so expressive. Python’s native serialization does not support serializing lambdas. When using the multiprocessing module and spawning a new process the parameters of the process are serialized before being piped into a new python interpreter instance. It is for this reason that functions are used as parameters rather than their returns, since it is possible to pass a function which will instantiate an anonymous function upon call, but not to pass an existing anonymous function to a separate process. Feel free to write a supporting package to make it so this is not the case.- Parameters
name – The name of the collector, this name will be set when using
pmake
orcadd
and should not be changed. Changing this may result in unintended functionality of the api2db library, as this name is used when determining where to store incoming data, what to name database tables, and the location of the dtypes file which gets stored in the projects CACHE/ directory. If you wish to change the name of a collector, you can runcadd
to add a new collector with the desired name, and then move the code from the old collector into the new collector.seconds – This specifies the periodic interval that data should be imported at. I.e.
seconds=30
will request data from the collector api every 30 seconds. This is set to 0 by default, and when set to 0 the collector is disabled and will not be registered with the main program. This allows for all neccesary collectors to be added to a project, and then for each collector to be enabled as its code is written.import_target –
The import_target is the function that the programmer using the library writes that performs the initial data import. In most cases this will utilize a library like urllib in order to perform the requests. The return of this function should be a list of dictionary objects.
When dealing with XML data use a library like xmltodict to convert the data to a python dictionary
When dealing with JSON data use a library like the built-in json library to convert the data to a python dictionary.
The implementation of this method is left to the programmer. This method could also be written to collect data from a serial stream, or a web-scraper if desired. Design and implementation of things such as that are left to the users of the library.
The
import_target
MUST return a list of dictionaries, or None. Exceptions that may occur within the function must be handled. The purpose of this implementation is to allow for logic to be written to perform multiple API requests and treat the data as a single incoming request. Most APIs will return a single response, and if the implementation of theimport_target
does not make multiple API calls then simply wrap that data in a list when returning it from the function.api_form – This is a function that returns an API form.
streams – This is a function that returns a list of Stream object subclasses.
stores – This is a function that returns a list of Store object subclasses.
debug – When set to True logs will be printed to the console. Set to False for production.
-
q
¶ A queue used for message passing if collector is running in debug mode
- Type
Optional[multiprocessing.Queue]
-
set_q
(q: multiprocessing.context.BaseContext.Queue) → None¶ Sets the q class member used for collectors running in debug mode
- Parameters
q – The queue used for message passing
- Returns
None
-
Module contents¶
Original Author |
Tristen Harr |
Creation Date |
04/28/2021 |
Revisions |
None |
api2db.install package¶
Submodules¶
api2db.install.clear_lab module¶
-
api2db.install.clear_lab.
clab
()¶ This shell command is used to clear a lab.
project_dir-----/ | apis-------/ | |- __init__.py | |- FooCollector.py | |- BarCollector.py | AUTH-------/ | |- bigquery_auth_template.json | |- omnisci_auth_template.json | |- sql_auth_template.json | CACHE/ | STORE/ | laboratory-/ | |- lab.py EDIT THIS FILE! | helpers.py | main.py
Shell Command:
path/to/project_dir> clab
project_dir-----/ | apis-----/ | |- __init__.py | |- FooCollector.py | |- BarCollector.py | AUTH-----/ | |- bigquery_auth_template.json | |- omnisci_auth_template.json | |- sql_auth_template.json | CACHE/ | STORE/ | helpers.py | main.py
api2db.install.collector_add module¶
-
api2db.install.collector_add.
cadd
(ad: str) → None¶ This shell command is used to add a collector to an existing api2db project
Given the following project structure
project_dir-----/ | apis-----/ | |- __init__.py | |- FooCollector.py | AUTH-----/ | |- bigquery_auth_template.json | |- omnisci_auth_template.json | |- sql_auth_template.json | CACHE/ | STORE/ | helpers.py | main.py
Shell Command:
path/to/procect_dir> cadd BarCollector
project_dir-----/ | apis-----/ | |- __init__.py | |- FooCollector.py | |- BarCollector.py | AUTH-----/ | |- bigquery_auth_template.json | |- omnisci_auth_template.json | |- sql_auth_template.json | CACHE/ | STORE/ | helpers.py | main.py
- Parameters
ad – The name of the collector to add.
- Returns
None
api2db.install.collector_list module¶
-
api2db.install.collector_list.
clist
() → None¶ This shell command is used to show a list of collectors registered with an existing api2db project
Given the following project
project_dir-----/ | apis-----/ | |- __init__.py | |- FooCollector.py | |- BarCollector.py | AUTH-----/ | |- bigquery_auth_template.json | |- omnisci_auth_template.json | |- sql_auth_template.json | CACHE/ | STORE/ | helpers.py | main.py
Shell Command:
path/to/procect_dir> clist
Out:
["FooCollector", "BarCollector"]
- Returns
None
api2db.install.collector_remove module¶
-
api2db.install.collector_remove.
crem
(rem: str) → None¶ This shell command is used to remove a collector registered with an existing api2db project
Given the following project
project_dir-----/ | apis-----/ | |- __init__.py | |- FooCollector.py | |- BarCollector.py | AUTH-----/ | |- bigquery_auth_template.json | |- omnisci_auth_template.json | |- sql_auth_template.json | CACHE/ | STORE/ | helpers.py | main.py
Shell Command:
path/to/project_dir> crem BarCollector
project_dir-----/ | apis-----/ | |- __init__.py | |- FooCollector.py | AUTH-----/ | |- bigquery_auth_template.json | |- omnisci_auth_template.json | |- sql_auth_template.json | CACHE/ | STORE/ | helpers.py | main.py
- Parameters
rem – The name of the collector to remove
- Returns
None
api2db.install.make_lab module¶
-
api2db.install.make_lab.
mlab
()¶ This shell command is used for creation of a lab. Labs offer an easier way to design an ApiForm.
Given a project directory
project_dir-----/ | apis-----/ | |- __init__.py | |- FooCollector.py | |- BarCollector.py | AUTH-----/ | |- bigquery_auth_template.json | |- omnisci_auth_template.json | |- sql_auth_template.json | CACHE/ | STORE/ | helpers.py | main.py
Shell Command:
path/to/project_dir> mlab
project_dir-----/ | apis-------/ | |- __init__.py | |- FooCollector.py | |- BarCollector.py | AUTH-------/ | |- bigquery_auth_template.json | |- omnisci_auth_template.json | |- sql_auth_template.json | CACHE/ | STORE/ | laboratory-/ | |- lab.py EDIT THIS FILE! | helpers.py | main.py
- Returns
None
api2db.install.project_clear module¶
Warning
Usage of this will completely clear out the project directory. This includes all collectors, all code, and all files. This is a nuclear delete option for when your foo doesn’t want to bar and so you need to start over. Use with caution.
-
api2db.install.project_clear.
pclear
() → None¶ This shell command is used to clear a project and should ONLY be used if a complete restart is required.
Given the following project
project_dir-----/ | apis-----/ | |- __init__.py | |- FooCollector.py | |- BarCollector.py | AUTH-----/ | |- bigquery_auth_template.json | |- omnisci_auth_template.json | |- sql_auth_template.json | CACHE/ | STORE/ | helpers.py | main.py
Shell Command:
path/to/project_dir> pclear
project_dir-----/
- Returns
None
api2db.install.project_make module¶
-
api2db.install.project_make.
pmake
(apis: Optional[List[str]] = None) → None¶ This shell command is used for initial creation of the project structure.
Given a blank project directory
project_dir-----/
Shell Command:
path/to/project_dir> pmake FooCollector BarCollector
project_dir-----/ | apis-----/ | |- __init__.py | |- FooCollector.py | |- BarCollector.py | AUTH-----/ | |- bigquery_auth_template.json | |- omnisci_auth_template.json | |- sql_auth_template.json | CACHE/ | STORE/ | helpers.py | main.py
Note
This command can also be used without any collector arguments, and collectors can be added using the
cadd
shell command.- Parameters
apis – The collector names that should be created for the project APIs
- Returns
None
api2db.install.run_lab module¶
-
api2db.install.run_lab.
rlab
()¶ This shell command is used to run a lab.
Module contents¶
Original Author |
Tristen Harr |
Creation Date |
04/28/2021 |
Revisions |
None |
api2db.store package¶
Submodules¶
api2db.store.store module¶
-
class
api2db.store.store.
Store
(name: str, seconds: int, path: Optional[str] = None, fmt: str = 'parquet', drop_duplicate_exclude: Optional[List[str]] = None, move_shards_path: Optional[str] = None, move_composed_path: Optional[str] = None, chunk_size: int = 0)¶ Bases:
api2db.stream.stream.Stream
Used for storing data into a local or external source periodically
-
__init__
(name: str, seconds: int, path: Optional[str] = None, fmt: str = 'parquet', drop_duplicate_exclude: Optional[List[str]] = None, move_shards_path: Optional[str] = None, move_composed_path: Optional[str] = None, chunk_size: int = 0)¶ Creates a Store object and attempts to build its dtypes.
- Parameters
name – The name of the collector the store is associated with
seconds – The number of seconds between storage cycles
path – The path to the directory that will contain sharded files that should be recomposed for storage
fmt –
The file format of the sharded files
fmt=”parquet” (recommended) stores the DataFrame using parquet format
fmt=”json” stores the DataFrame using JSON format
fmt=”pickle” stores the DataFrame using pickle format
fmt=”csv” stores the DataFrame using csv format
drop_duplicate_exclude –
drop_duplicate_exclude=None
DataFrame.drop_duplicates() performed before storage
drop_duplicate_exclude=[“request_millis”]
.drop_duplicates(subset=df.columns.difference(drop_duplicate_exclude)) performed before storage.
Primarily used for arrival timestamps. I.e. API sends the same data on sequential requests but in most applications the programmer will want to timestamp the arrival time of data, which would lead to duplicate data with the only difference being arrival timestamps
move_shards_path –
Documentation and Examples found here
move_composed_path –
Documentation and Examples found here
chunk_size – CURRENTLY NOT SUPPORTED
-
store_str
¶ A string used for logging
- Type
Optional[str]
-
build_dependencies
() → None¶ Builds the dependencies for the storage object. I.e. Makes the directories for the
move_shards_path
and themove_composed_path
- Returns
None
-
store
() → None¶ Composed a DataFrame from the files in the stores path, and stores the data to the storage target.
- Returns
None
-
start
()¶ Store objects subclass Stream but do not contain a start method. Stores should NEVER use start
- Raises
AttributeError – ‘Store’ object has no attribute ‘start’
-
stream_start
()¶ Store objects subclass Stream but do not contain a stream_start method. Stores should NEVER use stream_start
- Raises
AttributeError – ‘Store’ object has no attribute ‘stream_start’
-
api2db.store.store2bigquery module¶
-
class
api2db.store.store2bigquery.
Store2Bigquery
(name: str, seconds: int, auth_path: str, pid: str, did: str, tid: str, path: Optional[str] = None, fmt: str = 'parquet', drop_duplicate_exclude: Optional[List[str]] = None, move_shards_path: Optional[str] = None, move_composed_path: Optional[str] = None, location: str = 'US', if_exists: str = 'append', chunk_size: int = 0)¶ Bases:
api2db.store.store.Store
Used for storing data to bigquery periodically
-
__init__
(name: str, seconds: int, auth_path: str, pid: str, did: str, tid: str, path: Optional[str] = None, fmt: str = 'parquet', drop_duplicate_exclude: Optional[List[str]] = None, move_shards_path: Optional[str] = None, move_composed_path: Optional[str] = None, location: str = 'US', if_exists: str = 'append', chunk_size: int = 0)¶ Creates a Store2Bigquery object and attempts to build its dtypes.
- Parameters
name – The name of the collector the store is associated with
seconds – The number of seconds between storage cycles
auth_path – The path to the Google provided authentication file. I.e. AUTH/google_auth_file.json
pid – Google project ID
did – Google dataset ID
tid – Google table ID
path – The path to the directory that will contain sharded files that should be recomposed for storage
fmt –
The file format of the sharded files
fmt=”parquet” (recommended) loads the sharded files using parquet format
fmt=”json” loads the sharded files using JSON format
fmt=”pickle” loads the sharded files using pickle format
fmt=”csv” loads the sharded files using csv format
drop_duplicate_exclude –
drop_duplicate_exclude=None
DataFrame.drop_duplicates() performed before storage
drop_duplicate_exclude=[“request_millis”]
.drop_duplicates(subset=df.columns.difference(drop_duplicate_exclude)) performed before storage.
Primarily used for arrival timestamps. I.e. API sends the same data on sequential requests but in most applications the programmer will want to timestamp the arrival time of data, which would lead to duplicate data with the only difference being arrival timestamps
move_shards_path –
Documentation and Examples found here
move_composed_path –
Documentation and Examples found here
location – Location of the Bigquery project
if_exists –
if_exists=”append” Adds the data to the table
if_exists=”replace” Replaces the table with the new data
if_exists=”fail” Fails to upload the new data if the table exists
chunk_size – CURRENTLY NOT SUPPORTED
-
api2db.store.store2omnisci module¶
-
class
api2db.store.store2omnisci.
Store2Omnisci
(name: str, seconds: int, db_name: str, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, path: Optional[str] = None, fmt: str = 'parquet', drop_duplicate_exclude: Optional[List[str]] = None, move_shards_path: Optional[str] = None, move_composed_path: Optional[str] = None, protocol: str = 'binary', chunk_size: int = 0)¶ Bases:
api2db.store.store.Store
Used for storing data to omnisci periodically
-
__init__
(name: str, seconds: int, db_name: str, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, path: Optional[str] = None, fmt: str = 'parquet', drop_duplicate_exclude: Optional[List[str]] = None, move_shards_path: Optional[str] = None, move_composed_path: Optional[str] = None, protocol: str = 'binary', chunk_size: int = 0)¶ Creates a Store2Omnisci object and attempts to build its dtypes.
- Parameters
name – The name of the collector the store is associated with
seconds – The number of seconds between storage cycles
db_name – The name of the database to connect to
username – The username to authenticate with the database
password – The password to authenticate with the database
host – The host of the database
auth_path – The path to the authentication credentials.
path – The path to the directory that will contain sharded files that should be recomposed for storage
fmt –
The file format of the sharded files
fmt=”parquet” (recommended) loads the sharded files using parquet format
fmt=”json” loads the sharded files using JSON format
fmt=”pickle” loads the sharded files using pickle format
fmt=”csv” loads the sharded files using csv format
drop_duplicate_exclude –
drop_duplicate_exclude=None
DataFrame.drop_duplicates() performed before storage
drop_duplicate_exclude=[“request_millis”]
.drop_duplicates(subset=df.columns.difference(drop_duplicate_exclude)) performed before storage.
Primarily used for arrival timestamps. I.e. API sends the same data on sequential requests but in most applications the programmer will want to timestamp the arrival time of data, which would lead to duplicate data with the only difference being arrival timestamps
move_shards_path –
Documentation and Examples found here
move_composed_path –
Documentation and Examples found here
protocol – The protocol to use when connecting to the database
chunk_size – CURRENTLY NOT SUPPORTED
-
api2db.store.store2sql module¶
-
class
api2db.store.store2sql.
Store2Sql
(name: str, seconds: int, db_name: str, dialect: str, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, port: str = '', path: Optional[str] = None, fmt: str = 'parquet', drop_duplicate_exclude: Optional[List[str]] = None, move_shards_path: Optional[str] = None, move_composed_path: Optional[str] = None, if_exists: str = 'append', chunk_size: int = 0)¶ Bases:
api2db.store.store.Store
Used for storing data to an SQL database periodically
-
__init__
(name: str, seconds: int, db_name: str, dialect: str, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, port: str = '', path: Optional[str] = None, fmt: str = 'parquet', drop_duplicate_exclude: Optional[List[str]] = None, move_shards_path: Optional[str] = None, move_composed_path: Optional[str] = None, if_exists: str = 'append', chunk_size: int = 0)¶ Creates a Store2Sql object and attempts to build its dtypes.
- Parameters
name – The name of the collector the store is associated with
seconds – The number of seconds between storage cycles
db_name – The name of the database to connect to
dialect –
dialect=”mysql” -> Use this to connect to a mysql database
dialect=”mariadb” -> Use this to connect to a mariadb database
dialect=”postgresql” -> Use this to connect to a postgresql database
dialect=”amazon_aurora” -> COMING SOON
dialect=”oracle” -> COMING SOON
dialect=”microsoft_sql” -> COMING SOON
dialect=”Something else?” -> Submit a feature request… or even better build it!
username – The username to authenticate with the database
password – The password to authenticate with the database
host – The host of the database
auth_path – The path to the authentication credentials.
port – The port to connect to the database with
path – The path to the directory that will contain sharded files that should be recomposed for storage
fmt –
The file format of the sharded files
fmt=”parquet” (recommended) loads the sharded files using parquet format
fmt=”json” loads the sharded files using JSON format
fmt=”pickle” loads the sharded files using pickle format
fmt=”csv” loads the sharded files using csv format
drop_duplicate_exclude –
drop_duplicate_exclude=None
DataFrame.drop_duplicates() performed before storage
drop_duplicate_exclude=[“request_millis”]
.drop_duplicates(subset=df.columns.difference(drop_duplicate_exclude)) performed before storage.
Primarily used for arrival timestamps. I.e. API sends the same data on sequential requests but in most applications the programmer will want to timestamp the arrival time of data, which would lead to duplicate data with the only difference being arrival timestamps
move_shards_path –
Documentation and Examples found here
move_composed_path –
Documentation and Examples found here
chunk_size – CURRENTLY NOT SUPPORTED
-
Module contents¶
Original Author |
Tristen Harr |
Creation Date |
04/28/2021 |
Revisions |
None |
api2db.stream package¶
Submodules¶
api2db.stream.file_converter module¶
-
class
api2db.stream.file_converter.
FileConverter
(name: Optional[str] = None, dtypes: Optional[dict] = None, path: Optional[str] = None, fmt: Optional[str] = None)¶ Bases:
object
Serves as a base-class for all Streams/Stores and is used to store/load pandas DataFrames to different formats
-
__init__
(name: Optional[str] = None, dtypes: Optional[dict] = None, path: Optional[str] = None, fmt: Optional[str] = None)¶ Creates a FileConverter object and attempts to build its dtypes
- Parameters
name – The name of the collector associated with the FileConverter
dtypes – The dictionary of dtypes for the collector associated with the FileConverter
path – Either a path to a file, or a path to a directory, dictated by super class
fmt –
fmt=”parquet” (recommended) sets the FileConverter format to use parquet format
fmt=”json” sets the FileConverter format to use JSON format
fmt=”pickle” sets the FileConverter format to use pickle format
fmt=”csv” sets the FileConverter format to use CSV format
-
build_dtypes
() → Optional[dict]¶ Attempts to build the dtypes so that a loaded pandas DataFrame can be type-casted
- Returns
dtypes that can be used with pandas.DataFrame.astype(dtypes)
-
static
static_compose_df_from_dir
(path: str, fmt: str, move_shards_path: Optional[str] = None, move_composed_path: Optional[str] = None, force: bool = True) → Optional[pandas.core.frame.DataFrame]¶ Attempts to build a single DataFrame from all files in a directory.
- Parameters
path – The directory path to compose files from
fmt –
fmt=”parquet” (recommended) stores the DataFrame using parquet format
fmt=”json” stores the DataFrame using JSON format
fmt=”pickle” stores the DataFrame using pickle format
fmt=”csv” stores the DataFrame using csv format
move_shards_path – The path to move the file shards to after composing the DataFrame
move_composed_path – The path to move the composed shards to, with naming schema of filename1_filenameN.fmt
force – Forces creation of the directories to move files to if they do not exist.
- Returns
The DataFrame composed from the sharded files if successful, else None
Example
Original Directory Structure
store/ | |- file1.parquet | |- file2.parquet | |- file3.parquet | sharded/ | composed/ | main.py
The following files contain pandas DataFrames stored using parquet format.
file1.parquet
A
B
C
1
2
3
file2.parquet
A
B
C
4
5
6
file3.parquet
A
B
C
7
8
9
>>> FileConverter.static_compose_df_from_dir(path="store/", fmt="parquet")
Returns (pandas.DataFrame):
A
B
C
1
2
3
4
5
6
7
8
9
By default, files will be deleted when the DataFrame is returned
FileConverter.static_compose_df_from_dir(path="store/", fmt="parquet", move_shards_path=None, move_composed_path=None)
store/ | sharded/ | composed/ | main.py
move_shards_path
specifies the path the sharded files should be moved toFileConverter.static_compose_df_from_dir(path="store/", fmt="parquet", move_shards_path="sharded/", move_composed_path=None)
store/ | sharded/ | |- file1.parquet | |- file2.parquet | |- file3.parquet | composed/ | main.py
move_composed_path
speficies the path that the recomposed files should be moved toFileConverter.static_compose_df_from_dir(path="store/", fmt="parquet", move_shards_path=None, move_composed_path="composed/")
store/ | sharded/ | composed/ | |- file1_file3.parquet | main.py
-
static
static_store_df
(df: pandas.core.frame.DataFrame, path: str, fmt: str) → bool¶ Stores a DataFrame to a file
- Parameters
df – The DataFrame to store to a file
path – The path to the file the DataFrame should be stored in
fmt –
fmt=”parquet” (recommended) stores the DataFrame using parquet format
fmt=”json” stores the DataFrame using JSON format
fmt=”pickle” stores the DataFrame using pickle format
fmt=”csv” stores the DataFrame using csv format
- Returns
True if successful, else False
-
static
pickle_store
(path: str, df: pandas.core.frame.DataFrame, force: bool = True) → bool¶ Stores a DataFrame as a .pickle file
- Parameters
path – The path to store the DataFrame to
df – The DataFrame to store
force – If the directories in the path do not exist, forces them to be created
- Returns
True if successful, otherwise False
-
static
json_store
(path: str, df: pandas.core.frame.DataFrame, force: bool = True) → bool¶ Stores a DataFrame as a .json file
- Parameters
path – The path to store the DataFrame to
df – The DataFrame to store
force – If the directories in the path do not exist, forces them to be created
- Returns
True if successful, otherwise False
-
static
csv_store
(path: str, df: pandas.core.frame.DataFrame, force: bool = True) → bool¶ Stores a DataFrame as a .csv file
- Parameters
path – The path to store the DataFrame to
df – The DataFrame to store
force – If the directories in the path do not exist, forces them to be created
- Returns
True if successful, otherwise False
-
static
parquet_store
(path: str, df: pandas.core.frame.DataFrame, force: bool = True) → bool¶ Stores a DataFrame as a .parquet file
- Parameters
path – The path to store the DataFrame to
df – The DataFrame to store
force – If the directories in the path do not exist, forces them to be created
- Returns
True if successful, otherwise False
-
static
store_valid
(path: str, force: bool) → bool¶ Determines if a provided path for storage is valid. I.e. The directory structure exists
- Parameters
path – The path to check
force – When True, will attempt to create necessary directories if they do not exist
- Returns
True if path is valid, otherwise False
-
static
static_load_df
(path: str, fmt: str, dtypes: Optional[dict] = None) → Optional[pandas.core.frame.DataFrame]¶ Loads a DataFrame from a file
- Parameters
path – The path to the file the DataFrame should be loaded from
fmt –
fmt=”parquet” (recommended) loads the DataFrame using parquet format
fmt=”json” loads the DataFrame using JSON format
fmt=”pickle” loads the DataFrame using pickle format
fmt=”csv” loads the DataFrame using csv format
dtypes – The dtypes to cast the DataFrame to before returning it. I.e. DataFrame.astype(dtypes)
- Returns
Loaded DataFrame if successful, otherwise None
-
static
pickle_load
(path: str) → Optional[pandas.core.frame.DataFrame]¶ Loads a DataFrame from a .pickle file
- Parameters
path – The path to load the DataFrame from
- Returns
The loaded DataFrame if successful, otherwise None
-
static
json_load
(path: str) → Optional[pandas.core.frame.DataFrame]¶ Loads a DataFrame from a .json file
- Parameters
path – The path to load the DataFrame from
- Returns
The loaded DataFrame if successful, otherwise None
-
static
csv_load
(path: str) → Optional[pandas.core.frame.DataFrame]¶ Loads a DataFrame from a .csv file
- Parameters
path – The path to load the DataFrame from
- Returns
The loaded DataFrame if successful, otherwise None
-
static
parquet_load
(path: str) → Optional[pandas.core.frame.DataFrame]¶ Loads a DataFrame from a .parquet file
- Parameters
path – The path to load the DataFrame from
- Returns
The loaded DataFrame if successful, otherwise None
-
api2db.stream.stream module¶
-
class
api2db.stream.stream.
Stream
(name: str, path: Optional[str] = None, dtypes: Optional[dict] = None, fmt: Optional[str] = None, chunk_size: int = 0, stream_type: str = 'stream', store: bool = False)¶ Bases:
api2db.stream.file_converter.FileConverter
Used for streaming data into a local or external source
-
__init__
(name: str, path: Optional[str] = None, dtypes: Optional[dict] = None, fmt: Optional[str] = None, chunk_size: int = 0, stream_type: str = 'stream', store: bool = False)¶ Creates a Stream object and attempts to build its dtypes. If store flag is false, spawns a thread that polls the Stream queue for incoming data
- Parameters
name – The name of the collector the stream is associated with
path – The directory path the stream should store to (Usage dictated by super classes)
dtypes – A dictionary containing the dtypes that the stream data DataFrame has
fmt –
The file format that the stream data should be stored as
fmt=”parquet” (recommended) stores the DataFrame using parquet format
fmt=”json” stores the DataFrame using JSON format
fmt=”pickle” stores the DataFrame using pickle format
fmt=”csv” stores the DataFrame using csv format
chunk_size – The size of chunks to send to the stream target. I.e. Insert data in chunks of chunk_size rows
stream_type – The type of the stream (Primarily used for logging)
store – This flag indicates whether or not the stream is being called by a Store object
- Raises
NotImplementedError – chunk_storage is not yet implemented.
-
is_store_instance
¶ True if the super-class has base-class Store otherwise False
- Type
bool
-
lock
¶ Stream Lock used to signal if the stream has died
- Type
threading.Lock
-
q
¶ Stream queue used to pass data into
- Type
queue.Queue
-
start
() → None¶ Starts the stream running loop in a new thread
- Returns
None
-
check_failures
() → None¶ Checks to see if previous uploads have failed and if so, loads the previous upload data and attempts to upload it again.
This method searches the directory path
STORE/upload_failed/collector_name/stream_type/
This path is the target location for failed uploads. If an upload fails 5 times in a row, it is stored in this location with the filename being the timestamp it is stored.
- Returns
None
-
stream_start
() → None¶ Starts the stream listener that polls the stream queue for incoming data.
- Returns
None
-
stream
(data: pandas.core.frame.DataFrame) → AttributeError¶ Overridden by supers, a Stream object is NEVER directly used to stream data. It is ALWAYS inherited from
- Parameters
data – The data to stream
- Raises
AttributeError – Stream does not have the ability to stream data. It must be subclassed.
-
api2db.stream.stream2bigquery module¶
-
class
api2db.stream.stream2bigquery.
Stream2Bigquery
(name: str, auth_path: str, pid: str, did: str, tid: str, location: str = 'US', if_exists: str = 'append', chunk_size: int = 0, store: bool = False)¶ Bases:
api2db.stream.stream.Stream
Streams data from the associated collector directly to Bigquery in real-time
-
__init__
(name: str, auth_path: str, pid: str, did: str, tid: str, location: str = 'US', if_exists: str = 'append', chunk_size: int = 0, store: bool = False)¶ Creates a Stream2Bigquery object and attempts to build its dtypes.
If dtypes can successfully be created I.e. Data arrives from the API for the first time the following occurs:
Auto-generates the table schema
Creates the dataset if it does not exist within the project
Creates the table if it does not exist within the project
- Parameters
name – The name of the collector associated with the stream
auth_path – The path to the Google provided authentication file. I.e. AUTH/google_auth_file.json
pid – Google project ID
did – Google dataset ID
tid – Google table ID
location – Location of the Bigquery project
if_exists –
if_exists=”append” Adds the data to the table
if_exists=”replace” Replaces the table with the new data
if_exists=”fail” Fails to upload the new data if the table exists
chunk_size – CURRENTLY NOT SUPPORTED
store – True if the super class is a Store object, otherwise False
-
schema
¶ contains schema if dtypes exist else None
- Type
Optional[List[google.cloud.bigquery.SchemaField]]
-
bq_schema
¶ contains bq_schema if dtypes exist else None
- Type
Optional[List[dict]]
-
cred
¶ contains the credentials used to authenticate with bigquery
- Type
google.oauth2.service_account.Credentials
-
client
¶ The bigquery client
- Type
google.cloud.bigquery.Client
-
dataset
¶ The dataset associated with the collector
- Type
google.cloud.bigquery.Dataset
-
table
¶ The table associated with the collector
- Type
google.cloud.bigquery.Table
-
connected
¶ True if a connection has been established I.e. credentials have been authenticated, otherwise False
- Type
bool
-
connect
() → bool¶ Attempts to authenticate with provided credentials.
Workflow
Load the credentials from the service account file
Instantiate the bigquery Client
Attempt to create the dataset and if a Conflict exception is thrown then load the dataset
Attempt to load the table and if a NotFound exception is thrown then create the table
- Returns
True if the table is successfully loaded/created otherwise False
-
stream
(data: pandas.core.frame.DataFrame, retry_depth: int = 5) → None¶ Attempts to store the incoming data into bigquery
Workflow
If authentication has not been performed, call self.connect()
Attempt to store the DataFrame to bigquery
If successful, check to see if any previous uploads have failed and attempt to store those as well
If the DataFrame cannot be successfully stored set the connection to False
If the retry_depth is not 0 perform a recursive call attempting to store the data again
If the retry_depth has reached zero, log an exception and store the DataFrame locally
Failed uploads will be stored in
STORE/upload_failed/collector_name/bigquery/timestamp_ns.parquet
- Parameters
data – The DataFrame that should be stored to bigquery
retry_depth – Used for a recursive call counter should the DataFrame fail to be stored
- Returns
None
-
build_schema
() → Optional[List[google.cloud.bigquery.schema.SchemaField]]¶ Attempts to build the schema that will be used for table creation
Iterates through the dtypes items and generate the appropriate SchemaFields
- Returns
The schema generated if successful otherwise None
-
build_bq_schema
() → Optional[List[dict]]¶ Attempts to build the schema that will be used to upload data to bigquery via DataFrame.to_gbq()
Iterates through the dtypes items and generate the appropriate schema dictionary
- Returns
The schema generated if successful otherwise None
-
api2db.stream.stream2local module¶
-
class
api2db.stream.stream2local.
Stream2Local
(name: str, path: Optional[str] = None, mode: str = 'shard', fmt: str = 'parquet', drop_duplicate_keys: Optional[List[str]] = None)¶ Bases:
api2db.stream.stream.Stream
Streams data from the associated collector directly to a local file in real-time
-
__init__
(name: str, path: Optional[str] = None, mode: str = 'shard', fmt: str = 'parquet', drop_duplicate_keys: Optional[List[str]] = None)¶ Creates a Stream2Local object and attempts to build its dtypes
- Parameters
name – The name of the collector associated with the stream
path – The path to either a single file or a file directory dictated by the mode parameter
mode –
mode=”shard” (default) will store each incoming file independently in the specified path In shard mode the file will be named timestamp_ns.fmt
mode=”update” will update the file located at the specified path with the new data
mode=”replace” will replace the file located at the specified path with the new data
fmt –
fmt=”parquet” (default/recommended) stores the files using parquet format
fmt=”json” stores the files using JSON format
fmt=”pickle” stores the files using pickle format
fmt=”csv” stores the files using csv format
drop_duplicate_keys –
drop_duplicate_keys=None -> DataFrame.drop_duplicates() performed before storage
drop_duplicate_keys=[“uuid”] -> DataFrame.drop_duplicates(subset=drop_duplicate_keys) performed before storage
-
stream
(data: pandas.core.frame.DataFrame) → None¶ Stores the incoming data into its stream target using the specified mode
- Parameters
data – The data to be stored
- Returns
None
-
stream_shard
(data: pandas.core.frame.DataFrame) → None¶ Stores the incoming data to the specified directory path using the file naming schema timestamp_ns.fmt
- Parameters
data – The data to store to the file
- Returns
None
-
stream_update
(data: pandas.core.frame.DataFrame) → None¶ Updates the existing data at the specified file path and adds the incoming data
- Parameters
data – The data to add to the file
- Returns
None
-
stream_replace
(data: pandas.core.frame.DataFrame) → None¶ Replaces the existing data at the specified file path with the incoming data
- Parameters
data – The data to replace the file with
- Returns
None
-
api2db.stream.stream2omnisci module¶
Warning
Due to dependency conflicts and issues with the current published branch of the pymapd library the following steps must be taken to support streaming/storing data to Omnisci
> pip install pymapd==0.25.0
> pip install pandas --upgrade
> pip install pyarrow --upgrade
This occurs because of issues with the dependencies of the pymapd library being locked in place. I’ve opened an issue on this, and they appear to be working on it. The most recent publish seemed to break other things. Until this gets fixed this is a simple work-around. This will allow api2db to work with Omnisci, however there may be issues with attempts to utilize features of the pymapd library outside of what api2db uses, so use with caution. –Tristen
-
class
api2db.stream.stream2omnisci.
Stream2Omnisci
(name: str, db_name, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, protocol: str = 'binary', chunk_size: int = 0, store: bool = False)¶ Bases:
api2db.stream.stream.Stream
Streams data from the associated collector directly to Omnisci in real-time
-
__init__
(name: str, db_name, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, protocol: str = 'binary', chunk_size: int = 0, store: bool = False)¶ Creates a Stream2Omnisci object and attempts to build its dtypes
If dtypes can successfully be created I.e. Data arrives from the API for the first time the following occurs:
Auto-generates the table schema
Casts all string columns to categories as required by Omnisci
Creates a Omnisci table with the name
collector_name
_stream
Note
Data-attribute fields will have a
_t
appended to them due to naming conflicts encountered.Example:
A
B
C
1
2
3
4
5
6
Will become the following in the Omnisci database. (This is only applied to data in the database)
A_t
B_t
C_t
1
2
3
4
5
6
Authentication Methods:
Supply
auth_path
with a path to an authentication file. Templates for these files can be found in your projects AUTH/ directory
OR
Supply the
username
,host
, andpassword
- Parameters
name – The name of the collector associated with the stream
db_name – The name of the database to connect to
username – The username to authenticate with the database
password – The password to authenticate with the database
host – The host of the database
auth_path – The path to the authentication credentials.
protocol – The protocol to use when connecting to the database
chunk_size – CURRENTLY NOT SUPPORTED
store – True if the super class is a Store object, otherwise False
- Raises
ValueError – If
auth_path
is provided but is invalid or has incorrect valuesValueError – If
auth_path
is not provided andusername
,password
orhost
is missing
-
con
¶ The connection to the database
- Type
Optional[pymapd.Connection]
-
connected
¶ returns True if connection is established else False
- Type
Callable[Optional[pymapd.Connection], bool]
-
log_str
¶ A string used for logging
- Type
str
-
connect
() → Optional[<Mock name=’mock.Connection’ id=’140699503473936’>]¶ Attempts to establish a connection to a omnisci database
- Returns
A connection object if a connection can be established, else None
-
static
cast_categorical
(data: pandas.core.frame.DataFrame, dtypes: pandas.core.series.Series) → pandas.core.frame.DataFrame¶ Casts all columns with type
str
to typecategory
as required by omnisci and appends a_t
to column names- Parameters
data – The DataFrame that will be stored into the omnisci database
dtypes – The dtypes of the DataFrame
- Returns
Modified DataFrame
-
stream
(data, retry_depth=5)¶ Attempts to store the incoming data into omnisci
Workflow
If authentication has not been performed, call self.connect()
Attempt to store the DataFrame to omnisci
If successful, check to see if any previous uploads have failed and attempt to store those as well
If the DataFrame cannot be successfully stored set the con to None
If the retry_depth is not 0 perform a recursive call attempting to store the data again
If the retry_depth has reached zero, log an exception and store the DataFrame locally
Failed uploads will be stored in
STORE/upload_failed/collector_name/omnisci/timestamp_ns.parquet
- Parameters
data – The DataFrame that should be stored to omnisci
retry_depth – Used for a recursive call counter should the DataFrame fail to be stored
- Returns
None
-
api2db.stream.stream2sql module¶
-
class
api2db.stream.stream2sql.
Stream2Sql
(name: str, db_name: str, dialect: str, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, port: str = '', if_exists: str = 'append', chunk_size: int = 0, store: bool = False)¶ Bases:
api2db.stream.stream.Stream
Streams data from the associated collector directly to an SQL database target in real-time
-
__init__
(name: str, db_name: str, dialect: str, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, port: str = '', if_exists: str = 'append', chunk_size: int = 0, store: bool = False)¶ Creates a Stream2Sql object and attempts to build its dtypes
If dtypes can successfully be created I.e. Data arrives from the API for the first time the following occurs:
Auto-generates the table schema
Creates the table in the database if it does not exist
Authentication Methods:
Supply
auth_path
with a path to an authentication file. Templates for these files can be found in your projects AUTH/ directory
OR
Supply the
username
,host
, andpassword
- Parameters
name – The name of the collector associated with the stream
db_name – The name of the database to connect to
dialect –
dialect=”mysql” -> Use this to connect to a mysql database
dialect=”mariadb” -> Use this to connect to a mariadb database
dialect=”postgresql” -> Use this to connect to a postgresql database
dialect=”amazon_aurora” -> COMING SOON
dialect=”oracle” -> COMING SOON
dialect=”microsoft_sql” -> COMING SOON
dialect=”Something else?” -> Submit a feature request… or even better build it!
username – The username to authenticate with the database
password – The password to authenticate with the database
host – The host of the database
auth_path – The path to the authentication credentials.
port – The port used when establishing a connection to the database
if_exists –
if_exists=”append” Adds the data to the table
if_exists=”replace” Replaces the table with the new data
if_exists=”fail” Fails to upload the new data if the table exists
chunk_size – CURRENTLY NOT SUPPORTED
store – True if the super class is a Store object, otherwise False
- Raises
ValueError – If
auth_path
is provided but is invalid or has incorrect valuesValueError – If
auth_path
is not provided andusername
,password
orhost
is missing
-
driver
¶ The driver to use when connecting with SQLAlchemy
- Type
str
-
engine_str
¶ The full string that is used with sqlalchemy.create_engine
- Type
str
-
log_str
¶ A string used for logging
- Type
str
-
con
¶ The connection to the database
- Type
sqlalchemy.engine.Engine
-
connected
¶ True if connection is established otherwise False
- Type
bool
-
load
() → None¶ Loads the driver and creates the engine string and the log string
- Raises
NotImplementedError – Support for amazon_aurora has not been implemented in api2pandas yet
NotImplementedError – Support for oracle has not been implemented in api2db yet
NotImplementedError – Support for microsoft_sql has not been implemented in api2db yet
- Returns
None
-
connect
() → bool¶ Attempts to establish a connection to the database
- Returns
True if the connection is established otherwise False
-
stream
(data, retry_depth=5)¶ Attempts to store the incoming data into the SQL database
Workflow
If authentication has not been performed, call self.connect()
Attempt to store the DataFrame to the database
If successful, check to see if any previous uploads have failed and attempt to store those as well
If the DataFrame cannot be successfully stored set the connected to False
If the retry_depth is not 0 perform a recursive call attempting to store the data again
If the retry_depth has reached zero, log an exception and store the DataFrame locally
Failed uploads will be stored in
STORE/upload_failed/collector_name/sql.**dialect**/timestamp_ns.parquet
- Parameters
data – The DataFrame that should be stored to the database
retry_depth – Used for a recursive call counter should the DataFrame fail to be stored
- Returns
None
-
Module contents¶
Original Author |
Tristen Harr |
Creation Date |
04/27/2021 |
Revisions |
None |
Module contents¶
Original Author |
Tristen Harr |
Creation Date |
04/27/2021 |
Revisions |
None |
The creator of api2db is currently searching for a job. He graduates with a bachelors in CS May 15th
Contact him by emailing tristenharr@gmail.com