api2db.stream package

Submodules

api2db.stream.file_converter module

Contains the FileConverter class

class api2db.stream.file_converter.FileConverter(name: Optional[str] = None, dtypes: Optional[dict] = None, path: Optional[str] = None, fmt: Optional[str] = None)

Bases: object

Serves as a base-class for all Streams/Stores and is used to store/load pandas DataFrames to different formats

__init__(name: Optional[str] = None, dtypes: Optional[dict] = None, path: Optional[str] = None, fmt: Optional[str] = None)

Creates a FileConverter object and attempts to build its dtypes

Parameters
  • name – The name of the collector associated with the FileConverter

  • dtypes – The dictionary of dtypes for the collector associated with the FileConverter

  • path – Either a path to a file, or a path to a directory, dictated by super class

  • fmt

    • fmt=”parquet” (recommended) sets the FileConverter format to use parquet format

    • fmt=”json” sets the FileConverter format to use JSON format

    • fmt=”pickle” sets the FileConverter format to use pickle format

    • fmt=”csv” sets the FileConverter format to use CSV format

build_dtypes()Optional[dict]

Attempts to build the dtypes so that a loaded pandas DataFrame can be type-casted

Returns

dtypes that can be used with pandas.DataFrame.astype(dtypes)

static static_compose_df_from_dir(path: str, fmt: str, move_shards_path: Optional[str] = None, move_composed_path: Optional[str] = None, force: bool = True)Optional[pandas.core.frame.DataFrame]

Attempts to build a single DataFrame from all files in a directory.

Parameters
  • path – The directory path to compose files from

  • fmt

    • fmt=”parquet” (recommended) stores the DataFrame using parquet format

    • fmt=”json” stores the DataFrame using JSON format

    • fmt=”pickle” stores the DataFrame using pickle format

    • fmt=”csv” stores the DataFrame using csv format

  • move_shards_path – The path to move the file shards to after composing the DataFrame

  • move_composed_path – The path to move the composed shards to, with naming schema of filename1_filenameN.fmt

  • force – Forces creation of the directories to move files to if they do not exist.

Returns

The DataFrame composed from the sharded files if successful, else None

Example

Original Directory Structure

store/
|    |- file1.parquet
|    |- file2.parquet
|    |- file3.parquet
|
sharded/
|
composed/
|
main.py

The following files contain pandas DataFrames stored using parquet format.

file1.parquet

A

B

C

1

2

3

file2.parquet

A

B

C

4

5

6

file3.parquet

A

B

C

7

8

9

>>> FileConverter.static_compose_df_from_dir(path="store/", fmt="parquet")

Returns (pandas.DataFrame):

A

B

C

1

2

3

4

5

6

7

8

9

By default, files will be deleted when the DataFrame is returned

FileConverter.static_compose_df_from_dir(path="store/",
                                         fmt="parquet",
                                         move_shards_path=None,
                                         move_composed_path=None)
store/
|
sharded/
|
composed/
|
main.py

move_shards_path specifies the path the sharded files should be moved to

FileConverter.static_compose_df_from_dir(path="store/",
                                         fmt="parquet",
                                         move_shards_path="sharded/",
                                         move_composed_path=None)
store/
|
sharded/
|      |- file1.parquet
|      |- file2.parquet
|      |- file3.parquet
|
composed/
|
main.py

move_composed_path speficies the path that the recomposed files should be moved to

FileConverter.static_compose_df_from_dir(path="store/",
                                         fmt="parquet",
                                         move_shards_path=None,
                                         move_composed_path="composed/")
store/
|
sharded/
|
composed/
|       |- file1_file3.parquet
|
main.py
static static_store_df(df: pandas.core.frame.DataFrame, path: str, fmt: str)bool

Stores a DataFrame to a file

Parameters
  • df – The DataFrame to store to a file

  • path – The path to the file the DataFrame should be stored in

  • fmt

    • fmt=”parquet” (recommended) stores the DataFrame using parquet format

    • fmt=”json” stores the DataFrame using JSON format

    • fmt=”pickle” stores the DataFrame using pickle format

    • fmt=”csv” stores the DataFrame using csv format

Returns

True if successful, else False

static pickle_store(path: str, df: pandas.core.frame.DataFrame, force: bool = True)bool

Stores a DataFrame as a .pickle file

Parameters
  • path – The path to store the DataFrame to

  • df – The DataFrame to store

  • force – If the directories in the path do not exist, forces them to be created

Returns

True if successful, otherwise False

static json_store(path: str, df: pandas.core.frame.DataFrame, force: bool = True)bool

Stores a DataFrame as a .json file

