api2db.ingest.pre_process package

Submodules

api2db.ingest.pre_process.bad_row_swap module

Contains the BadRowSwap class

Note

BadRowSwap should not be used until AFTER ListExtract has been performed on the data, unless performing a list extract is not necessary on the data.

When using BadRowSwap, the following conditions must be met:

  1. The value contained at location key_1 must be able to be identified as valid, or in need of being swapped without any reference to the value at location key_2. (Typically using regex or performing type-checking)

  2. key_1 and key_2 must be unique within their respective row of data. data = {"key_1": {"key_1": 1, "key_2": 2}} would be invalid.

BadRowSwap will potentially drop rows of data. Rows meeting the following conditions will be dropped:

  • Any row that is missing key_1 as a key will be dropped.

  • Any row that evaluates as needing to be swapped based on key_1 that is missing key_2 will be dropped.

BadRowSwap will keep rows that meet the following conditions:

  • Any row that evaluates as not needing to be swapped based on key_1 will be kept, regardless of if key_2 exists or not.

  • Any row that evaluates as needing to be swapped based on key_1 that also contains key_2 will swap the values at the locations of the key_1 and key_2 and the row will be kept.

Performing BadRowSwap can be computationally expensive, since it walks all nested data until it finds the desired keys. So here are a few tips to help you determine if you should be using it or not.

Usage Tips for using BadRowSwap:

  • If both key_1 and key_2 are unimportant fields, I.e. Nullable fields and keeping them does not add significant value to the data consider just allowing the collector to Null them if they do not match the types or consider allowing them to simply have the wrong values if they have the same data-types. Otherwise you risk both slowing down data-collection, and dropping rows that have good data other than those swapped rows.

  • Always attempt to place the key at location key_1 as the more important value to retain. If you need to swap data like a “uuid” and a “description”, use the “uuid” as key_1

  • If you cannot place the key at location key_1 as the more important key, consider if the risk of losing data with a valid value for the more important key is worth it in instances where the less important key is missing

  • Consider the frequency that BadRowSwap would need to be run. If 1 out of 1,000,000 data-points contains values with swapped keys, is it worth running the computation on all 1,000,000 rows to save just that 1 row?

  • Analyze the data by hand. Pull it into a pandas DataFrame, and check it.

    • How often are is a row incorrect?

    • Are the erroneous rows ALWAYS the same key?

    • How often is one of the keys for the row missing when the rows have bad data?

Summary of BadRowSwap usage:

data = [
    {
        "id": "17.0",
        "size": "Foo",
    },
    {
        "id": "Bar",
        "size": "10.0"
    }
]

pre = BadRowSwap(key_1="id",
                 key_2="size",
                 lam=lambda x: re.match("[0-9][0-9]\.[0-9]+", x["id"]) is not None
                 )

Example Usage of BadRowSwap:

Occasionally when dealing with an API, the data is not always where it is supposed to be. Oftentimes this results in the rows containing the misplaced data being dropped altogether. In the instance that for some unknown reason the incoming data has keys that tend to occasionally have their values swapped so long as it is possible to check to see if the data has been swapped due to what the data should be, use BadRowSwap.

This example assumes that the API occasionally swaps the values for “id” and “latitude”. BadRowSwap can handle any level of nested data in these instances, so long as the keys for the values that are occasionally swapped are unique within a single row

>>> import re
... data = [
...     {
...         "id": "16.53",                                  # NEEDS SWAP = True
...          "place": {
...              "coords": {
...                  "latitude": "ID_1",
...                  "longitude": "-20.43"
...              },
...              "name": "place_1"
...          },
...         "details": "Some details... etc"
...      },
...
...     {
...         "id": "ID_2",                                   # NEEDS SWAP = False
...          "place": {
...              "coords": {
...                  "latitude": "15.43",
...                  "longitude": "-20.43"
...              },
...              "name": "place_2"
...          },
...         "details": "Some details... etc"
...      },
...
...     {
...         "id": "10.21",                                  # NEEDS SWAP = True
...         "place": {
...             "coords": {
...                                                         # Missing "latitude" key, results in row being skipped
...                 "longitude": "-20.43"
...             },
...             "name": "place_2"
...         },
...         "details": "Some details... etc"
...     },
...
...     {
...                                                         # Missing "id" key, results in row being skipped
...         "place": {
...             "coords": {
...                 "latitude": "ID_4",
...                 "longitude": "-20.43"
...             },
...             "name": "place_2"
...         },
...         "details": "Some details... etc"
...     },
...
...     {
...         "id": "ID_5",                                   # NEEDS SWAP = False
...         "place": {
...             "coords": {
...                                                         # Missing "latitude" row is kept, because no row swap needed
...                 "longitude": "-20.43"
...             },
...             "name": "place_2"
...         },
...         "details": "Some details... etc"
...     }
... ]
...
... pre = BadRowSwap(key_1="id",
...                  key_2="latitude",
...                  lam=lambda x: re.match("[0-9][0-9]\.[0-9]+", x["id"]) is not None
...                  )
...
... pre.lam_wrap(data)
[
    {
        "id": "ID_1",  # "id" and "latitude" have been swapped
        "place": {
            "coords": {
                "latitude": "16.53",
                "longitude": "-20.43"
            },
            "name": "place_1"
        },
        "details": "Some details... etc"
    },
    {
        "id": "ID_2",  # No changes required with this row
        "place": {
            "coords": {
                "latitude": "15.43",
                "longitude": "-20.43"
            },
            "name": "place_2"
        },
        "details": "Some details... etc"
    },
    # Row 3, and Row 4 have been dropped because they were missing key_1 or they required a swap and were missing key_2
    {
        "id": "ID_5",  # No changes required with this row
        "place": {
            "coords": {
                # The latitude is still missing but that can be handled later, it may be nullable, so it should be kept
                "longitude": "-20.43"
            },
            "name": "place_2"
        },
        "details": "Some details... etc"
    }
]
class api2db.ingest.pre_process.bad_row_swap.BadRowSwap(key_1: str, key_2: str, lam: Callable[[dict], bool])

