api2db.ingest package¶
Subpackages¶
- api2db.ingest.data_feature package
- api2db.ingest.post_process package
- Submodules
- api2db.ingest.post_process.column_add module
- api2db.ingest.post_process.column_apply module
- api2db.ingest.post_process.columns_calculate module
- api2db.ingest.post_process.date_cast module
- api2db.ingest.post_process.drop_na module
- api2db.ingest.post_process.merge_static module
- api2db.ingest.post_process.post module
- Module contents
- api2db.ingest.pre_process package
Submodules¶
api2db.ingest.api2pandas module¶
Contains the Api2Pandas class¶
-
class
api2db.ingest.api2pandas.
Api2Pandas
(api_form: Callable[], api2db.ingest.api_form.ApiForm])¶ Bases:
object
Used to extract incoming data from an API into a pandas DataFrame
-
__init__
(api_form: Callable[], api2db.ingest.api_form.ApiForm])¶ Creates a Api2Pandas object and loads its ApiForm
- Parameters
api_form – The function that generates the ApiForm for the associated collector
-
dependencies_satisfied
() → bool¶ Checks to ensure any data-linking dependency files exist
This feature currently only exists for
api2db.ingest.post_process.merge_static.MergeStatic
- Returns
True if all dependencies are satisfied, otherwise False
-
extract
(data: dict) → Optional[pandas.core.frame.DataFrame]¶ Performs data-extraction from data arriving from an API.
Workflow:
Perform all pre-processing on data
Perform all data-feature extraction
Perform all post-processing on data
Return a DataFrame containing the cleaned data.
- Parameters
data – The data arriving from an API to perform data extraction on.
- Returns
The cleaned data if it is possible to clean the data otherwise None
-
api2db.ingest.api_form module¶
Contains the ApiForm class¶
-
class
api2db.ingest.api_form.
ApiForm
(name: str, pre_process: Optional[List[api2db.ingest.pre_process.pre.Pre]] = None, data_features: Optional[List[api2db.ingest.data_feature.feature.Feature]] = None, post_process: Optional[List[api2db.ingest.post_process.post.Post]] = None)¶ Bases:
object
Used to clean and process incoming data arriving from an Api
-
__init__
(name: str, pre_process: Optional[List[api2db.ingest.pre_process.pre.Pre]] = None, data_features: Optional[List[api2db.ingest.data_feature.feature.Feature]] = None, post_process: Optional[List[api2db.ingest.post_process.post.Post]] = None)¶ Creates an ApiForm
Note
The ApiForm is used by api2db to do the processing and cleaning of data. Incoming data goes through 3 phases.
Pre-Processing
Extract global data-features
Extract a list of data-points that will serve as the rows in a database
Flatten nested arrays of data
Swap extraneous rows returned from poorly implemented APIs
Feature Extraction
Extracts the data features for each row that will be stored in a database
Post-Processing
Add new columns of data that will be the same globally for the arriving data. I.e. arrival timestamps
Apply functions across data columns, replacing the data with the calculated value. I.e. Reformat strings, strip whitespace, etc.
Add new columns of data that are derived from performing calculations on existing columns. I.e. Use a latitude and longitude column to calculate a new column called country
Cast columns that contain datetime data from strings to date times.
Drop columns that should not contain null values.
Perform merging of incoming data with locally stored reference tables. I.e. Incoming data has column location_id field, a reference table contains location info with the location_id field being a link between the two. This allows for data to be merged on column location_id in order to contain all data in a single table.
- Parameters
name – The name of the collector the ApiForm is associated with
pre_process – An array pre-processing objects to be applied sequentially on incoming data
data_features – An array of data features to be extracted from the incoming data. The programmer can choose which data features they require, and keep only those.
post_process – An array of post-processing objects to be applied sequentially on the data after data has been cleaned and extracted to a pandas.DataFrame
-
add_pre
(pre: api2db.ingest.pre_process.pre.Pre) → None¶ Allows the programmer to manually add a item to the pre-processing array.
- Parameters
pre – The pre-processing object to add
- Returns
None
-
add_feature
(feat: api2db.ingest.data_feature.feature.Feature) → None¶ Allows the programmer to manually add a item to the data-features array.
- Parameters
feat – The feature object to add
- Returns
None
-
add_post
(post: api2db.ingest.post_process.post.Post) → None¶ Allows the programmer to manually add a item to the post-processing array.
- Parameters
post – The post-processing object to add
- Returns
None
-
pandas_typecast
() → dict¶ Performs typecasting from python native types to their pandas counterparts. Currently supported types are:
int
float
bool
str
Since API data is inconsistent, all typecasting makes the values nullable inside the DataFrame. Null values can be removed during post-processing.
- Returns
A dictionary that can be used to cast a DataFrames types using DataFrame.astype()
-
static
typecast
(dtype: Any) → str¶ Yields a string containing the pandas dtype when given a python native type.
- Parameters
dtype – The python native type
- Returns
The string representing the type that the native type converts to when put into a DataFrame
-
experiment
(CACHE, import_target) → bool¶ Tool used to build an ApiForm
Note
The laboratory is an experimental feature and does not currently support the StaticMerge post-processor.
- Parameters
CACHE – If the data imports should be cached. I.e. Only call the API once
import_target – The target function that performs an API import
- Returns
True if experiment is ready for export otherwise False
-
api2db.ingest.base_lam module¶
Contains the BaseLam class¶
-
class
api2db.ingest.base_lam.
BaseLam
¶ Bases:
object
Used as a Base object for pre-process subclasses, post-process subclasses, and data-features.
-
__call__
(lam_arg: Any) → Any¶ Makes the class callable, with target of class method lam_wrap This is used to allow for anonymous functions to be passed to the class, and to enhance ease of use for library developers.
- Parameters
lam_arg – The argument to be passed to the lam_wrap class method.
- Returns
The response of the lam_wrap class method
-
__getstate__
() → dict¶ Allows for lambda operations to be serialized in order to allow for instance to be passed between processes
- Returns
Customized self.__dict__ items with values serialized using the dill library
-
__setstate__
(state: dict) → None¶ Allows for lambda operations to be deserialized using the dill library in order to allow for instance to be passed between processes
- Parameters
state – Incoming state
- Returns
None
-
lam_wrap
(lam_arg: Any) → None¶ Method that performs class lambda method on
lam_arg
This method will ALWAYS be overriden.- Parameters
lam_arg – The incoming data to perform the lambda operation on.
- Returns
None if attempting to call
BaseLam.lam_wrap
, return is dictated by subclasses.
-
api2db.ingest.collector module¶
Contains the Collector class¶
-
class
api2db.ingest.collector.
Collector
(name: str, seconds: int, import_target: Callable[], Optional[List[dict]]], api_form: Callable[], api2db.ingest.api_form.ApiForm], streams: Callable[], List[api2db.stream.stream.Stream]], stores: Callable[], List[api2db.store.store.Store]], debug: bool = True)¶ Bases:
object
Used for creating a data-collection pipeline from API to storage medium
-
__init__
(name: str, seconds: int, import_target: Callable[], Optional[List[dict]]], api_form: Callable[], api2db.ingest.api_form.ApiForm], streams: Callable[], List[api2db.stream.stream.Stream]], stores: Callable[], List[api2db.store.store.Store]], debug: bool = True)¶ Creates a Collector object
Note
Project collectors are disabled by default, this allows the project to run immediately after
pmake
is run without any code being written. To enable a collector, you must change itsseconds
parameter to a number greater than zero. This represents the periodic interval that the collectorsimport_target
is run. I.e. The collector will request data from its configured API everyseconds
seconds.A perceptive user may notice that
import_target
,api_form
,streams
andstores
appear to be written in seemingly extraneous functions. Why not just pass in the actual data directly to the Collector object? This occurs due to the extensive use of anonymous functions which is what allows the library to be so expressive. Python’s native serialization does not support serializing lambdas. When using the multiprocessing module and spawning a new process the parameters of the process are serialized before being piped into a new python interpreter instance. It is for this reason that functions are used as parameters rather than their returns, since it is possible to pass a function which will instantiate an anonymous function upon call, but not to pass an existing anonymous function to a separate process. Feel free to write a supporting package to make it so this is not the case.- Parameters
name – The name of the collector, this name will be set when using
pmake
orcadd
and should not be changed. Changing this may result in unintended functionality of the api2db library, as this name is used when determining where to store incoming data, what to name database tables, and the location of the dtypes file which gets stored in the projects CACHE/ directory. If you wish to change the name of a collector, you can runcadd
to add a new collector with the desired name, and then move the code from the old collector into the new collector.seconds – This specifies the periodic interval that data should be imported at. I.e.
seconds=30
will request data from the collector api every 30 seconds. This is set to 0 by default, and when set to 0 the collector is disabled and will not be registered with the main program. This allows for all neccesary collectors to be added to a project, and then for each collector to be enabled as its code is written.import_target –
The import_target is the function that the programmer using the library writes that performs the initial data import. In most cases this will utilize a library like urllib in order to perform the requests. The return of this function should be a list of dictionary objects.
When dealing with XML data use a library like xmltodict to convert the data to a python dictionary
When dealing with JSON data use a library like the built-in json library to convert the data to a python dictionary.
The implementation of this method is left to the programmer. This method could also be written to collect data from a serial stream, or a web-scraper if desired. Design and implementation of things such as that are left to the users of the library.
The
import_target
MUST return a list of dictionaries, or None. Exceptions that may occur within the function must be handled. The purpose of this implementation is to allow for logic to be written to perform multiple API requests and treat the data as a single incoming request. Most APIs will return a single response, and if the implementation of theimport_target
does not make multiple API calls then simply wrap that data in a list when returning it from the function.api_form – This is a function that returns an API form.
streams – This is a function that returns a list of Stream object subclasses.
stores – This is a function that returns a list of Store object subclasses.
debug – When set to True logs will be printed to the console. Set to False for production.
-
q
¶ A queue used for message passing if collector is running in debug mode
- Type
Optional[multiprocessing.Queue]
-
set_q
(q: multiprocessing.context.BaseContext.Queue) → None¶ Sets the q class member used for collectors running in debug mode
- Parameters
q – The queue used for message passing
- Returns
None
-
Module contents¶
Original Author |
Tristen Harr |
Creation Date |
04/28/2021 |
Revisions |
None |