Parameters
  • path – The path to store the DataFrame to

  • df – The DataFrame to store

  • force – If the directories in the path do not exist, forces them to be created

Returns

True if successful, otherwise False

static csv_store(path: str, df: pandas.core.frame.DataFrame, force: bool = True)bool

Stores a DataFrame as a .csv file

Parameters
  • path – The path to store the DataFrame to

  • df – The DataFrame to store

  • force – If the directories in the path do not exist, forces them to be created

Returns

True if successful, otherwise False

static parquet_store(path: str, df: pandas.core.frame.DataFrame, force: bool = True)bool

Stores a DataFrame as a .parquet file

Parameters
  • path – The path to store the DataFrame to

  • df – The DataFrame to store

  • force – If the directories in the path do not exist, forces them to be created

Returns

True if successful, otherwise False

static store_valid(path: str, force: bool)bool

Determines if a provided path for storage is valid. I.e. The directory structure exists

Parameters
  • path – The path to check

  • force – When True, will attempt to create necessary directories if they do not exist

Returns

True if path is valid, otherwise False

static static_load_df(path: str, fmt: str, dtypes: Optional[dict] = None)Optional[pandas.core.frame.DataFrame]

Loads a DataFrame from a file

Parameters
  • path – The path to the file the DataFrame should be loaded from

  • fmt

    • fmt=”parquet” (recommended) loads the DataFrame using parquet format

    • fmt=”json” loads the DataFrame using JSON format

    • fmt=”pickle” loads the DataFrame using pickle format

    • fmt=”csv” loads the DataFrame using csv format

  • dtypes – The dtypes to cast the DataFrame to before returning it. I.e. DataFrame.astype(dtypes)

Returns

Loaded DataFrame if successful, otherwise None

static pickle_load(path: str)Optional[pandas.core.frame.DataFrame]

Loads a DataFrame from a .pickle file

Parameters

path – The path to load the DataFrame from

Returns

The loaded DataFrame if successful, otherwise None

static json_load(path: str)Optional[pandas.core.frame.DataFrame]

Loads a DataFrame from a .json file

Parameters

path – The path to load the DataFrame from

Returns

The loaded DataFrame if successful, otherwise None

static csv_load(path: str)Optional[pandas.core.frame.DataFrame]

Loads a DataFrame from a .csv file

Parameters

path – The path to load the DataFrame from

Returns

The loaded DataFrame if successful, otherwise None

static parquet_load(path: str)Optional[pandas.core.frame.DataFrame]

Loads a DataFrame from a .parquet file

Parameters

path – The path to load the DataFrame from

Returns

The loaded DataFrame if successful, otherwise None

api2db.stream.stream module

Contains the Stream class

class api2db.stream.stream.Stream(name: str, path: Optional[str] = None, dtypes: Optional[dict] = None, fmt: Optional[str] = None, chunk_size: int = 0, stream_type: str = 'stream', store: bool = False)

Bases: api2db.stream.file_converter.FileConverter

Used for streaming data into a local or external source

__init__(name: str, path: Optional[str] = None, dtypes: Optional[dict] = None, fmt: Optional[str] = None, chunk_size: int = 0, stream_type: str = 'stream', store: bool = False)

Creates a Stream object and attempts to build its dtypes. If store flag is false, spawns a thread that polls the Stream queue for incoming data

Parameters
  • name – The name of the collector the stream is associated with

  • path – The directory path the stream should store to (Usage dictated by super classes)

  • dtypes – A dictionary containing the dtypes that the stream data DataFrame has

  • fmt

    The file format that the stream data should be stored as

    • fmt=”parquet” (recommended) stores the DataFrame using parquet format

    • fmt=”json” stores the DataFrame using JSON format

    • fmt=”pickle” stores the DataFrame using pickle format

    • fmt=”csv” stores the DataFrame using csv format

  • chunk_size – The size of chunks to send to the stream target. I.e. Insert data in chunks of chunk_size rows

  • stream_type – The type of the stream (Primarily used for logging)

  • store – This flag indicates whether or not the stream is being called by a Store object

Raises

NotImplementedError – chunk_storage is not yet implemented.

is_store_instance

True if the super-class has base-class Store otherwise False

Type

bool

lock

Stream Lock used to signal if the stream has died

Type

threading.Lock

q

Stream queue used to pass data into

Type

queue.Queue

start()None

Starts the stream running loop in a new thread

Returns

None

check_failures()None

Checks to see if previous uploads have failed and if so, loads the previous upload data and attempts to upload it again.

This method searches the directory path

STORE/upload_failed/collector_name/stream_type/

