api2db.ingest package

Subpackages

Submodules

api2db.ingest.api2pandas module

Contains the Api2Pandas class

class api2db.ingest.api2pandas.Api2Pandas(api_form: Callable[], api2db.ingest.api_form.ApiForm])

Bases: object

Used to extract incoming data from an API into a pandas DataFrame

__init__(api_form: Callable[], api2db.ingest.api_form.ApiForm])

Creates a Api2Pandas object and loads its ApiForm

Parameters

api_form – The function that generates the ApiForm for the associated collector

dependencies_satisfied()bool

Checks to ensure any data-linking dependency files exist

This feature currently only exists for api2db.ingest.post_process.merge_static.MergeStatic

Returns

True if all dependencies are satisfied, otherwise False

extract(data: dict)Optional[pandas.core.frame.DataFrame]

Performs data-extraction from data arriving from an API.

Workflow:

  1. Perform all pre-processing on data

  2. Perform all data-feature extraction

  3. Perform all post-processing on data

  4. Return a DataFrame containing the cleaned data.

Parameters

data – The data arriving from an API to perform data extraction on.

Returns

The cleaned data if it is possible to clean the data otherwise None

api2db.ingest.api_form module

Contains the ApiForm class

class api2db.ingest.api_form.ApiForm(name: str, pre_process: Optional[List[api2db.ingest.pre_process.pre.Pre]] = None, data_features: Optional[List[api2db.ingest.data_feature.feature.Feature]] = None, post_process: Optional[List[api2db.ingest.post_process.post.Post]] = None)

Bases: object

Used to clean and process incoming data arriving from an Api

__init__(name: str, pre_process: Optional[List[api2db.ingest.pre_process.pre.Pre]] = None, data_features: Optional[List[api2db.ingest.data_feature.feature.Feature]] = None, post_process: Optional[List[api2db.ingest.post_process.post.Post]] = None)

Creates an ApiForm

Note

The ApiForm is used by api2db to do the processing and cleaning of data. Incoming data goes through 3 phases.

  1. Pre-Processing

    • Extract global data-features

    • Extract a list of data-points that will serve as the rows in a database

    • Flatten nested arrays of data

    • Swap extraneous rows returned from poorly implemented APIs

  2. Feature Extraction

    • Extracts the data features for each row that will be stored in a database

  3. Post-Processing

    • Add new columns of data that will be the same globally for the arriving data. I.e. arrival timestamps

    • Apply functions across data columns, replacing the data with the calculated value. I.e. Reformat strings, strip whitespace, etc.

    • Add new columns of data that are derived from performing calculations on existing columns. I.e. Use a latitude and longitude column to calculate a new column called country

    • Cast columns that contain datetime data from strings to date times.

    • Drop columns that should not contain null values.

    • Perform merging of incoming data with locally stored reference tables. I.e. Incoming data has column location_id field, a reference table contains location info with the location_id field being a link between the two. This allows for data to be merged on column location_id in order to contain all data in a single table.

Parameters
  • name – The name of the collector the ApiForm is associated with

  • pre_process – An array pre-processing objects to be applied sequentially on incoming data

  • data_features – An array of data features to be extracted from the incoming data. The programmer can choose which data features they require, and keep only those.

  • post_process – An array of post-processing objects to be applied sequentially on the data after data has been cleaned and extracted to a pandas.DataFrame

add_pre(pre: api2db.ingest.pre_process.pre.Pre)None

Allows the programmer to manually add a item to the pre-processing array.

Parameters

pre – The pre-processing object to add

Returns

None

add_feature(feat: api2db.ingest.data_feature.feature.Feature)None

Allows the programmer to manually add a item to the data-features array.

Parameters

feat – The feature object to add

Returns

None

add_post(post: api2db.ingest.post_process.post.Post)None

Allows the programmer to manually add a item to the post-processing array.

Parameters

post – The post-processing object to add

Returns

None

pandas_typecast()dict

Performs typecasting from python native types to their pandas counterparts. Currently supported types are:

  • int

  • float

  • bool

  • str

Since API data is inconsistent, all typecasting makes the values nullable inside the DataFrame. Null values can be removed during post-processing.

Returns

A dictionary that can be used to cast a DataFrames types using DataFrame.astype()

static typecast(dtype: Any)str

Yields a string containing the pandas dtype when given a python native type.

Parameters

dtype – The python native type

Returns

The string representing the type that the native type converts to when put into a DataFrame

experiment(CACHE, import_target)bool

Tool used to build an ApiForm

Note

The laboratory is an experimental feature and does not currently support the StaticMerge post-processor.

Parameters
  • CACHE – If the data imports should be cached. I.e. Only call the API once

  • import_target – The target function that performs an API import

Returns

True if experiment is ready for export otherwise False

api2db.ingest.base_lam module

Contains the BaseLam class

