Examples¶
Shell Commands¶
pmake¶
This shell command is used for initial creation of the project structure.
Given a blank project directory
project_dir-----/
Shell Command: path/to/project_dir> pmake FooCollector BarCollector
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Note
This command can also be used without any collector arguments, and collectors can be added using the cadd
shell command.
cadd¶
This shell command is used to add a collector to an existing api2db project
Given the following project structure
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Shell Command: path/to/procect_dir> cadd BarCollector
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
crem¶
This shell command is used to remove a collector registered with an existing api2db project
Given the following project
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Shell Command: path/to/project_dir> crem BarCollector
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
clist¶
This shell command is used to show a list of collectors registered with an existing api2db project
Given the following project
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Shell Command: path/to/procect_dir> clist
Out: ["FooCollector", "BarCollector"]
pclear¶
This shell command is used to clear a project and should ONLY be used if a complete restart is required.
Given the following project
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Shell Command: path/to/project_dir> pclear
project_dir-----/
mlab¶
This shell command is used for creation of a lab. Labs offer an easier way to design an ApiForm.
Given a project directory
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Shell Command: path/to/project_dir> mlab
project_dir-----/
|
apis-------/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-------/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
laboratory-/
| |- lab.py EDIT THIS FILE!
|
helpers.py
|
main.py
rlab¶
This shell command is used to run a lab.
clab¶
This shell command is used to clear a lab.
project_dir-----/
|
apis-------/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-------/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
laboratory-/
| |- lab.py EDIT THIS FILE!
|
helpers.py
|
main.py
Shell Command: path/to/project_dir> clab
project_dir-----/
|
apis-----/
| |- __init__.py
| |- FooCollector.py
| |- BarCollector.py
|
AUTH-----/
| |- bigquery_auth_template.json
| |- omnisci_auth_template.json
| |- sql_auth_template.json
|
CACHE/
|
STORE/
|
helpers.py
|
main.py
Pre-processing¶
BadRowSwap¶
Note
BadRowSwap should not be used until AFTER ListExtract has been performed on the data, unless performing a list extract is not necessary on the data.
When using BadRowSwap, the following conditions must be met:
The value contained at location
key_1
must be able to be identified as valid, or in need of being swapped without any reference to the value at locationkey_2
. (Typically using regex or performing type-checking)
key_1
andkey_2
must be unique within their respective row of data.data = {"key_1": {"key_1": 1, "key_2": 2}}
would be invalid.
BadRowSwap will potentially drop rows of data. Rows meeting the following conditions will be dropped:
Any row that is missing
key_1
as a key will be dropped.Any row that evaluates as needing to be swapped based on
key_1
that is missingkey_2
will be dropped.
BadRowSwap will keep rows that meet the following conditions:
Any row that evaluates as not needing to be swapped based on
key_1
will be kept, regardless of ifkey_2
exists or not.Any row that evaluates as needing to be swapped based on
key_1
that also containskey_2
will swap the values at the locations of thekey_1
andkey_2
and the row will be kept.
Performing BadRowSwap can be computationally expensive, since it walks all nested data until it finds the desired keys. So here are a few tips to help you determine if you should be using it or not.
Usage Tips for using BadRowSwap:
If both
key_1
andkey_2
are unimportant fields, I.e. Nullable fields and keeping them does not add significant value to the data consider just allowing the collector to Null them if they do not match the types or consider allowing them to simply have the wrong values if they have the same data-types. Otherwise you risk both slowing down data-collection, and dropping rows that have good data other than those swapped rows.Always attempt to place the key at location
key_1
as the more important value to retain. If you need to swap data like a “uuid” and a “description”, use the “uuid” askey_1
If you cannot place the key at location
key_1
as the more important key, consider if the risk of losing data with a valid value for the more important key is worth it in instances where the less important key is missingConsider the frequency that BadRowSwap would need to be run. If 1 out of 1,000,000 data-points contains values with swapped keys, is it worth running the computation on all 1,000,000 rows to save just that 1 row?
Analyze the data by hand. Pull it into a pandas DataFrame, and check it.
How often are is a row incorrect?
Are the erroneous rows ALWAYS the same key?
How often is one of the keys for the row missing when the rows have bad data?
Summary of BadRowSwap usage:
data = [
{
"id": "17.0",
"size": "Foo",
},
{
"id": "Bar",
"size": "10.0"
}
]
pre = BadRowSwap(key_1="id",
key_2="size",
lam=lambda x: re.match("[0-9][0-9]\.[0-9]+", x["id"]) is not None
)
Example Usage of BadRowSwap:
Occasionally when dealing with an API, the data is not always where it is supposed to be. Oftentimes this results in the rows containing the misplaced data being dropped altogether. In the instance that for some unknown reason the incoming data has keys that tend to occasionally have their values swapped so long as it is possible to check to see if the data has been swapped due to what the data should be, use BadRowSwap.
This example assumes that the API occasionally swaps the values for “id” and “latitude”. BadRowSwap can handle any level of nested data in these instances, so long as the keys for the values that are occasionally swapped are unique within a single row
>>> import re
... data = [
... {
... "id": "16.53", # NEEDS SWAP = True
... "place": {
... "coords": {
... "latitude": "ID_1",
... "longitude": "-20.43"
... },
... "name": "place_1"
... },
... "details": "Some details... etc"
... },
...
... {
... "id": "ID_2", # NEEDS SWAP = False
... "place": {
... "coords": {
... "latitude": "15.43",
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... },
...
... {
... "id": "10.21", # NEEDS SWAP = True
... "place": {
... "coords": {
... # Missing "latitude" key, results in row being skipped
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... },
...
... {
... # Missing "id" key, results in row being skipped
... "place": {
... "coords": {
... "latitude": "ID_4",
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... },
...
... {
... "id": "ID_5", # NEEDS SWAP = False
... "place": {
... "coords": {
... # Missing "latitude" row is kept, because no row swap needed
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... }
... ]
...
... pre = BadRowSwap(key_1="id",
... key_2="latitude",
... lam=lambda x: re.match("[0-9][0-9]\.[0-9]+", x["id"]) is not None
... )
...
... pre.lam_wrap(data)
[
{
"id": "ID_1", # "id" and "latitude" have been swapped
"place": {
"coords": {
"latitude": "16.53",
"longitude": "-20.43"
},
"name": "place_1"
},
"details": "Some details... etc"
},
{
"id": "ID_2", # No changes required with this row
"place": {
"coords": {
"latitude": "15.43",
"longitude": "-20.43"
},
"name": "place_2"
},
"details": "Some details... etc"
},
# Row 3, and Row 4 have been dropped because they were missing key_1 or they required a swap and were missing key_2
{
"id": "ID_5", # No changes required with this row
"place": {
"coords": {
# The latitude is still missing but that can be handled later, it may be nullable, so it should be kept
"longitude": "-20.43"
},
"name": "place_2"
},
"details": "Some details... etc"
}
]
FeatureFlatten¶
Note
FeatureFlatten should not be used until AFTER ListExtract has been performed on the data, unless performing a list extract is not necessary on the data.
Summary of FeatureFlatten usage:
data = [
{
"data_id": 1,
"data_features": [
{
"x": 5,
"y": 10
},
{
"x": 7,
"y": 15
},
.
.
.
]
}
]
pre = FeatureFlatten(key="data_features")
Example Usage of FeatureFlatten:
>>> data = [
... {
... "data_id": 1,
... "data_features": {
... "Foo": 5,
... "Bar": 10
... }
... },
...
... {
... "data_id": 2,
... "data_features": [
... {
... "Foo": 5,
... "Bar": 10
... },
... {
... "Foo": 7,
... "Bar": 15
... }
... ]
... }
... ]
... pre = FeatureFlatten(key="data_features")
... pre.lam_wrap(data)
[
{
"data_id": 1,
"data_features": {
"Foo": 5,
"Bar": 10
}
},
{
"data_id": 2,
"data_features": {
"Foo": 5,
"Bar": 10
}
},
{
"data_id": 2,
"data_features": {
"Foo": 7,
"Bar": 15
}
}
]
GlobalExtract¶
Summary of GlobalExtract usage:
data = {"date": "2021-04-19", "data_array": [{"id": 1, "name": "Foo"}, {"id": 2, "name": "Bar"}]}
pre = GlobalExtract(key="publish_time",
lam=lambda x: x["date"],
dtype=str
)
Final DataFrame
id |
name |
publish_time |
---|---|---|
1 |
Foo |
2021-04-19 |
2 |
Bar |
2021-04-19 |
Example Usage of GlobalExtract:
>>> # pre-processing operators
... pres = []
... # Dictionary that contains all globally extracted data
... pre_2_post_dict = {}
... # Incoming Data
... data = {"date": "2021-04-19", "data_array": [{"id": 1, "name": "Foo"}, {"id": 2, "name": "Bar"}]}
... # GlobalExtract instance for extracting the "date" from data, but replacing its key with "publish_time"
... pre = GlobalExtract(key="publish_time",
... lam=lambda x: x["date"],
... dtype=str
... )
... # The preprocessor gets added to the list of preprocessors
... pres.append(pre)
... # Each preprocesser gets applied sequentially
... for p in pres:
... if p.ctype == "global_extract":
... pre_2_post_dict[p.key] = p.lam_wrap(data)
... else:
... pass # See other pre-processors
... pre_2_post_dict
{"publish_time": {"value": "2021-04-19", "dtype": str}}
Later after the data has been extracted to a DataFrame df
# Assume df = DataFrame containing extracted data
# Assume dtype_convert is a function that maps a python native type to a pandas dtype
# For each globally extracted item
for k, v in pre_2_post_dict.items():
# Add the item to the DataFrame -> These are GLOBAL values shared amongst ALL rows
df[k] = v["value"]
# Typecast the value to ensure it is the correct dtype
df[k] = df[k].astype(dtype_convert(v["dtype"]))
Example of what DataFrame would be:
id |
name |
publish_time |
---|---|---|
1 |
Foo |
2021-04-19 |
2 |
Bar |
2021-04-19 |
ListExtract¶
Summary of ListExtract Usage:
data = { "actual_data_rows": [{"id": "row1"}, {"id": "row2"}], "erroneous_data": "FooBar" }
pre = ListExtract(lam=lambda x: x["actual_data_rows"])
Example Usage of ListExtract:
>>> data = {
... "Foo": "Metadata",
... "data_array": [
... {
... "data_id": 1,
... "name": "name_1"
... },
... {
... "data_id": 2,
... "name": "name_2"
... }
... ]
... }
...
... pre = ListExtract(lam=lambda x: x["data_array"])
... pre.lam_wrap(data)
[
{
"data_id": 1,
"name": "name_1"
},
{
"data_id": 2,
"name": "name_2"
}
]
Extracting data-features¶
Summary of Feature Usage:¶
data = [{"id": 1, "name": "Foo", "nest0": {"nest1": {"x": True}, "y": 14.3 } }, ... ]
data_features = [
Feature(key="uuid", lam=lambda x: x["id"], dtype=int), # Extracts "id" and rename it to "uuid"
Feature(key="name", lam=lambda x: x["name"], dtype=str), # Will extract "name" keeping the key as "name"
Feature(key="x", lam=lambda x: x["nest0"]["nest1"]["x"], dtype=bool), # Will extract "x"
Feature(key="y", lam=lambda x: x["nest0"]["y"], dtype=bool) # Will extract "y"
]
Post-processing¶
ColumnAdd¶
Summary of ColumnAdd Usage:
DataFrame df
Foo |
Bar |
---|---|
1 |
A |
2 |
B |
3 |
C |
post = ColumnAdd(key="FooBar", lam=lambda: 5, dtype=int)
DataFrame df
Foo |
Bar |
FooBar |
---|---|---|
1 |
A |
5 |
2 |
B |
5 |
3 |
C |
5 |
Example Usage of ColumnAdd:
>>> import pandas as pd
... def f():
... return 5
... df = pd.DataFrame({"Foo": [1, 2, 3], "Bar": ["A", "B", "C"]}) # Setup
...
... post = ColumnAdd(key="timestamp", lam=lambda x: f, dtype=int)
... post.lam_wrap(df)
pd.DataFrame({"Foo": [1, 2, 3], "Bar": ["A", "B", "C"], "FooBar": [5, 5, 5]})
ColumnApply¶
Summary of ColumnApply Usage:
DataFrame df
Foo |
Bar |
---|---|
1 |
A |
2 |
B |
3 |
C |
post = ColumnApply(key="Foo", lam=lambda x: x + 1, dtype=int)
DataFrame df
Foo |
Bar |
---|---|
2 |
A |
3 |
B |
4 |
C |
Example Usage of ColumnApply:
>>> import pandas as pd
... df = pd.DataFrame({"Foo": [1, 2, 3], "Bar": ["A", "B", "C"]}) # Setup
...
... post = ColumnApply(key="Foo", lam=lambda x: x + 1, dtype=int)
... post.lam_wrap(df)
pd.DataFrame({"Foo": [2, 3, 4], "Bar": ["A", "B", "C"]})
ColumnsCalculate¶
Note
ColumnsCalculate can be used to
Replace columns in a DataFrame with calculated values
Add new columns to a DataFrame based on calculations from existing columns
Summary of ColumnsCalculate Usage:
DataFrame df
Foo |
Bar |
---|---|
1 |
2 |
2 |
4 |
3 |
8 |
def foobar(df):
df["Foo+Bar"] = df["Foo"] + df["Bar"]
df["Foo*Bar"] = df["Foo"] * df["Bar"]
return df[["Foo+Bar", "Foo*Bar"]]
post = ColumnsCalculate(keys=["Foo+Bar", "Foo*Bar"], lam=lambda x: foobar(x), dtype=int)
DataFrame df
Foo |
Bar |
Foo+Bar |
Foo*Bar |
---|---|---|---|
1 |
2 |
3 |
2 |
2 |
4 |
6 |
8 |
3 |
8 |
11 |
24 |
Example Usage of ColumnsCalculate:
>>> import pandas as pd
... df = pd.DataFrame({"Foo": [1, 2, 3], "Bar": [2, 4, 8]}) # Setup
...
... def foobar(d):
... d["Foo+Bar"] = d["Foo"] + d["Bar"]
... d["Foo*Bar"] = d["Foo"] * d["Bar"]
... return d[["Foo+Bar", "Foo*Bar"]]
...
... post = ColumnsCalculate(keys=["Foo+Bar", "Foo*Bar"], lam=lambda x: foobar(x), dtype=int)
... post.lam_wrap(df)
pd.DataFrame({"Foo+Bar": [3, 6, 11], "Foo*Bar": [2, 8, 24]})
DateCast¶
Summary of DateCast Usage:
DataFrame df
Foo |
Bar |
---|---|
2021-04-29 01:39:00 |
False |
2021-04-29 01:39:00 |
False |
Bar! |
True |
DataFrame df.dtypes
Foo |
Bar |
---|---|
string |
bool |
post = DateCast(key="Foo", fmt="%Y-%m-%d %H:%M:%S")
DataFrame df
Foo |
Bar |
---|---|
2021-04-29 01:39:00 |
False |
2021-04-29 01:39:00 |
False |
NaT |
True |
DataFrame df.dtypes
Foo |
Bar |
---|---|
datetime64[ns] |
bool |
DropNa¶
Simply a shortcut class for a common operation.
Summary of DropNa Usage:
See pandas Documentation
MergeStatic¶
Note
MergeStatic is used to merge data together. A common use case of this is in situations where a data-vendor provides an API that gives data-points “Foo”, “Bar”, and “location_id” where “location_id” references a different data-set.
It is common for data-providers to have a file that does not update very frequently, i.e. is mostly static that contains this information.
The typical workflow of a MergeStatic instance is as follows:
Create a LocalStream with mode set to update or replace and a target like CACHE/my_local_stream.pickle
Set the LocalStream to run periodically (6 hours, 24 hours, 10 days, whatever frequency this data is updated)
Add a MergeStatic object to the frequently updating datas post-processors and set the path to the LocalStream storage path.