This path is the target location for failed uploads. If an upload fails 5 times in a row, it is stored in this location with the filename being the timestamp it is stored.

Returns

None

stream_start()None

Starts the stream listener that polls the stream queue for incoming data.

Returns

None

stream(data: pandas.core.frame.DataFrame)AttributeError

Overridden by supers, a Stream object is NEVER directly used to stream data. It is ALWAYS inherited from

Parameters

data – The data to stream

Raises

AttributeErrorStream does not have the ability to stream data. It must be subclassed.

api2db.stream.stream2bigquery module

Contains the Stream2Bigquery class

class api2db.stream.stream2bigquery.Stream2Bigquery(name: str, auth_path: str, pid: str, did: str, tid: str, location: str = 'US', if_exists: str = 'append', chunk_size: int = 0, store: bool = False)

Bases: api2db.stream.stream.Stream

Streams data from the associated collector directly to Bigquery in real-time

__init__(name: str, auth_path: str, pid: str, did: str, tid: str, location: str = 'US', if_exists: str = 'append', chunk_size: int = 0, store: bool = False)

Creates a Stream2Bigquery object and attempts to build its dtypes.

If dtypes can successfully be created I.e. Data arrives from the API for the first time the following occurs:

  • Auto-generates the table schema

  • Creates the dataset if it does not exist within the project

  • Creates the table if it does not exist within the project

Parameters
  • name – The name of the collector associated with the stream

  • auth_path – The path to the Google provided authentication file. I.e. AUTH/google_auth_file.json

  • pid – Google project ID

  • did – Google dataset ID

  • tid – Google table ID

  • location – Location of the Bigquery project

  • if_exists

    • if_exists=”append” Adds the data to the table

    • if_exists=”replace” Replaces the table with the new data

    • if_exists=”fail” Fails to upload the new data if the table exists

  • chunk_size – CURRENTLY NOT SUPPORTED

  • store – True if the super class is a Store object, otherwise False

schema

contains schema if dtypes exist else None

Type

Optional[List[google.cloud.bigquery.SchemaField]]

bq_schema

contains bq_schema if dtypes exist else None

Type

Optional[List[dict]]

cred

contains the credentials used to authenticate with bigquery

Type

google.oauth2.service_account.Credentials

client

The bigquery client

Type

google.cloud.bigquery.Client

dataset

The dataset associated with the collector

Type

google.cloud.bigquery.Dataset

table

The table associated with the collector

Type

google.cloud.bigquery.Table

connected

True if a connection has been established I.e. credentials have been authenticated, otherwise False

Type

bool

connect()bool

Attempts to authenticate with provided credentials.

Workflow

  1. Load the credentials from the service account file

  2. Instantiate the bigquery Client

  3. Attempt to create the dataset and if a Conflict exception is thrown then load the dataset

  4. Attempt to load the table and if a NotFound exception is thrown then create the table

Returns

True if the table is successfully loaded/created otherwise False

stream(data: pandas.core.frame.DataFrame, retry_depth: int = 5)None

Attempts to store the incoming data into bigquery

Workflow

  1. If authentication has not been performed, call self.connect()

  2. Attempt to store the DataFrame to bigquery

    • If successful, check to see if any previous uploads have failed and attempt to store those as well

  3. If the DataFrame cannot be successfully stored set the connection to False

  4. If the retry_depth is not 0 perform a recursive call attempting to store the data again

  5. If the retry_depth has reached zero, log an exception and store the DataFrame locally

Failed uploads will be stored in

  • STORE/upload_failed/collector_name/bigquery/timestamp_ns.parquet

Parameters
  • data – The DataFrame that should be stored to bigquery

  • retry_depth – Used for a recursive call counter should the DataFrame fail to be stored

Returns

None

build_schema()Optional[List[google.cloud.bigquery.schema.SchemaField]]

Attempts to build the schema that will be used for table creation

Iterates through the dtypes items and generate the appropriate SchemaFields

Returns

The schema generated if successful otherwise None

build_bq_schema()Optional[List[dict]]

Attempts to build the schema that will be used to upload data to bigquery via DataFrame.to_gbq()

Iterates through the dtypes items and generate the appropriate schema dictionary

Returns

The schema generated if successful otherwise None

api2db.stream.stream2local module

Contains the Stream2Local class

class api2db.stream.stream2local.Stream2Local(name: str, path: Optional[str] = None, mode: str = 'shard', fmt: str = 'parquet', drop_duplicate_keys: Optional[List[str]] = None)

Bases: api2db.stream.stream.Stream

Streams data from the associated collector directly to a local file in real-time

__init__(name: str, path: Optional[str] = None, mode: str = 'shard', fmt: str = 'parquet', drop_duplicate_keys: Optional[List[str]] = None)

