diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..5c98b42 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,2 @@ +# Default ignored files +/workspace.xml \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..a2e120d --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..b52b86f --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/sparkdevelop.iml b/.idea/sparkdevelop.iml new file mode 100644 index 0000000..6711606 --- /dev/null +++ b/.idea/sparkdevelop.iml @@ -0,0 +1,11 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/08-RowDemo/RowDemo.py b/08-RowDemo/RowDemo.py index 2512a67..9a23c89 100644 --- a/08-RowDemo/RowDemo.py +++ b/08-RowDemo/RowDemo.py @@ -2,7 +2,7 @@ from pyspark.sql.functions import * from pyspark.sql.types import * -from lib.logger import Log4j +##from lib.logger import Log4j def to_date_df(df, fmt, fld): @@ -16,7 +16,7 @@ def to_date_df(df, fmt, fld): .appName("RowDemo") \ .getOrCreate() - logger = Log4j(spark) + ##logger = Log4j(spark) my_schema = StructType([ StructField("ID", StringType()), diff --git a/21-DataFrames/lib/DatabricksSample.py b/21-DataFrames/lib/DatabricksSample.py new file mode 100644 index 0000000..d77de1b --- /dev/null +++ b/21-DataFrames/lib/DatabricksSample.py @@ -0,0 +1,44 @@ + +from pyspark.sql import * +import pyspark.sql.functions as f +from pyspark.sql.functions import spark_partition_id + +if __name__ == "__main__": + + spark = SparkSession \ + .builder \ + .appName("DatabricksSample") \ + .master("local[2]") \ + .getOrCreate() + + logDF = spark.read\ + .option("header",True)\ + .csv("/Users/rahulvenugopalan/Documents/bigDBFS.csv")\ + ##.sample(withReplacement=False, fraction=0.3, seed=3) + + from pyspark.sql.functions import col + + serverErrorDF = logDF.filter((col("code")>=500)& (col("code")<600)).select("date", "time", "extention", "code") + logDF.show() + logDF.createOrReplaceTempView("total") + sqlDF = spark.sql("select ip,count(ip) from total group by ip order by count") + + #ipCountDF = spark.sql("select ip,count(ip) as count from total group by ip order by count desc") + sqlDF.show() + + from pyspark.sql.functions import from_utc_timestamp, hour, col + countvalue=serverErrorDF.count() + print(countvalue) + + serverErrorDF.show(100) + + countsDF = (serverErrorDF + .select(f.minute(from_utc_timestamp(col("time"), "GMT")).alias("hour")) + .groupBy("hour") + .count() + .orderBy("hour") + ) + + countsDF.show() + + spark.stop() \ No newline at end of file diff --git a/21-DataFrames/lib/Dataframe_Creation.py b/21-DataFrames/lib/Dataframe_Creation.py new file mode 100644 index 0000000..6de9983 --- /dev/null +++ b/21-DataFrames/lib/Dataframe_Creation.py @@ -0,0 +1,124 @@ + +from pyspark.sql import * +from pyspark.sql.types import * +from pyspark.sql.functions import * + +if __name__ == "__main__": + + spark = SparkSession \ + .builder \ + .appName("dateFormatting") \ + .master("local[2]") \ + .getOrCreate() + + + my_schema_date = StructType([ + StructField("ID", StringType()), + StructField("EventDatestr", StringType()) + ]) + + my_schema_timestamp= StructType([ + StructField("ID", StringType()), + StructField("EventTimestamp", StringType()) + ]) + + my_rows_date = [Row("123", "04/05/2020"), + Row("124", "3/05/2020" ), + Row("125", "04/05/2020"), + Row("126", "04/05/2020"), + Row("127", "04/05/2020"), + Row("128", "04/05/2020"), + Row("129", "04/05/2020"), + Row("130", "8/3/2020"), + Row("131", "11/12/2020"), + Row("132", "04/13/2020")] + + my_rows_timestamp = [Row("123", "2020-02-28 12:30:00"), + Row("234", "2020-02-28 12:30:00") , + Row("233", "2020-02-28 10:30:00") , + Row("343", "2020-02-28 11:30:00") , + Row("434", "2020-02-28 14:30:00") , + Row("343", "2020-02-28 10:30:00") , + Row("353", "2020-02-28 10:30:00") , + Row("453", "2020-02-28 23:30:00") , + Row("137", "2020-02-28 14:30:00") ] + + my_schema_list = StructType([ + StructField("ID", IntegerType()) + ]) + + my_row_list = [Row(123)] + + my_rdd_list = spark.sparkContext.parallelize(my_row_list) + + my_df_list= spark.createDataFrame(my_rdd_list, my_schema_list) + + my_df_list.show() + + print(type(my_df_list)) + + mvv = my_df_list.select(max("ID")).rdd.flatMap(lambda x: x).collect() + + print(type(mvv)) + print(mvv) + + for i in mvv: + xy=i + print(xy) + print(type(xy)) + + #list(my_df_list.select('ID').toPandas()['ID']) # => + + #print(list) + + my_rdd_date = spark.sparkContext.parallelize(my_rows_date) + + my_df_date = spark.createDataFrame(my_rdd_date, my_schema_date) + + my_rdd2 = spark.sparkContext.parallelize(my_rows_timestamp) + + my_df_timestamp = spark.createDataFrame(my_rdd2, my_schema_timestamp) + + from pyspark.sql.functions import col + + dfwithDate = my_df_date.withColumn("EventDatetype",to_date(col("EventDatestr"), 'M/d/yyyy') ) + + dfwithDate.show() + + dfwithTimestamp=my_df_timestamp.withColumn("EventTimestamptype", to_timestamp(col("EventTimestamp"), 'yyyy-MM-dd HH:mm:ss')) + + dfwithTimestamp2 = my_df_timestamp.select("ID",to_timestamp("EventTimestamp",'yyyy-MM-dd HH:mm:ss').alias("new")) + + my_df_timestamp.show() + + dfwithTimestamp2.show() + + + dfwithTimestamp3 = my_df_timestamp.withColumn("Timeintimestamp", to_timestamp("EventTimestamp", 'yyyy-MM-dd HH:mm:ss')) \ + .withColumn("ID2", col("ID").cast(IntegerType()))\ + .withColumn("ID3", col("ID2")*2)\ + .withColumnRenamed("ID3","Transformed_Integer_column")\ + .drop("ID2") + + dfwithTimestamp3.show() + + cols = set(dfwithTimestamp3.columns) + print(cols) + + print("dfwithTimestamp2") + + + minmax = dfwithTimestamp3.select(min("Transformed_Integer_column")).collect().map(_(0)).toList + + + print(type(minmax)) + + dfwithTimestamp3.agg({'Transformed_Integer_column': 'max'}).show() + + newdf5=dfwithTimestamp3.select(hour(from_utc_timestamp(col("Timeintimestamp"), "GMT")).alias("hour"))\ + .groupBy("hour").count().orderBy("hour") + + newdf5.show() + + +spark.stop() diff --git a/21-DataFrames/lib/JSON_read.py b/21-DataFrames/lib/JSON_read.py new file mode 100644 index 0000000..4c6ba6b --- /dev/null +++ b/21-DataFrames/lib/JSON_read.py @@ -0,0 +1,61 @@ + +from pyspark.sql import * +from pyspark.sql.types import * +from pyspark.sql.functions import * +from pyspark.sql import functions as f + +if __name__ == "__main__": + + spark = SparkSession \ + .builder \ + .appName("jsonread") \ + .master("local[2]") \ + .getOrCreate() + + +smartphoneDF = spark.read.json("/Users/rahulvenugopalan/Downloads/sampledb2.txt") +smartphoneDF.show() + +smartphoneDF.printSchema() +smartphoneDFschema = smartphoneDF.schema +from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType + +schemaSMS = StructType([ + StructField("SMS", StringType(), True) +]) + + + + +# Here is the full schema as well +fullSchema = StructType([ + StructField("SMS", StructType([ + StructField("Address",StringType(),True), + StructField("date",StringType(),True), + StructField("metadata", StructType([ + StructField("name",StringType(), True) + ]), True), + ]), True) +]) + +## Filter using SQL expression +SMSDF = (spark.read + .schema(schemaSMS) + .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt") + .filter("SMS is not null") +) +# Filter using column +SMSDF2 = (spark.read + .schema(fullSchema) + .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt") + .filter(f.col("SMS").isNotNull()) +) + + +SMSDF.show() +SMSDF2.show() + + +SMSDF2.select('SMS.Address','SMS.date','SMS.metadata.name').show(truncate=False) + +spark.stop() diff --git a/21-DataFrames/lib/SampleUDF.py b/21-DataFrames/lib/SampleUDF.py new file mode 100644 index 0000000..6346f0d --- /dev/null +++ b/21-DataFrames/lib/SampleUDF.py @@ -0,0 +1,52 @@ + +from pyspark.sql import * +from pyspark.sql.types import * +from pyspark.sql.functions import * + +if __name__ == "__main__": + + spark = SparkSession \ + .builder \ + .appName("dateFormatting") \ + .master("local[2]") \ + .getOrCreate() + + # Creating a function + def cube(x): + return x*x*x + + def strlen_nullsafe(x): + return len(x) + + # Register the function as udf + spark.udf.register("udfcube" ,cube,LongType()) + + spark.range(1, 20).createOrReplaceTempView("test1") + sqlDF=spark.sql("select id,udfcube(id) as qubedID from test1") + sqlDF.show() + + spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int") + + #spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") + + my_schema = StructType([ + StructField("ID", StringType()), + StructField("EventDate", StringType())]) + + my_rows = [Row("123", "04/05/2020"), Row("124", "4/5/2020"), Row(None, "04/5/2020"), Row("126", "4/05/2020")] + my_rdd = spark.sparkContext.parallelize(my_rows, 2) + my_df = spark.createDataFrame(my_rdd, my_schema) + + my_df.createOrReplaceTempView("test2") + + sqlDF = spark.sql("select id,strlen_nullsafe(id) as strlen_nullsafe from test2") + + print("lambdadf :") + sqlDF.show() + + sqldf = spark.sql("select id ,strlen_nullsafe(id) from test2 where id is not null and strlen_nullsafe(id) > 1") + print("sqldf :") + sqldf.show() + + spark.stop() + diff --git a/21-DataFrames/lib/SelectingColumns.py b/21-DataFrames/lib/SelectingColumns.py new file mode 100644 index 0000000..935a0f4 --- /dev/null +++ b/21-DataFrames/lib/SelectingColumns.py @@ -0,0 +1,19 @@ + +from pyspark.sql import * + + +spark = SparkSession \ + .builder \ + .appName("DatabricksSample") \ + .master("local[2]") \ + .getOrCreate() + +logDF = spark.read\ + .option("header",True)\ + .csv("/Users/rahulvenugopalan/Documents/bigDBFS.csv")\ + .sample(withReplacement=False, fraction=0.3, seed=3) + +logDF.show(50) + + +spark.stop() diff --git a/21-DataFrames/lib/__init__.py b/21-DataFrames/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/21-DataFrames/lib/app-logs/hello-spark.log b/21-DataFrames/lib/app-logs/hello-spark.log new file mode 100644 index 0000000..e69de29 diff --git a/21-DataFrames/lib/createDFmanually.py b/21-DataFrames/lib/createDFmanually.py new file mode 100644 index 0000000..b04c067 --- /dev/null +++ b/21-DataFrames/lib/createDFmanually.py @@ -0,0 +1,38 @@ + +from pyspark.sql import * +from pyspark.sql.types import * +from pyspark.sql.functions import * + + +if __name__ == "__main__": + + spark = SparkSession \ + .builder \ + .appName("createDataframes") \ + .master("local[2]") \ + .getOrCreate() + + my_schema = StructType([ + StructField("ID", StringType()), + StructField("EventDatestr", StringType()) + ]) + + ## Method 1 : - Creating dataframes using Dataset of row objects with createDataFrame. + + my_rows_date = [Row("123", "04/05/2020"), + Row("124", "3/05/2020" ), + Row("125", "04/05/2020"), + Row("126", "04/05/2020") + ] + + my_rdd_list = spark.sparkContext.parallelize(my_rows_date) + print(type(my_rdd_list)) + my_df_list= spark.createDataFrame(my_rdd_list, my_schema) + my_df_list.show() + + ## Method 2 : - Creating dataframes using Dataset of row objects with toDF. + columnName=["id","EventDate"] + dfnew_toDF=my_rdd_list.toDF(columnName) + dfnew_toDF.show() + +spark.stop() \ No newline at end of file diff --git a/21-DataFrames/lib/dfwithDateFormat.py b/21-DataFrames/lib/dfwithDateFormat.py new file mode 100644 index 0000000..a19698d --- /dev/null +++ b/21-DataFrames/lib/dfwithDateFormat.py @@ -0,0 +1,51 @@ + +from pyspark.sql import * +from pyspark.sql.functions import to_timestamp, unix_timestamp, hour, from_utc_timestamp +from pyspark.sql.types import * +from pyspark.sql import functions as f + +if __name__ == "__main__": + + spark = SparkSession \ + .builder \ + .appName("dataframeswithDateformat") \ + .master("local[2]") \ + .getOrCreate() + + my_schema = StructType([ + StructField("ID", StringType()), + StructField("EventDatestr", StringType()), + StructField("EventTimestampstr", StringType()) + ]) + + my_rows_date = [ + Row("123", "04/05/2020","2020-02-28 12:30:00"), + Row("124", "3/05/2020","2020-02-28 12:30:00" ), + Row("125", "04/05/2020","2020-02-28 12:30:00"), + Row("126", "04/05/2020","2020-02-28 12:30:00") + ] + + my_rdd_list = spark.sparkContext.parallelize(my_rows_date) + print(type(my_rdd_list)) + my_df_list= spark.createDataFrame(my_rdd_list, my_schema) + my_df_list.show() + +## Scenarios : - + # 1) Convert string to Date + # 2) Convert string to Timestamp + # 3) Convert string to Unixtimestamp with and without casting to timestamptype() + # 4) Convert string to get hours/minutes + # 5) convert into a different time zone - GMT/UTC to EST + + + dfwithDate = my_df_list.withColumn("EventDateStringToDateType", f.to_date(f.col("EventDatestr"), 'M/d/yyyy'))\ + .withColumn("EventDateStringToTimestampType",to_timestamp("EventTimestampstr", 'yyyy-MM-dd HH:mm:ss')) \ + .withColumn("EventDateStringToUnixtimestampTypeWithCast", unix_timestamp("EventTimestampstr", 'yyyy-MM-dd HH:mm:ss').cast(TimestampType())) \ + .withColumn("EventDateStringToUnixtimestampType",unix_timestamp("EventTimestampstr", 'yyyy-MM-dd HH:mm:ss')) \ + .withColumn("EventDatehoursESTformat", hour(from_utc_timestamp("EventTimestampstr", 'EST')))\ + .withColumn("EventDatehoursUTCformat", hour(to_timestamp("EventTimestampstr", 'yyyy-MM-dd HH:mm:ss'))) + + dfwithDate.printSchema() + dfwithDate.show() + +spark.stop() diff --git a/21-DataFrames/lib/log4j.properties b/21-DataFrames/lib/log4j.properties new file mode 100644 index 0000000..ef6d2f5 --- /dev/null +++ b/21-DataFrames/lib/log4j.properties @@ -0,0 +1,38 @@ +# Set everything to be logged to the console +log4j.rootCategory=WARN, console + +# define console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +#application log +log4j.logger.guru.learningjournal.spark.examples=INFO, console, file +log4j.additivity.guru.learningjournal.spark.examples=false + +#define rolling file appender +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log +#define following in Java System +# -Dlog4j.configuration=file:log4j.properties +# -Dlogfile.name=hello-spark +# -Dspark.yarn.app.container.log.dir=app-logs +log4j.appender.file.ImmediateFlush=true +log4j.appender.file.Append=false +log4j.appender.file.MaxFileSize=500MB +log4j.appender.file.MaxBackupIndex=2 +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Recommendations from Spark template +log4j.logger.org.apache.spark.repl.Main=WARN +log4j.logger.org.spark_project.jetty=WARN +log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO +log4j.logger.org.apache.parquet=ERROR +log4j.logger.parquet=ERROR +log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL +log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR + diff --git a/21-DataFrames/lib/logger.py b/21-DataFrames/lib/logger.py new file mode 100644 index 0000000..d5bc91b --- /dev/null +++ b/21-DataFrames/lib/logger.py @@ -0,0 +1,21 @@ +class Log4j: + def __init__(self, spark): + log4j = spark._jvm.org.apache.log4j + + root_class = "guru.learningjournal.spark.examples" + conf = spark.sparkContext.getConf() + app_name = conf.get("spark.app.name") + + self.logger = log4j.LogManager.getLogger(root_class + "." + app_name) + + def warn(self, message): + self.logger.warn(message) + + def info(self, message): + self.logger.info(message) + + def error(self, message): + self.logger.error(message) + + def debug(self, message): + self.logger.debug(message) \ No newline at end of file diff --git a/21-DataFrames/lib/readFileTypes.py b/21-DataFrames/lib/readFileTypes.py new file mode 100644 index 0000000..40e44e9 --- /dev/null +++ b/21-DataFrames/lib/readFileTypes.py @@ -0,0 +1,73 @@ + +from pyspark.sql import * +from pyspark.sql import functions as f +from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType +if __name__ == "__main__": + + spark = SparkSession \ + .builder \ + .appName("readfiles") \ + .master("local[2]") \ + .getOrCreate() + +## Sampling from big files + logDF = spark.read \ + .option("header", True) \ + .csv("/Users/rahulvenugopalan/Documents/bigDBFS.csv") \ + .sample(withReplacement=False, fraction=0.3, seed=3) + + logDF.show(10) + logDF.show(10) + + +## Reading files - Different ways + # Type 1 + + smartphoneDF = spark.read.json("/Users/rahulvenugopalan/Downloads/sampledb2.txt") + smartphoneDF.show(10) + smartphoneDF.printSchema() + smartphoneDFschema = smartphoneDF.schema + # Type 2 + smartphoneDF2 = spark.read \ + .format("json") \ + .load("/Users/rahulvenugopalan/Downloads/sampledb2.txt") + + smartphoneDF2.show(10) + + +schemaSMS = StructType([ + StructField("SMS", StringType(), True) +]) + + +# Here is the full schema as well +fullSchema = StructType([ + StructField("SMS", StructType([ + StructField("Address",StringType(),True), + StructField("date",StringType(),True), + StructField("metadata", StructType([ + StructField("name",StringType(), True) + ]), True), + ]), True) +]) +# Type 3 - Read using Schema and filter +## Filter using SQL expression +SMSDF = (spark.read + .schema(schemaSMS) + .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt") + .filter("SMS is not null") +) +# Filter using column +SMSDFullschema = (spark.read + .schema(fullSchema) + .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt") + .filter(f.col("SMS").isNotNull()) +) + +SMSDF.show() +SMSDFullschema.show() + +SMSDFullschema.select('SMS.Address','SMS.date','SMS.metadata.name').show(truncate=False) + + +spark.stop() diff --git a/21-DataFrames/lib/readFileTypes2.py b/21-DataFrames/lib/readFileTypes2.py new file mode 100644 index 0000000..af3fae1 --- /dev/null +++ b/21-DataFrames/lib/readFileTypes2.py @@ -0,0 +1,64 @@ + +from pyspark.sql import * +from pyspark.sql import functions as f +if __name__ == "__main__": + + spark = SparkSession \ + .builder \ + .appName("readfiles") \ + .master("local[2]") \ + .getOrCreate() + +## Sampling from big files + logDF = spark.read \ + .option("header", True) \ + .csv("/Users/rahulvenugopalan/Documents/bigDBFS.csv") \ + .sample(withReplacement=False, fraction=0.3, seed=3) + + logDF.show(50) + + smartphoneDF = spark.read.json("/Users/rahulvenugopalan/Downloads/sampledb2.txt") + smartphoneDF.show() + + +smartphoneDF.printSchema() +smartphoneDFschema = smartphoneDF.schema +from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType + +schemaSMS = StructType([ + StructField("SMS", StringType(), True) +]) + + +# Here is the full schema as well +fullSchema = StructType([ + StructField("SMS", StructType([ + StructField("Address",StringType(),True), + StructField("date",StringType(),True), + StructField("metadata", StructType([ + StructField("name",StringType(), True) + ]), True), + ]), True) +]) + +## Filter using SQL expression +SMSDF = (spark.read + .schema(schemaSMS) + .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt") + .filter("SMS is not null") +) +# Filter using column +SMSDF2 = (spark.read + .schema(fullSchema) + .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt") + .filter(f.col("SMS").isNotNull()) +) + + +SMSDF.show() +SMSDF2.show() + + +SMSDF2.select('SMS.Address','SMS.date','SMS.metadata.name').show(truncate=False) + +spark.stop() diff --git a/21-DataFrames/lib/readWitherrorHandling.py b/21-DataFrames/lib/readWitherrorHandling.py new file mode 100644 index 0000000..d8e6c3a --- /dev/null +++ b/21-DataFrames/lib/readWitherrorHandling.py @@ -0,0 +1,56 @@ + +from pyspark.sql import * +from pyspark.sql import functions as f +from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType +if __name__ == "__main__": + + spark = SparkSession \ + .builder \ + .appName("readfiles") \ + .master("local[2]") \ + .getOrCreate() +# Types of error handling while reading. + +# PERMISSIVE - Includes corrupt records in a "_corrupt_record" column (by default) +# DROPMALFORMED - Ignores all corrupted records +# FAILFAST - Throws an exception when it meets corrupted records + +data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|') + +smartphoneDF = spark.read.json("/Users/rahulvenugopalan/Downloads/samplecorrupt.txt") +smartphoneDF.show(10) + +sc=spark.sparkContext +# PERMISSIVE - Includes corrupt records in a "_corrupt_record" column (by default) +corruptDF = (spark.read + .option("mode", "PERMISSIVE") + .option("columnNameOfCorruptRecord", "_corrupt_record2") + .json("/Users/rahulvenugopalan/Downloads/samplecorrupt.txt") +) + +(corruptDF).show() +# DROPMALFORMED - Ignores all corrupted records +corruptDF2 = (spark.read + .option("mode", "DROPMALFORMED") + .json(sc.parallelize(data)) +) + +(corruptDF2).show() + +# FAILFAST - Throws an exception when it meets corrupted records + +try: + data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|') + + corruptDF3 = (spark.read + .option("mode", "FAILFAST") + .json(sc.parallelize(data)) + ) + (corruptDF3).show() + +except Exception as e: + print(e) +## + + +spark.stop() diff --git a/21-DataFrames/lib/utils.py b/21-DataFrames/lib/utils.py new file mode 100644 index 0000000..43c6609 --- /dev/null +++ b/21-DataFrames/lib/utils.py @@ -0,0 +1,27 @@ +import configparser + +from pyspark import SparkConf + + +def load_survey_df(spark, data_file): + return spark.read \ + .option("header", "true") \ + .option("inferSchema", "true") \ + .csv(data_file) + + +def count_by_country(survey_df): + return survey_df.filter("Age < 40") \ + .select("Age", "Gender", "Country", "state") \ + .groupBy("Country") \ + .count() + + +def get_spark_app_config(): + spark_conf = SparkConf() + config = configparser.ConfigParser() + config.read("spark.conf") + + for (key, val) in config.items("SPARK_APP_CONFIGS"): + spark_conf.set(key, val) + return spark_conf diff --git a/21-DataFrames/lib/writeFileTypes.py b/21-DataFrames/lib/writeFileTypes.py new file mode 100644 index 0000000..926e83b --- /dev/null +++ b/21-DataFrames/lib/writeFileTypes.py @@ -0,0 +1,63 @@ + +from pyspark.sql import * +from pyspark.sql.types import * +from pyspark.sql.functions import * +from pyspark.sql import functions as f + +if __name__ == "__main__": + + spark = SparkSession \ + .builder \ + .appName("readfiles") \ + .master("local[2]") \ + .getOrCreate() + + +smartphoneDF = spark.read.json("/Users/rahulvenugopalan/Downloads/sampledb2.txt") +smartphoneDF.show() + + + +smartphoneDF.printSchema() +smartphoneDFschema = smartphoneDF.schema +from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType + +schemaSMS = StructType([ + StructField("SMS", StringType(), True) +]) + + + + +# Here is the full schema as well +fullSchema = StructType([ + StructField("SMS", StructType([ + StructField("Address",StringType(),True), + StructField("date",StringType(),True), + StructField("metadata", StructType([ + StructField("name",StringType(), True) + ]), True), + ]), True) +]) + +## Filter using SQL expression +SMSDF = (spark.read + .schema(schemaSMS) + .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt") + .filter("SMS is not null") +) +# Filter using column +SMSDF2 = (spark.read + .schema(fullSchema) + .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt") + .filter(f.col("SMS").isNotNull()) +) + + +SMSDF.show() +SMSDF2.show() + + +SMSDF2.select('SMS.Address','SMS.date','SMS.metadata.name').show(truncate=False) + +spark.stop()