pyspark_db_utils.ch package

Submodules

pyspark_db_utils.ch.make_ch_model_for_df module

pyspark_db_utils.ch.make_ch_model_for_df.spark_field2clickhouse_field(spark_field)

spark field to clickhouse field

Return type:Field
pyspark_db_utils.ch.make_ch_model_for_df.make_ch_model_for_df(df, date_field_name, table_name, pk_columns=None)
creates ORM Model for DataFrame models.Model is meta class so it is a bit tricky to dynamically create child-class with given attrivutes ToDo: Add support for engine Memory and Log
Parameters:
  • df – PySpark DataFrame
  • date_field_name – Date-typed field for partitioning
  • pk_columns – primary key columns
  • table_name – table name in DB
Returns:

ORM Model class

pyspark_db_utils.ch.read_from_ch module

pyspark_db_utils.ch.read_from_ch.read_from_ch(config, sql, sc, logger=None)

Read DF from ClickHouse SQL

Parameters:
  • config (Dict[~KT, ~VT]) – config
  • sql (str) – sql it may be one of these format: - ‘table_name’ - ‘schema_name.table_name’ - ‘(select a, b, c from t1 join t2 …) as foo’
  • sc (<MagicMock id='140005200045392'>) – spark context
  • logger (Optional[Logger]) – logger
Return type:

<MagicMock id=‘140005199297504’>

Returns:

DataFrame

pyspark_db_utils.ch.smart_ch_fillna module

pyspark_db_utils.ch.smart_ch_fillna.check_date_columns_for_nulls(df)

returns True if any Date or Timestamp column consist NULL value

Return type:bool
pyspark_db_utils.ch.smart_ch_fillna.smart_ch_fillna(df)

change null-value to default values

Return type:<MagicMock id=‘140005199315856’>

pyspark_db_utils.ch.write_to_ch module

class pyspark_db_utils.ch.write_to_ch.CustomDatabase(db_name, db_url='http://localhost:8123/', username=None, password=None, readonly=False, autocreate=True)

Bases: infi.clickhouse_orm.database.Database

ClickHouse database with useful functions

static table2table_name(table)

get table_name for table

Parameters:table (Union[str, ModelBase]) – may be string of table_name or db Model class
Return type:str
check_table_exist(table)

check if table exists

Parameters:table (Union[str, ModelBase]) – table to check
Return type:bool
describe(table)

Returns result for DESCRIBE statement on table

Parameters:table (Union[ModelBase, str]) – table
Return type:str
Returns:describe table

Examples

example of output:

plu     Int64
shop_id Int64
check_date_time DateTime
clickhouse_date Date
created DateTime
type    UInt8
pyspark_db_utils.ch.write_to_ch.make_sure_exsit(df, date_field_name, table_name, mode, config, logger, pk_columns=None)

drop and create table if need

pyspark_db_utils.ch.write_to_ch.write_to_ch(df, date_field_name, table_name, mode, config, logger, pk_columns=None)

Dumps PySpark DataFrame to ClickHouse, create or recreate table if needed.

Parameters:
  • df – PySpark DataFrame
  • mode

    describe, what do if table already exists

    must be one of ‘overwrite’ / ‘append’ / ‘fail’:

    • overwrite: drop and create table and insert rows (CH hasn’t truncate operator)
    • append: insert rows to exist table
    • fail: raise Exception
  • table_name – table name
  • date_field_name – date field for partitioning
  • pk_columns – list/tuple of primary key columns (None for all columns)
Return type:

None

Module contents