Creates a Stream2Local object and attempts to build its dtypes

Parameters
  • name – The name of the collector associated with the stream

  • path – The path to either a single file or a file directory dictated by the mode parameter

  • mode

    • mode=”shard” (default) will store each incoming file independently in the specified path In shard mode the file will be named timestamp_ns.fmt

    • mode=”update” will update the file located at the specified path with the new data

    • mode=”replace” will replace the file located at the specified path with the new data

  • fmt

    • fmt=”parquet” (default/recommended) stores the files using parquet format

    • fmt=”json” stores the files using JSON format

    • fmt=”pickle” stores the files using pickle format

    • fmt=”csv” stores the files using csv format

  • drop_duplicate_keys

    • drop_duplicate_keys=None -> DataFrame.drop_duplicates() performed before storage

    • drop_duplicate_keys=[“uuid”] -> DataFrame.drop_duplicates(subset=drop_duplicate_keys) performed before storage

stream(data: pandas.core.frame.DataFrame)None

Stores the incoming data into its stream target using the specified mode

Parameters

data – The data to be stored

Returns

None

stream_shard(data: pandas.core.frame.DataFrame)None

Stores the incoming data to the specified directory path using the file naming schema timestamp_ns.fmt

Parameters

data – The data to store to the file

Returns

None

stream_update(data: pandas.core.frame.DataFrame)None

Updates the existing data at the specified file path and adds the incoming data

Parameters

data – The data to add to the file

Returns

None

stream_replace(data: pandas.core.frame.DataFrame)None

Replaces the existing data at the specified file path with the incoming data

Parameters

data – The data to replace the file with

Returns

None

api2db.stream.stream2omnisci module

Contains the Stream2Omnisci class

Warning

Due to dependency conflicts and issues with the current published branch of the pymapd library the following steps must be taken to support streaming/storing data to Omnisci

> pip install pymapd==0.25.0

> pip install pandas --upgrade

> pip install pyarrow --upgrade

This occurs because of issues with the dependencies of the pymapd library being locked in place. I’ve opened an issue on this, and they appear to be working on it. The most recent publish seemed to break other things. Until this gets fixed this is a simple work-around. This will allow api2db to work with Omnisci, however there may be issues with attempts to utilize features of the pymapd library outside of what api2db uses, so use with caution. –Tristen

class api2db.stream.stream2omnisci.Stream2Omnisci(name: str, db_name, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, protocol: str = 'binary', chunk_size: int = 0, store: bool = False)

Bases: api2db.stream.stream.Stream

Streams data from the associated collector directly to Omnisci in real-time

__init__(name: str, db_name, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, protocol: str = 'binary', chunk_size: int = 0, store: bool = False)

Creates a Stream2Omnisci object and attempts to build its dtypes

If dtypes can successfully be created I.e. Data arrives from the API for the first time the following occurs:

  • Auto-generates the table schema

  • Casts all string columns to categories as required by Omnisci

  • Creates a Omnisci table with the name collector_name_stream

Note

Data-attribute fields will have a _t appended to them due to naming conflicts encountered.

Example:

A

B

C

1

2

3

4

5

6

Will become the following in the Omnisci database. (This is only applied to data in the database)

A_t

B_t

C_t

1

2

3

4

5

6

Authentication Methods:

  • Supply auth_path with a path to an authentication file. Templates for these files can be found in your projects AUTH/ directory

OR

  • Supply the username, host, and password

Parameters
  • name – The name of the collector associated with the stream

  • db_name – The name of the database to connect to

  • username – The username to authenticate with the database

  • password – The password to authenticate with the database

  • host – The host of the database

  • auth_path – The path to the authentication credentials.

  • protocol – The protocol to use when connecting to the database

  • chunk_size – CURRENTLY NOT SUPPORTED

  • store – True if the super class is a Store object, otherwise False

Raises
  • ValueError – If auth_path is provided but is invalid or has incorrect values

  • ValueError – If auth_path is not provided and username, password or host is missing

con

The connection to the database

Type

Optional[pymapd.Connection]

connected

returns True if connection is established else False

Type

Callable[Optional[pymapd.Connection], bool]

log_str

A string used for logging

Type

str

connect()Optional[<Mock name=’mock.Connection’ id=’140699503473936’>]

Attempts to establish a connection to a omnisci database

Returns

A connection object if a connection can be established, else None

static cast_categorical(data: pandas.core.frame.DataFrame, dtypes: pandas.core.series.Series)pandas.core.frame.DataFrame

Casts all columns with type str to type category as required by omnisci and appends a _t to column names