Bases: api2db.ingest.pre_process.pre.Pre

Used to swap rows arriving from the API that have the values for the given key swapped occasionally

__init__(key_1: str, key_2: str, lam: Callable[[dict], bool])

Creates a BadRowSwap object

Parameters
  • key_1 – The key of a field that occasionally has its value swapped with the data from key_2

  • key_2 – The key of a field that occasionally has its value swapped with the data from key_1

  • lam – A function (anonymous, or not) that when given the value located under key_1 returns True if the keys need their values swapped otherwise returns False

lam_wrap(lam_arg: List[dict])List[dict]

Overrides super class method

Parameters

lam_arg – A list of dictionaries with each dictionary containing what will become a row in a DataFrame

Returns

A modified list of dictionaries with bad rows being either swapped or dropped.

api2db.ingest.pre_process.feature_flatten module

Contains the FeatureFlatten class

Note

FeatureFlatten should not be used until AFTER ListExtract has been performed on the data, unless performing a list extract is not necessary on the data.

Summary of FeatureFlatten usage:

data = [
    {
        "data_id": 1,
        "data_features": [
                            {
                                "x": 5,
                                "y": 10
                            },
                            {
                                "x": 7,
                                "y": 15
                            },
                            .
                            .
                            .
                         ]
    }
]
pre = FeatureFlatten(key="data_features")

Example Usage of FeatureFlatten:

>>> data = [
...     {
...         "data_id": 1,
...         "data_features": {
...                             "Foo": 5,
...                             "Bar": 10
...                          }
...     },
...
...     {
...         "data_id": 2,
...         "data_features": [
...                             {
...                                 "Foo": 5,
...                                 "Bar": 10
...                             },
...                             {
...                                 "Foo": 7,
...                                 "Bar": 15
...                             }
...                          ]
...     }
... ]
... pre = FeatureFlatten(key="data_features")
... pre.lam_wrap(data)
[
    {
        "data_id": 1,
        "data_features": {
                            "Foo": 5,
                            "Bar": 10
                         }
    },
    {
        "data_id": 2,
        "data_features": {
                            "Foo": 5,
                            "Bar": 10
                         }
    },
    {
        "data_id": 2,
        "data_features": {
                            "Foo": 7,
                            "Bar": 15
                         }
    }
]
class api2db.ingest.pre_process.feature_flatten.FeatureFlatten(key: str)

Bases: api2db.ingest.pre_process.pre.Pre

Used to flatten features containing arrays causing them to be incompatible for storage in a table-based schema

__init__(key: str)

Creates a FeatureFlatten object

Parameters

key – The key containing nested data that each needs to have its own row in the final DataFrame

ctype

type of data processor

Type

str

lam_wrap(lam_arg: Optional[List[dict]])List[dict]

Overrides super class method

Workflow:

  • Create an array of rows

  • For each dictionary d in the array of data-points

    • If the type of self.key is in d.keys() and type(d[self.key]) == list

      • For each item in list

        • Create a new row containing all data-features and the item by itself and add it to rows

    • If the type of self.key is in d.keys() and type(d[self.key]) == dict

      • Keep the row as it is, and add it to rows

Parameters

lam_arg – A list of dictionaries that each represent a row in the final DataFrame (Optional to safeguard against if previous pre-processors could not parse data, i.e. No data-points existed)

Returns

An array of dictionaries that each represent a row, with nested data extracted to their own rows

api2db.ingest.pre_process.global_extract module

Contains the GlobalExtract class

Summary of GlobalExtract usage:

data = {"date": "2021-04-19", "data_array": [{"id": 1, "name": "Foo"}, {"id": 2, "name": "Bar"}]}
pre = GlobalExtract(key="publish_time",
                    lam=lambda x: x["date"],
                    dtype=str
                    )

