diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..5c98b42
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,2 @@
+# Default ignored files
+/workspace.xml
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..105ce2d
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..a2e120d
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..b52b86f
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/sparkdevelop.iml b/.idea/sparkdevelop.iml
new file mode 100644
index 0000000..6711606
--- /dev/null
+++ b/.idea/sparkdevelop.iml
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/08-RowDemo/RowDemo.py b/08-RowDemo/RowDemo.py
index 2512a67..9a23c89 100644
--- a/08-RowDemo/RowDemo.py
+++ b/08-RowDemo/RowDemo.py
@@ -2,7 +2,7 @@
from pyspark.sql.functions import *
from pyspark.sql.types import *
-from lib.logger import Log4j
+##from lib.logger import Log4j
def to_date_df(df, fmt, fld):
@@ -16,7 +16,7 @@ def to_date_df(df, fmt, fld):
.appName("RowDemo") \
.getOrCreate()
- logger = Log4j(spark)
+ ##logger = Log4j(spark)
my_schema = StructType([
StructField("ID", StringType()),
diff --git a/21-DataFrames/lib/DatabricksSample.py b/21-DataFrames/lib/DatabricksSample.py
new file mode 100644
index 0000000..d77de1b
--- /dev/null
+++ b/21-DataFrames/lib/DatabricksSample.py
@@ -0,0 +1,44 @@
+
+from pyspark.sql import *
+import pyspark.sql.functions as f
+from pyspark.sql.functions import spark_partition_id
+
+if __name__ == "__main__":
+
+ spark = SparkSession \
+ .builder \
+ .appName("DatabricksSample") \
+ .master("local[2]") \
+ .getOrCreate()
+
+ logDF = spark.read\
+ .option("header",True)\
+ .csv("/Users/rahulvenugopalan/Documents/bigDBFS.csv")\
+ ##.sample(withReplacement=False, fraction=0.3, seed=3)
+
+ from pyspark.sql.functions import col
+
+ serverErrorDF = logDF.filter((col("code")>=500)& (col("code")<600)).select("date", "time", "extention", "code")
+ logDF.show()
+ logDF.createOrReplaceTempView("total")
+ sqlDF = spark.sql("select ip,count(ip) from total group by ip order by count")
+
+ #ipCountDF = spark.sql("select ip,count(ip) as count from total group by ip order by count desc")
+ sqlDF.show()
+
+ from pyspark.sql.functions import from_utc_timestamp, hour, col
+ countvalue=serverErrorDF.count()
+ print(countvalue)
+
+ serverErrorDF.show(100)
+
+ countsDF = (serverErrorDF
+ .select(f.minute(from_utc_timestamp(col("time"), "GMT")).alias("hour"))
+ .groupBy("hour")
+ .count()
+ .orderBy("hour")
+ )
+
+ countsDF.show()
+
+ spark.stop()
\ No newline at end of file
diff --git a/21-DataFrames/lib/Dataframe_Creation.py b/21-DataFrames/lib/Dataframe_Creation.py
new file mode 100644
index 0000000..6de9983
--- /dev/null
+++ b/21-DataFrames/lib/Dataframe_Creation.py
@@ -0,0 +1,124 @@
+
+from pyspark.sql import *
+from pyspark.sql.types import *
+from pyspark.sql.functions import *
+
+if __name__ == "__main__":
+
+ spark = SparkSession \
+ .builder \
+ .appName("dateFormatting") \
+ .master("local[2]") \
+ .getOrCreate()
+
+
+ my_schema_date = StructType([
+ StructField("ID", StringType()),
+ StructField("EventDatestr", StringType())
+ ])
+
+ my_schema_timestamp= StructType([
+ StructField("ID", StringType()),
+ StructField("EventTimestamp", StringType())
+ ])
+
+ my_rows_date = [Row("123", "04/05/2020"),
+ Row("124", "3/05/2020" ),
+ Row("125", "04/05/2020"),
+ Row("126", "04/05/2020"),
+ Row("127", "04/05/2020"),
+ Row("128", "04/05/2020"),
+ Row("129", "04/05/2020"),
+ Row("130", "8/3/2020"),
+ Row("131", "11/12/2020"),
+ Row("132", "04/13/2020")]
+
+ my_rows_timestamp = [Row("123", "2020-02-28 12:30:00"),
+ Row("234", "2020-02-28 12:30:00") ,
+ Row("233", "2020-02-28 10:30:00") ,
+ Row("343", "2020-02-28 11:30:00") ,
+ Row("434", "2020-02-28 14:30:00") ,
+ Row("343", "2020-02-28 10:30:00") ,
+ Row("353", "2020-02-28 10:30:00") ,
+ Row("453", "2020-02-28 23:30:00") ,
+ Row("137", "2020-02-28 14:30:00") ]
+
+ my_schema_list = StructType([
+ StructField("ID", IntegerType())
+ ])
+
+ my_row_list = [Row(123)]
+
+ my_rdd_list = spark.sparkContext.parallelize(my_row_list)
+
+ my_df_list= spark.createDataFrame(my_rdd_list, my_schema_list)
+
+ my_df_list.show()
+
+ print(type(my_df_list))
+
+ mvv = my_df_list.select(max("ID")).rdd.flatMap(lambda x: x).collect()
+
+ print(type(mvv))
+ print(mvv)
+
+ for i in mvv:
+ xy=i
+ print(xy)
+ print(type(xy))
+
+ #list(my_df_list.select('ID').toPandas()['ID']) # =>
+
+ #print(list)
+
+ my_rdd_date = spark.sparkContext.parallelize(my_rows_date)
+
+ my_df_date = spark.createDataFrame(my_rdd_date, my_schema_date)
+
+ my_rdd2 = spark.sparkContext.parallelize(my_rows_timestamp)
+
+ my_df_timestamp = spark.createDataFrame(my_rdd2, my_schema_timestamp)
+
+ from pyspark.sql.functions import col
+
+ dfwithDate = my_df_date.withColumn("EventDatetype",to_date(col("EventDatestr"), 'M/d/yyyy') )
+
+ dfwithDate.show()
+
+ dfwithTimestamp=my_df_timestamp.withColumn("EventTimestamptype", to_timestamp(col("EventTimestamp"), 'yyyy-MM-dd HH:mm:ss'))
+
+ dfwithTimestamp2 = my_df_timestamp.select("ID",to_timestamp("EventTimestamp",'yyyy-MM-dd HH:mm:ss').alias("new"))
+
+ my_df_timestamp.show()
+
+ dfwithTimestamp2.show()
+
+
+ dfwithTimestamp3 = my_df_timestamp.withColumn("Timeintimestamp", to_timestamp("EventTimestamp", 'yyyy-MM-dd HH:mm:ss')) \
+ .withColumn("ID2", col("ID").cast(IntegerType()))\
+ .withColumn("ID3", col("ID2")*2)\
+ .withColumnRenamed("ID3","Transformed_Integer_column")\
+ .drop("ID2")
+
+ dfwithTimestamp3.show()
+
+ cols = set(dfwithTimestamp3.columns)
+ print(cols)
+
+ print("dfwithTimestamp2")
+
+
+ minmax = dfwithTimestamp3.select(min("Transformed_Integer_column")).collect().map(_(0)).toList
+
+
+ print(type(minmax))
+
+ dfwithTimestamp3.agg({'Transformed_Integer_column': 'max'}).show()
+
+ newdf5=dfwithTimestamp3.select(hour(from_utc_timestamp(col("Timeintimestamp"), "GMT")).alias("hour"))\
+ .groupBy("hour").count().orderBy("hour")
+
+ newdf5.show()
+
+
+spark.stop()
diff --git a/21-DataFrames/lib/JSON_read.py b/21-DataFrames/lib/JSON_read.py
new file mode 100644
index 0000000..4c6ba6b
--- /dev/null
+++ b/21-DataFrames/lib/JSON_read.py
@@ -0,0 +1,61 @@
+
+from pyspark.sql import *
+from pyspark.sql.types import *
+from pyspark.sql.functions import *
+from pyspark.sql import functions as f
+
+if __name__ == "__main__":
+
+ spark = SparkSession \
+ .builder \
+ .appName("jsonread") \
+ .master("local[2]") \
+ .getOrCreate()
+
+
+smartphoneDF = spark.read.json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+smartphoneDF.show()
+
+smartphoneDF.printSchema()
+smartphoneDFschema = smartphoneDF.schema
+from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType
+
+schemaSMS = StructType([
+ StructField("SMS", StringType(), True)
+])
+
+
+
+
+# Here is the full schema as well
+fullSchema = StructType([
+ StructField("SMS", StructType([
+ StructField("Address",StringType(),True),
+ StructField("date",StringType(),True),
+ StructField("metadata", StructType([
+ StructField("name",StringType(), True)
+ ]), True),
+ ]), True)
+])
+
+## Filter using SQL expression
+SMSDF = (spark.read
+ .schema(schemaSMS)
+ .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+ .filter("SMS is not null")
+)
+# Filter using column
+SMSDF2 = (spark.read
+ .schema(fullSchema)
+ .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+ .filter(f.col("SMS").isNotNull())
+)
+
+
+SMSDF.show()
+SMSDF2.show()
+
+
+SMSDF2.select('SMS.Address','SMS.date','SMS.metadata.name').show(truncate=False)
+
+spark.stop()
diff --git a/21-DataFrames/lib/SampleUDF.py b/21-DataFrames/lib/SampleUDF.py
new file mode 100644
index 0000000..6346f0d
--- /dev/null
+++ b/21-DataFrames/lib/SampleUDF.py
@@ -0,0 +1,52 @@
+
+from pyspark.sql import *
+from pyspark.sql.types import *
+from pyspark.sql.functions import *
+
+if __name__ == "__main__":
+
+ spark = SparkSession \
+ .builder \
+ .appName("dateFormatting") \
+ .master("local[2]") \
+ .getOrCreate()
+
+ # Creating a function
+ def cube(x):
+ return x*x*x
+
+ def strlen_nullsafe(x):
+ return len(x)
+
+ # Register the function as udf
+ spark.udf.register("udfcube" ,cube,LongType())
+
+ spark.range(1, 20).createOrReplaceTempView("test1")
+ sqlDF=spark.sql("select id,udfcube(id) as qubedID from test1")
+ sqlDF.show()
+
+ spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
+
+ #spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")
+
+ my_schema = StructType([
+ StructField("ID", StringType()),
+ StructField("EventDate", StringType())])
+
+ my_rows = [Row("123", "04/05/2020"), Row("124", "4/5/2020"), Row(None, "04/5/2020"), Row("126", "4/05/2020")]
+ my_rdd = spark.sparkContext.parallelize(my_rows, 2)
+ my_df = spark.createDataFrame(my_rdd, my_schema)
+
+ my_df.createOrReplaceTempView("test2")
+
+ sqlDF = spark.sql("select id,strlen_nullsafe(id) as strlen_nullsafe from test2")
+
+ print("lambdadf :")
+ sqlDF.show()
+
+ sqldf = spark.sql("select id ,strlen_nullsafe(id) from test2 where id is not null and strlen_nullsafe(id) > 1")
+ print("sqldf :")
+ sqldf.show()
+
+ spark.stop()
+
diff --git a/21-DataFrames/lib/SelectingColumns.py b/21-DataFrames/lib/SelectingColumns.py
new file mode 100644
index 0000000..935a0f4
--- /dev/null
+++ b/21-DataFrames/lib/SelectingColumns.py
@@ -0,0 +1,19 @@
+
+from pyspark.sql import *
+
+
+spark = SparkSession \
+ .builder \
+ .appName("DatabricksSample") \
+ .master("local[2]") \
+ .getOrCreate()
+
+logDF = spark.read\
+ .option("header",True)\
+ .csv("/Users/rahulvenugopalan/Documents/bigDBFS.csv")\
+ .sample(withReplacement=False, fraction=0.3, seed=3)
+
+logDF.show(50)
+
+
+spark.stop()
diff --git a/21-DataFrames/lib/__init__.py b/21-DataFrames/lib/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/21-DataFrames/lib/app-logs/hello-spark.log b/21-DataFrames/lib/app-logs/hello-spark.log
new file mode 100644
index 0000000..e69de29
diff --git a/21-DataFrames/lib/createDFmanually.py b/21-DataFrames/lib/createDFmanually.py
new file mode 100644
index 0000000..b04c067
--- /dev/null
+++ b/21-DataFrames/lib/createDFmanually.py
@@ -0,0 +1,38 @@
+
+from pyspark.sql import *
+from pyspark.sql.types import *
+from pyspark.sql.functions import *
+
+
+if __name__ == "__main__":
+
+ spark = SparkSession \
+ .builder \
+ .appName("createDataframes") \
+ .master("local[2]") \
+ .getOrCreate()
+
+ my_schema = StructType([
+ StructField("ID", StringType()),
+ StructField("EventDatestr", StringType())
+ ])
+
+ ## Method 1 : - Creating dataframes using Dataset of row objects with createDataFrame.
+
+ my_rows_date = [Row("123", "04/05/2020"),
+ Row("124", "3/05/2020" ),
+ Row("125", "04/05/2020"),
+ Row("126", "04/05/2020")
+ ]
+
+ my_rdd_list = spark.sparkContext.parallelize(my_rows_date)
+ print(type(my_rdd_list))
+ my_df_list= spark.createDataFrame(my_rdd_list, my_schema)
+ my_df_list.show()
+
+ ## Method 2 : - Creating dataframes using Dataset of row objects with toDF.
+ columnName=["id","EventDate"]
+ dfnew_toDF=my_rdd_list.toDF(columnName)
+ dfnew_toDF.show()
+
+spark.stop()
\ No newline at end of file
diff --git a/21-DataFrames/lib/dfwithDateFormat.py b/21-DataFrames/lib/dfwithDateFormat.py
new file mode 100644
index 0000000..a19698d
--- /dev/null
+++ b/21-DataFrames/lib/dfwithDateFormat.py
@@ -0,0 +1,51 @@
+
+from pyspark.sql import *
+from pyspark.sql.functions import to_timestamp, unix_timestamp, hour, from_utc_timestamp
+from pyspark.sql.types import *
+from pyspark.sql import functions as f
+
+if __name__ == "__main__":
+
+ spark = SparkSession \
+ .builder \
+ .appName("dataframeswithDateformat") \
+ .master("local[2]") \
+ .getOrCreate()
+
+ my_schema = StructType([
+ StructField("ID", StringType()),
+ StructField("EventDatestr", StringType()),
+ StructField("EventTimestampstr", StringType())
+ ])
+
+ my_rows_date = [
+ Row("123", "04/05/2020","2020-02-28 12:30:00"),
+ Row("124", "3/05/2020","2020-02-28 12:30:00" ),
+ Row("125", "04/05/2020","2020-02-28 12:30:00"),
+ Row("126", "04/05/2020","2020-02-28 12:30:00")
+ ]
+
+ my_rdd_list = spark.sparkContext.parallelize(my_rows_date)
+ print(type(my_rdd_list))
+ my_df_list= spark.createDataFrame(my_rdd_list, my_schema)
+ my_df_list.show()
+
+## Scenarios : -
+ # 1) Convert string to Date
+ # 2) Convert string to Timestamp
+ # 3) Convert string to Unixtimestamp with and without casting to timestamptype()
+ # 4) Convert string to get hours/minutes
+ # 5) convert into a different time zone - GMT/UTC to EST
+
+
+ dfwithDate = my_df_list.withColumn("EventDateStringToDateType", f.to_date(f.col("EventDatestr"), 'M/d/yyyy'))\
+ .withColumn("EventDateStringToTimestampType",to_timestamp("EventTimestampstr", 'yyyy-MM-dd HH:mm:ss')) \
+ .withColumn("EventDateStringToUnixtimestampTypeWithCast", unix_timestamp("EventTimestampstr", 'yyyy-MM-dd HH:mm:ss').cast(TimestampType())) \
+ .withColumn("EventDateStringToUnixtimestampType",unix_timestamp("EventTimestampstr", 'yyyy-MM-dd HH:mm:ss')) \
+ .withColumn("EventDatehoursESTformat", hour(from_utc_timestamp("EventTimestampstr", 'EST')))\
+ .withColumn("EventDatehoursUTCformat", hour(to_timestamp("EventTimestampstr", 'yyyy-MM-dd HH:mm:ss')))
+
+ dfwithDate.printSchema()
+ dfwithDate.show()
+
+spark.stop()
diff --git a/21-DataFrames/lib/log4j.properties b/21-DataFrames/lib/log4j.properties
new file mode 100644
index 0000000..ef6d2f5
--- /dev/null
+++ b/21-DataFrames/lib/log4j.properties
@@ -0,0 +1,38 @@
+# Set everything to be logged to the console
+log4j.rootCategory=WARN, console
+
+# define console appender
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+#application log
+log4j.logger.guru.learningjournal.spark.examples=INFO, console, file
+log4j.additivity.guru.learningjournal.spark.examples=false
+
+#define rolling file appender
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log
+#define following in Java System
+# -Dlog4j.configuration=file:log4j.properties
+# -Dlogfile.name=hello-spark
+# -Dspark.yarn.app.container.log.dir=app-logs
+log4j.appender.file.ImmediateFlush=true
+log4j.appender.file.Append=false
+log4j.appender.file.MaxFileSize=500MB
+log4j.appender.file.MaxBackupIndex=2
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Recommendations from Spark template
+log4j.logger.org.apache.spark.repl.Main=WARN
+log4j.logger.org.spark_project.jetty=WARN
+log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
+log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
+
diff --git a/21-DataFrames/lib/logger.py b/21-DataFrames/lib/logger.py
new file mode 100644
index 0000000..d5bc91b
--- /dev/null
+++ b/21-DataFrames/lib/logger.py
@@ -0,0 +1,21 @@
+class Log4j:
+ def __init__(self, spark):
+ log4j = spark._jvm.org.apache.log4j
+
+ root_class = "guru.learningjournal.spark.examples"
+ conf = spark.sparkContext.getConf()
+ app_name = conf.get("spark.app.name")
+
+ self.logger = log4j.LogManager.getLogger(root_class + "." + app_name)
+
+ def warn(self, message):
+ self.logger.warn(message)
+
+ def info(self, message):
+ self.logger.info(message)
+
+ def error(self, message):
+ self.logger.error(message)
+
+ def debug(self, message):
+ self.logger.debug(message)
\ No newline at end of file
diff --git a/21-DataFrames/lib/readFileTypes.py b/21-DataFrames/lib/readFileTypes.py
new file mode 100644
index 0000000..40e44e9
--- /dev/null
+++ b/21-DataFrames/lib/readFileTypes.py
@@ -0,0 +1,73 @@
+
+from pyspark.sql import *
+from pyspark.sql import functions as f
+from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType
+if __name__ == "__main__":
+
+ spark = SparkSession \
+ .builder \
+ .appName("readfiles") \
+ .master("local[2]") \
+ .getOrCreate()
+
+## Sampling from big files
+ logDF = spark.read \
+ .option("header", True) \
+ .csv("/Users/rahulvenugopalan/Documents/bigDBFS.csv") \
+ .sample(withReplacement=False, fraction=0.3, seed=3)
+
+ logDF.show(10)
+ logDF.show(10)
+
+
+## Reading files - Different ways
+ # Type 1
+
+ smartphoneDF = spark.read.json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+ smartphoneDF.show(10)
+ smartphoneDF.printSchema()
+ smartphoneDFschema = smartphoneDF.schema
+ # Type 2
+ smartphoneDF2 = spark.read \
+ .format("json") \
+ .load("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+
+ smartphoneDF2.show(10)
+
+
+schemaSMS = StructType([
+ StructField("SMS", StringType(), True)
+])
+
+
+# Here is the full schema as well
+fullSchema = StructType([
+ StructField("SMS", StructType([
+ StructField("Address",StringType(),True),
+ StructField("date",StringType(),True),
+ StructField("metadata", StructType([
+ StructField("name",StringType(), True)
+ ]), True),
+ ]), True)
+])
+# Type 3 - Read using Schema and filter
+## Filter using SQL expression
+SMSDF = (spark.read
+ .schema(schemaSMS)
+ .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+ .filter("SMS is not null")
+)
+# Filter using column
+SMSDFullschema = (spark.read
+ .schema(fullSchema)
+ .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+ .filter(f.col("SMS").isNotNull())
+)
+
+SMSDF.show()
+SMSDFullschema.show()
+
+SMSDFullschema.select('SMS.Address','SMS.date','SMS.metadata.name').show(truncate=False)
+
+
+spark.stop()
diff --git a/21-DataFrames/lib/readFileTypes2.py b/21-DataFrames/lib/readFileTypes2.py
new file mode 100644
index 0000000..af3fae1
--- /dev/null
+++ b/21-DataFrames/lib/readFileTypes2.py
@@ -0,0 +1,64 @@
+
+from pyspark.sql import *
+from pyspark.sql import functions as f
+if __name__ == "__main__":
+
+ spark = SparkSession \
+ .builder \
+ .appName("readfiles") \
+ .master("local[2]") \
+ .getOrCreate()
+
+## Sampling from big files
+ logDF = spark.read \
+ .option("header", True) \
+ .csv("/Users/rahulvenugopalan/Documents/bigDBFS.csv") \
+ .sample(withReplacement=False, fraction=0.3, seed=3)
+
+ logDF.show(50)
+
+ smartphoneDF = spark.read.json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+ smartphoneDF.show()
+
+
+smartphoneDF.printSchema()
+smartphoneDFschema = smartphoneDF.schema
+from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType
+
+schemaSMS = StructType([
+ StructField("SMS", StringType(), True)
+])
+
+
+# Here is the full schema as well
+fullSchema = StructType([
+ StructField("SMS", StructType([
+ StructField("Address",StringType(),True),
+ StructField("date",StringType(),True),
+ StructField("metadata", StructType([
+ StructField("name",StringType(), True)
+ ]), True),
+ ]), True)
+])
+
+## Filter using SQL expression
+SMSDF = (spark.read
+ .schema(schemaSMS)
+ .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+ .filter("SMS is not null")
+)
+# Filter using column
+SMSDF2 = (spark.read
+ .schema(fullSchema)
+ .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+ .filter(f.col("SMS").isNotNull())
+)
+
+
+SMSDF.show()
+SMSDF2.show()
+
+
+SMSDF2.select('SMS.Address','SMS.date','SMS.metadata.name').show(truncate=False)
+
+spark.stop()
diff --git a/21-DataFrames/lib/readWitherrorHandling.py b/21-DataFrames/lib/readWitherrorHandling.py
new file mode 100644
index 0000000..d8e6c3a
--- /dev/null
+++ b/21-DataFrames/lib/readWitherrorHandling.py
@@ -0,0 +1,56 @@
+
+from pyspark.sql import *
+from pyspark.sql import functions as f
+from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType
+if __name__ == "__main__":
+
+ spark = SparkSession \
+ .builder \
+ .appName("readfiles") \
+ .master("local[2]") \
+ .getOrCreate()
+# Types of error handling while reading.
+
+# PERMISSIVE - Includes corrupt records in a "_corrupt_record" column (by default)
+# DROPMALFORMED - Ignores all corrupted records
+# FAILFAST - Throws an exception when it meets corrupted records
+
+data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')
+
+smartphoneDF = spark.read.json("/Users/rahulvenugopalan/Downloads/samplecorrupt.txt")
+smartphoneDF.show(10)
+
+sc=spark.sparkContext
+# PERMISSIVE - Includes corrupt records in a "_corrupt_record" column (by default)
+corruptDF = (spark.read
+ .option("mode", "PERMISSIVE")
+ .option("columnNameOfCorruptRecord", "_corrupt_record2")
+ .json("/Users/rahulvenugopalan/Downloads/samplecorrupt.txt")
+)
+
+(corruptDF).show()
+# DROPMALFORMED - Ignores all corrupted records
+corruptDF2 = (spark.read
+ .option("mode", "DROPMALFORMED")
+ .json(sc.parallelize(data))
+)
+
+(corruptDF2).show()
+
+# FAILFAST - Throws an exception when it meets corrupted records
+
+try:
+ data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')
+
+ corruptDF3 = (spark.read
+ .option("mode", "FAILFAST")
+ .json(sc.parallelize(data))
+ )
+ (corruptDF3).show()
+
+except Exception as e:
+ print(e)
+##
+
+
+spark.stop()
diff --git a/21-DataFrames/lib/utils.py b/21-DataFrames/lib/utils.py
new file mode 100644
index 0000000..43c6609
--- /dev/null
+++ b/21-DataFrames/lib/utils.py
@@ -0,0 +1,27 @@
+import configparser
+
+from pyspark import SparkConf
+
+
+def load_survey_df(spark, data_file):
+ return spark.read \
+ .option("header", "true") \
+ .option("inferSchema", "true") \
+ .csv(data_file)
+
+
+def count_by_country(survey_df):
+ return survey_df.filter("Age < 40") \
+ .select("Age", "Gender", "Country", "state") \
+ .groupBy("Country") \
+ .count()
+
+
+def get_spark_app_config():
+ spark_conf = SparkConf()
+ config = configparser.ConfigParser()
+ config.read("spark.conf")
+
+ for (key, val) in config.items("SPARK_APP_CONFIGS"):
+ spark_conf.set(key, val)
+ return spark_conf
diff --git a/21-DataFrames/lib/writeFileTypes.py b/21-DataFrames/lib/writeFileTypes.py
new file mode 100644
index 0000000..926e83b
--- /dev/null
+++ b/21-DataFrames/lib/writeFileTypes.py
@@ -0,0 +1,63 @@
+
+from pyspark.sql import *
+from pyspark.sql.types import *
+from pyspark.sql.functions import *
+from pyspark.sql import functions as f
+
+if __name__ == "__main__":
+
+ spark = SparkSession \
+ .builder \
+ .appName("readfiles") \
+ .master("local[2]") \
+ .getOrCreate()
+
+
+smartphoneDF = spark.read.json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+smartphoneDF.show()
+
+
+
+smartphoneDF.printSchema()
+smartphoneDFschema = smartphoneDF.schema
+from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType
+
+schemaSMS = StructType([
+ StructField("SMS", StringType(), True)
+])
+
+
+
+
+# Here is the full schema as well
+fullSchema = StructType([
+ StructField("SMS", StructType([
+ StructField("Address",StringType(),True),
+ StructField("date",StringType(),True),
+ StructField("metadata", StructType([
+ StructField("name",StringType(), True)
+ ]), True),
+ ]), True)
+])
+
+## Filter using SQL expression
+SMSDF = (spark.read
+ .schema(schemaSMS)
+ .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+ .filter("SMS is not null")
+)
+# Filter using column
+SMSDF2 = (spark.read
+ .schema(fullSchema)
+ .json("/Users/rahulvenugopalan/Downloads/sampledb2.txt")
+ .filter(f.col("SMS").isNotNull())
+)
+
+
+SMSDF.show()
+SMSDF2.show()
+
+
+SMSDF2.select('SMS.Address','SMS.date','SMS.metadata.name').show(truncate=False)
+
+spark.stop()