Parameters
  • data – The DataFrame that will be stored into the omnisci database

  • dtypes – The dtypes of the DataFrame

Returns

Modified DataFrame

stream(data, retry_depth=5)

Attempts to store the incoming data into omnisci

Workflow

  1. If authentication has not been performed, call self.connect()

  2. Attempt to store the DataFrame to omnisci

    • If successful, check to see if any previous uploads have failed and attempt to store those as well

  3. If the DataFrame cannot be successfully stored set the con to None

  4. If the retry_depth is not 0 perform a recursive call attempting to store the data again

  5. If the retry_depth has reached zero, log an exception and store the DataFrame locally

Failed uploads will be stored in

  • STORE/upload_failed/collector_name/omnisci/timestamp_ns.parquet

Parameters
  • data – The DataFrame that should be stored to omnisci

  • retry_depth – Used for a recursive call counter should the DataFrame fail to be stored

Returns

None

api2db.stream.stream2sql module

Contains the Stream2Sql class

class api2db.stream.stream2sql.Stream2Sql(name: str, db_name: str, dialect: str, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, port: str = '', if_exists: str = 'append', chunk_size: int = 0, store: bool = False)

Bases: api2db.stream.stream.Stream

Streams data from the associated collector directly to an SQL database target in real-time

__init__(name: str, db_name: str, dialect: str, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, port: str = '', if_exists: str = 'append', chunk_size: int = 0, store: bool = False)

Creates a Stream2Sql object and attempts to build its dtypes

If dtypes can successfully be created I.e. Data arrives from the API for the first time the following occurs:

  • Auto-generates the table schema

  • Creates the table in the database if it does not exist

Authentication Methods:

  • Supply auth_path with a path to an authentication file. Templates for these files can be found in your projects AUTH/ directory

OR

  • Supply the username, host, and password

Parameters
  • name – The name of the collector associated with the stream

  • db_name – The name of the database to connect to

  • dialect

    • dialect=”mysql” -> Use this to connect to a mysql database

    • dialect=”mariadb” -> Use this to connect to a mariadb database

    • dialect=”postgresql” -> Use this to connect to a postgresql database

    • dialect=”amazon_aurora” -> COMING SOON

    • dialect=”oracle” -> COMING SOON

    • dialect=”microsoft_sql” -> COMING SOON

    • dialect=”Something else?” -> Submit a feature request… or even better build it!

  • username – The username to authenticate with the database

  • password – The password to authenticate with the database

  • host – The host of the database

  • auth_path – The path to the authentication credentials.

  • port – The port used when establishing a connection to the database

  • if_exists

    • if_exists=”append” Adds the data to the table

    • if_exists=”replace” Replaces the table with the new data

    • if_exists=”fail” Fails to upload the new data if the table exists

  • chunk_size – CURRENTLY NOT SUPPORTED

  • store – True if the super class is a Store object, otherwise False

Raises
  • ValueError – If auth_path is provided but is invalid or has incorrect values

  • ValueError – If auth_path is not provided and username, password or host is missing

driver

The driver to use when connecting with SQLAlchemy

Type

str

engine_str

The full string that is used with sqlalchemy.create_engine

Type

str

log_str

A string used for logging

Type

str

con

The connection to the database

Type

sqlalchemy.engine.Engine

connected

True if connection is established otherwise False

Type

bool

load()None

Loads the driver and creates the engine string and the log string

Raises
  • NotImplementedError – Support for amazon_aurora has not been implemented in api2pandas yet

  • NotImplementedError – Support for oracle has not been implemented in api2db yet

  • NotImplementedError – Support for microsoft_sql has not been implemented in api2db yet

Returns

None

connect()bool

Attempts to establish a connection to the database

Returns

True if the connection is established otherwise False

stream(data, retry_depth=5)

Attempts to store the incoming data into the SQL database

Workflow

  1. If authentication has not been performed, call self.connect()

  2. Attempt to store the DataFrame to the database

    • If successful, check to see if any previous uploads have failed and attempt to store those as well

  3. If the DataFrame cannot be successfully stored set the connected to False

  4. If the retry_depth is not 0 perform a recursive call attempting to store the data again

  5. If the retry_depth has reached zero, log an exception and store the DataFrame locally

Failed uploads will be stored in

  • STORE/upload_failed/collector_name/sql.**dialect**/timestamp_ns.parquet

Parameters
  • data – The DataFrame that should be stored to the database

  • retry_depth – Used for a recursive call counter should the DataFrame fail to be stored

Returns

None

Module contents

Original Author

Tristen Harr

Creation Date

04/27/2021

Revisions

None