Installation and Quickstart¶
Installation¶
Install the library
pip install api2db
To add MySQL support
pip install mypysql
To add MariaDB support
pip install mariadb
To add PostgreSQL support
pip install psycopg2
To add Omnisci support
pip install pymapd==0.25.0
pip install pyarrow==3.0.0
pip install pandas --upgrade
Quickstart¶
Create a project with the pmake
shell command
Initial directory structure
project_dir/
path/to/project_dir/> pmake
New project directory structure
project_dir-----/
|
apis-----/ # Each API will get its own file
| |- __init__.py
|
AUTH-----/ # These templates are used for adding database targets
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/ # Application cache files will be stored and used here.
|
STORE/ # This is where incoming data can be stored locally.
|
LOGS/ # Each collector will receive it's own log file.
|
helpers.py # Common helper functions can be written here.
|
main.py # This is the application entry point.
Choose an API This example will use the CoinCap API as it is free, does not require an API key, and seems to have good uptime. (This project has no affiliation with CoinCap)
Create a collector with the cadd
shell command
path/to/project_dir/> cadd coincap
project_dir-----/
|
apis-----/
| |- __init__.py
| |- coincap.py # This is where you'll write code!
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
LOGS/
|
helpers.py
|
main.py
Understanding project_dir/apis/some_api_collector.py
Each collector has 4 parts.
- Data Import
Performs a call to an API to request data.
- Data Processing
Processes and cleans incoming data, making it useful.
- Data Streams
Streams data as it arrives to a storage target.
- Data Stores
Stores data periodically to a storage target.
The base template for the coincap collector looks like this
from api2db.ingest import *
from api2db.stream import *
from api2db.store import *
from helpers import *
def coincap_import():
return None
def coincap_form():
pre_process = [
# Preprocessing Here
]
data_features = [
# Data Features Here
]
post_process = [
# Postproccesing Here
]
return ApiForm(name="coincap", pre_process=pre_process, data_features=data_features, post_process=post_process)
def coincap_streams():
streams = [
]
return streams
def coincap_stores():
stores = [
]
return stores
coincap_info = Collector(name="coincap",
seconds=0, # Import frequency of 0 disables collector
import_target=coincap_import,
api_form=coincap_form,
streams=coincap_streams,
stores=coincap_stores,
debug=True # Set to False for production
)
Using the lab to build ApiForms¶
To simplify setting up the data import and data-processing first run the mlab
shell command
path/to/project_dir/> mlab
project_dir-----/
|
apis------/
| |- __init__.py
| |- coincap.py
|
AUTH------/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
laboratory/
| |- lab.py # This is where you can experiment with pre-processing
|
STORE/
|
LOGS/
|
helpers.py
|
main.py
A blank lab.py file will look like this
from api2db.ingest import *
CACHE=True # Caches API data so that only a single API call is made if True
def import_target():
return None
def pre_process():
return None
def data_features():
return None
def post_process():
return None
if __name__ == "__main__":
api_form = ApiForm(name="lab",
pre_process=pre_process(),
data_features=data_features(),
post_process=post_process()
)
api_form.experiment(CACHE, import_target)
Importing data¶
Perform a data import by writing the code for the import_target
function
lab.py
.
.
.
import requests
import logging
def import_target():
"""
Data returned by the import target must be an array of dicts.
This allows for either a single API call to be returned, or an array of them.
"""
data = None
url = "https://api.coincap.io/v2/assets/"
try:
data = [requests.get(url).json()]
except Exception as e:
logging.exception(e)
return data
.
.
.
Use the rlab
shell command to run the lab
Note
Watch the laboratory directory closely. Data will be dumped into JSON files at different points during data-processing to provide the programmer with an easier to read format.
path/to/project_dir/> rlab
Output:
data:
{
"data": [
{
"id": "bitcoin",
"rank": "1",
"symbol": "BTC",
"name": "Bitcoin",
"supply": "18698850.0000000000000000",
"maxSupply": "21000000.0000000000000000",
"marketCapUsd": "1041388865130.8623213956691350",
"volumeUsd24Hr": "12822561919.6746830356589619",
"priceUsd": "55692.6690748822693051",
"changePercent24Hr": "-4.1033665252363403",
"vwap24Hr": "57708.7312639442977184",
"explorer": "https://blockchain.info/",
},
.
.
.
],
"timestamp": 1620100433183,
}
data keys:
dict_keys(['data', 'timestamp'])
pre_process must return a list of 0 or more pre-processors.
pre_process:
None
Performing pre-processing on data¶
Perform pre-processing on data by writing the code for the pre_process
function
lab.py
.
.
.
def pre_process():
"""
Pre-processors are applied sequentially.
In this example, we will:
1. Extract the timestamp and make it a global feature using GlobalExtract
2. Perform a ListExtract to extract the list of data which will become the rows in the storage target table
"""
return [
GlobalExtract(key="timestamp",
lam=lambda x: x["timestamp"],
dtype=int
),
ListExtract(lam=lambda x: x["data"])
]
.
.
.
Use the rlab
shell command to run the lab
path/to/project_dir/> rlab
Output:
data point 1:
{'id': 'bitcoin', 'rank': '1', 'symbol': 'BTC', 'name': 'Bitcoin', 'supply': '18698850.0000000000000000', 'maxSupply': '21000000.0000000000000000', 'marketCapUsd': '1
041388865130.8623213956691350', 'volumeUsd24Hr': '12822561919.6746830356589619', 'priceUsd': '55692.6690748822693051', 'changePercent24Hr': '-4.1033665252363403', 'vw
ap24Hr': '57708.7312639442977184', 'explorer': 'https://blockchain.info/'}
data point 2:
{'id': 'ethereum', 'rank': '2', 'symbol': 'ETH', 'name': 'Ethereum', 'supply': '115729464.3115000000000000', 'maxSupply': None, 'marketCapUsd': '376411190202.66581272
13330461', 'volumeUsd24Hr': '17656637086.6618270054805080', 'priceUsd': '3252.5095699873722881', 'changePercent24Hr': '6.4420494833790460', 'vwap24Hr': '3234.41835079
37765772', 'explorer': 'https://etherscan.io/'}
data point 3:
{'id': 'binance-coin', 'rank': '3', 'symbol': 'BNB', 'name': 'Binance Coin', 'supply': '153432897.0000000000000000', 'maxSupply': '170532785.0000000000000000', 'marke
tCapUsd': '98431624817.6777436959489247', 'volumeUsd24Hr': '254674805.8210425908376882', 'priceUsd': '641.5288164550379551', 'changePercent24Hr': '1.1504585233985471'
, 'vwap24Hr': '653.0516845642682435', 'explorer': 'https://etherscan.io/token/0xB8c77482e45F1F44dE1745F52C74426C631bDD52'}
data_features must return a list of data-features.
data_features:
None
Extracting features from data¶
Extract data-features from data by writing the code for the data_features
function
Note
Pick and choose which data-features you wish to extract from your data. This example will extract the
id
, rank
, symbol
, name
, priceUsd
, and volumeUsd24Hr
Feature extraction will handle null data and data of the wrong type automatically.
lab.py
.
.
.
def data_features():
return [
Feature(key="id",
lam=lambda x: x["id"],
dtype=str),
Feature(key="rank",
lam=lambda x: x["rank"],
dtype=int),
Feature(key="symbol",
lam=lambda x: x["symbol"],
dtype=str),
Feature(key="name",
lam=lambda x: x["name"],
dtype=str),
Feature(key="price_usd", # Keys support renaming
lam=lambda x: x["priceUsd"],
dtype=float),
Feature(key="volume_usd_24_hr",
lam=lambda x: x["volumeUsd24Hr"],
dtype=float)
]
.
.
.
Use the rlab
shell command to run the lab
path/to/project_dir/> rlab
Output:
data point 1:
{'id': 'bitcoin', 'rank': '1', 'symbol': 'BTC', 'name': 'Bitcoin', 'supply': '18698850.0000000000000000', 'maxSupply': '21000000.0000000000000000', 'marketCapUsd': '1
041388865130.8623213956691350', 'volumeUsd24Hr': '12822561919.6746830356589619', 'priceUsd': '55692.6690748822693051', 'changePercent24Hr': '-4.1033665252363403', 'vw
ap24Hr': '57708.7312639442977184', 'explorer': 'https://blockchain.info/'}
data point 2:
{'id': 'ethereum', 'rank': '2', 'symbol': 'ETH', 'name': 'Ethereum', 'supply': '115729464.3115000000000000', 'maxSupply': None, 'marketCapUsd': '376411190202.66581272
13330461', 'volumeUsd24Hr': '17656637086.6618270054805080', 'priceUsd': '3252.5095699873722881', 'changePercent24Hr': '6.4420494833790460', 'vwap24Hr': '3234.41835079
37765772', 'explorer': 'https://etherscan.io/'}
data point 3:
{'id': 'binance-coin', 'rank': '3', 'symbol': 'BNB', 'name': 'Binance Coin', 'supply': '153432897.0000000000000000', 'maxSupply': '170532785.0000000000000000', 'marke
tCapUsd': '98431624817.6777436959489247', 'volumeUsd24Hr': '254674805.8210425908376882', 'priceUsd': '641.5288164550379551', 'changePercent24Hr': '1.1504585233985471'
, 'vwap24Hr': '653.0516845642682435', 'explorer': 'https://etherscan.io/token/0xB8c77482e45F1F44dE1745F52C74426C631bDD52'}
data:
id rank symbol name price_usd volume_usd_24_hr timestamp
0 bitcoin 1 BTC Bitcoin 55692.669075 12822561919.674683 1620100433183
1 ethereum 2 ETH Ethereum 3252.50957 17656637086.661827 1620100433183
2 binance-coin 3 BNB Binance Coin 641.528816 254674805.821043 1620100433183
3 xrp 4 XRP XRP 1.461734 1969092162.016667 1620100433183
4 dogecoin 5 DOGE Dogecoin 0.419828 2694025432.110168 1620100433183
.. ... ... ... ... ... ... ...
95 abbc-coin 96 ABBC ABBC Coin 0.755244 355316.252287 1620100433183
96 status 97 SNT Status 0.169848 5966843.243043 1620100433183
97 nxm 98 NXM NXM 90.764252 7577199.874023 1620100433183
98 ocean-protocol 99 OCEAN Ocean Protocol 1.357968 9131449.423728 1620100433183
99 iotex 100 IOTX IoTeX 0.057802 576658.038699 1620100433183
[100 rows x 7 columns]
data dtypes:
id string
rank Int64
symbol string
name string
price_usd Float64
volume_usd_24_hr Float64
timestamp Int64
dtype: object
Performing post-processing on data¶
Perform post-processing on data by writing the code for the post_process
function
Note
Post-processors can be applied to alter the data, or extract new information from the data.
lab.py
.
.
.
import time
def post_process():
"""
In this example we will add a timestamp for the arrival time of the data.
"""
return [
ColumnAdd(key="arrival_time",
lam=lambda: int(time.time()*1000),
dtype=int
)
]
.
.
.
Use the rlab
shell command to run the lab
path/to/project_dir/> rlab
Output:
data point 1:
{'id': 'bitcoin', 'rank': '1', 'symbol': 'BTC', 'name': 'Bitcoin', 'supply': '18698850.0000000000000000', 'maxSupply': '21000000.0000000000000000', 'marketCapUsd': '1
041388865130.8623213956691350', 'volumeUsd24Hr': '12822561919.6746830356589619', 'priceUsd': '55692.6690748822693051', 'changePercent24Hr': '-4.1033665252363403', 'vw
ap24Hr': '57708.7312639442977184', 'explorer': 'https://blockchain.info/'}
data point 2:
{'id': 'ethereum', 'rank': '2', 'symbol': 'ETH', 'name': 'Ethereum', 'supply': '115729464.3115000000000000', 'maxSupply': None, 'marketCapUsd': '376411190202.66581272
13330461', 'volumeUsd24Hr': '17656637086.6618270054805080', 'priceUsd': '3252.5095699873722881', 'changePercent24Hr': '6.4420494833790460', 'vwap24Hr': '3234.41835079
37765772', 'explorer': 'https://etherscan.io/'}
data point 3:
{'id': 'binance-coin', 'rank': '3', 'symbol': 'BNB', 'name': 'Binance Coin', 'supply': '153432897.0000000000000000', 'maxSupply': '170532785.0000000000000000', 'marke
tCapUsd': '98431624817.6777436959489247', 'volumeUsd24Hr': '254674805.8210425908376882', 'priceUsd': '641.5288164550379551', 'changePercent24Hr': '1.1504585233985471'
, 'vwap24Hr': '653.0516845642682435', 'explorer': 'https://etherscan.io/token/0xB8c77482e45F1F44dE1745F52C74426C631bDD52'}
finalized data:
id rank symbol name price_usd volume_usd_24_hr timestamp arrival_time
0 bitcoin 1 BTC Bitcoin 55692.669075 12822561919.674683 1620100433183 1620104839526
1 ethereum 2 ETH Ethereum 3252.50957 17656637086.661827 1620100433183 1620104839526
2 binance-coin 3 BNB Binance Coin 641.528816 254674805.821043 1620100433183 1620104839526
3 xrp 4 XRP XRP 1.461734 1969092162.016667 1620100433183 1620104839526
4 dogecoin 5 DOGE Dogecoin 0.419828 2694025432.110168 1620100433183 1620104839526
.. ... ... ... ... ... ... ... ...
95 abbc-coin 96 ABBC ABBC Coin 0.755244 355316.252287 1620100433183 1620104839526
96 status 97 SNT Status 0.169848 5966843.243043 1620100433183 1620104839526
97 nxm 98 NXM NXM 90.764252 7577199.874023 1620100433183 1620104839526
98 ocean-protocol 99 OCEAN Ocean Protocol 1.357968 9131449.423728 1620100433183 1620104839526
99 iotex 100 IOTX IoTeX 0.057802 576658.038699 1620100433183 1620104839526
[100 rows x 8 columns]
finalized data dtypes:
id string
rank Int64
symbol string
name string
price_usd Float64
volume_usd_24_hr Float64
timestamp Int64
arrival_time Int64
dtype: object
Exporting data from the lab to a collector¶
Note
Once the lab has been used to build the form fields for an ApiForm, move the data to the collector
It is not necessary to use the lab feature of the library to perform data-extraction, it just makes things a bit easier.
Move the code from lab.py
to coincap.py
coincap.py
.
.
.
import requests
import logging
import time
def coincap_import():
data = None
url = "https://api.coincap.io/v2/assets/"
try:
data = [requests.get(url).json()]
except Exception as e:
logging.exception(e)
return data
def coincap_form():
pre_process = [
GlobalExtract(key="timestamp",
lam=lambda x: x["timestamp"],
dtype=int
),
ListExtract(lam=lambda x: x["data"])
]
data_features = [
Feature(key="id",
lam=lambda x: x["id"],
dtype=str),
Feature(key="rank",
lam=lambda x: x["rank"],
dtype=int),
Feature(key="symbol",
lam=lambda x: x["symbol"],
dtype=str),
Feature(key="name",
lam=lambda x: x["name"],
dtype=str),
Feature(key="price_usd", # Keys support renaming
lam=lambda x: x["priceUsd"],
dtype=float),
Feature(key="volume_usd_24_hr",
lam=lambda x: x["volumeUsd24Hr"],
dtype=float)
]
post_process = [
ColumnAdd(key="arrival_time",
lam=lambda: int(time.time()*1000),
dtype=int
)
]
return ApiForm(name="coincap", pre_process=pre_process, data_features=data_features, post_process=post_process)
.
.
.
Once the lab has been moved over, you can optionally run the clab
shell command to delete the lab
Setting up an authentication file for database targets¶
Create a JSON file in the AUTH directory
Copy the template for the database target you wish to use
Fill out the template
Setting up a stream target for live data¶
The following code will set up live streaming both to a local file location, and to a MySQL database
coincap.py
.
.
.
def coincap_streams():
"""
In this example, we will stream data live into a local file, and directly into a MySQL database.
"""
streams = [
Stream2Local(name="coincap",
path="STORE/coincap/live"
),
Stream2Sql(name="coincap",
auth_path="AUTH/mysql_auth.json",
db_name="stream_coincap",
dialect="mysql",
port="3306"
)
]
return streams
.
.
.
Yes it is that easy, no you do not have to build the tables.
Setting up a store target for data¶
The following will set up a storage target that will pull data from STORE/coincap/live
and store it to a MariaDB database periodically
coincap.py
.
.
.
def coincap_stores():
"""
In this example, we will store data every 10 minutes to a MariaDB database.
The files we store will then be composed into a single file, and stored in a different storage location.
"""
stores = [
Store2Sql(name="coincap",
seconds=600,
path="STORE/coincap/live",
db_name="store_coincap",
auth_path="AUTH/mariadb_auth.json",
port="3306",
dialect="mariadb",
move_composed_path="STORE/coincap/ten_minute_intervals/"
)
]
return stores
.
.
.
Registering a collector to run¶
To register a collector, all that needs to be done is set the import frequency by changing the seconds
parameter
coincap.py
.
.
.
coincap_info = Collector(name="coincap",
seconds=30, # Import data from the API every 30 seconds
import_target=coincap_import,
api_form=coincap_form,
streams=coincap_streams,
stores=coincap_stores,
debug=True # Set to False for production
)
.
.
.
Running the application¶
Run main.py
Info Log Outputs:
2021-05-04 01:01:14 stream.py INFO stream starting -> (local.parquet)
2021-05-04 01:01:14 stream.py INFO stream starting -> (sql.mysql)
2021-05-04 01:01:14 api2db.py INFO import scheduled: [30 seconds] (api request data) -> (streams)
2021-05-04 01:01:14 api2db.py INFO storage refresh scheduled: [30 seconds] -> (check stores)
2021-05-04 01:01:15 api2db.py INFO storage scheduled: [600 seconds] (STORE/coincap/live) -> (store)
2021-05-04 01:01:15 stream2sql.py INFO establishing connection to mysql://***/stream_coincap
2021-05-04 01:01:15 stream2sql.py INFO database not found mysql://***.com/stream_coincap... creating database
2021-05-04 01:01:15 stream2sql.py INFO connection established mysql://***/stream_coincap
2021-05-04 01:01:25 store.py INFO storage files composed, attempting to store 3600 rows to mariadb://***/store_coincap
2021-05-04 01:01:25 stream2sql.py INFO establishing connection to mariadb://***/store_coincap
2021-05-04 01:01:25 stream2sql.py INFO database not found mariadb://***/store_coincap... creating database
2021-05-04 01:01:25 stream2sql.py INFO connection established mariadb://***/store_coincap
Debug Log Outputs:
.
.
.
2021-05-04 01:01:24 stream2sql.py DEBUG 100 rows inserted into mysql://***/stream_coincap
2021-05-04 01:01:24 stream2local.py DEBUG storing 100 rows to STORE/coincap/live
2021-05-04 01:01:24 stream2sql.py DEBUG 100 rows inserted into mysql://***/stream_coincap
2021-05-04 01:01:25 stream2local.py DEBUG storing 100 rows to STORE/coincap/live
2021-05-04 01:01:25 stream2sql.py DEBUG 100 rows inserted into mysql://***/stream_coincap
2021-05-04 01:01:25 stream2sql.py DEBUG 3600 rows inserted into mariadb://***/store_coincap
2021-05-04 01:01:25 stream2local.py DEBUG storing 100 rows to STORE/coincap/live
2021-05-04 01:01:25 stream2sql.py DEBUG 100 rows inserted into mysql://***/stream_coincap
2021-05-04 01:01:26 stream2local.py DEBUG storing 100 rows to STORE/coincap/live
2021-05-04 01:01:26 stream2sql.py DEBUG 100 rows inserted into mysql://***/stream_coincap
2021-05-04 01:01:26 stream2local.py DEBUG storing 100 rows to STORE/coincap/live
2021-05-04 01:01:26 stream2sql.py DEBUG 100 rows inserted into mysql://***/stream_coincap
.
.
.