From c42be849d3404d752c68046de86ce53e8d482e4b Mon Sep 17 00:00:00 2001 From: Prashant Kumar Pandey Date: Fri, 15 Sep 2023 17:01:30 +0530 Subject: [PATCH 1/2] Initial Commit --- SBDL - Starter/.gitignore | 5 + SBDL - Starter/Jenkinsfile | 40 +++ SBDL - Starter/Pipfile | 14 + SBDL - Starter/conf/sbdl.conf | 13 + SBDL - Starter/conf/spark.conf | 20 ++ SBDL - Starter/lib/Utils.py | 17 ++ SBDL - Starter/lib/__init__.py | 0 SBDL - Starter/lib/logger.py | 18 ++ SBDL - Starter/log4j.properties | 27 ++ SBDL - Starter/sbdl_main.py | 18 ++ SBDL - Starter/sbdl_submit.sh | 7 + .../test_data/accounts/account_samples.csv | 10 + .../test_data/parties/party_samples.csv | 12 + .../party_address/address_samples.csv | 10 + .../test_data/results/final_df.json | 8 + SBDL - Starter/test_pytest_sbdl.py | 15 + SBDL/.gitignore | 5 + SBDL/Jenkinsfile | 40 +++ SBDL/Pipfile | 17 ++ SBDL/Pipfile.lock | 259 ++++++++++++++++++ SBDL/conf/sbdl.conf | 42 +++ SBDL/conf/spark.conf | 19 ++ SBDL/lib/ConfigLoader.py | 26 ++ SBDL/lib/DataGen.py | 65 +++++ SBDL/lib/DataLoader.py | 61 +++++ SBDL/lib/Transformations.py | 94 +++++++ SBDL/lib/Utils.py | 20 ++ SBDL/lib/__init__.py | 0 SBDL/lib/logger.py | 18 ++ SBDL/log4j.properties | 27 ++ SBDL/main.py | 69 +++++ SBDL/sbdl_submit.sh | 7 + SBDL/test_data/accounts/account_samples.csv | 10 + .../output-sample/output-sample.json | 112 ++++++++ SBDL/test_data/parties/party_samples.csv | 12 + .../party_address/address_samples.csv | 10 + SBDL/test_data/results/contract_df.json | 8 + SBDL/test_data/results/final_df.json | 8 + SBDL/test_nutter_sbdl.py | 24 ++ SBDL/test_pytest_sbdl.py | 227 +++++++++++++++ 40 files changed, 1414 insertions(+) create mode 100644 SBDL - Starter/.gitignore create mode 100644 SBDL - Starter/Jenkinsfile create mode 100644 SBDL - Starter/Pipfile create mode 100644 SBDL - Starter/conf/sbdl.conf create mode 100644 SBDL - Starter/conf/spark.conf create mode 100644 SBDL - Starter/lib/Utils.py create mode 100644 SBDL - Starter/lib/__init__.py create mode 100644 SBDL - Starter/lib/logger.py create mode 100644 SBDL - Starter/log4j.properties create mode 100644 SBDL - Starter/sbdl_main.py create mode 100644 SBDL - Starter/sbdl_submit.sh create mode 100644 SBDL - Starter/test_data/accounts/account_samples.csv create mode 100644 SBDL - Starter/test_data/parties/party_samples.csv create mode 100644 SBDL - Starter/test_data/party_address/address_samples.csv create mode 100644 SBDL - Starter/test_data/results/final_df.json create mode 100644 SBDL - Starter/test_pytest_sbdl.py create mode 100644 SBDL/.gitignore create mode 100644 SBDL/Jenkinsfile create mode 100644 SBDL/Pipfile create mode 100644 SBDL/Pipfile.lock create mode 100644 SBDL/conf/sbdl.conf create mode 100644 SBDL/conf/spark.conf create mode 100644 SBDL/lib/ConfigLoader.py create mode 100644 SBDL/lib/DataGen.py create mode 100644 SBDL/lib/DataLoader.py create mode 100644 SBDL/lib/Transformations.py create mode 100644 SBDL/lib/Utils.py create mode 100644 SBDL/lib/__init__.py create mode 100644 SBDL/lib/logger.py create mode 100644 SBDL/log4j.properties create mode 100644 SBDL/main.py create mode 100644 SBDL/sbdl_submit.sh create mode 100644 SBDL/test_data/accounts/account_samples.csv create mode 100644 SBDL/test_data/output-sample/output-sample.json create mode 100644 SBDL/test_data/parties/party_samples.csv create mode 100644 SBDL/test_data/party_address/address_samples.csv create mode 100644 SBDL/test_data/results/contract_df.json create mode 100644 SBDL/test_data/results/final_df.json create mode 100644 SBDL/test_nutter_sbdl.py create mode 100644 SBDL/test_pytest_sbdl.py diff --git a/SBDL - Starter/.gitignore b/SBDL - Starter/.gitignore new file mode 100644 index 0000000..cc36b36 --- /dev/null +++ b/SBDL - Starter/.gitignore @@ -0,0 +1,5 @@ +# Default ignored files +/workspace.xml +.idea/* +lib/__pycache__/* + diff --git a/SBDL - Starter/Jenkinsfile b/SBDL - Starter/Jenkinsfile new file mode 100644 index 0000000..e6e1300 --- /dev/null +++ b/SBDL - Starter/Jenkinsfile @@ -0,0 +1,40 @@ +pipeline { + agent any + + stages { + stage('Build') { + steps { + sh 'pipenv --python python3 sync' + } + } + stage('Test') { + steps { + sh 'pipenv run pytest' + } + } + stage('Package') { + when{ + anyOf{ branch "master" ; branch 'release' } + } + steps { + sh 'zip -r sbdl.zip lib' + } + } + stage('Release') { + when{ + branch 'release' + } + steps { + sh "scp -i /home/prashant/cred/edge-node_key.pem -o 'StrictHostKeyChecking no' -r sbdl.zip log4j.properties sbdl_main.py sbdl_submit.sh conf prashant@40.117.123.105:/home/prashant/sbdl-qa" + } + } + stage('Deploy') { + when{ + branch 'master' + } + steps { + sh "scp -i /home/prashant/cred/edge-node_key.pem -o 'StrictHostKeyChecking no' -r sbdl.zip log4j.properties sbdl_main.py sbdl_submit.sh conf prashant@40.117.123.105:/home/prashant/sbdl-prod" + } + } + } +} diff --git a/SBDL - Starter/Pipfile b/SBDL - Starter/Pipfile new file mode 100644 index 0000000..d0b6fab --- /dev/null +++ b/SBDL - Starter/Pipfile @@ -0,0 +1,14 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +pyspark = "*" +pytest = "*" + +[dev-packages] + +[requires] +python_version = "3.10" + diff --git a/SBDL - Starter/conf/sbdl.conf b/SBDL - Starter/conf/sbdl.conf new file mode 100644 index 0000000..de5f81b --- /dev/null +++ b/SBDL - Starter/conf/sbdl.conf @@ -0,0 +1,13 @@ +[LOCAL] +enable.hive = false +hive.database = null +kafka.topic = sbdl_kafka_cloud +[QA] +enable.hive = true +hive.database = sbdl_db_qa +kafka.topic = sbdl_kafka_qa +[PROD] +enable.hive = true +hive.database = sbdl_db +kafka.topic = sbdl_kafka + diff --git a/SBDL - Starter/conf/spark.conf b/SBDL - Starter/conf/spark.conf new file mode 100644 index 0000000..afa3809 --- /dev/null +++ b/SBDL - Starter/conf/spark.conf @@ -0,0 +1,20 @@ +[LOCAL] +spark.app.name = sbdl-local +spark.executor.instances = 2 +spark.executor.cores = 1 +spark.executor.memory = 1G +spark.sql.shuffle.partitions = 5 +[QA] +spark.app.name = sbdl-qa +spark.executor.instances = 2 +spark.executor.cores = 1 +spark.executor.memory = 4G +spark.sql.shuffle.partitions = 1000 +[PROD] +spark.app.name = sbdl +spark.executor.instances = 2 +spark.executor.cores = 1 +spark.executor.memory = 4G +spark.sql.shuffle.partitions = 1000 + + diff --git a/SBDL - Starter/lib/Utils.py b/SBDL - Starter/lib/Utils.py new file mode 100644 index 0000000..23646cb --- /dev/null +++ b/SBDL - Starter/lib/Utils.py @@ -0,0 +1,17 @@ +from pyspark.sql import SparkSession + + +def get_spark_session(env): + if env == "LOCAL": + return SparkSession.builder \ + .config('spark.driver.extraJavaOptions', + '-Dlog4j.configuration=file:log4j.properties') \ + .master("local[2]") \ + .enableHiveSupport() \ + .getOrCreate() + else: + return SparkSession.builder \ + .enableHiveSupport() \ + .getOrCreate() + + diff --git a/SBDL - Starter/lib/__init__.py b/SBDL - Starter/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/SBDL - Starter/lib/logger.py b/SBDL - Starter/lib/logger.py new file mode 100644 index 0000000..8db6244 --- /dev/null +++ b/SBDL - Starter/lib/logger.py @@ -0,0 +1,18 @@ +class Log4j(object): + def __init__(self, spark): + log4j = spark._jvm.org.apache.log4j + self.logger = log4j.LogManager.getLogger("sbdl") + + def warn(self, message): + self.logger.warn(message) + + def info(self, message): + self.logger.info(message) + + def error(self, message): + self.logger.error(message) + + def debug(self, message): + self.logger.debug(message) + + diff --git a/SBDL - Starter/log4j.properties b/SBDL - Starter/log4j.properties new file mode 100644 index 0000000..f1531ab --- /dev/null +++ b/SBDL - Starter/log4j.properties @@ -0,0 +1,27 @@ +# Set everything to be logged to the console +log4j.rootCategory=WARN, console + +# define console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +#application log +log4j.logger.sbdl=INFO, console, file +log4j.additivity.sbdl=false + +#define following in Java System +# -Dlog4j.configuration=file:log4j.properties + +# Recommendations from Spark template +log4j.logger.org.apache.spark.repl.Main=WARN +log4j.logger.org.spark_project.jetty=WARN +log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO +log4j.logger.org.apache.parquet=ERROR +log4j.logger.parquet=ERROR +log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL +log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR + diff --git a/SBDL - Starter/sbdl_main.py b/SBDL - Starter/sbdl_main.py new file mode 100644 index 0000000..b2c79ca --- /dev/null +++ b/SBDL - Starter/sbdl_main.py @@ -0,0 +1,18 @@ +import sys + +from lib import Utils +from lib.logger import Log4j + +if __name__ == '__main__': + + if len(sys.argv) < 3: + print("Usage: sbdl {local, qa, prod} {load_date} : Arguments are missing") + sys.exit(-1) + + job_run_env = sys.argv[1].upper() + load_date = sys.argv[2] + + spark = Utils.get_spark_session(job_run_env) + logger = Log4j(spark) + + logger.info("Finished creating Spark Session") diff --git a/SBDL - Starter/sbdl_submit.sh b/SBDL - Starter/sbdl_submit.sh new file mode 100644 index 0000000..d0a37b5 --- /dev/null +++ b/SBDL - Starter/sbdl_submit.sh @@ -0,0 +1,7 @@ +spark-submit --master yarn --deploy-mode cluster \ +--py-files sdbl_lib.zip \ +--files conf/sdbl.conf,conf/spark.conf,log4j.properties \ +sbdl_main.py qa 2022-08-02 + + + diff --git a/SBDL - Starter/test_data/accounts/account_samples.csv b/SBDL - Starter/test_data/accounts/account_samples.csv new file mode 100644 index 0000000..7498c11 --- /dev/null +++ b/SBDL - Starter/test_data/accounts/account_samples.csv @@ -0,0 +1,10 @@ +load_date,active_ind,account_id,source_sys,account_start_date,legal_title_1,legal_title_2,tax_id_type,tax_id,branch_code,country +2022-08-02,1,6982391060,COH,2018-03-24T13:56:45.000+05:30,Tiffany Riley,Matthew Davies,EIN,ZLCK91795330413525,ACXMGBA5,Mexico +2022-08-02,1,6982391061,ADS,2018-07-19T11:24:49.000+05:30,Garcia and Sons,Taylor Guzman,SSP,CADU39916151090321,SHJFGBML,United States +2022-08-02,1,6982391067,BDL,2018-08-29T17:18:59.000+05:30,Acosta Inc,David Walker,SSP,UJLN20870916345792,WZTEGBTG,Canada +2022-08-02,0,6982391063,ADS,2019-02-15T09:46:10.000+05:30,Christopher Moreno,,CPR,APVN98456428071508,KRGZGB9S,Canada +2022-08-02,1,6982391064,ADS,2018-03-28T15:47:43.000+05:30,Allen Group,,CPR,WJMX61093523376960,OCCKGB65,Canada +2022-08-02,1,6982391065,COR,2018-06-30T22:33:19.000+05:30,Austin Miles,,EIN,TYAB75470638120665,XCVKGB49,Canada +2022-08-02,1,6982391066,BDL,2017-08-25T03:00:00.000+05:30,Miss Lisa Lee,,SSP,WFDP61047142851240,CVYEGBJC,United States +2022-08-02,1,6982391062,CML,2017-09-24T08:14:17.000+05:30,Theresa Mays,,CPR,WTUP76582369402245,XCVKGB49,Canada +2022-08-02,1,6982391068,BDL,2018-05-11T15:24:57.000+05:30,Amanda Martin,,EIN,JDPX30428146546118,ACXMGBA5,Mexico diff --git a/SBDL - Starter/test_data/parties/party_samples.csv b/SBDL - Starter/test_data/parties/party_samples.csv new file mode 100644 index 0000000..c92c0a6 --- /dev/null +++ b/SBDL - Starter/test_data/parties/party_samples.csv @@ -0,0 +1,12 @@ +load_date,account_id,party_id,relation_type,relation_start_date +2022-08-02,6982391060,9823462810,F-N,2019-07-29T06:21:32.000+05:30 +2022-08-02,6982391061,9823462811,F-N,2018-08-31T05:27:22.000+05:30 +2022-08-02,6982391062,9823462812,F-N,2018-08-25T15:50:29.000+05:30 +2022-08-02,6982391063,9823462813,F-N,2018-05-11T07:23:28.000+05:30 +2022-08-02,6982391064,9823462814,F-N,2019-06-06T14:18:12.000+05:30 +2022-08-02,6982391065,9823462815,F-N,2019-05-04T05:12:37.000+05:30 +2022-08-02,6982391066,9823462816,F-N,2019-05-15T10:39:29.000+05:30 +2022-08-02,6982391067,9823462817,F-N,2018-05-16T09:53:04.000+05:30 +2022-08-02,6982391068,9823462818,F-N,2017-11-27T01:20:12.000+05:30 +2022-08-02,6982391067,9823462820,F-S,2017-11-20T14:18:05.000+05:30 +2022-08-02,6982391067,9823462821,F-S,2018-07-19T18:56:57.000+05:30 diff --git a/SBDL - Starter/test_data/party_address/address_samples.csv b/SBDL - Starter/test_data/party_address/address_samples.csv new file mode 100644 index 0000000..b5278ac --- /dev/null +++ b/SBDL - Starter/test_data/party_address/address_samples.csv @@ -0,0 +1,10 @@ +load_date,party_id,address_line_1,address_line_2,city,postal_code,country_of_address,address_start_date +2022-08-02,9823462810,45229 Drake Route,13306 Corey Point,Shanefort,77163,Canada,2019-02-26 +2022-08-02,9823462811,361 Robinson Green,3511 Rebecca Mission,North Tyler,34118,Canada,2018-01-28 +2022-08-02,9823462812,039 Daniel Mount,8219 Hernandez Lodge Suite 875,Boltonborough,71648,Mexico,2018-12-07 +2022-08-02,9823462813,05550 Nancy Rapids,9471 Zachary Canyon,East Davidport,02504,United States,2019-04-02 +2022-08-02,9823462814,5227 Wagner Pines,189 Julie Throughway,West Amanda,78962,Canada,2018-07-11 +2022-08-02,9823462815,6993 Diane Alley,8945 Trevor Greens,Kendrafurt,50790,United States,2017-10-08 +2022-08-02,9823462816,23450 Timothy Divide,125 Johnson Mountain Suite 701,Osbornetown,04756,Canada,2018-11-28 +2022-08-02,9823462817,251 Lee Tunnel,09795 Tara Station Suite 264,New Michelleborough,05505,United States,2019-04-20 +2022-08-02,9823462818,7537 Clarke Club,74089 Jerry Trail,Hunterville,19596,United States,2018-07-17 diff --git a/SBDL - Starter/test_data/results/final_df.json b/SBDL - Starter/test_data/results/final_df.json new file mode 100644 index 0000000..e87c931 --- /dev/null +++ b/SBDL - Starter/test_data/results/final_df.json @@ -0,0 +1,8 @@ +{"eventHeader":{"eventIdentifier":"c361a145-d2fc-434e-a608-9688caa6d22e","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391060"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391060"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"COH"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-03-24T13:56:45.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Tiffany Riley"},{"contractTitleLineType":"lgl_ttl_ln_2","contractTitleLine":"Matthew Davies"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"EIN","taxId":"ZLCK91795330413525"}},"contractBranchCode":{"operation":"INSERT","newValue":"ACXMGBA5"},"contractCountry":{"operation":"INSERT","newValue":"Mexico"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462810"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2019-07-29T06:21:32.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"45229 Drake Route","addressLine2":"13306 Corey Point","addressCity":"Shanefort","addressPostalCode":"77163","addressCountry":"Canada","addressStartDate":"2019-02-26"}}}]}} +{"eventHeader":{"eventIdentifier":"1c1f4df6-8d4f-4c52-9ba6-165babe48f67","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391061"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391061"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"ADS"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-07-19T11:24:49.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Garcia and Sons"},{"contractTitleLineType":"lgl_ttl_ln_2","contractTitleLine":"Taylor Guzman"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"SSP","taxId":"CADU39916151090321"}},"contractBranchCode":{"operation":"INSERT","newValue":"SHJFGBML"},"contractCountry":{"operation":"INSERT","newValue":"United States"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462811"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2018-08-31T05:27:22.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"361 Robinson Green","addressLine2":"3511 Rebecca Mission","addressCity":"North Tyler","addressPostalCode":"34118","addressCountry":"Canada","addressStartDate":"2018-01-28"}}}]}} +{"eventHeader":{"eventIdentifier":"38d57237-a566-4ec4-a471-f659e9ce8437","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391067"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391067"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"BDL"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-08-29T17:18:59.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Acosta Inc"},{"contractTitleLineType":"lgl_ttl_ln_2","contractTitleLine":"David Walker"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"SSP","taxId":"UJLN20870916345792"}},"contractBranchCode":{"operation":"INSERT","newValue":"WZTEGBTG"},"contractCountry":{"operation":"INSERT","newValue":"Canada"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462817"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2018-05-16T09:53:04.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"251 Lee Tunnel","addressLine2":"09795 Tara Station Suite 264","addressCity":"New Michelleborough","addressPostalCode":"05505","addressCountry":"United States","addressStartDate":"2019-04-20"}}},{"partyIdentifier":{"operation":"INSERT","newValue":"9823462820"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-S"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2017-11-20T14:18:05.000+05:30"}},{"partyIdentifier":{"operation":"INSERT","newValue":"9823462821"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-S"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2018-07-19T18:56:57.000+05:30"}}]}} +{"eventHeader":{"eventIdentifier":"38b24610-3bb9-4adc-8f8c-cfc3c24877f2","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391064"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391064"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"ADS"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-03-28T15:47:43.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Allen Group"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"CPR","taxId":"WJMX61093523376960"}},"contractBranchCode":{"operation":"INSERT","newValue":"OCCKGB65"},"contractCountry":{"operation":"INSERT","newValue":"Canada"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462814"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2019-06-06T14:18:12.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"5227 Wagner Pines","addressLine2":"189 Julie Throughway","addressCity":"West Amanda","addressPostalCode":"78962","addressCountry":"Canada","addressStartDate":"2018-07-11"}}}]}} +{"eventHeader":{"eventIdentifier":"40a7cb7f-3996-47db-85ae-13d08d5998ea","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391065"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391065"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"COR"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-06-30T22:33:19.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Austin Miles"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"EIN","taxId":"TYAB75470638120665"}},"contractBranchCode":{"operation":"INSERT","newValue":"XCVKGB49"},"contractCountry":{"operation":"INSERT","newValue":"Canada"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462815"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2019-05-04T05:12:37.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"6993 Diane Alley","addressLine2":"8945 Trevor Greens","addressCity":"Kendrafurt","addressPostalCode":"50790","addressCountry":"United States","addressStartDate":"2017-10-08"}}}]}} +{"eventHeader":{"eventIdentifier":"b96bc28e-e949-4f08-ae7d-e0787d2b02fc","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391066"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391066"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"BDL"},"contactStartDateTime":{"operation":"INSERT","newValue":"2017-08-25T03:00:00.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Miss Lisa Lee"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"SSP","taxId":"WFDP61047142851240"}},"contractBranchCode":{"operation":"INSERT","newValue":"CVYEGBJC"},"contractCountry":{"operation":"INSERT","newValue":"United States"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462816"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2019-05-15T10:39:29.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"23450 Timothy Divide","addressLine2":"125 Johnson Mountain Suite 701","addressCity":"Osbornetown","addressPostalCode":"04756","addressCountry":"Canada","addressStartDate":"2018-11-28"}}}]}} +{"eventHeader":{"eventIdentifier":"2d6234a7-959e-4b89-837e-58f45d66d734","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391062"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391062"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"CML"},"contactStartDateTime":{"operation":"INSERT","newValue":"2017-09-24T08:14:17.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Theresa Mays"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"CPR","taxId":"WTUP76582369402245"}},"contractBranchCode":{"operation":"INSERT","newValue":"XCVKGB49"},"contractCountry":{"operation":"INSERT","newValue":"Canada"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462812"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2018-08-25T15:50:29.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"039 Daniel Mount","addressLine2":"8219 Hernandez Lodge Suite 875","addressCity":"Boltonborough","addressPostalCode":"71648","addressCountry":"Mexico","addressStartDate":"2018-12-07"}}}]}} +{"eventHeader":{"eventIdentifier":"f752ac70-9992-44a5-b5af-579086578c2c","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391068"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391068"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"BDL"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-05-11T15:24:57.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Amanda Martin"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"EIN","taxId":"JDPX30428146546118"}},"contractBranchCode":{"operation":"INSERT","newValue":"ACXMGBA5"},"contractCountry":{"operation":"INSERT","newValue":"Mexico"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462818"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2017-11-27T01:20:12.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"7537 Clarke Club","addressLine2":"74089 Jerry Trail","addressCity":"Hunterville","addressPostalCode":"19596","addressCountry":"United States","addressStartDate":"2018-07-17"}}}]}} diff --git a/SBDL - Starter/test_pytest_sbdl.py b/SBDL - Starter/test_pytest_sbdl.py new file mode 100644 index 0000000..2bc81df --- /dev/null +++ b/SBDL - Starter/test_pytest_sbdl.py @@ -0,0 +1,15 @@ +import pytest + +from lib.Utils import get_spark_session + + +@pytest.fixture(scope='session') +def spark(): + return get_spark_session("LOCAL") + + +def test_blank_test(spark): + print(spark.version) + assert spark.version == "3.3.0" + + diff --git a/SBDL/.gitignore b/SBDL/.gitignore new file mode 100644 index 0000000..0435b69 --- /dev/null +++ b/SBDL/.gitignore @@ -0,0 +1,5 @@ +# Default ignored files +/shelf/ +/workspace.xml +.idea/* +lib/__pycache__/* diff --git a/SBDL/Jenkinsfile b/SBDL/Jenkinsfile new file mode 100644 index 0000000..e6e1300 --- /dev/null +++ b/SBDL/Jenkinsfile @@ -0,0 +1,40 @@ +pipeline { + agent any + + stages { + stage('Build') { + steps { + sh 'pipenv --python python3 sync' + } + } + stage('Test') { + steps { + sh 'pipenv run pytest' + } + } + stage('Package') { + when{ + anyOf{ branch "master" ; branch 'release' } + } + steps { + sh 'zip -r sbdl.zip lib' + } + } + stage('Release') { + when{ + branch 'release' + } + steps { + sh "scp -i /home/prashant/cred/edge-node_key.pem -o 'StrictHostKeyChecking no' -r sbdl.zip log4j.properties sbdl_main.py sbdl_submit.sh conf prashant@40.117.123.105:/home/prashant/sbdl-qa" + } + } + stage('Deploy') { + when{ + branch 'master' + } + steps { + sh "scp -i /home/prashant/cred/edge-node_key.pem -o 'StrictHostKeyChecking no' -r sbdl.zip log4j.properties sbdl_main.py sbdl_submit.sh conf prashant@40.117.123.105:/home/prashant/sbdl-prod" + } + } + } +} diff --git a/SBDL/Pipfile b/SBDL/Pipfile new file mode 100644 index 0000000..e90b12c --- /dev/null +++ b/SBDL/Pipfile @@ -0,0 +1,17 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +pyspark = "*" +runtime = "*" +pytest = "*" +chispa = "*" +nutter = "*" +faker = "*" + +[dev-packages] + +[requires] +python_version = "3.10" diff --git a/SBDL/Pipfile.lock b/SBDL/Pipfile.lock new file mode 100644 index 0000000..0de6075 --- /dev/null +++ b/SBDL/Pipfile.lock @@ -0,0 +1,259 @@ +{ + "_meta": { + "hash": { + "sha256": "7c7828685b3eae15aed441d755d78d9fe5f6ccc8dacdd3269ed8ca50be0338b1" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3.10" + }, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.org/simple", + "verify_ssl": true + } + ] + }, + "default": { + "attrs": { + "hashes": [ + "sha256:29adc2665447e5191d0e7c568fde78b21f9672d344281d0c6e1ab085429b22b6", + "sha256:86efa402f67bf2df34f51a335487cf46b1ec130d02b8d39fd248abfd30da551c" + ], + "markers": "python_version >= '3.5'", + "version": "==22.1.0" + }, + "certifi": { + "hashes": [ + "sha256:84c85a9078b11105f04f3036a9482ae10e4621616db313fe045dd24743a0820d", + "sha256:fe86415d55e84719d75f8b69414f6438ac3547d2078ab91b67e779ef69378412" + ], + "markers": "python_version >= '3.6'", + "version": "==2022.6.15" + }, + "charset-normalizer": { + "hashes": [ + "sha256:5a3d016c7c547f69d6f81fb0db9449ce888b418b5b9952cc5e6e66843e9dd845", + "sha256:83e9a75d1911279afd89352c68b45348559d1fc0506b054b346651b5e7fee29f" + ], + "markers": "python_version >= '3.6'", + "version": "==2.1.1" + }, + "chispa": { + "hashes": [ + "sha256:621ad2e64fd27e7372c7b90ab2d5ad1f8dd69b737a3421ba5b6f84b113a18b84", + "sha256:c6eae922f5c3ccd08f4dc3707202291bb249e68e319d0641795d92d80cfb1cad" + ], + "index": "pypi", + "version": "==0.9.2" + }, + "click": { + "hashes": [ + "sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e", + "sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48" + ], + "markers": "python_version >= '3.7'", + "version": "==8.1.3" + }, + "colorama": { + "hashes": [ + "sha256:854bf444933e37f5824ae7bfc1e98d5bce2ebe4160d46b5edf346a89358e99da", + "sha256:e6c6b4334fc50988a639d9b98aa429a0b57da6e17b9a44f0451f930b6967b7a4" + ], + "markers": "sys_platform == 'win32'", + "version": "==0.4.5" + }, + "databricks-api": { + "hashes": [ + "sha256:49cbf99a4faef50b57ec79e74820bf9ff1f690090359b78eb82889f9962714e6", + "sha256:d689b9453f4772277f7df179a9e0abd0d5389590eb790cdfa14d714fd1541a9e" + ], + "markers": "python_version >= '3.6' and python_version < '4'", + "version": "==0.8.0" + }, + "databricks-cli": { + "hashes": [ + "sha256:2f00f3e70e859809f0595885ec76fc73ba60ad0cccd69564f7df5d95b6c90066", + "sha256:f090c2e4f99c39d69a7f7228e6c7df8cb1cebd5fddad6292e0625daf29d4be01" + ], + "version": "==0.17.3" + }, + "faker": { + "hashes": [ + "sha256:6db56e2c43a2b74250d1c332ef25fef7dc07dcb6c5fab5329dd7b4467b8ed7b9", + "sha256:e02c55a5b0586caaf913cc6c254b3de178e08b031c5922e590fd033ebbdbfd02" + ], + "index": "pypi", + "version": "==14.2.0" + }, + "fire": { + "hashes": [ + "sha256:c5e2b8763699d1142393a46d0e3e790c5eb2f0706082df8f647878842c216a62" + ], + "version": "==0.4.0" + }, + "idna": { + "hashes": [ + "sha256:84d9dd047ffa80596e0f246e2eab0b391788b0503584e8945f2368256d2735ff", + "sha256:9d643ff0a55b762d5cdb124b8eaa99c66322e2157b69160bc32796e824360e6d" + ], + "markers": "python_version >= '3.5'", + "version": "==3.3" + }, + "iniconfig": { + "hashes": [ + "sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3", + "sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32" + ], + "version": "==1.1.1" + }, + "junit-xml": { + "hashes": [ + "sha256:ec5ca1a55aefdd76d28fcc0b135251d156c7106fa979686a4b48d62b761b4732" + ], + "version": "==1.9" + }, + "nutter": { + "hashes": [ + "sha256:20cf0b020ff536c0d760794c8dc6b8cb5dabf30591654fb1c403208a27446540", + "sha256:ec95132b17282d340ca0e7bfbd72fda16dc7888bc2913f71d459c9e449e53ee8" + ], + "index": "pypi", + "version": "==0.1.34" + }, + "oauthlib": { + "hashes": [ + "sha256:23a8208d75b902797ea29fd31fa80a15ed9dc2c6c16fe73f5d346f83f6fa27a2", + "sha256:6db33440354787f9b7f3a6dbd4febf5d0f93758354060e802f6c06cb493022fe" + ], + "markers": "python_version >= '3.6'", + "version": "==3.2.0" + }, + "packaging": { + "hashes": [ + "sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb", + "sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522" + ], + "markers": "python_version >= '3.6'", + "version": "==21.3" + }, + "pluggy": { + "hashes": [ + "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159", + "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3" + ], + "markers": "python_version >= '3.6'", + "version": "==1.0.0" + }, + "py": { + "hashes": [ + "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719", + "sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", + "version": "==1.11.0" + }, + "py4j": { + "hashes": [ + "sha256:276a4a3c5a2154df1860ef3303a927460e02e97b047dc0a47c1c3fb8cce34db6", + "sha256:52d171a6a2b031d8a5d1de6efe451cf4f5baff1a2819aabc3741c8406539ba04" + ], + "version": "==0.10.9.5" + }, + "pyjwt": { + "hashes": [ + "sha256:72d1d253f32dbd4f5c88eaf1fdc62f3a19f676ccbadb9dbc5d07e951b2b26daf", + "sha256:d42908208c699b3b973cbeb01a969ba6a96c821eefb1c5bfe4c390c01d67abba" + ], + "markers": "python_version >= '3.6'", + "version": "==2.4.0" + }, + "pyparsing": { + "hashes": [ + "sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb", + "sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc" + ], + "markers": "python_full_version >= '3.6.8'", + "version": "==3.0.9" + }, + "pyspark": { + "hashes": [ + "sha256:7ebe8e9505647b4d124d5a82fca60dfd3891021cf8ad6c5ec88777eeece92cf7" + ], + "index": "pypi", + "version": "==3.3.0" + }, + "pytest": { + "hashes": [ + "sha256:1377bda3466d70b55e3f5cecfa55bb7cfcf219c7964629b967c37cf0bda818b7", + "sha256:4f365fec2dff9c1162f834d9f18af1ba13062db0c708bf7b946f8a5c76180c39" + ], + "index": "pypi", + "version": "==7.1.3" + }, + "python-dateutil": { + "hashes": [ + "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86", + "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2'", + "version": "==2.8.2" + }, + "requests": { + "hashes": [ + "sha256:7c5599b102feddaa661c826c56ab4fee28bfd17f5abca1ebbe3e7f19d7c97983", + "sha256:8fefa2a1a1365bf5520aac41836fbee479da67864514bdb821f31ce07ce65349" + ], + "markers": "python_version >= '3.7' and python_version < '4'", + "version": "==2.28.1" + }, + "runtime": { + "hashes": [ + "sha256:5f604b41baba272385840270d0ad48493839b41e8ab880eaa1c6aa687f1686e8" + ], + "index": "pypi", + "version": "==0.1.4" + }, + "six": { + "hashes": [ + "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926", + "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2'", + "version": "==1.16.0" + }, + "tabulate": { + "hashes": [ + "sha256:0ba055423dbaa164b9e456abe7920c5e8ed33fcc16f6d1b2f2d152c8e1e8b4fc", + "sha256:436f1c768b424654fce8597290d2764def1eea6a77cfa5c33be00b1bc0f4f63d", + "sha256:6c57f3f3dd7ac2782770155f3adb2db0b1a269637e42f27599925e64b114f519" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", + "version": "==0.8.10" + }, + "termcolor": { + "hashes": [ + "sha256:1d6d69ce66211143803fbc56652b41d73b4a400a2891d7bf7a1cdf4c02de613b" + ], + "version": "==1.1.0" + }, + "tomli": { + "hashes": [ + "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc", + "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f" + ], + "markers": "python_version >= '3.7'", + "version": "==2.0.1" + }, + "urllib3": { + "hashes": [ + "sha256:3fa96cf423e6987997fc326ae8df396db2a8b7c667747d47ddd8ecba91f4a74e", + "sha256:b930dd878d5a8afb066a637fbb35144fe7901e3b209d1cd4f524bd0e9deee997" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' and python_version < '4'", + "version": "==1.26.12" + } + }, + "develop": {} +} diff --git a/SBDL/conf/sbdl.conf b/SBDL/conf/sbdl.conf new file mode 100644 index 0000000..bf58b53 --- /dev/null +++ b/SBDL/conf/sbdl.conf @@ -0,0 +1,42 @@ +[LOCAL] +enable.hive = false +hive.database = null +account.filter = active_ind = 1 +party.filter = +address.filter = +kafka.topic = sbdl_kafka_cloud +kafka.bootstrap.servers = pkc-4j8dq.southeastasia.azure.confluent.cloud:9092 +kafka.security.protocol = SASL_SSL +kafka.sasl.mechanism = PLAIN +kafka.client.dns.lookup = use_all_dns_ips +kafka.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}'; +kafka.api_key = S5R6XQ67ENKF7JUM +kafka.api_secret = gs37/I3xWlpjxe8ZrG60q0dIZQKYmmUAcQ0HPdi9ntuhs365mW3opAVWUZqwi5Bi +[QA] +enable.hive = true +hive.database = sbdl_db_qa +account.filter = active_ind = 1 +party.filter = +address.filter = +kafka.topic = sbdl_kafka_cloud +kafka.bootstrap.servers = pkc-56d1g.eastus.azure.confluent.cloud:9092 +kafka.security.protocol = SASL_SSL +kafka.sasl.mechanism = PLAIN +kafka.client.dns.lookup = use_all_dns_ips +kafka.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}'; +kafka.api_key = FTQVQAJIX55A4ZO4 +kafka.api_secret = LOC/IGgsAdapaPLi7KG2xR3TW6QfowEigLqZkIP7hX4v0mcbCXO3uZVA09RxTcAQ +[PROD] +enable.hive = true +hive.database = sbdl_db +account.filter = active_ind = 1 +party.filter = +address.filter = +kafka.topic = sbdl_kafka_cloud +kafka.bootstrap.servers = pkc-56d1g.eastus.azure.confluent.cloud:9092 +kafka.security.protocol = SASL_SSL +kafka.sasl.mechanism = PLAIN +kafka.client.dns.lookup = use_all_dns_ips +kafka.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}'; +kafka.api_key = FTQVQAJIX55A4ZO4 +kafka.api_secret = LOC/IGgsAdapaPLi7KG2xR3TW6QfowEigLqZkIP7hX4v0mcbCXO3uZVA09RxTcAQ \ No newline at end of file diff --git a/SBDL/conf/spark.conf b/SBDL/conf/spark.conf new file mode 100644 index 0000000..7c890be --- /dev/null +++ b/SBDL/conf/spark.conf @@ -0,0 +1,19 @@ +[LOCAL] +spark.app.name = sbdl-local +spark.jars.packages = org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 +[QA] +spark.app.name = sbdl-qa +spark.jars.packages = org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 +spark.executor.cores = 5 +spark.executor.memory = 10GB +spark.executor.memoryOverhead = 1GB +spark.executor.instances = 20 +spark.sql.shuffle.partitions = 800 +[PROD] +spark.app.name = sbdl +spark.jars.packages = org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 +spark.executor.cores = 5 +spark.executor.memory = 10GB +spark.executor.memoryOverhead = 1GB +spark.executor.instances = 20 +spark.sql.shuffle.partitions = 800 \ No newline at end of file diff --git a/SBDL/lib/ConfigLoader.py b/SBDL/lib/ConfigLoader.py new file mode 100644 index 0000000..607cb57 --- /dev/null +++ b/SBDL/lib/ConfigLoader.py @@ -0,0 +1,26 @@ +import configparser +from pyspark import SparkConf + + +def get_config(env): + config = configparser.ConfigParser() + config.read("conf/sbdl.conf") + conf = {} + for (key, val) in config.items(env): + conf[key] = val + return conf + + +def get_spark_conf(env): + spark_conf = SparkConf() + config = configparser.ConfigParser() + config.read("conf/spark.conf") + + for (key, val) in config.items(env): + spark_conf.set(key, val) + return spark_conf + + +def get_data_filter(env, data_filter): + conf = get_config(env) + return "true" if conf[data_filter] == "" else conf[data_filter] diff --git a/SBDL/lib/DataGen.py b/SBDL/lib/DataGen.py new file mode 100644 index 0000000..04015b0 --- /dev/null +++ b/SBDL/lib/DataGen.py @@ -0,0 +1,65 @@ +from faker import Faker +from random import choice, choices + +account_id_offset = 6982391059 +part_id_offset = 9823462809 +fake = Faker() +Faker.seed(0) +source_sys = ["COR", "COH", "BDL", "ADS", "CML"] +tax_id_type = ["EIN", "SSP", "CPR"] +country = ["United States", "Canada", "Mexico"] + + +def gen_accounts_df(spark, load_date, num_records): + branch = [(fake.swift8(), choice(country)) for r in range(1, 20)] + data_list = [(load_date, choices([0,1],cum_weights=[10, 90], k=1 )[0], + account_id_offset + i, choice(source_sys), + fake.date_time_between(start_date='-5y', end_date='-3y'), + choice([fake.company(), fake.name()]), choice([fake.name(), None]), + choice(tax_id_type), fake.bban()) + (choice(branch)) + for i in range(1, num_records)] + + return spark.createDataFrame(data_list) \ + .toDF("load_date", "active_ind", "account_id", "source_sys", "account_start_date", + "legal_title_1", "legal_title_2", "tax_id_type", "tax_id", "branch_code", "country") + + +def gen_account_party(spark, load_date, num_records): + data_list_f = [(load_date, account_id_offset + i, part_id_offset + i, + "F-N", fake.date_time_between(start_date='-5y', end_date='-3y')) + for i in range(1, num_records)] + + data_list_s = [(load_date, account_id_offset + fake.pyint(1, num_records), + part_id_offset + num_records + i, "F-S", + fake.date_time_between(start_date='-5y', end_date='-3y')) + for i in range(1, int(num_records / 3))] + + return spark.createDataFrame(data_list_f + data_list_s) \ + .toDF("load_date", "account_id", + "party_id", "relation_type", "relation_start_date") + + +def gen_party_address(spark, load_date, num_records): + data_list_f = [(load_date, part_id_offset + i, fake.building_number() + " " + fake.street_name(), + fake.street_address(), fake.city(), fake.postcode(), choice(country), + fake.date_between(start_date='-5y', end_date='-3y')) + for i in range(1, num_records)] + + return spark.createDataFrame(data_list_f) \ + .toDF("load_date", "party_id", "address_line_1", + "address_line_2", "city", "postal_code", "country_of_address", + "address_start_date") + + +def create_data_files(spark, load_date, num_records): + accounts_df = gen_accounts_df(spark, load_date, num_records) + accounts_df.coalesce(1) \ + .write.format("csv").option("header", "true").mode("overwrite").save("test_data/accounts") + + party_df = gen_account_party(spark, load_date, num_records) + party_df.coalesce(1) \ + .write.format("csv").option("header", "true").mode("overwrite").save("test_data/parties") + + address_df = gen_party_address(spark, load_date, num_records) + address_df.coalesce(1) \ + .write.format("csv").option("header", "true").mode("overwrite").save("test_data/party_address") diff --git a/SBDL/lib/DataLoader.py b/SBDL/lib/DataLoader.py new file mode 100644 index 0000000..d77619e --- /dev/null +++ b/SBDL/lib/DataLoader.py @@ -0,0 +1,61 @@ +from lib import ConfigLoader + + +def get_account_schema(): + schema = """load_date date,active_ind int,account_id string, + source_sys string,account_start_date timestamp, + legal_title_1 string,legal_title_2 string, + tax_id_type string,tax_id string,branch_code string,country string""" + return schema + + +def get_party_schema(): + schema = """load_date date,account_id string,party_id string, + relation_type string,relation_start_date timestamp""" + return schema + + +def get_address_schema(): + schema = """load_date date,party_id string,address_line_1 string, + address_line_2 string,city string,postal_code string, + country_of_address string,address_start_date date""" + return schema + + +def read_accounts(spark, env, enable_hive, hive_db): + runtime_filter = ConfigLoader.get_data_filter(env, "account.filter") + if enable_hive: + return spark.sql("select * from " + hive_db + ".accounts").where(runtime_filter) + else: + return spark.read \ + .format("csv") \ + .option("header", "true") \ + .schema(get_account_schema()) \ + .load("test_data/accounts/") \ + .where(runtime_filter) + + +def read_parties(spark, env, enable_hive, hive_db): + runtime_filter = ConfigLoader.get_data_filter(env, "party.filter") + if enable_hive: + return spark.sql("select * from " + hive_db + ".parties").where(runtime_filter) + else: + return spark.read \ + .format("csv") \ + .option("header", "true") \ + .schema(get_party_schema()) \ + .load("test_data/parties/") \ + .where(runtime_filter) + + +def read_address(spark, env, enable_hive, hive_db): + runtime_filter = ConfigLoader.get_data_filter(env, "address.filter") + if enable_hive: + return spark.sql("select * from " + hive_db + ".party_address").where(runtime_filter) + else: + return spark.read \ + .format("csv") \ + .option("header", "true") \ + .schema(get_address_schema()) \ + .load("test_data/party_address/") \ + .where(runtime_filter) diff --git a/SBDL/lib/Transformations.py b/SBDL/lib/Transformations.py new file mode 100644 index 0000000..5f55e34 --- /dev/null +++ b/SBDL/lib/Transformations.py @@ -0,0 +1,94 @@ +from pyspark.sql.functions import struct, lit, col, array, when, isnull, filter, current_timestamp, date_format, expr, \ + collect_list + + +def get_insert_operation(column, alias): + return struct(lit("INSERT").alias("operation"), + column.alias("newValue"), + lit(None).alias("oldValue")).alias(alias) + + +def get_contract(df): + contract_title = array(when(~isnull("legal_title_1"), + struct(lit("lgl_ttl_ln_1").alias("contractTitleLineType"), + col("legal_title_1").alias("contractTitleLine")).alias("contractTitle")), + when(~isnull("legal_title_2"), + struct(lit("lgl_ttl_ln_2").alias("contractTitleLineType"), + col("legal_title_2").alias("contractTitleLine")).alias("contractTitle")) + ) + + contract_title_nl = filter(contract_title, lambda x: ~isnull(x)) + + tax_identifier = struct(col("tax_id_type").alias("taxIdType"), + col("tax_id").alias("taxId")).alias("taxIdentifier") + + return df.select("account_id", get_insert_operation(col("account_id"), "contractIdentifier"), + get_insert_operation(col("source_sys"), "sourceSystemIdentifier"), + get_insert_operation(col("account_start_date"), "contactStartDateTime"), + get_insert_operation(contract_title_nl, "contractTitle"), + get_insert_operation(tax_identifier, "taxIdentifier"), + get_insert_operation(col("branch_code"), "contractBranchCode"), + get_insert_operation(col("country"), "contractCountry"), + ) + + +def get_relations(df): + return df.select("account_id", "party_id", + get_insert_operation(col("party_id"), "partyIdentifier"), + get_insert_operation(col("relation_type"), "partyRelationshipType"), + get_insert_operation(col("relation_start_date"), "partyRelationStartDateTime") + ) + + +def get_address(df): + address = struct(col("address_line_1").alias("addressLine1"), + col("address_line_2").alias("addressLine2"), + col("city").alias("addressCity"), + col("postal_code").alias("addressPostalCode"), + col("country_of_address").alias("addressCountry"), + col("address_start_date").alias("addressStartDate") + ) + + return df.select("party_id", get_insert_operation(address, "partyAddress")) + + +def join_party_address(p_df, a_df): + return p_df.join(a_df, "party_id", "left_outer") \ + .groupBy("account_id") \ + .agg(collect_list(struct("partyIdentifier", + "partyRelationshipType", + "partyRelationStartDateTime", + "partyAddress" + ).alias("partyDetails") + ).alias("partyRelations")) + + +def join_contract_party(c_df, p_df): + return c_df.join(p_df, "account_id", "left_outer") + + +def apply_header(spark, df): + header_info = [("SBDL-Contract", 1, 0), ] + header_df = spark.createDataFrame(header_info) \ + .toDF("eventType", "majorSchemaVersion", "minorSchemaVersion") + + event_df = header_df.hint("broadcast").crossJoin(df) \ + .select(struct(expr("uuid()").alias("eventIdentifier"), + col("eventType"), col("majorSchemaVersion"), col("minorSchemaVersion"), + lit(date_format(current_timestamp(), "yyyy-MM-dd'T'HH:mm:ssZ")).alias("eventDateTime") + ).alias("eventHeader"), + array(struct(lit("contractIdentifier").alias("keyField"), + col("account_id").alias("keyValue") + )).alias("keys"), + struct(col("contractIdentifier"), + col("sourceSystemIdentifier"), + col("contactStartDateTime"), + col("contractTitle"), + col("taxIdentifier"), + col("contractBranchCode"), + col("contractCountry"), + col("partyRelations") + ).alias("payload") + ) + + return event_df diff --git a/SBDL/lib/Utils.py b/SBDL/lib/Utils.py new file mode 100644 index 0000000..32713c1 --- /dev/null +++ b/SBDL/lib/Utils.py @@ -0,0 +1,20 @@ +from pyspark.sql import SparkSession +from lib.ConfigLoader import get_spark_conf + + +def get_spark_session(env): + if env == "LOCAL": + return SparkSession.builder \ + .config(conf=get_spark_conf(env)) \ + .config('spark.sql.autoBroadcastJoinThreshold',-1) \ + .config('spark.sql.adaptive.enabled','false') \ + .config('spark.driver.extraJavaOptions', + '-Dlog4j.configuration=file:log4j.properties') \ + .master("local[2]") \ + .enableHiveSupport() \ + .getOrCreate() + else: + return SparkSession.builder \ + .config(conf=get_spark_conf(env)) \ + .enableHiveSupport() \ + .getOrCreate() diff --git a/SBDL/lib/__init__.py b/SBDL/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/SBDL/lib/logger.py b/SBDL/lib/logger.py new file mode 100644 index 0000000..8db6244 --- /dev/null +++ b/SBDL/lib/logger.py @@ -0,0 +1,18 @@ +class Log4j(object): + def __init__(self, spark): + log4j = spark._jvm.org.apache.log4j + self.logger = log4j.LogManager.getLogger("sbdl") + + def warn(self, message): + self.logger.warn(message) + + def info(self, message): + self.logger.info(message) + + def error(self, message): + self.logger.error(message) + + def debug(self, message): + self.logger.debug(message) + + diff --git a/SBDL/log4j.properties b/SBDL/log4j.properties new file mode 100644 index 0000000..dc334bd --- /dev/null +++ b/SBDL/log4j.properties @@ -0,0 +1,27 @@ +# Set everything to be logged to the console +log4j.rootCategory=WARN, console + +# define console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +#application log +log4j.logger.sbdl=INFO, console +log4j.additivity.sbdl=false + +#define following in Java System +# -Dlog4j.configuration=file:log4j.properties + +# Recommendations from Spark template +log4j.logger.org.apache.spark.repl.Main=WARN +log4j.logger.org.spark_project.jetty=WARN +log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO +log4j.logger.org.apache.parquet=ERROR +log4j.logger.parquet=ERROR +log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL +log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR + diff --git a/SBDL/main.py b/SBDL/main.py new file mode 100644 index 0000000..2af51b5 --- /dev/null +++ b/SBDL/main.py @@ -0,0 +1,69 @@ +import sys +import uuid + +from pyspark.sql.functions import struct, col, to_json + +from lib import ConfigLoader, Utils, DataLoader, Transformations +from lib.logger import Log4j + +if __name__ == '__main__': + + if len(sys.argv) < 3: + print("Usage: sbdl {local, qa, prod} {load_date} : Arguments are missing") + sys.exit(-1) + + job_run_env = sys.argv[1].upper() + load_date = sys.argv[2] + job_run_id = "SBDL-" + str(uuid.uuid4()) + + print("Initializing SBDL Job in " + job_run_env + " Job ID: " + job_run_id) + conf = ConfigLoader.get_config(job_run_env) + enable_hive = True if conf["enable.hive"] == "true" else False + hive_db = conf["hive.database"] + + print("Creating Spark Session") + spark = Utils.get_spark_session(job_run_env) + + logger = Log4j(spark) + + logger.info("Reading SBDL Account DF") + accounts_df = DataLoader.read_accounts(spark, job_run_env, enable_hive, hive_db) + contract_df = Transformations.get_contract(accounts_df) + + logger.info("Reading SBDL Party DF") + parties_df = DataLoader.read_parties(spark, job_run_env, enable_hive, hive_db) + relations_df = Transformations.get_relations(parties_df) + + logger.info("Reading SBDL Address DF") + address_df = DataLoader.read_address(spark, job_run_env, enable_hive, hive_db) + relation_address_df = Transformations.get_address(address_df) + + logger.info("Join Party Relations and Address") + party_address_df = Transformations.join_party_address(relations_df, relation_address_df) + + logger.info("Join Account and Parties") + data_df = Transformations.join_contract_party(contract_df, party_address_df) + + logger.info("Apply Header and create Event") + final_df = Transformations.apply_header(spark, data_df) + logger.info("Preparing to send data to Kafka") + kafka_kv_df = final_df.select(col("payload.contractIdentifier.newValue").alias("key"), + to_json(struct("*")).alias("value")) + input("Press Any Key") + # kafka_kv_df.write.format("noop").mode("overwrite").save("test_data\noop") + + # Keep it in vault or other secure place, authorize application to extract it from there + api_key = conf["kafka.api_key"] + api_secret = conf["kafka.api_secret"] + + kafka_kv_df.write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", conf["kafka.bootstrap.servers"]) \ + .option("topic", conf["kafka.topic"]) \ + .option("kafka.security.protocol", conf["kafka.security.protocol"]) \ + .option("kafka.sasl.jaas.config", conf["kafka.sasl.jaas.config"].format(api_key, api_secret)) \ + .option("kafka.sasl.mechanism", conf["kafka.sasl.mechanism"]) \ + .option("kafka.client.dns.lookup", conf["kafka.client.dns.lookup"]) \ + .save() + + logger.info("Finished sending data to Kafka") diff --git a/SBDL/sbdl_submit.sh b/SBDL/sbdl_submit.sh new file mode 100644 index 0000000..fad42ed --- /dev/null +++ b/SBDL/sbdl_submit.sh @@ -0,0 +1,7 @@ +spark-submit --master yarn --deploy-mode cluster \ +--py-files sdbl_lib.zip \ +--files conf/sdbl.conf,conf/spark.conf,log4j.properties \ +--driver-cores 2 \ +--driver-memory 3G \ +--conf spark.driver.memoryOverhead=1G +sbdl_main.py qa 2022-08-03 \ No newline at end of file diff --git a/SBDL/test_data/accounts/account_samples.csv b/SBDL/test_data/accounts/account_samples.csv new file mode 100644 index 0000000..7498c11 --- /dev/null +++ b/SBDL/test_data/accounts/account_samples.csv @@ -0,0 +1,10 @@ +load_date,active_ind,account_id,source_sys,account_start_date,legal_title_1,legal_title_2,tax_id_type,tax_id,branch_code,country +2022-08-02,1,6982391060,COH,2018-03-24T13:56:45.000+05:30,Tiffany Riley,Matthew Davies,EIN,ZLCK91795330413525,ACXMGBA5,Mexico +2022-08-02,1,6982391061,ADS,2018-07-19T11:24:49.000+05:30,Garcia and Sons,Taylor Guzman,SSP,CADU39916151090321,SHJFGBML,United States +2022-08-02,1,6982391067,BDL,2018-08-29T17:18:59.000+05:30,Acosta Inc,David Walker,SSP,UJLN20870916345792,WZTEGBTG,Canada +2022-08-02,0,6982391063,ADS,2019-02-15T09:46:10.000+05:30,Christopher Moreno,,CPR,APVN98456428071508,KRGZGB9S,Canada +2022-08-02,1,6982391064,ADS,2018-03-28T15:47:43.000+05:30,Allen Group,,CPR,WJMX61093523376960,OCCKGB65,Canada +2022-08-02,1,6982391065,COR,2018-06-30T22:33:19.000+05:30,Austin Miles,,EIN,TYAB75470638120665,XCVKGB49,Canada +2022-08-02,1,6982391066,BDL,2017-08-25T03:00:00.000+05:30,Miss Lisa Lee,,SSP,WFDP61047142851240,CVYEGBJC,United States +2022-08-02,1,6982391062,CML,2017-09-24T08:14:17.000+05:30,Theresa Mays,,CPR,WTUP76582369402245,XCVKGB49,Canada +2022-08-02,1,6982391068,BDL,2018-05-11T15:24:57.000+05:30,Amanda Martin,,EIN,JDPX30428146546118,ACXMGBA5,Mexico diff --git a/SBDL/test_data/output-sample/output-sample.json b/SBDL/test_data/output-sample/output-sample.json new file mode 100644 index 0000000..2a4d302 --- /dev/null +++ b/SBDL/test_data/output-sample/output-sample.json @@ -0,0 +1,112 @@ +{ + "eventHeader": { + "eventIdentifier": "38d57237-a566-4ec4-a471-f659e9ce8437", + "eventType": "SBDL-Contract", + "majorSchemaVersion": 1, + "minorSchemaVersion": 0, + "eventDateTime": "2022-09-06T20:49:03+0530" + }, + "keys": [ + { + "keyField": "contractIdentifier", + "keyValue": "6982391067" + } + ], + "payload": { + "contractIdentifier": { + "operation": "INSERT", + "newValue": "6982391067" + }, + "sourceSystemIdentifier": { + "operation": "INSERT", + "newValue": "BDL" + }, + "contactStartDateTime": { + "operation": "INSERT", + "newValue": "2018-08-29T17:18:59.000+05:30" + }, + "contractTitle": { + "operation": "INSERT", + "newValue": [ + { + "contractTitleLineType": "lgl_ttl_ln_1", + "contractTitleLine": "Acosta Inc" + }, + { + "contractTitleLineType": "lgl_ttl_ln_2", + "contractTitleLine": "David Walker" + } + ] + }, + "taxIdentifier": { + "operation": "INSERT", + "newValue": { + "taxIdType": "SSP", + "taxId": "UJLN20870916345792" + } + }, + "contractBranchCode": { + "operation": "INSERT", + "newValue": "WZTEGBTG" + }, + "contractCountry": { + "operation": "INSERT", + "newValue": "Canada" + }, + "partyRelations": [ + { + "partyIdentifier": { + "operation": "INSERT", + "newValue": "9823462817" + }, + "partyRelationshipType": { + "operation": "INSERT", + "newValue": "F-N" + }, + "partyRelationStartDateTime": { + "operation": "INSERT", + "newValue": "2018-05-16T09:53:04.000+05:30" + }, + "partyAddress": { + "operation": "INSERT", + "newValue": { + "addressLine1": "251 Lee Tunnel", + "addressLine2": "09795 Tara Station Suite 264", + "addressCity": "New Michelleborough", + "addressPostalCode": "05505", + "addressCountry": "United States", + "addressStartDate": "2019-04-20" + } + } + }, + { + "partyIdentifier": { + "operation": "INSERT", + "newValue": "9823462820" + }, + "partyRelationshipType": { + "operation": "INSERT", + "newValue": "F-S" + }, + "partyRelationStartDateTime": { + "operation": "INSERT", + "newValue": "2017-11-20T14:18:05.000+05:30" + } + }, + { + "partyIdentifier": { + "operation": "INSERT", + "newValue": "9823462821" + }, + "partyRelationshipType": { + "operation": "INSERT", + "newValue": "F-S" + }, + "partyRelationStartDateTime": { + "operation": "INSERT", + "newValue": "2018-07-19T18:56:57.000+05:30" + } + } + ] + } +} \ No newline at end of file diff --git a/SBDL/test_data/parties/party_samples.csv b/SBDL/test_data/parties/party_samples.csv new file mode 100644 index 0000000..c92c0a6 --- /dev/null +++ b/SBDL/test_data/parties/party_samples.csv @@ -0,0 +1,12 @@ +load_date,account_id,party_id,relation_type,relation_start_date +2022-08-02,6982391060,9823462810,F-N,2019-07-29T06:21:32.000+05:30 +2022-08-02,6982391061,9823462811,F-N,2018-08-31T05:27:22.000+05:30 +2022-08-02,6982391062,9823462812,F-N,2018-08-25T15:50:29.000+05:30 +2022-08-02,6982391063,9823462813,F-N,2018-05-11T07:23:28.000+05:30 +2022-08-02,6982391064,9823462814,F-N,2019-06-06T14:18:12.000+05:30 +2022-08-02,6982391065,9823462815,F-N,2019-05-04T05:12:37.000+05:30 +2022-08-02,6982391066,9823462816,F-N,2019-05-15T10:39:29.000+05:30 +2022-08-02,6982391067,9823462817,F-N,2018-05-16T09:53:04.000+05:30 +2022-08-02,6982391068,9823462818,F-N,2017-11-27T01:20:12.000+05:30 +2022-08-02,6982391067,9823462820,F-S,2017-11-20T14:18:05.000+05:30 +2022-08-02,6982391067,9823462821,F-S,2018-07-19T18:56:57.000+05:30 diff --git a/SBDL/test_data/party_address/address_samples.csv b/SBDL/test_data/party_address/address_samples.csv new file mode 100644 index 0000000..b5278ac --- /dev/null +++ b/SBDL/test_data/party_address/address_samples.csv @@ -0,0 +1,10 @@ +load_date,party_id,address_line_1,address_line_2,city,postal_code,country_of_address,address_start_date +2022-08-02,9823462810,45229 Drake Route,13306 Corey Point,Shanefort,77163,Canada,2019-02-26 +2022-08-02,9823462811,361 Robinson Green,3511 Rebecca Mission,North Tyler,34118,Canada,2018-01-28 +2022-08-02,9823462812,039 Daniel Mount,8219 Hernandez Lodge Suite 875,Boltonborough,71648,Mexico,2018-12-07 +2022-08-02,9823462813,05550 Nancy Rapids,9471 Zachary Canyon,East Davidport,02504,United States,2019-04-02 +2022-08-02,9823462814,5227 Wagner Pines,189 Julie Throughway,West Amanda,78962,Canada,2018-07-11 +2022-08-02,9823462815,6993 Diane Alley,8945 Trevor Greens,Kendrafurt,50790,United States,2017-10-08 +2022-08-02,9823462816,23450 Timothy Divide,125 Johnson Mountain Suite 701,Osbornetown,04756,Canada,2018-11-28 +2022-08-02,9823462817,251 Lee Tunnel,09795 Tara Station Suite 264,New Michelleborough,05505,United States,2019-04-20 +2022-08-02,9823462818,7537 Clarke Club,74089 Jerry Trail,Hunterville,19596,United States,2018-07-17 diff --git a/SBDL/test_data/results/contract_df.json b/SBDL/test_data/results/contract_df.json new file mode 100644 index 0000000..04dabdd --- /dev/null +++ b/SBDL/test_data/results/contract_df.json @@ -0,0 +1,8 @@ +{"account_id":"6982391060","contractIdentifier":{"operation":"INSERT","newValue":"6982391060"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"COH"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-03-24T13:56:45.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Tiffany Riley"},{"contractTitleLineType":"lgl_ttl_ln_2","contractTitleLine":"Matthew Davies"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"EIN","taxId":"ZLCK91795330413525"}},"contractBranchCode":{"operation":"INSERT","newValue":"ACXMGBA5"},"contractCountry":{"operation":"INSERT","newValue":"Mexico"}} +{"account_id":"6982391061","contractIdentifier":{"operation":"INSERT","newValue":"6982391061"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"ADS"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-07-19T11:24:49.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Garcia and Sons"},{"contractTitleLineType":"lgl_ttl_ln_2","contractTitleLine":"Taylor Guzman"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"SSP","taxId":"CADU39916151090321"}},"contractBranchCode":{"operation":"INSERT","newValue":"SHJFGBML"},"contractCountry":{"operation":"INSERT","newValue":"United States"}} +{"account_id":"6982391067","contractIdentifier":{"operation":"INSERT","newValue":"6982391067"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"BDL"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-08-29T17:18:59.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Acosta Inc"},{"contractTitleLineType":"lgl_ttl_ln_2","contractTitleLine":"David Walker"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"SSP","taxId":"UJLN20870916345792"}},"contractBranchCode":{"operation":"INSERT","newValue":"WZTEGBTG"},"contractCountry":{"operation":"INSERT","newValue":"Canada"}} +{"account_id":"6982391064","contractIdentifier":{"operation":"INSERT","newValue":"6982391064"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"ADS"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-03-28T15:47:43.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Allen Group"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"CPR","taxId":"WJMX61093523376960"}},"contractBranchCode":{"operation":"INSERT","newValue":"OCCKGB65"},"contractCountry":{"operation":"INSERT","newValue":"Canada"}} +{"account_id":"6982391065","contractIdentifier":{"operation":"INSERT","newValue":"6982391065"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"COR"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-06-30T22:33:19.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Austin Miles"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"EIN","taxId":"TYAB75470638120665"}},"contractBranchCode":{"operation":"INSERT","newValue":"XCVKGB49"},"contractCountry":{"operation":"INSERT","newValue":"Canada"}} +{"account_id":"6982391066","contractIdentifier":{"operation":"INSERT","newValue":"6982391066"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"BDL"},"contactStartDateTime":{"operation":"INSERT","newValue":"2017-08-25T03:00:00.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Miss Lisa Lee"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"SSP","taxId":"WFDP61047142851240"}},"contractBranchCode":{"operation":"INSERT","newValue":"CVYEGBJC"},"contractCountry":{"operation":"INSERT","newValue":"United States"}} +{"account_id":"6982391062","contractIdentifier":{"operation":"INSERT","newValue":"6982391062"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"CML"},"contactStartDateTime":{"operation":"INSERT","newValue":"2017-09-24T08:14:17.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Theresa Mays"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"CPR","taxId":"WTUP76582369402245"}},"contractBranchCode":{"operation":"INSERT","newValue":"XCVKGB49"},"contractCountry":{"operation":"INSERT","newValue":"Canada"}} +{"account_id":"6982391068","contractIdentifier":{"operation":"INSERT","newValue":"6982391068"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"BDL"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-05-11T15:24:57.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Amanda Martin"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"EIN","taxId":"JDPX30428146546118"}},"contractBranchCode":{"operation":"INSERT","newValue":"ACXMGBA5"},"contractCountry":{"operation":"INSERT","newValue":"Mexico"}} diff --git a/SBDL/test_data/results/final_df.json b/SBDL/test_data/results/final_df.json new file mode 100644 index 0000000..e87c931 --- /dev/null +++ b/SBDL/test_data/results/final_df.json @@ -0,0 +1,8 @@ +{"eventHeader":{"eventIdentifier":"c361a145-d2fc-434e-a608-9688caa6d22e","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391060"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391060"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"COH"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-03-24T13:56:45.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Tiffany Riley"},{"contractTitleLineType":"lgl_ttl_ln_2","contractTitleLine":"Matthew Davies"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"EIN","taxId":"ZLCK91795330413525"}},"contractBranchCode":{"operation":"INSERT","newValue":"ACXMGBA5"},"contractCountry":{"operation":"INSERT","newValue":"Mexico"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462810"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2019-07-29T06:21:32.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"45229 Drake Route","addressLine2":"13306 Corey Point","addressCity":"Shanefort","addressPostalCode":"77163","addressCountry":"Canada","addressStartDate":"2019-02-26"}}}]}} +{"eventHeader":{"eventIdentifier":"1c1f4df6-8d4f-4c52-9ba6-165babe48f67","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391061"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391061"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"ADS"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-07-19T11:24:49.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Garcia and Sons"},{"contractTitleLineType":"lgl_ttl_ln_2","contractTitleLine":"Taylor Guzman"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"SSP","taxId":"CADU39916151090321"}},"contractBranchCode":{"operation":"INSERT","newValue":"SHJFGBML"},"contractCountry":{"operation":"INSERT","newValue":"United States"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462811"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2018-08-31T05:27:22.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"361 Robinson Green","addressLine2":"3511 Rebecca Mission","addressCity":"North Tyler","addressPostalCode":"34118","addressCountry":"Canada","addressStartDate":"2018-01-28"}}}]}} +{"eventHeader":{"eventIdentifier":"38d57237-a566-4ec4-a471-f659e9ce8437","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391067"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391067"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"BDL"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-08-29T17:18:59.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Acosta Inc"},{"contractTitleLineType":"lgl_ttl_ln_2","contractTitleLine":"David Walker"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"SSP","taxId":"UJLN20870916345792"}},"contractBranchCode":{"operation":"INSERT","newValue":"WZTEGBTG"},"contractCountry":{"operation":"INSERT","newValue":"Canada"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462817"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2018-05-16T09:53:04.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"251 Lee Tunnel","addressLine2":"09795 Tara Station Suite 264","addressCity":"New Michelleborough","addressPostalCode":"05505","addressCountry":"United States","addressStartDate":"2019-04-20"}}},{"partyIdentifier":{"operation":"INSERT","newValue":"9823462820"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-S"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2017-11-20T14:18:05.000+05:30"}},{"partyIdentifier":{"operation":"INSERT","newValue":"9823462821"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-S"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2018-07-19T18:56:57.000+05:30"}}]}} +{"eventHeader":{"eventIdentifier":"38b24610-3bb9-4adc-8f8c-cfc3c24877f2","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391064"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391064"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"ADS"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-03-28T15:47:43.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Allen Group"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"CPR","taxId":"WJMX61093523376960"}},"contractBranchCode":{"operation":"INSERT","newValue":"OCCKGB65"},"contractCountry":{"operation":"INSERT","newValue":"Canada"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462814"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2019-06-06T14:18:12.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"5227 Wagner Pines","addressLine2":"189 Julie Throughway","addressCity":"West Amanda","addressPostalCode":"78962","addressCountry":"Canada","addressStartDate":"2018-07-11"}}}]}} +{"eventHeader":{"eventIdentifier":"40a7cb7f-3996-47db-85ae-13d08d5998ea","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391065"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391065"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"COR"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-06-30T22:33:19.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Austin Miles"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"EIN","taxId":"TYAB75470638120665"}},"contractBranchCode":{"operation":"INSERT","newValue":"XCVKGB49"},"contractCountry":{"operation":"INSERT","newValue":"Canada"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462815"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2019-05-04T05:12:37.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"6993 Diane Alley","addressLine2":"8945 Trevor Greens","addressCity":"Kendrafurt","addressPostalCode":"50790","addressCountry":"United States","addressStartDate":"2017-10-08"}}}]}} +{"eventHeader":{"eventIdentifier":"b96bc28e-e949-4f08-ae7d-e0787d2b02fc","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391066"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391066"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"BDL"},"contactStartDateTime":{"operation":"INSERT","newValue":"2017-08-25T03:00:00.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Miss Lisa Lee"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"SSP","taxId":"WFDP61047142851240"}},"contractBranchCode":{"operation":"INSERT","newValue":"CVYEGBJC"},"contractCountry":{"operation":"INSERT","newValue":"United States"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462816"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2019-05-15T10:39:29.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"23450 Timothy Divide","addressLine2":"125 Johnson Mountain Suite 701","addressCity":"Osbornetown","addressPostalCode":"04756","addressCountry":"Canada","addressStartDate":"2018-11-28"}}}]}} +{"eventHeader":{"eventIdentifier":"2d6234a7-959e-4b89-837e-58f45d66d734","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391062"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391062"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"CML"},"contactStartDateTime":{"operation":"INSERT","newValue":"2017-09-24T08:14:17.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Theresa Mays"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"CPR","taxId":"WTUP76582369402245"}},"contractBranchCode":{"operation":"INSERT","newValue":"XCVKGB49"},"contractCountry":{"operation":"INSERT","newValue":"Canada"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462812"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2018-08-25T15:50:29.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"039 Daniel Mount","addressLine2":"8219 Hernandez Lodge Suite 875","addressCity":"Boltonborough","addressPostalCode":"71648","addressCountry":"Mexico","addressStartDate":"2018-12-07"}}}]}} +{"eventHeader":{"eventIdentifier":"f752ac70-9992-44a5-b5af-579086578c2c","eventType":"SBDL-Contract","majorSchemaVersion":1,"minorSchemaVersion":0,"eventDateTime":"2022-09-06T20:49:03+0530"},"keys":[{"keyField":"contractIdentifier","keyValue":"6982391068"}],"payload":{"contractIdentifier":{"operation":"INSERT","newValue":"6982391068"},"sourceSystemIdentifier":{"operation":"INSERT","newValue":"BDL"},"contactStartDateTime":{"operation":"INSERT","newValue":"2018-05-11T15:24:57.000+05:30"},"contractTitle":{"operation":"INSERT","newValue":[{"contractTitleLineType":"lgl_ttl_ln_1","contractTitleLine":"Amanda Martin"}]},"taxIdentifier":{"operation":"INSERT","newValue":{"taxIdType":"EIN","taxId":"JDPX30428146546118"}},"contractBranchCode":{"operation":"INSERT","newValue":"ACXMGBA5"},"contractCountry":{"operation":"INSERT","newValue":"Mexico"},"partyRelations":[{"partyIdentifier":{"operation":"INSERT","newValue":"9823462818"},"partyRelationshipType":{"operation":"INSERT","newValue":"F-N"},"partyRelationStartDateTime":{"operation":"INSERT","newValue":"2017-11-27T01:20:12.000+05:30"},"partyAddress":{"operation":"INSERT","newValue":{"addressLine1":"7537 Clarke Club","addressLine2":"74089 Jerry Trail","addressCity":"Hunterville","addressPostalCode":"19596","addressCountry":"United States","addressStartDate":"2018-07-17"}}}]}} diff --git a/SBDL/test_nutter_sbdl.py b/SBDL/test_nutter_sbdl.py new file mode 100644 index 0000000..c7db2ea --- /dev/null +++ b/SBDL/test_nutter_sbdl.py @@ -0,0 +1,24 @@ +from pyspark.sql import SparkSession +from runtime.nutterfixture import NutterFixture, tag + +from lib import DataLoader + + +class MyTestFixture(NutterFixture): + def __init__(self): + self.spark = SparkSession.builder \ + .appName("TEST_SBDL") \ + .master("local[2]") \ + .enableHiveSupport() \ + .getOrCreate() + NutterFixture.__init__(self) + + def assertion_read_accounts(self): + accounts_df = DataLoader.read_accounts(self.spark, "LOCAL", False, None) + assert (accounts_df.count() == 8) + + + + +result = MyTestFixture().execute_tests() +print(result.to_string()) diff --git a/SBDL/test_pytest_sbdl.py b/SBDL/test_pytest_sbdl.py new file mode 100644 index 0000000..bf599c4 --- /dev/null +++ b/SBDL/test_pytest_sbdl.py @@ -0,0 +1,227 @@ +import pytest +from chispa import assert_df_equality +from datetime import datetime, date + +from pyspark.sql.types import StructType, StructField, StringType, NullType, TimestampType, ArrayType, DateType, Row + +from lib import DataLoader, Transformations +from lib.ConfigLoader import get_config +from lib.DataLoader import get_party_schema +from lib.Utils import get_spark_session + + +@pytest.fixture(scope='session') +def spark(): + return get_spark_session("LOCAL") + + +@pytest.fixture(scope='session') +def expected_party_rows(): + return [Row(load_date=date(2022, 8, 2), account_id='6982391060', + party_id='9823462810', relation_type='F-N', relation_start_date=datetime(2019, 7, 29, 6, 21, 32)), + Row(load_date=date(2022, 8, 2), account_id='6982391061', party_id='9823462811', relation_type='F-N', + relation_start_date=datetime(2018, 8, 31, 5, 27, 22)), + Row(load_date=date(2022, 8, 2), account_id='6982391062', party_id='9823462812', relation_type='F-N', + relation_start_date=datetime(2018, 8, 25, 15, 50, 29)), + Row(load_date=date(2022, 8, 2), account_id='6982391063', party_id='9823462813', relation_type='F-N', + relation_start_date=datetime(2018, 5, 11, 7, 23, 28)), + Row(load_date=date(2022, 8, 2), account_id='6982391064', party_id='9823462814', relation_type='F-N', + relation_start_date=datetime(2019, 6, 6, 14, 18, 12)), + Row(load_date=date(2022, 8, 2), account_id='6982391065', party_id='9823462815', relation_type='F-N', + relation_start_date=datetime(2019, 5, 4, 5, 12, 37)), + Row(load_date=date(2022, 8, 2), account_id='6982391066', party_id='9823462816', relation_type='F-N', + relation_start_date=datetime(2019, 5, 15, 10, 39, 29)), + Row(load_date=date(2022, 8, 2), account_id='6982391067', party_id='9823462817', relation_type='F-N', + relation_start_date=datetime(2018, 5, 16, 9, 53, 4)), + Row(load_date=date(2022, 8, 2), account_id='6982391068', party_id='9823462818', relation_type='F-N', + relation_start_date=datetime(2017, 11, 27, 1, 20, 12)), + Row(load_date=date(2022, 8, 2), account_id='6982391067', party_id='9823462820', relation_type='F-S', + relation_start_date=datetime(2017, 11, 20, 14, 18, 5)), + Row(load_date=date(2022, 8, 2), account_id='6982391067', party_id='9823462821', relation_type='F-S', + relation_start_date=datetime(2018, 7, 19, 18, 56, 57))] + + +@pytest.fixture(scope='session') +def parties_list(): + return [ + (date(2022, 8, 2), '6982391060', '9823462810', 'F-N', datetime.fromisoformat('2019-07-29 06:21:32.000+05:30')), + (date(2022, 8, 2), '6982391061', '9823462811', 'F-N', datetime.fromisoformat('2018-08-31 05:27:22.000+05:30')), + (date(2022, 8, 2), '6982391062', '9823462812', 'F-N', datetime.fromisoformat('2018-08-25 15:50:29.000+05:30')), + (date(2022, 8, 2), '6982391063', '9823462813', 'F-N', datetime.fromisoformat('2018-05-11 07:23:28.000+05:30')), + (date(2022, 8, 2), '6982391064', '9823462814', 'F-N', datetime.fromisoformat('2019-06-06 14:18:12.000+05:30')), + (date(2022, 8, 2), '6982391065', '9823462815', 'F-N', datetime.fromisoformat('2019-05-04 05:12:37.000+05:30')), + (date(2022, 8, 2), '6982391066', '9823462816', 'F-N', datetime.fromisoformat('2019-05-15 10:39:29.000+05:30')), + (date(2022, 8, 2), '6982391067', '9823462817', 'F-N', datetime.fromisoformat('2018-05-16 09:53:04.000+05:30')), + (date(2022, 8, 2), '6982391068', '9823462818', 'F-N', datetime.fromisoformat('2017-11-27 01:20:12.000+05:30')), + (date(2022, 8, 2), '6982391067', '9823462820', 'F-S', datetime.fromisoformat('2017-11-20 14:18:05.000+05:30')), + (date(2022, 8, 2), '6982391067', '9823462821', 'F-S', datetime.fromisoformat('2018-07-19 18:56:57.000+05:30'))] + + +@pytest.fixture(scope='session') +def expected_contract_df(spark): + schema = StructType([StructField('account_id', StringType()), + StructField('contractIdentifier', + StructType([StructField('operation', StringType()), + StructField('newValue', StringType()), + StructField('oldValue', NullType())])), + StructField('sourceSystemIdentifier', + StructType([StructField('operation', StringType()), + StructField('newValue', StringType()), + StructField('oldValue', NullType())])), + StructField('contactStartDateTime', + StructType([StructField('operation', StringType()), + StructField('newValue', TimestampType()), + StructField('oldValue', NullType())])), + StructField('contractTitle', + StructType([StructField('operation', StringType()), + StructField('newValue', + ArrayType(StructType( + [StructField('contractTitleLineType', StringType()), + StructField('contractTitleLine', StringType())]))), + StructField('oldValue', NullType())])), + StructField('taxIdentifier', + StructType([StructField('operation', StringType()), + StructField('newValue', + StructType([StructField('taxIdType', StringType()), + StructField('taxId', StringType())])), + StructField('oldValue', NullType())])), + StructField('contractBranchCode', + StructType([StructField('operation', StringType()), + StructField('newValue', StringType()), + StructField('oldValue', NullType())])), + StructField('contractCountry', + StructType([StructField('operation', StringType()), + StructField('newValue', StringType()), + StructField('oldValue', NullType())]))]) + + return spark.read.format("json").schema(schema).load("test_data/results/contract_df.json") + + +@pytest.fixture(scope='session') +def expected_final_df(spark): + schema = StructType( + [StructField('keys', + ArrayType(StructType([StructField('keyField', StringType()), + StructField('keyValue', StringType())]))), + StructField('payload', + StructType([ + StructField('contractIdentifier', + StructType([StructField('operation', StringType()), + StructField('newValue', StringType()), + StructField('oldValue', NullType())])), + StructField('sourceSystemIdentifier', + StructType([StructField('operation', StringType()), + StructField('newValue', StringType()), + StructField('oldValue', NullType())])), + StructField('contactStartDateTime', + StructType([StructField('operation', StringType()), + StructField('newValue', TimestampType()), + StructField('oldValue', NullType())])), + StructField('contractTitle', + StructType([StructField('operation', StringType()), + StructField('newValue', ArrayType( + StructType([StructField('contractTitleLineType', StringType()), + StructField('contractTitleLine', StringType())]))), + StructField('oldValue', NullType())])), + StructField('taxIdentifier', + StructType([StructField('operation', StringType()), + StructField('newValue', + StructType([StructField('taxIdType', StringType()), + StructField('taxId', StringType())])), + StructField('oldValue', NullType())])), + StructField('contractBranchCode', + StructType([StructField('operation', StringType()), + StructField('newValue', StringType()), + StructField('oldValue', NullType())])), + StructField('contractCountry', + StructType([StructField('operation', StringType()), + StructField('newValue', StringType()), + StructField('oldValue', NullType())])), + StructField('partyRelations', + ArrayType(StructType([ + StructField('partyIdentifier', + StructType([ + StructField('operation', StringType()), + StructField('newValue', StringType()), + StructField('oldValue', NullType())])), + StructField('partyRelationshipType', + StructType([ + StructField('operation', StringType()), + StructField('newValue', StringType()), + StructField('oldValue', NullType())])), + StructField('partyRelationStartDateTime', + StructType([ + StructField('operation', StringType()), + StructField('newValue', TimestampType()), + StructField('oldValue', NullType())])), + StructField('partyAddress', + StructType([StructField('operation', StringType()), + StructField( + 'newValue', + StructType( + [StructField('addressLine1', StringType()), + StructField('addressLine2', StringType()), + StructField('addressCity', StringType()), + StructField('addressPostalCode', + StringType()), + StructField('addressCountry', StringType()), + StructField('addressStartDate', DateType()) + ])), + StructField('oldValue', NullType())]))])))]))]) + return spark.read.format("json").schema(schema).load("test_data/results/final_df.json") \ + .select("keys", "payload") + + +def test_blank_test(spark): + print(spark.version) + assert spark.version == "3.3.0" + + +def test_get_config(): + conf_local = get_config("LOCAL") + conf_qa = get_config("QA") + assert conf_local["kafka.topic"] == "sbdl_kafka_cloud" + assert conf_qa["hive.database"] == "sbdl_db_qa" + + +def test_read_accounts(spark): + accounts_df = DataLoader.read_accounts(spark, "LOCAL", False, None) + assert accounts_df.count() == 8 + + +def test_read_parties_row(spark, expected_party_rows): + actual_party_rows = DataLoader.read_parties(spark, "LOCAL", False, None).collect() + assert expected_party_rows == actual_party_rows + + +def test_read_parties(spark, parties_list): + expected_df = spark.createDataFrame(parties_list) + actual_df = DataLoader.read_parties(spark, "LOCAL", False, None) + assert_df_equality(expected_df, actual_df, ignore_schema=True) + + +def test_read_party_schema(spark, parties_list): + expected_df = spark.createDataFrame(parties_list, get_party_schema()) + actual_df = DataLoader.read_parties(spark, "LOCAL", False, None) + assert_df_equality(expected_df, actual_df) + + +def test_get_contract(spark, expected_contract_df): + accounts_df = DataLoader.read_accounts(spark, "LOCAL", False, None) + actual_contract_df = Transformations.get_contract(accounts_df) + assert expected_contract_df.collect() == actual_contract_df.collect() + assert_df_equality(expected_contract_df, actual_contract_df, ignore_schema=True) + + +def test_kafka_kv_df(spark, expected_final_df): + accounts_df = DataLoader.read_accounts(spark, "LOCAL", False, None) + contract_df = Transformations.get_contract(accounts_df) + parties_df = DataLoader.read_parties(spark, "LOCAL", False, None) + relations_df = Transformations.get_relations(parties_df) + address_df = DataLoader.read_address(spark, "LOCAL", False, None) + relation_address_df = Transformations.get_address(address_df) + party_address_df = Transformations.join_party_address(relations_df, relation_address_df) + data_df = Transformations.join_contract_party(contract_df, party_address_df) + actual_final_df = Transformations.apply_header(spark, data_df) \ + .select("keys", "payload") + assert_df_equality(actual_final_df, expected_final_df, ignore_schema=True) From 33d16ad22c3168785f8c682013d73eae21406ed1 Mon Sep 17 00:00:00 2001 From: Prashant Kumar Pandey Date: Fri, 29 Dec 2023 11:11:19 +0530 Subject: [PATCH 2/2] Added Missing Notebooks --- 01-getting-started.ipynb | 1 + 02-spark-dataframe-demo.ipynb | 1 + 03-spark-table-demo.sql | 71 +++++++++++++++++ 04-spark-sql-demo.sql | 135 ++++++++++++++++++++++++++++++++ 05-working-with-dataframe.ipynb | 1 + HelloSpark.py | 17 ++++ 6 files changed, 226 insertions(+) create mode 100644 01-getting-started.ipynb create mode 100644 02-spark-dataframe-demo.ipynb create mode 100644 03-spark-table-demo.sql create mode 100644 04-spark-sql-demo.sql create mode 100644 05-working-with-dataframe.ipynb create mode 100644 HelloSpark.py diff --git a/01-getting-started.ipynb b/01-getting-started.ipynb new file mode 100644 index 0000000..b1d9c5e --- /dev/null +++ b/01-getting-started.ipynb @@ -0,0 +1 @@ +{"cells":[{"cell_type":"code","source":["diamonds_df = spark.read.format(\"csv\") \\\n .option(\"header\", \"true\") \\\n .option(\"inferSchema\", \"true\") \\\n .load(\"/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv\")\n\ndiamonds_df.show(10)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"b1b0d245-342b-472b-9c14-9e7883cf73f4"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["from pyspark.sql.functions import avg\n\nresults_df = diamonds_df.select(\"color\", \"price\") \\\n .groupBy(\"color\") \\\n .agg(avg(\"price\")) \\\n .sort(\"color\")\n\nresults_df.show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"015c9e24-1834-4494-a61e-f842bf41371d"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["display(results_df)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"c6d78c3a-7eb6-4376-af28-4bde47c83b4c"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":[""],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"32c660e0-a73f-43d4-9695-affcc19a0a2d"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0}],"metadata":{"application/vnd.databricks.v1+notebook":{"notebookName":"01-getting-started","dashboards":[],"notebookMetadata":{"pythonIndentUnit":4},"language":"python","widgets":{},"notebookOrigID":2879982568079096}},"nbformat":4,"nbformat_minor":0} diff --git a/02-spark-dataframe-demo.ipynb b/02-spark-dataframe-demo.ipynb new file mode 100644 index 0000000..d73616b --- /dev/null +++ b/02-spark-dataframe-demo.ipynb @@ -0,0 +1 @@ +{"cells":[{"cell_type":"code","source":["raw_fire_df = spark.read \\\n .format(\"csv\") \\\n .option(\"header\", \"true\") \\\n .option(\"inferSchema\", \"true\") \\\n .load(\"/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv\")"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"1493b35d-a05e-4322-949c-2c6a7db9e146"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["raw_fire_df.show(10)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"33c9f1f1-299b-45d9-b7cf-7940ac9e1d80"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["display(raw_fire_df)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"b333860d-334b-42a4-b073-a98bc58b1c43"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["raw_fire_df.createGlobalTempView(\"fire_service_calls_view\")"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"56d2f0d6-90a7-4399-8f09-9dead0bbf526"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["%sql\nselect * from global_temp.fire_service_calls_view"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"75c57597-0f81-40a7-885f-64829f3db180"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":[""],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"19f40c42-622a-43a2-bd56-d182e528fe6b"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0}],"metadata":{"application/vnd.databricks.v1+notebook":{"notebookName":"02-spark-dataframe-demo","dashboards":[],"notebookMetadata":{"pythonIndentUnit":4},"language":"python","widgets":{},"notebookOrigID":2787702214819532}},"nbformat":4,"nbformat_minor":0} diff --git a/03-spark-table-demo.sql b/03-spark-table-demo.sql new file mode 100644 index 0000000..9d3d908 --- /dev/null +++ b/03-spark-table-demo.sql @@ -0,0 +1,71 @@ +-- Databricks notebook source +drop table if exists demo_db.fire_service_calls_tbl; +drop view if exists demo_db; + +-- COMMAND ---------- + +-- MAGIC %fs rm -r /user/hive/warehouse/demo_db.db + +-- COMMAND ---------- + +create database if not exists demo_db + +-- COMMAND ---------- + +create table if not exists demo_db.fire_service_calls_tbl( + CallNumber integer, + UnitID string, + IncidentNumber integer, + CallType string, + CallDate string, + WatchDate string, + CallFinalDisposition string, + AvailableDtTm string, + Address string, + City string, + Zipcode integer, + Battalion string, + StationArea string, + Box string, + OriginalPriority string, + Priority string, + FinalPriority integer, + ALSUnit boolean, + CallTypeGroup string, + NumAlarms integer, + UnitType string, + UnitSequenceInCallDispatch integer, + FirePreventionDistrict string, + SupervisorDistrict string, + Neighborhood string, + Location string, + RowID string, + Delay float +) using parquet + +-- COMMAND ---------- + +insert into demo_db.fire_service_calls_tbl +values(1234, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, +null, null, null, null, null, null, null, null, null) + +-- COMMAND ---------- + +select * from demo_db.fire_service_calls_tbl + +-- COMMAND ---------- + +truncate table demo_db.fire_service_calls_tbl + +-- COMMAND ---------- + +insert into demo_db.fire_service_calls_tbl +select * from global_temp.fire_service_calls_view + +-- COMMAND ---------- + +select * from demo_db.fire_service_calls_tbl + +-- COMMAND ---------- + + diff --git a/04-spark-sql-demo.sql b/04-spark-sql-demo.sql new file mode 100644 index 0000000..cb1a5d9 --- /dev/null +++ b/04-spark-sql-demo.sql @@ -0,0 +1,135 @@ +-- Databricks notebook source +select * from demo_db.fire_service_calls_tbl limit 100 + +-- COMMAND ---------- + +drop view if exists fire_service_calls_tbl_cache; + +-- COMMAND ---------- + +cache lazy table fire_service_calls_tbl_cache as +select * from demo_db.fire_service_calls_tbl + +-- COMMAND ---------- + +select count(*) from demo_db.fire_service_calls_tbl + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ##### Q1. How many distinct types of calls were made to the Fire Department? + +-- COMMAND ---------- + +select count(distinct callType) as distinct_call_type_count +from demo_db.fire_service_calls_tbl +where callType is not null + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ##### Q2. What were distinct types of calls made to the Fire Department? + +-- COMMAND ---------- + +select distinct callType as distinct_call_types +from demo_db.fire_service_calls_tbl +where callType is not null + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ##### Q3. Find out all response for delayed times greater than 5 mins? + +-- COMMAND ---------- + +select callNumber, Delay +from demo_db.fire_service_calls_tbl +where Delay > 5 + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ##### Q4. What were the most common call types? + +-- COMMAND ---------- + +select callType, count(*) as count +from demo_db.fire_service_calls_tbl +where callType is not null +group by callType +order by count desc + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ##### Q5. What zip codes accounted for most common calls? + +-- COMMAND ---------- + +select callType, zipCode, count(*) as count +from demo_db.fire_service_calls_tbl +where callType is not null +group by callType, zipCode +order by count desc + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ##### Q6. What San Francisco neighborhoods are in the zip codes 94102 and 94103? + +-- COMMAND ---------- + +select zipCode, neighborhood +from demo_db.fire_service_calls_tbl +where zipCode == 94102 or zipCode == 94103 + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC #####Q7. What was the sum of all call alarms, average, min, and max of the call response times? + +-- COMMAND ---------- + +select sum(NumAlarms), avg(Delay), min(Delay), max(Delay) +from demo_db.fire_service_calls_tbl + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ##### Q8. How many distinct years of data is in the data set? + +-- COMMAND ---------- + +select distinct year(to_date(callDate, "MM/dd/yyyy")) as year_num +from demo_db.fire_service_calls_tbl +order by year_num + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ##### Q9. What week of the year in 2018 had the most fire calls? + +-- COMMAND ---------- + +select weekofyear(to_date(callDate, "MM/dd/yyyy")) week_year, count(*) as count +from demo_db.fire_service_calls_tbl +where year(to_date(callDate, "MM/dd/yyyy")) == 2018 +group by week_year +order by count desc + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ##### Q10. What neighborhoods in San Francisco had the worst response time in 2018? + +-- COMMAND ---------- + +select neighborhood, delay +from demo_db.fire_service_calls_tbl +where year(to_date(callDate, "MM/dd/yyyy")) == 2018 +order by delay desc + +-- COMMAND ---------- + + diff --git a/05-working-with-dataframe.ipynb b/05-working-with-dataframe.ipynb new file mode 100644 index 0000000..53247e4 --- /dev/null +++ b/05-working-with-dataframe.ipynb @@ -0,0 +1 @@ +{"cells":[{"cell_type":"code","source":["from pyspark.sql.functions import *"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"714857fa-35e9-45e3-a6d3-1d16a0cc7f91"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["raw_fire_df = spark.read \\\n .format(\"csv\") \\\n .option(\"header\", \"true\") \\\n .option(\"inferSchema\",\"true\") \\\n .load(\"/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv\")"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"6fac9a4c-8d5f-4eda-aca1-180eb49086c1"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["display(raw_fire_df)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"28a6868e-f4be-4406-ae8d-f67613d9f493"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["renamed_fire_df = raw_fire_df \\\n .withColumnRenamed(\"Call Number\", \"CallNumber\") \\\n .withColumnRenamed(\"Unit ID\", \"UnitID\") \\\n .withColumnRenamed(\"Incident Number\", \"IncidentNumber\") \\\n .withColumnRenamed(\"Call Date\", \"CallDate\") \\\n .withColumnRenamed(\"Watch Date\", \"WatchDate\") \\\n .withColumnRenamed(\"Call Final Disposition\", \"CallFinalDisposition\") \\\n .withColumnRenamed(\"Available DtTm\", \"AvailableDtTm\") \\\n .withColumnRenamed(\"Zipcode of Incident\", \"Zipcode\") \\\n .withColumnRenamed(\"Station Area\", \"StationArea\") \\\n .withColumnRenamed(\"Final Priority\", \"FinalPriority\") \\\n .withColumnRenamed(\"ALS Unit\", \"ALSUnit\") \\\n .withColumnRenamed(\"Call Type Group\", \"CallTypeGroup\") \\\n .withColumnRenamed(\"Unit sequence in call dispatch\", \"UnitSequenceInCallDispatch\") \\\n .withColumnRenamed(\"Fire Prevention District\", \"FirePreventionDistrict\") \\\n .withColumnRenamed(\"Supervisor District\", \"SupervisorDistrict\")"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"3a45840e-00d7-403c-bcb5-f4c2b0e1dbba"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["display(renamed_fire_df)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"34acbca6-5403-4389-812b-47d79aae4d6d"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["renamed_fire_df.printSchema()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"12343768-27f0-4b8c-bd7d-e43ea856ea6a"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["fire_df = renamed_fire_df \\\n .withColumn(\"CallDate\", to_date(\"CallDate\", \"MM/dd/yyyy\")) \\\n .withColumn(\"WatchDate\", to_date(\"WatchDate\", \"MM/dd/yyyy\")) \\\n .withColumn(\"AvailableDtTm\", to_timestamp(\"AvailableDtTm\", \"MM/dd/yyyy hh:mm:ss a\")) \\\n .withColumn(\"Delay\", round(\"Delay\", 2))"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"1bfbd70e-ee18-49d6-affd-bc4033116772"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["display(fire_df)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"b2605e0c-3bdb-4e27-ad86-939c80bd3a9d"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["fire_df.printSchema()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"4ed3cca5-aac7-4529-bbec-8c8c85e11b5d"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["fire_df.cache()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"2cf3909b-63a5-4140-a744-b3ff9213544e"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["##### Q1. How many distinct types of calls were made to the Fire Department?\n```SQL\nselect count(distinct CallType) as distinct_call_type_count\nfrom fire_service_calls_tbl\nwhere CallType is not null\n```"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"7596246e-e79c-4e85-81eb-d05047216212"}}},{"cell_type":"code","source":["fire_df.createOrReplaceTempView(\"fire_service_calls_view\")\nq1_sql_df = spark.sql(\"\"\"\n select count(distinct CallType) as distinct_call_type_count\n from fire_service_calls_view\n where CallType is not null\n \"\"\")\ndisplay(q1_sql_df)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"92360fa0-13d4-4834-bfc6-bf462125e615"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["q1_df = fire_df.where(\"CallType is not null\") \\\n .select(\"CallType\") \\\n .distinct()\nprint(q1_df.count())"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"6efce8ff-052a-4bdf-9c1c-0602a4837653"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["q1_df1 = fire_df.where(\"CallType is not null\")\nq1_df2 = q1_df1.select(\"CallType\")\nq1_df3 = q1_df2.distinct()\nprint(q1_df3.count())"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"cbdf0db8-e0a3-41b7-b672-b134b6c811ba"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["##### Q2. What were distinct types of calls made to the Fire Department?\n```sql\nselect distinct CallType as distinct_call_types\nfrom fire_service_calls_tbl\nwhere CallType is not null\n```"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"7d4f11c9-374c-41df-b4bb-9a0d812a7975"}}},{"cell_type":"code","source":["q2_df = fire_df.where(\"CallType is not null\") \\\n .select(expr(\"CallType as distinct_call_type\")) \\\n .distinct()\nq2_df.show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"7af8c2e5-864b-4196-8315-2ef9f9d82c64"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["display(q2_df)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"822d62b3-8c09-4237-92fb-80a855c202c1"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["##### Q3. Find out all response for delayed times greater than 5 mins?\n``` sql\nselect CallNumber, Delay\nfrom fire_service_calls_tbl\nwhere Delay > 5\n```"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"85b11023-f976-465a-9aed-0d7de4383722"}}},{"cell_type":"code","source":["fire_df.where(\"Delay > 5\") \\\n .select(\"CallNumber\", \"Delay\") \\\n .show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"06543f39-21ea-4bda-a228-03257788c5f9"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["##### Q4. What were the most common call types?\n```sql\nselect CallType, count(*) as count\nfrom fire_service_calls_tbl\nwhere CallType is not null\ngroup by CallType\norder by count desc\n```"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"d682e1e3-28f2-4cb1-87af-3b2a41fa1f9f"}}},{"cell_type":"code","source":["fire_df.select(\"CallType\") \\\n .where(\"CallType is not null\") \\\n .groupBy(\"CallType\") \\\n .count() \\\n .orderBy(\"count\", ascending=False) \\\n .show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"b8c3327f-104b-4e9e-b941-9b0a57571d0b"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["##### Q5. What zip codes accounted for most common calls?\n```sql\nselect CallType, ZipCode, count(*) as count\nfrom fire_service_calls_tbl\nwhere CallType is not null\ngroup by CallType, Zipcode\norder by count desc\n```"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"05668ab2-6c49-46f7-bf75-05bcaa7ca666"}}},{"cell_type":"code","source":[""],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"c88d0eb0-fd17-424e-bc49-d0a2231d7d06"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["##### Q6. What San Francisco neighborhoods are in the zip codes 94102 and 94103\n```sql\nselect distinct Neighborhood, Zipcode\nfrom fire_service_calls_tbl\nwhere Zipcode== 94102 or Zipcode == 94103\n```"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"48e6bc0a-bcec-4d20-a165-9079aa74adb2"}}},{"cell_type":"code","source":[""],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"21ff24e5-845d-42ad-bba1-157f7cb1dab0"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["##### Q7. What was the sum of all calls, average, min and max of the response times for calls?\n```sql\nselect sum(NumAlarms), avg(Delay), min(Delay), max(Delay)\nfrom fire_service_calls_tbl\n```"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"bb33f91e-0d67-414b-b056-88fc38b6a6bd"}}},{"cell_type":"code","source":[""],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"aa54b23f-6b28-4b07-80c8-38c319216d11"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["##### Q8. How many distinct years of data is in the CSV file?\n```sql\nselect distinct year(to_timestamp(CallDate, \"MM/dd/yyyy\")) as year_num\nfrom fire_service_calls_tbl\norder by year_num\n```"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"8894d83e-905c-4b33-85ba-a85c0b41b5cb"}}},{"cell_type":"code","source":[""],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"cc09d106-3298-459a-905a-7efaea20437a"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["##### Q9. What week of the year in 2018 had the most fire calls?\n```sql\nselect weekofyear(to_timestamp(CallDate, \"MM/dd/yyyy\")) week_year, count(*) as count\nfrom fire_service_calls_tbl \nwhere year(to_timestamp(CallDate, \"MM/dd/yyyy\")) == 2018\ngroup by week_year\norder by count desc\n```"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"b52c7b9e-6493-4572-85e0-4bd5e8534372"}}},{"cell_type":"code","source":[""],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"f8aa82ee-5140-4e78-9d78-35b0c8d40385"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["##### Q10. What neighborhoods in San Francisco had the worst response time in 2018?\n```sql\nselect Neighborhood, Delay\nfrom fire_service_calls_tbl \nwhere year(to_timestamp(CallDate, \"MM/dd/yyyy\")) == 2018\n```"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"63144f63-f0e7-4361-97ca-78173498fc2f"}}},{"cell_type":"code","source":[""],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"c03714de-f4af-407f-9227-dae8df634a4a"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0}],"metadata":{"application/vnd.databricks.v1+notebook":{"notebookName":"05-working-with-dataframe","dashboards":[],"notebookMetadata":{"pythonIndentUnit":4},"language":"python","widgets":{},"notebookOrigID":384359114294295}},"nbformat":4,"nbformat_minor":0} diff --git a/HelloSpark.py b/HelloSpark.py new file mode 100644 index 0000000..07f7b53 --- /dev/null +++ b/HelloSpark.py @@ -0,0 +1,17 @@ +from pyspark.sql import * + +if __name__ == "__main__": + + spark = SparkSession.builder \ + .appName("Hello Spark") \ + .master("local[2]") \ + .getOrCreate() + + data_list = [("Ravi", 28), + ("David", 45), + ("Abdul", 27)] + + + + df = spark.createDataFrame(data_list).toDF("Name", "Age") + df.show() \ No newline at end of file