api2db.ingest.pre_process package¶
Submodules¶
api2db.ingest.pre_process.bad_row_swap module¶
Contains the BadRowSwap class¶
Note
BadRowSwap should not be used until AFTER ListExtract has been performed on the data, unless performing a list extract is not necessary on the data.
When using BadRowSwap, the following conditions must be met:
The value contained at location
key_1
must be able to be identified as valid, or in need of being swapped without any reference to the value at locationkey_2
. (Typically using regex or performing type-checking)
key_1
andkey_2
must be unique within their respective row of data.data = {"key_1": {"key_1": 1, "key_2": 2}}
would be invalid.
BadRowSwap will potentially drop rows of data. Rows meeting the following conditions will be dropped:
Any row that is missing
key_1
as a key will be dropped.Any row that evaluates as needing to be swapped based on
key_1
that is missingkey_2
will be dropped.
BadRowSwap will keep rows that meet the following conditions:
Any row that evaluates as not needing to be swapped based on
key_1
will be kept, regardless of ifkey_2
exists or not.Any row that evaluates as needing to be swapped based on
key_1
that also containskey_2
will swap the values at the locations of thekey_1
andkey_2
and the row will be kept.
Performing BadRowSwap can be computationally expensive, since it walks all nested data until it finds the desired keys. So here are a few tips to help you determine if you should be using it or not.
Usage Tips for using BadRowSwap:
If both
key_1
andkey_2
are unimportant fields, I.e. Nullable fields and keeping them does not add significant value to the data consider just allowing the collector to Null them if they do not match the types or consider allowing them to simply have the wrong values if they have the same data-types. Otherwise you risk both slowing down data-collection, and dropping rows that have good data other than those swapped rows.Always attempt to place the key at location
key_1
as the more important value to retain. If you need to swap data like a “uuid” and a “description”, use the “uuid” askey_1
If you cannot place the key at location
key_1
as the more important key, consider if the risk of losing data with a valid value for the more important key is worth it in instances where the less important key is missingConsider the frequency that BadRowSwap would need to be run. If 1 out of 1,000,000 data-points contains values with swapped keys, is it worth running the computation on all 1,000,000 rows to save just that 1 row?
Analyze the data by hand. Pull it into a pandas DataFrame, and check it.
How often are is a row incorrect?
Are the erroneous rows ALWAYS the same key?
How often is one of the keys for the row missing when the rows have bad data?
Summary of BadRowSwap usage:¶
data = [
{
"id": "17.0",
"size": "Foo",
},
{
"id": "Bar",
"size": "10.0"
}
]
pre = BadRowSwap(key_1="id",
key_2="size",
lam=lambda x: re.match("[0-9][0-9]\.[0-9]+", x["id"]) is not None
)
Example Usage of BadRowSwap:¶
Occasionally when dealing with an API, the data is not always where it is supposed to be. Oftentimes this results in the rows containing the misplaced data being dropped altogether. In the instance that for some unknown reason the incoming data has keys that tend to occasionally have their values swapped so long as it is possible to check to see if the data has been swapped due to what the data should be, use BadRowSwap.
This example assumes that the API occasionally swaps the values for “id” and “latitude”. BadRowSwap can handle any level of nested data in these instances, so long as the keys for the values that are occasionally swapped are unique within a single row
>>> import re
... data = [
... {
... "id": "16.53", # NEEDS SWAP = True
... "place": {
... "coords": {
... "latitude": "ID_1",
... "longitude": "-20.43"
... },
... "name": "place_1"
... },
... "details": "Some details... etc"
... },
...
... {
... "id": "ID_2", # NEEDS SWAP = False
... "place": {
... "coords": {
... "latitude": "15.43",
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... },
...
... {
... "id": "10.21", # NEEDS SWAP = True
... "place": {
... "coords": {
... # Missing "latitude" key, results in row being skipped
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... },
...
... {
... # Missing "id" key, results in row being skipped
... "place": {
... "coords": {
... "latitude": "ID_4",
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... },
...
... {
... "id": "ID_5", # NEEDS SWAP = False
... "place": {
... "coords": {
... # Missing "latitude" row is kept, because no row swap needed
... "longitude": "-20.43"
... },
... "name": "place_2"
... },
... "details": "Some details... etc"
... }
... ]
...
... pre = BadRowSwap(key_1="id",
... key_2="latitude",
... lam=lambda x: re.match("[0-9][0-9]\.[0-9]+", x["id"]) is not None
... )
...
... pre.lam_wrap(data)
[
{
"id": "ID_1", # "id" and "latitude" have been swapped
"place": {
"coords": {
"latitude": "16.53",
"longitude": "-20.43"
},
"name": "place_1"
},
"details": "Some details... etc"
},
{
"id": "ID_2", # No changes required with this row
"place": {
"coords": {
"latitude": "15.43",
"longitude": "-20.43"
},
"name": "place_2"
},
"details": "Some details... etc"
},
# Row 3, and Row 4 have been dropped because they were missing key_1 or they required a swap and were missing key_2
{
"id": "ID_5", # No changes required with this row
"place": {
"coords": {
# The latitude is still missing but that can be handled later, it may be nullable, so it should be kept
"longitude": "-20.43"
},
"name": "place_2"
},
"details": "Some details... etc"
}
]
-
class
api2db.ingest.pre_process.bad_row_swap.
BadRowSwap
(key_1: str, key_2: str, lam: Callable[[dict], bool])¶ Bases:
api2db.ingest.pre_process.pre.Pre
Used to swap rows arriving from the API that have the values for the given key swapped occasionally
-
__init__
(key_1: str, key_2: str, lam: Callable[[dict], bool])¶ Creates a BadRowSwap object
- Parameters
key_1 – The key of a field that occasionally has its value swapped with the data from
key_2
key_2 – The key of a field that occasionally has its value swapped with the data from
key_1
lam – A function (anonymous, or not) that when given the value located under
key_1
returns True if the keys need their values swapped otherwise returns False
-
lam_wrap
(lam_arg: List[dict]) → List[dict]¶ Overrides super class method
- Parameters
lam_arg – A list of dictionaries with each dictionary containing what will become a row in a DataFrame
- Returns
A modified list of dictionaries with bad rows being either swapped or dropped.
-
api2db.ingest.pre_process.feature_flatten module¶
Contains the FeatureFlatten class¶
Note
FeatureFlatten should not be used until AFTER ListExtract has been performed on the data, unless performing a list extract is not necessary on the data.
Summary of FeatureFlatten usage:¶
data = [
{
"data_id": 1,
"data_features": [
{
"x": 5,
"y": 10
},
{
"x": 7,
"y": 15
},
.
.
.
]
}
]
pre = FeatureFlatten(key="data_features")
Example Usage of FeatureFlatten:¶
>>> data = [
... {
... "data_id": 1,
... "data_features": {
... "Foo": 5,
... "Bar": 10
... }
... },
...
... {
... "data_id": 2,
... "data_features": [
... {
... "Foo": 5,
... "Bar": 10
... },
... {
... "Foo": 7,
... "Bar": 15
... }
... ]
... }
... ]
... pre = FeatureFlatten(key="data_features")
... pre.lam_wrap(data)
[
{
"data_id": 1,
"data_features": {
"Foo": 5,
"Bar": 10
}
},
{
"data_id": 2,
"data_features": {
"Foo": 5,
"Bar": 10
}
},
{
"data_id": 2,
"data_features": {
"Foo": 7,
"Bar": 15
}
}
]
-
class
api2db.ingest.pre_process.feature_flatten.
FeatureFlatten
(key: str)¶ Bases:
api2db.ingest.pre_process.pre.Pre
Used to flatten features containing arrays causing them to be incompatible for storage in a table-based schema
-
__init__
(key: str)¶ Creates a FeatureFlatten object
- Parameters
key – The key containing nested data that each needs to have its own row in the final DataFrame
-
ctype
¶ type of data processor
- Type
str
-
lam_wrap
(lam_arg: Optional[List[dict]]) → List[dict]¶ Overrides super class method
Workflow:
Create an array of
rows
For each dictionary
d
in the array of data-pointsIf the type of
self.key
is ind.keys()
andtype(d[self.key]) == list
For each item in list
Create a new row containing all data-features and the item by itself and add it to
rows
If the type of
self.key
is ind.keys()
andtype(d[self.key]) == dict
Keep the row as it is, and add it to
rows
- Parameters
lam_arg – A list of dictionaries that each represent a row in the final DataFrame (Optional to safeguard against if previous pre-processors could not parse data, i.e. No data-points existed)
- Returns
An array of dictionaries that each represent a row, with nested data extracted to their own rows
-
api2db.ingest.pre_process.global_extract module¶
Contains the GlobalExtract class¶
Summary of GlobalExtract usage:¶
data = {"date": "2021-04-19", "data_array": [{"id": 1, "name": "Foo"}, {"id": 2, "name": "Bar"}]}
pre = GlobalExtract(key="publish_time",
lam=lambda x: x["date"],
dtype=str
)
Final DataFrame
id |
name |
publish_time |
---|---|---|
1 |
Foo |
2021-04-19 |
2 |
Bar |
2021-04-19 |
Example Usage of GlobalExtract:¶
>>> # pre-processing operators
... pres = []
... # Dictionary that contains all globally extracted data
... pre_2_post_dict = {}
... # Incoming Data
... data = {"date": "2021-04-19", "data_array": [{"id": 1, "name": "Foo"}, {"id": 2, "name": "Bar"}]}
... # GlobalExtract instance for extracting the "date" from data, but replacing its key with "publish_time"
... pre = GlobalExtract(key="publish_time",
... lam=lambda x: x["date"],
... dtype=str
... )
... # The preprocessor gets added to the list of preprocessors
... pres.append(pre)
... # Each preprocesser gets applied sequentially
... for p in pres:
... if p.ctype == "global_extract":
... pre_2_post_dict[p.key] = p.lam_wrap(data)
... else:
... pass # See other pre-processors
... pre_2_post_dict
{"publish_time": {"value": "2021-04-19", "dtype": str}}
Later after the data has been extracted to a DataFrame df
# Assume df = DataFrame containing extracted data
# Assume dtype_convert is a function that maps a python native type to a pandas dtype
# For each globally extracted item
for k, v in pre_2_post_dict.items():
# Add the item to the DataFrame -> These are GLOBAL values shared amongst ALL rows
df[k] = v["value"]
# Typecast the value to ensure it is the correct dtype
df[k] = df[k].astype(dtype_convert(v["dtype"]))
Example of what DataFrame would be:
id |
name |
publish_time |
---|---|---|
1 |
Foo |
2021-04-19 |
2 |
Bar |
2021-04-19 |
-
class
api2db.ingest.pre_process.global_extract.
GlobalExtract
(key: str, lam: Callable[[dict], Any], dtype)¶ Bases:
api2db.ingest.pre_process.pre.Pre
Used to extract a global feature from incoming data
-
__init__
(key: str, lam: Callable[[dict], Any], dtype)¶ Creates a GlobalExtract object
- Parameters
key – The desired key of the feature for the storage target
lam – Anonymous function that specifies where the location of the feature that should be extracted is
dtype – The python native datatype the feature is expected to be
-
ctype
¶ type of the data processor
- Type
str
-
lam_wrap
(lam_arg: dict) → dict¶ Overrides super class method
Workflow:
Attempt to perform the
lam
operation on the incoming dataAttempt to cast the result of the
lam
operation to thedtype
If an exception occurs, returns {“value”: None, “dtype”:
dtype
}
Return {“value”:
result
, “dtype”:dtype
}
- Parameters
lam_arg – A dictionary containing the feature that should be extracted
- Returns
result or None
, “dtype”:dtype
}- Return type
A dictionary containing {“value”
-
api2db.ingest.pre_process.list_extract module¶
Contains the ListExtract class¶
Summary of ListExtract Usage:¶
data = { "actual_data_rows": [{"id": "row1"}, {"id": "row2"}], "erroneous_data": "FooBar" }
pre = ListExtract(lam=lambda x: x["actual_data_rows"])
Example Usage of ListExtract:¶
>>> data = {
... "Foo": "Metadata",
... "data_array": [
... {
... "data_id": 1,
... "name": "name_1"
... },
... {
... "data_id": 2,
... "name": "name_2"
... }
... ]
... }
...
... pre = ListExtract(lam=lambda x: x["data_array"])
... pre.lam_wrap(data)
[
{
"data_id": 1,
"name": "name_1"
},
{
"data_id": 2,
"name": "name_2"
}
]
-
class
api2db.ingest.pre_process.list_extract.
ListExtract
(lam: Callable[[dict], list])¶ Bases:
api2db.ingest.pre_process.pre.Pre
Used to extract a list of dictionaries that will each represent a single row in a database
-
__init__
(lam: Callable[[dict], list])¶ Creates a ListExtract object
- Parameters
lam – Anonymous function that attempts to extract a list of data that will become rows in a DataFrame
-
ctype
¶ type of data processor
- Type
str
-
dtype
¶ the datatype performing lam should yield
- Type
type(list)
-
lam_wrap
(lam_arg: dict) → Optional[List[dict]]¶ Overrides super class method
Workflow:
Attempt to perform the
lam
operation on the incoming dataAttempt to cast the result
lam
operation to a listIf an exception occurs, return None
Return the list of data
- Parameters
lam_arg – A dictionary containing a list of dictionaries that will become the rows of a DataFrame
- Returns
A list of dictionaries that will become the rows of a DataFrame if successful otherwise None
-
api2db.ingest.pre_process.pre module¶
Contains the Pre class¶
-
class
api2db.ingest.pre_process.pre.
Pre
¶ Bases:
api2db.ingest.base_lam.BaseLam
Direct subclass of BaseLam with no overriders, members, or methods. Exists solely for organizational purposes
Module contents¶
Original Author |
Tristen Harr |
Creation Date |
04/29/2021 |
Revisions |
None |