Final DataFrame

id

name

publish_time

1

Foo

2021-04-19

2

Bar

2021-04-19

Example Usage of GlobalExtract:

>>> # pre-processing operators
... pres = []
... # Dictionary that contains all globally extracted data
... pre_2_post_dict = {}
... # Incoming Data
... data = {"date": "2021-04-19", "data_array": [{"id": 1, "name": "Foo"}, {"id": 2, "name": "Bar"}]}
... # GlobalExtract instance for extracting the "date" from data, but replacing its key with "publish_time"
... pre = GlobalExtract(key="publish_time",
...                     lam=lambda x: x["date"],
...                     dtype=str
...                     )
... # The preprocessor gets added to the list of preprocessors
... pres.append(pre)
... # Each preprocesser gets applied sequentially
... for p in pres:
...     if p.ctype == "global_extract":
...         pre_2_post_dict[p.key] = p.lam_wrap(data)
...     else:
...         pass # See other pre-processors
... pre_2_post_dict
{"publish_time": {"value": "2021-04-19", "dtype": str}}

Later after the data has been extracted to a DataFrame df

# Assume df = DataFrame containing extracted data
# Assume dtype_convert is a function that maps a python native type to a pandas dtype

# For each globally extracted item
for k, v in pre_2_post_dict.items():
    # Add the item to the DataFrame -> These are GLOBAL values shared amongst ALL rows
    df[k] = v["value"]
    # Typecast the value to ensure it is the correct dtype
    df[k] = df[k].astype(dtype_convert(v["dtype"]))

Example of what DataFrame would be:

id

name

publish_time

1

Foo

2021-04-19

2

Bar

2021-04-19

class api2db.ingest.pre_process.global_extract.GlobalExtract(key: str, lam: Callable[[dict], Any], dtype)

Bases: api2db.ingest.pre_process.pre.Pre

Used to extract a global feature from incoming data

__init__(key: str, lam: Callable[[dict], Any], dtype)

Creates a GlobalExtract object

Parameters
  • key – The desired key of the feature for the storage target

  • lam – Anonymous function that specifies where the location of the feature that should be extracted is

  • dtype – The python native datatype the feature is expected to be

ctype

type of the data processor

Type

str

lam_wrap(lam_arg: dict)dict

Overrides super class method

Workflow:

  1. Attempt to perform the lam operation on the incoming data

  2. Attempt to cast the result of the lam operation to the dtype

    • If an exception occurs, returns {“value”: None, “dtype”: dtype}

  3. Return {“value”: result, “dtype”: dtype}

Parameters

lam_arg – A dictionary containing the feature that should be extracted

Returns

result or None, “dtype”: dtype}

Return type

A dictionary containing {“value”

api2db.ingest.pre_process.list_extract module

Contains the ListExtract class

Summary of ListExtract Usage:

data = { "actual_data_rows": [{"id": "row1"}, {"id": "row2"}], "erroneous_data": "FooBar" }
pre = ListExtract(lam=lambda x: x["actual_data_rows"])

Example Usage of ListExtract:

>>> data = {
...    "Foo": "Metadata",
...    "data_array": [
...            {
...                "data_id": 1,
...                "name": "name_1"
...            },
...            {
...                "data_id": 2,
...                "name": "name_2"
...            }
...        ]
... }
...
... pre = ListExtract(lam=lambda x: x["data_array"])
... pre.lam_wrap(data)
[
    {
        "data_id": 1,
        "name": "name_1"
    },
    {
        "data_id": 2,
        "name": "name_2"
    }
]
class api2db.ingest.pre_process.list_extract.ListExtract(lam: Callable[[dict], list])

Bases: api2db.ingest.pre_process.pre.Pre

Used to extract a list of dictionaries that will each represent a single row in a database

__init__(lam: Callable[[dict], list])

Creates a ListExtract object

Parameters

lam – Anonymous function that attempts to extract a list of data that will become rows in a DataFrame

ctype

type of data processor

Type

str

dtype

the datatype performing lam should yield

Type

type(list)

lam_wrap(lam_arg: dict)Optional[List[dict]]

Overrides super class method

Workflow:

  1. Attempt to perform the lam operation on the incoming data

  2. Attempt to cast the result lam operation to a list

    • If an exception occurs, return None

  3. Return the list of data

Parameters

lam_arg – A dictionary containing a list of dictionaries that will become the rows of a DataFrame

Returns

A list of dictionaries that will become the rows of a DataFrame if successful otherwise None

api2db.ingest.pre_process.pre module

Contains the Pre class

class api2db.ingest.pre_process.pre.Pre

Bases: api2db.ingest.base_lam.BaseLam

Direct subclass of BaseLam with no overriders, members, or methods. Exists solely for organizational purposes

Module contents

Original Author

Tristen Harr

Creation Date

04/29/2021

Revisions

None