class api2db.ingest.base_lam.BaseLam

Bases: object

Used as a Base object for pre-process subclasses, post-process subclasses, and data-features.

__call__(lam_arg: Any)Any

Makes the class callable, with target of class method lam_wrap This is used to allow for anonymous functions to be passed to the class, and to enhance ease of use for library developers.

Parameters

lam_arg – The argument to be passed to the lam_wrap class method.

Returns

The response of the lam_wrap class method

__getstate__()dict

Allows for lambda operations to be serialized in order to allow for instance to be passed between processes

Returns

Customized self.__dict__ items with values serialized using the dill library

__setstate__(state: dict)None

Allows for lambda operations to be deserialized using the dill library in order to allow for instance to be passed between processes

Parameters

state – Incoming state

Returns

None

lam_wrap(lam_arg: Any)None

Method that performs class lambda method on lam_arg This method will ALWAYS be overriden.

Parameters

lam_arg – The incoming data to perform the lambda operation on.

Returns

None if attempting to call BaseLam.lam_wrap, return is dictated by subclasses.

api2db.ingest.collector module

Contains the Collector class

class api2db.ingest.collector.Collector(name: str, seconds: int, import_target: Callable[], Optional[List[dict]]], api_form: Callable[], api2db.ingest.api_form.ApiForm], streams: Callable[], List[api2db.stream.stream.Stream]], stores: Callable[], List[api2db.store.store.Store]], debug: bool = True)

Bases: object

Used for creating a data-collection pipeline from API to storage medium

__init__(name: str, seconds: int, import_target: Callable[], Optional[List[dict]]], api_form: Callable[], api2db.ingest.api_form.ApiForm], streams: Callable[], List[api2db.stream.stream.Stream]], stores: Callable[], List[api2db.store.store.Store]], debug: bool = True)

Creates a Collector object

Note

Project collectors are disabled by default, this allows the project to run immediately after pmake is run without any code being written. To enable a collector, you must change its seconds parameter to a number greater than zero. This represents the periodic interval that the collectors import_target is run. I.e. The collector will request data from its configured API every seconds seconds.

A perceptive user may notice that import_target, api_form, streams and stores appear to be written in seemingly extraneous functions. Why not just pass in the actual data directly to the Collector object? This occurs due to the extensive use of anonymous functions which is what allows the library to be so expressive. Python’s native serialization does not support serializing lambdas. When using the multiprocessing module and spawning a new process the parameters of the process are serialized before being piped into a new python interpreter instance. It is for this reason that functions are used as parameters rather than their returns, since it is possible to pass a function which will instantiate an anonymous function upon call, but not to pass an existing anonymous function to a separate process. Feel free to write a supporting package to make it so this is not the case.

Parameters
  • name – The name of the collector, this name will be set when using pmake or cadd and should not be changed. Changing this may result in unintended functionality of the api2db library, as this name is used when determining where to store incoming data, what to name database tables, and the location of the dtypes file which gets stored in the projects CACHE/ directory. If you wish to change the name of a collector, you can run cadd to add a new collector with the desired name, and then move the code from the old collector into the new collector.

  • seconds – This specifies the periodic interval that data should be imported at. I.e. seconds=30 will request data from the collector api every 30 seconds. This is set to 0 by default, and when set to 0 the collector is disabled and will not be registered with the main program. This allows for all neccesary collectors to be added to a project, and then for each collector to be enabled as its code is written.

  • import_target

    The import_target is the function that the programmer using the library writes that performs the initial data import. In most cases this will utilize a library like urllib in order to perform the requests. The return of this function should be a list of dictionary objects.

    • When dealing with XML data use a library like xmltodict to convert the data to a python dictionary

    • When dealing with JSON data use a library like the built-in json library to convert the data to a python dictionary.

    The implementation of this method is left to the programmer. This method could also be written to collect data from a serial stream, or a web-scraper if desired. Design and implementation of things such as that are left to the users of the library.

    The import_target MUST return a list of dictionaries, or None. Exceptions that may occur within the function must be handled. The purpose of this implementation is to allow for logic to be written to perform multiple API requests and treat the data as a single incoming request. Most APIs will return a single response, and if the implementation of the import_target does not make multiple API calls then simply wrap that data in a list when returning it from the function.

  • api_form – This is a function that returns an API form.

  • streams – This is a function that returns a list of Stream object subclasses.

  • stores – This is a function that returns a list of Store object subclasses.

  • debug – When set to True logs will be printed to the console. Set to False for production.

q

A queue used for message passing if collector is running in debug mode

Type

Optional[multiprocessing.Queue]

set_q(q: multiprocessing.context.BaseContext.Queue)None

Sets the q class member used for collectors running in debug mode

Parameters

q – The queue used for message passing

Returns

None

Module contents

Original Author

Tristen Harr

Creation Date

04/28/2021

Revisions

None