pyspark_db_utils package

Submodules

pyspark_db_utils.example module

It’s just simple example of using lib It asks you about DB connection parameters, makes DF, writes to DB, loads it back and shows.

pyspark_db_utils.example.get_pg_config()

Ask DB connections params

Return type:dict
pyspark_db_utils.example.init_spark_context(appname)

init spark context

Return type:<MagicMock id=‘140005202322936’>
pyspark_db_utils.example.main(spark)

run example

Return type:None

pyspark_db_utils.pg module

Utils for Postgres.

Most useful are: read_from_pg(), write_to_pg(), execute_batch()

pyspark_db_utils.pg.read_from_pg(config, sql, sc, logger=None)

Read dataframe from postgres

Parameters:
  • config (dict) – settings for connect
  • sql (str) –

    sql to read, it may be one of these format

    • ’table_name’
    • ’schema_name.table_name’
    • ’(select a, b, c from t1 join t2 …) as foo’
  • sc (<MagicMock id='140005202368944'>) – specific current spark_context or None
  • logger (Optional[Logger]) – logger
Return type:

<MagicMock id=‘140005202306664’>

Returns:

selected DF

pyspark_db_utils.pg.write_to_pg(df, config, table, mode='append', logger=None)

Write dataframe to postgres

Parameters:
  • df (<MagicMock id='140005202306664'>) – DataFrame to write
  • config (dict) – config dict
  • table (str) – table_name
  • logger (Optional[Logger]) – logger
  • mode (str) –

    mode, one of these:

    • append - create table if not exists (with all columns of DataFrame)
      and write records to table (using fields only in table columns)
    • overwrite - truncate table (if exists) and write records (using fields only in table columns)
    • overwrite_full - drop table and create new one with all columns and DataFrame and append records to it
    • fail - fail if table is not exists, otherwise append records to it
Return type:

None

pyspark_db_utils.pg.run_sql(sql, config, logger=None)

just run sql

Return type:None
pyspark_db_utils.pg.get_field_names(table_name, config)

get field names of table

Return type:Set[str]
pyspark_db_utils.pg.get_field_names_stub(df, config, table_name, sc)

get field names of table

! DONT USE IT ! Use get_field_names instead !

TODO: replace with get_field_names

Return type:Set[str]
pyspark_db_utils.pg.jdbc_connect(config, autocommit=False)

context manager, opens and closes connection correctly

Parameters:
  • config (Dict[~KT, ~VT]) – config
  • autocommit (bool) – enable autocommit
Yields:

tuple – connection, cursor

pyspark_db_utils.pg.mogrify(val)

cast python values to raw-sql correctly and escape if necessary

Parameters:val – some value
Return type:str
Returns:mogrified value
class pyspark_db_utils.pg.MogrifyFormatter

Bases: string.Formatter

custom formatter to mogrify {}-like formatting strings

get_value(key, args, kwargs)
Return type:str
pyspark_db_utils.pg.batcher(iterable, batch_size)

yields batches of iterable

Parameters:
  • iterable (Iterable[+T_co]) – something to batch
  • batch_size (int) – batch size
Yields:

batch, until end of iterable

pyspark_db_utils.pg.execute_batch(df, sql_temp, config, batch_size=1000)

Very useful function to run custom SQL on each rows in DataFrame by batches.

For example UPDATE / DELETE / etc

Attention! It’s expecting that sql_temp string using {} like formatting (because it’s easy to overload it by custom formatter. execute_batch replace {field} by field value for each row in DataFrame. So, if you want to make some formatting (such as table_name or constant values) you should use %()s formatting.

Examples

update table rows by id and values for DF records:

>> execute_batch(df, config=config,
    sql_temp='update %(table_name)s set out_date=%(filename_date)s where id={id}'
    % {'table_name': table_name, 'filename_date': filename_date})

update table rows fields by complex sql expression:

>> execute_batch(df=df, sql_temp='''
    UPDATE reporting.cases c
         SET
              close_date = {check_date_time},
              status = 2,
              lost_sales = EXTRACT(epoch FROM {check_date_time} - c.start_date) * (3.0 / 7) / (24 * 3600)
         WHERE
              c.id = {id}
   ''', config=config)
Return type:None
pyspark_db_utils.pg.update_many(df, table_name, set_to, config, batch_size=1000, id_field='id')

Update rows in DataFrame. Set some fields to the new constant same-values.

Note

this function update fields to constant values, if you want to make some update-sql-expression, use execute_batch

Parameters:
  • df (<MagicMock id='140005202306664'>) – DataFrame
  • table_name (str) – table name
  • set_to (Dict[~KT, ~VT]) – dict such as {‘field_name1’: new_const_value1, ‘field_name2’: new_const_value2}
  • config (Dict[~KT, ~VT]) – config
  • batch_size (int) – batch size
  • id_field (str) – id field
Return type:

None

pyspark_db_utils.pg.insert_values(df, config, batch_size=1000, fields=None, values_temp=None, sql_temp=None, table_name=None, on_conflict_do_nothing=False, on_conflict_do_update=False, drop_duplicates=False, exclude_null_field=None, logger=None, sc=None)

Insert rows from DataFrame.

Note

Use write_to_pg as often as possible.

Unfortunately, it’s not able to use ON CONFLICT and ON UPDATE statements, so we are forced to write custom function.

Parameters:
  • df (<MagicMock id='140005202306664'>) – DataFrame
  • sql_temp (Optional[str]) – sql template (may consist values, fields, table_name formatting-arguments)
  • values_temp (Optional[str]) – string template for values
  • config (Dict[~KT, ~VT]) – config
  • fields (Optional[List[str]]) – list of columns for insert (if None, all olumns will be used)
  • batch_size (int) – batch size
  • table_name (Optional[str]) – table name argument for string-formatting
  • on_conflict_do_nothing (bool) – add ON CONFLICT DO NOTHING statement to each INSERT
  • on_conflict_do_update (bool) – add ON CONFLICT DO UPDATE statement to each INSERT
  • drop_duplicates (bool) – drop duplicates if set to True
  • exclude_null_field (Optional[str]) – exclude rows where field=exclude_null_field is null
  • logger (Optional[Logger]) – logger
  • sc (Optional[<MagicMock id=‘140005202368944’>]) – Spark context
Return type:

None

Module contents