pyspark_db_utils package¶
Subpackages¶
Submodules¶
pyspark_db_utils.example module¶
It’s just simple example of using lib It asks you about DB connection parameters, makes DF, writes to DB, loads it back and shows.
-
pyspark_db_utils.example.get_pg_config()¶ Ask DB connections params
Return type: dict
-
pyspark_db_utils.example.init_spark_context(appname)¶ init spark context
Return type: <MagicMock id=‘140005202322936’>
-
pyspark_db_utils.example.main(spark)¶ run example
Return type: None
pyspark_db_utils.pg module¶
Utils for Postgres.
Most useful are: read_from_pg(), write_to_pg(), execute_batch()
-
pyspark_db_utils.pg.read_from_pg(config, sql, sc, logger=None)¶ Read dataframe from postgres
Parameters: - config (
dict) – settings for connect - sql (
str) –sql to read, it may be one of these format
- ’table_name’
- ’schema_name.table_name’
- ’(select a, b, c from t1 join t2 …) as foo’
- sc (<MagicMock id='140005202368944'>) – specific current spark_context or None
- logger (
Optional[Logger]) – logger
Return type: <MagicMock id=‘140005202306664’>
Returns: selected DF
- config (
-
pyspark_db_utils.pg.write_to_pg(df, config, table, mode='append', logger=None)¶ Write dataframe to postgres
Parameters: - df (<MagicMock id='140005202306664'>) – DataFrame to write
- config (
dict) – config dict - table (
str) – table_name - logger (
Optional[Logger]) – logger - mode (
str) –mode, one of these:
- append - create table if not exists (with all columns of DataFrame)
- and write records to table (using fields only in table columns)
- overwrite - truncate table (if exists) and write records (using fields only in table columns)
- overwrite_full - drop table and create new one with all columns and DataFrame and append records to it
- fail - fail if table is not exists, otherwise append records to it
Return type: None
-
pyspark_db_utils.pg.run_sql(sql, config, logger=None)¶ just run sql
Return type: None
-
pyspark_db_utils.pg.get_field_names(table_name, config)¶ get field names of table
Return type: Set[str]
-
pyspark_db_utils.pg.get_field_names_stub(df, config, table_name, sc)¶ get field names of table
! DONT USE IT ! Use get_field_names instead !
TODO: replace with get_field_names
Return type: Set[str]
-
pyspark_db_utils.pg.jdbc_connect(config, autocommit=False)¶ context manager, opens and closes connection correctly
Parameters: - config (
Dict[~KT, ~VT]) – config - autocommit (
bool) – enable autocommit
Yields: tuple – connection, cursor
- config (
-
pyspark_db_utils.pg.mogrify(val)¶ cast python values to raw-sql correctly and escape if necessary
Parameters: val – some value Return type: strReturns: mogrified value
-
class
pyspark_db_utils.pg.MogrifyFormatter¶ Bases:
string.Formattercustom formatter to mogrify {}-like formatting strings
-
get_value(key, args, kwargs)¶ Return type: str
-
-
pyspark_db_utils.pg.batcher(iterable, batch_size)¶ yields batches of iterable
Parameters: - iterable (
Iterable[+T_co]) – something to batch - batch_size (
int) – batch size
Yields: batch, until end of iterable
- iterable (
-
pyspark_db_utils.pg.execute_batch(df, sql_temp, config, batch_size=1000)¶ Very useful function to run custom SQL on each rows in DataFrame by batches.
For example UPDATE / DELETE / etc
Attention! It’s expecting that sql_temp string using {} like formatting (because it’s easy to overload it by custom formatter. execute_batch replace {field} by field value for each row in DataFrame. So, if you want to make some formatting (such as table_name or constant values) you should use %()s formatting.
Examples
update table rows by id and values for DF records:
>> execute_batch(df, config=config, sql_temp='update %(table_name)s set out_date=%(filename_date)s where id={id}' % {'table_name': table_name, 'filename_date': filename_date})
update table rows fields by complex sql expression:
>> execute_batch(df=df, sql_temp=''' UPDATE reporting.cases c SET close_date = {check_date_time}, status = 2, lost_sales = EXTRACT(epoch FROM {check_date_time} - c.start_date) * (3.0 / 7) / (24 * 3600) WHERE c.id = {id} ''', config=config)
Return type: None
-
pyspark_db_utils.pg.update_many(df, table_name, set_to, config, batch_size=1000, id_field='id')¶ Update rows in DataFrame. Set some fields to the new constant same-values.
Note
this function update fields to constant values, if you want to make some update-sql-expression, use execute_batch
Parameters: - df (<MagicMock id='140005202306664'>) – DataFrame
- table_name (
str) – table name - set_to (
Dict[~KT, ~VT]) – dict such as {‘field_name1’: new_const_value1, ‘field_name2’: new_const_value2} - config (
Dict[~KT, ~VT]) – config - batch_size (
int) – batch size - id_field (
str) – id field
Return type: None
-
pyspark_db_utils.pg.insert_values(df, config, batch_size=1000, fields=None, values_temp=None, sql_temp=None, table_name=None, on_conflict_do_nothing=False, on_conflict_do_update=False, drop_duplicates=False, exclude_null_field=None, logger=None, sc=None)¶ Insert rows from DataFrame.
Note
Use write_to_pg as often as possible.
Unfortunately, it’s not able to use ON CONFLICT and ON UPDATE statements, so we are forced to write custom function.
Parameters: - df (<MagicMock id='140005202306664'>) – DataFrame
- sql_temp (
Optional[str]) – sql template (may consist values, fields, table_name formatting-arguments) - values_temp (
Optional[str]) – string template for values - config (
Dict[~KT, ~VT]) – config - fields (
Optional[List[str]]) – list of columns for insert (if None, all olumns will be used) - batch_size (
int) – batch size - table_name (
Optional[str]) – table name argument for string-formatting - on_conflict_do_nothing (
bool) – add ON CONFLICT DO NOTHING statement to each INSERT - on_conflict_do_update (
bool) – add ON CONFLICT DO UPDATE statement to each INSERT - drop_duplicates (
bool) – drop duplicates if set to True - exclude_null_field (
Optional[str]) – exclude rows where field=exclude_null_field is null - logger (
Optional[Logger]) – logger - sc (
Optional[<MagicMock id=‘140005202368944’>]) – Spark context
Return type: None