api2db.stream package¶
Submodules¶
api2db.stream.file_converter module¶
Contains the FileConverter class¶
-
class
api2db.stream.file_converter.
FileConverter
(name: Optional[str] = None, dtypes: Optional[dict] = None, path: Optional[str] = None, fmt: Optional[str] = None)¶ Bases:
object
Serves as a base-class for all Streams/Stores and is used to store/load pandas DataFrames to different formats
-
__init__
(name: Optional[str] = None, dtypes: Optional[dict] = None, path: Optional[str] = None, fmt: Optional[str] = None)¶ Creates a FileConverter object and attempts to build its dtypes
- Parameters
name – The name of the collector associated with the FileConverter
dtypes – The dictionary of dtypes for the collector associated with the FileConverter
path – Either a path to a file, or a path to a directory, dictated by super class
fmt –
fmt=”parquet” (recommended) sets the FileConverter format to use parquet format
fmt=”json” sets the FileConverter format to use JSON format
fmt=”pickle” sets the FileConverter format to use pickle format
fmt=”csv” sets the FileConverter format to use CSV format
-
build_dtypes
() → Optional[dict]¶ Attempts to build the dtypes so that a loaded pandas DataFrame can be type-casted
- Returns
dtypes that can be used with pandas.DataFrame.astype(dtypes)
-
static
static_compose_df_from_dir
(path: str, fmt: str, move_shards_path: Optional[str] = None, move_composed_path: Optional[str] = None, force: bool = True) → Optional[pandas.core.frame.DataFrame]¶ Attempts to build a single DataFrame from all files in a directory.
- Parameters
path – The directory path to compose files from
fmt –
fmt=”parquet” (recommended) stores the DataFrame using parquet format
fmt=”json” stores the DataFrame using JSON format
fmt=”pickle” stores the DataFrame using pickle format
fmt=”csv” stores the DataFrame using csv format
move_shards_path – The path to move the file shards to after composing the DataFrame
move_composed_path – The path to move the composed shards to, with naming schema of filename1_filenameN.fmt
force – Forces creation of the directories to move files to if they do not exist.
- Returns
The DataFrame composed from the sharded files if successful, else None
Example
Original Directory Structure
store/ | |- file1.parquet | |- file2.parquet | |- file3.parquet | sharded/ | composed/ | main.py
The following files contain pandas DataFrames stored using parquet format.
file1.parquet
A
B
C
1
2
3
file2.parquet
A
B
C
4
5
6
file3.parquet
A
B
C
7
8
9
>>> FileConverter.static_compose_df_from_dir(path="store/", fmt="parquet")
Returns (pandas.DataFrame):
A
B
C
1
2
3
4
5
6
7
8
9
By default, files will be deleted when the DataFrame is returned
FileConverter.static_compose_df_from_dir(path="store/", fmt="parquet", move_shards_path=None, move_composed_path=None)
store/ | sharded/ | composed/ | main.py
move_shards_path
specifies the path the sharded files should be moved toFileConverter.static_compose_df_from_dir(path="store/", fmt="parquet", move_shards_path="sharded/", move_composed_path=None)
store/ | sharded/ | |- file1.parquet | |- file2.parquet | |- file3.parquet | composed/ | main.py
move_composed_path
speficies the path that the recomposed files should be moved toFileConverter.static_compose_df_from_dir(path="store/", fmt="parquet", move_shards_path=None, move_composed_path="composed/")
store/ | sharded/ | composed/ | |- file1_file3.parquet | main.py
-
static
static_store_df
(df: pandas.core.frame.DataFrame, path: str, fmt: str) → bool¶ Stores a DataFrame to a file
- Parameters
df – The DataFrame to store to a file
path – The path to the file the DataFrame should be stored in
fmt –
fmt=”parquet” (recommended) stores the DataFrame using parquet format
fmt=”json” stores the DataFrame using JSON format
fmt=”pickle” stores the DataFrame using pickle format
fmt=”csv” stores the DataFrame using csv format
- Returns
True if successful, else False
-
static
pickle_store
(path: str, df: pandas.core.frame.DataFrame, force: bool = True) → bool¶ Stores a DataFrame as a .pickle file
- Parameters
path – The path to store the DataFrame to
df – The DataFrame to store
force – If the directories in the path do not exist, forces them to be created
- Returns
True if successful, otherwise False
-
static
json_store
(path: str, df: pandas.core.frame.DataFrame, force: bool = True) → bool¶ Stores a DataFrame as a .json file
- Parameters
path – The path to store the DataFrame to
df – The DataFrame to store
force – If the directories in the path do not exist, forces them to be created
- Returns
True if successful, otherwise False
-
static
csv_store
(path: str, df: pandas.core.frame.DataFrame, force: bool = True) → bool¶ Stores a DataFrame as a .csv file
- Parameters
path – The path to store the DataFrame to
df – The DataFrame to store
force – If the directories in the path do not exist, forces them to be created
- Returns
True if successful, otherwise False
-
static
parquet_store
(path: str, df: pandas.core.frame.DataFrame, force: bool = True) → bool¶ Stores a DataFrame as a .parquet file
- Parameters
path – The path to store the DataFrame to
df – The DataFrame to store
force – If the directories in the path do not exist, forces them to be created
- Returns
True if successful, otherwise False
-
static
store_valid
(path: str, force: bool) → bool¶ Determines if a provided path for storage is valid. I.e. The directory structure exists
- Parameters
path – The path to check
force – When True, will attempt to create necessary directories if they do not exist
- Returns
True if path is valid, otherwise False
-
static
static_load_df
(path: str, fmt: str, dtypes: Optional[dict] = None) → Optional[pandas.core.frame.DataFrame]¶ Loads a DataFrame from a file
- Parameters
path – The path to the file the DataFrame should be loaded from
fmt –
fmt=”parquet” (recommended) loads the DataFrame using parquet format
fmt=”json” loads the DataFrame using JSON format
fmt=”pickle” loads the DataFrame using pickle format
fmt=”csv” loads the DataFrame using csv format
dtypes – The dtypes to cast the DataFrame to before returning it. I.e. DataFrame.astype(dtypes)
- Returns
Loaded DataFrame if successful, otherwise None
-
static
pickle_load
(path: str) → Optional[pandas.core.frame.DataFrame]¶ Loads a DataFrame from a .pickle file
- Parameters
path – The path to load the DataFrame from
- Returns
The loaded DataFrame if successful, otherwise None
-
static
json_load
(path: str) → Optional[pandas.core.frame.DataFrame]¶ Loads a DataFrame from a .json file
- Parameters
path – The path to load the DataFrame from
- Returns
The loaded DataFrame if successful, otherwise None
-
static
csv_load
(path: str) → Optional[pandas.core.frame.DataFrame]¶ Loads a DataFrame from a .csv file
- Parameters
path – The path to load the DataFrame from
- Returns
The loaded DataFrame if successful, otherwise None
-
static
parquet_load
(path: str) → Optional[pandas.core.frame.DataFrame]¶ Loads a DataFrame from a .parquet file
- Parameters
path – The path to load the DataFrame from
- Returns
The loaded DataFrame if successful, otherwise None
-
api2db.stream.stream module¶
Contains the Stream class¶
-
class
api2db.stream.stream.
Stream
(name: str, path: Optional[str] = None, dtypes: Optional[dict] = None, fmt: Optional[str] = None, chunk_size: int = 0, stream_type: str = 'stream', store: bool = False)¶ Bases:
api2db.stream.file_converter.FileConverter
Used for streaming data into a local or external source
-
__init__
(name: str, path: Optional[str] = None, dtypes: Optional[dict] = None, fmt: Optional[str] = None, chunk_size: int = 0, stream_type: str = 'stream', store: bool = False)¶ Creates a Stream object and attempts to build its dtypes. If store flag is false, spawns a thread that polls the Stream queue for incoming data
- Parameters
name – The name of the collector the stream is associated with
path – The directory path the stream should store to (Usage dictated by super classes)
dtypes – A dictionary containing the dtypes that the stream data DataFrame has
fmt –
The file format that the stream data should be stored as
fmt=”parquet” (recommended) stores the DataFrame using parquet format
fmt=”json” stores the DataFrame using JSON format
fmt=”pickle” stores the DataFrame using pickle format
fmt=”csv” stores the DataFrame using csv format
chunk_size – The size of chunks to send to the stream target. I.e. Insert data in chunks of chunk_size rows
stream_type – The type of the stream (Primarily used for logging)
store – This flag indicates whether or not the stream is being called by a Store object
- Raises
NotImplementedError – chunk_storage is not yet implemented.
-
is_store_instance
¶ True if the super-class has base-class Store otherwise False
- Type
bool
-
lock
¶ Stream Lock used to signal if the stream has died
- Type
threading.Lock
-
q
¶ Stream queue used to pass data into
- Type
queue.Queue
-
start
() → None¶ Starts the stream running loop in a new thread
- Returns
None
-
check_failures
() → None¶ Checks to see if previous uploads have failed and if so, loads the previous upload data and attempts to upload it again.
This method searches the directory path
STORE/upload_failed/collector_name/stream_type/
This path is the target location for failed uploads. If an upload fails 5 times in a row, it is stored in this location with the filename being the timestamp it is stored.
- Returns
None
-
stream_start
() → None¶ Starts the stream listener that polls the stream queue for incoming data.
- Returns
None
-
stream
(data: pandas.core.frame.DataFrame) → AttributeError¶ Overridden by supers, a Stream object is NEVER directly used to stream data. It is ALWAYS inherited from
- Parameters
data – The data to stream
- Raises
AttributeError – Stream does not have the ability to stream data. It must be subclassed.
-
api2db.stream.stream2bigquery module¶
Contains the Stream2Bigquery class¶
-
class
api2db.stream.stream2bigquery.
Stream2Bigquery
(name: str, auth_path: str, pid: str, did: str, tid: str, location: str = 'US', if_exists: str = 'append', chunk_size: int = 0, store: bool = False)¶ Bases:
api2db.stream.stream.Stream
Streams data from the associated collector directly to Bigquery in real-time
-
__init__
(name: str, auth_path: str, pid: str, did: str, tid: str, location: str = 'US', if_exists: str = 'append', chunk_size: int = 0, store: bool = False)¶ Creates a Stream2Bigquery object and attempts to build its dtypes.
If dtypes can successfully be created I.e. Data arrives from the API for the first time the following occurs:
Auto-generates the table schema
Creates the dataset if it does not exist within the project
Creates the table if it does not exist within the project
- Parameters
name – The name of the collector associated with the stream
auth_path – The path to the Google provided authentication file. I.e. AUTH/google_auth_file.json
pid – Google project ID
did – Google dataset ID
tid – Google table ID
location – Location of the Bigquery project
if_exists –
if_exists=”append” Adds the data to the table
if_exists=”replace” Replaces the table with the new data
if_exists=”fail” Fails to upload the new data if the table exists
chunk_size – CURRENTLY NOT SUPPORTED
store – True if the super class is a Store object, otherwise False
-
schema
¶ contains schema if dtypes exist else None
- Type
Optional[List[google.cloud.bigquery.SchemaField]]
-
bq_schema
¶ contains bq_schema if dtypes exist else None
- Type
Optional[List[dict]]
-
cred
¶ contains the credentials used to authenticate with bigquery
- Type
google.oauth2.service_account.Credentials
-
client
¶ The bigquery client
- Type
google.cloud.bigquery.Client
-
dataset
¶ The dataset associated with the collector
- Type
google.cloud.bigquery.Dataset
-
table
¶ The table associated with the collector
- Type
google.cloud.bigquery.Table
-
connected
¶ True if a connection has been established I.e. credentials have been authenticated, otherwise False
- Type
bool
-
connect
() → bool¶ Attempts to authenticate with provided credentials.
Workflow
Load the credentials from the service account file
Instantiate the bigquery Client
Attempt to create the dataset and if a Conflict exception is thrown then load the dataset
Attempt to load the table and if a NotFound exception is thrown then create the table
- Returns
True if the table is successfully loaded/created otherwise False
-
stream
(data: pandas.core.frame.DataFrame, retry_depth: int = 5) → None¶ Attempts to store the incoming data into bigquery
Workflow
If authentication has not been performed, call self.connect()
Attempt to store the DataFrame to bigquery
If successful, check to see if any previous uploads have failed and attempt to store those as well
If the DataFrame cannot be successfully stored set the connection to False
If the retry_depth is not 0 perform a recursive call attempting to store the data again
If the retry_depth has reached zero, log an exception and store the DataFrame locally
Failed uploads will be stored in
STORE/upload_failed/collector_name/bigquery/timestamp_ns.parquet
- Parameters
data – The DataFrame that should be stored to bigquery
retry_depth – Used for a recursive call counter should the DataFrame fail to be stored
- Returns
None
-
build_schema
() → Optional[List[google.cloud.bigquery.schema.SchemaField]]¶ Attempts to build the schema that will be used for table creation
Iterates through the dtypes items and generate the appropriate SchemaFields
- Returns
The schema generated if successful otherwise None
-
build_bq_schema
() → Optional[List[dict]]¶ Attempts to build the schema that will be used to upload data to bigquery via DataFrame.to_gbq()
Iterates through the dtypes items and generate the appropriate schema dictionary
- Returns
The schema generated if successful otherwise None
-
api2db.stream.stream2local module¶
Contains the Stream2Local class¶
-
class
api2db.stream.stream2local.
Stream2Local
(name: str, path: Optional[str] = None, mode: str = 'shard', fmt: str = 'parquet', drop_duplicate_keys: Optional[List[str]] = None)¶ Bases:
api2db.stream.stream.Stream
Streams data from the associated collector directly to a local file in real-time
-
__init__
(name: str, path: Optional[str] = None, mode: str = 'shard', fmt: str = 'parquet', drop_duplicate_keys: Optional[List[str]] = None)¶ Creates a Stream2Local object and attempts to build its dtypes
- Parameters
name – The name of the collector associated with the stream
path – The path to either a single file or a file directory dictated by the mode parameter
mode –
mode=”shard” (default) will store each incoming file independently in the specified path In shard mode the file will be named timestamp_ns.fmt
mode=”update” will update the file located at the specified path with the new data
mode=”replace” will replace the file located at the specified path with the new data
fmt –
fmt=”parquet” (default/recommended) stores the files using parquet format
fmt=”json” stores the files using JSON format
fmt=”pickle” stores the files using pickle format
fmt=”csv” stores the files using csv format
drop_duplicate_keys –
drop_duplicate_keys=None -> DataFrame.drop_duplicates() performed before storage
drop_duplicate_keys=[“uuid”] -> DataFrame.drop_duplicates(subset=drop_duplicate_keys) performed before storage
-
stream
(data: pandas.core.frame.DataFrame) → None¶ Stores the incoming data into its stream target using the specified mode
- Parameters
data – The data to be stored
- Returns
None
-
stream_shard
(data: pandas.core.frame.DataFrame) → None¶ Stores the incoming data to the specified directory path using the file naming schema timestamp_ns.fmt
- Parameters
data – The data to store to the file
- Returns
None
-
stream_update
(data: pandas.core.frame.DataFrame) → None¶ Updates the existing data at the specified file path and adds the incoming data
- Parameters
data – The data to add to the file
- Returns
None
-
stream_replace
(data: pandas.core.frame.DataFrame) → None¶ Replaces the existing data at the specified file path with the incoming data
- Parameters
data – The data to replace the file with
- Returns
None
-
api2db.stream.stream2omnisci module¶
Contains the Stream2Omnisci class¶
Warning
Due to dependency conflicts and issues with the current published branch of the pymapd library the following steps must be taken to support streaming/storing data to Omnisci
> pip install pymapd==0.25.0
> pip install pandas --upgrade
> pip install pyarrow --upgrade
This occurs because of issues with the dependencies of the pymapd library being locked in place. I’ve opened an issue on this, and they appear to be working on it. The most recent publish seemed to break other things. Until this gets fixed this is a simple work-around. This will allow api2db to work with Omnisci, however there may be issues with attempts to utilize features of the pymapd library outside of what api2db uses, so use with caution. –Tristen
-
class
api2db.stream.stream2omnisci.
Stream2Omnisci
(name: str, db_name, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, protocol: str = 'binary', chunk_size: int = 0, store: bool = False)¶ Bases:
api2db.stream.stream.Stream
Streams data from the associated collector directly to Omnisci in real-time
-
__init__
(name: str, db_name, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, protocol: str = 'binary', chunk_size: int = 0, store: bool = False)¶ Creates a Stream2Omnisci object and attempts to build its dtypes
If dtypes can successfully be created I.e. Data arrives from the API for the first time the following occurs:
Auto-generates the table schema
Casts all string columns to categories as required by Omnisci
Creates a Omnisci table with the name
collector_name
_stream
Note
Data-attribute fields will have a
_t
appended to them due to naming conflicts encountered.Example:
A
B
C
1
2
3
4
5
6
Will become the following in the Omnisci database. (This is only applied to data in the database)
A_t
B_t
C_t
1
2
3
4
5
6
Authentication Methods:
Supply
auth_path
with a path to an authentication file. Templates for these files can be found in your projects AUTH/ directory
OR
Supply the
username
,host
, andpassword
- Parameters
name – The name of the collector associated with the stream
db_name – The name of the database to connect to
username – The username to authenticate with the database
password – The password to authenticate with the database
host – The host of the database
auth_path – The path to the authentication credentials.
protocol – The protocol to use when connecting to the database
chunk_size – CURRENTLY NOT SUPPORTED
store – True if the super class is a Store object, otherwise False
- Raises
ValueError – If
auth_path
is provided but is invalid or has incorrect valuesValueError – If
auth_path
is not provided andusername
,password
orhost
is missing
-
con
¶ The connection to the database
- Type
Optional[pymapd.Connection]
-
connected
¶ returns True if connection is established else False
- Type
Callable[Optional[pymapd.Connection], bool]
-
log_str
¶ A string used for logging
- Type
str
-
connect
() → Optional[<Mock name=’mock.Connection’ id=’140699503473936’>]¶ Attempts to establish a connection to a omnisci database
- Returns
A connection object if a connection can be established, else None
-
static
cast_categorical
(data: pandas.core.frame.DataFrame, dtypes: pandas.core.series.Series) → pandas.core.frame.DataFrame¶ Casts all columns with type
str
to typecategory
as required by omnisci and appends a_t
to column names- Parameters
data – The DataFrame that will be stored into the omnisci database
dtypes – The dtypes of the DataFrame
- Returns
Modified DataFrame
-
stream
(data, retry_depth=5)¶ Attempts to store the incoming data into omnisci
Workflow
If authentication has not been performed, call self.connect()
Attempt to store the DataFrame to omnisci
If successful, check to see if any previous uploads have failed and attempt to store those as well
If the DataFrame cannot be successfully stored set the con to None
If the retry_depth is not 0 perform a recursive call attempting to store the data again
If the retry_depth has reached zero, log an exception and store the DataFrame locally
Failed uploads will be stored in
STORE/upload_failed/collector_name/omnisci/timestamp_ns.parquet
- Parameters
data – The DataFrame that should be stored to omnisci
retry_depth – Used for a recursive call counter should the DataFrame fail to be stored
- Returns
None
-
api2db.stream.stream2sql module¶
Contains the Stream2Sql class¶
-
class
api2db.stream.stream2sql.
Stream2Sql
(name: str, db_name: str, dialect: str, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, port: str = '', if_exists: str = 'append', chunk_size: int = 0, store: bool = False)¶ Bases:
api2db.stream.stream.Stream
Streams data from the associated collector directly to an SQL database target in real-time
-
__init__
(name: str, db_name: str, dialect: str, username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, auth_path: Optional[str] = None, port: str = '', if_exists: str = 'append', chunk_size: int = 0, store: bool = False)¶ Creates a Stream2Sql object and attempts to build its dtypes
If dtypes can successfully be created I.e. Data arrives from the API for the first time the following occurs:
Auto-generates the table schema
Creates the table in the database if it does not exist
Authentication Methods:
Supply
auth_path
with a path to an authentication file. Templates for these files can be found in your projects AUTH/ directory
OR
Supply the
username
,host
, andpassword
- Parameters
name – The name of the collector associated with the stream
db_name – The name of the database to connect to
dialect –
dialect=”mysql” -> Use this to connect to a mysql database
dialect=”mariadb” -> Use this to connect to a mariadb database
dialect=”postgresql” -> Use this to connect to a postgresql database
dialect=”amazon_aurora” -> COMING SOON
dialect=”oracle” -> COMING SOON
dialect=”microsoft_sql” -> COMING SOON
dialect=”Something else?” -> Submit a feature request… or even better build it!
username – The username to authenticate with the database
password – The password to authenticate with the database
host – The host of the database
auth_path – The path to the authentication credentials.
port – The port used when establishing a connection to the database
if_exists –
if_exists=”append” Adds the data to the table
if_exists=”replace” Replaces the table with the new data
if_exists=”fail” Fails to upload the new data if the table exists
chunk_size – CURRENTLY NOT SUPPORTED
store – True if the super class is a Store object, otherwise False
- Raises
ValueError – If
auth_path
is provided but is invalid or has incorrect valuesValueError – If
auth_path
is not provided andusername
,password
orhost
is missing
-
driver
¶ The driver to use when connecting with SQLAlchemy
- Type
str
-
engine_str
¶ The full string that is used with sqlalchemy.create_engine
- Type
str
-
log_str
¶ A string used for logging
- Type
str
-
con
¶ The connection to the database
- Type
sqlalchemy.engine.Engine
-
connected
¶ True if connection is established otherwise False
- Type
bool
-
load
() → None¶ Loads the driver and creates the engine string and the log string
- Raises
NotImplementedError – Support for amazon_aurora has not been implemented in api2pandas yet
NotImplementedError – Support for oracle has not been implemented in api2db yet
NotImplementedError – Support for microsoft_sql has not been implemented in api2db yet
- Returns
None
-
connect
() → bool¶ Attempts to establish a connection to the database
- Returns
True if the connection is established otherwise False
-
stream
(data, retry_depth=5)¶ Attempts to store the incoming data into the SQL database
Workflow
If authentication has not been performed, call self.connect()
Attempt to store the DataFrame to the database
If successful, check to see if any previous uploads have failed and attempt to store those as well
If the DataFrame cannot be successfully stored set the connected to False
If the retry_depth is not 0 perform a recursive call attempting to store the data again
If the retry_depth has reached zero, log an exception and store the DataFrame locally
Failed uploads will be stored in
STORE/upload_failed/collector_name/sql.**dialect**/timestamp_ns.parquet
- Parameters
data – The DataFrame that should be stored to the database
retry_depth – Used for a recursive call counter should the DataFrame fail to be stored
- Returns
None
-
Module contents¶
Original Author |
Tristen Harr |
Creation Date |
04/27/2021 |
Revisions |
None |