Skip to content

Commit ca2292a

Browse files
committed
update notes
1 parent 049167c commit ca2292a

File tree

9 files changed

+39
-15
lines changed

9 files changed

+39
-15
lines changed

01-HelloSpark/HelloSpark.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
survey_raw_df = load_survey_df(spark, sys.argv[1])
2424
partitioned_survey_df = survey_raw_df.repartition(2)
2525
count_df = count_by_country(partitioned_survey_df)
26-
count_df.show()
26+
count_df.collect()
2727

28-
logger.info("Finished HelloSpark")
29-
spark.stop()
28+
input("Please Print Enter")
29+
# logger.info("Finished HelloSpark")
30+
# spark.stop()

03-HelloSparkSQL/HelloSparkSQL.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
.option("inferSchema", "true") \
2323
.csv(sys.argv[1])
2424

25-
surveyDF.createOrReplaceTempView("survey_tbl")
25+
surveyDF.createOrReplaceTempView("survey_tbl") # 创建表
2626
countDF = spark.sql("select Country, count(1) as count from survey_tbl where Age<40 group by Country")
2727

2828
countDF.show()

05-DataSinkDemo/DataSinkDemo.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
logger.info("Num Partitions after: " + str(partitionedDF.rdd.getNumPartitions()))
2424
partitionedDF.groupBy(spark_partition_id()).count().show()
2525

26-
partitionedDF.write \
27-
.format("avro") \
28-
.mode("overwrite") \
29-
.option("path", "dataSink/avro/") \
30-
.save()
26+
# partitionedDF.write \
27+
# .format("avro") \
28+
# .mode("overwrite") \
29+
# .option("path", "dataSink/avro/") \
30+
# .save()
3131

3232
flightTimeParquetDF.write \
3333
.format("json") \

06-SparkSQLTableDemo/SparkSQLTableDemo.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,16 @@
1919
spark.sql("CREATE DATABASE IF NOT EXISTS AIRLINE_DB")
2020
spark.catalog.setCurrentDatabase("AIRLINE_DB")
2121

22+
# flightTimeParquetDF.write \
23+
# .mode("overwrite") \
24+
# .partitionBy("ORIGIN", "OP_CARRIER") \
25+
# .saveAsTable("flight_data_tbl")
26+
#
2227
flightTimeParquetDF.write \
28+
.format("csv") \
2329
.mode("overwrite") \
30+
.bucketBy(5, "OP_CARRIER", "ORIGIN") \
31+
.sortBy("OP_CARRIER", "ORIGIN") \
2432
.saveAsTable("flight_data_tbl")
2533

26-
logger.info(spark.catalog.listTables("AIRLINE_DB"))
34+
logger.info(spark.catalog.listTables("AIRLINE_DB"))

11-UDFDemo/UDFDemo.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,21 @@ def parse_gender(gender):
3333

3434
survey_df.show(10)
3535

36+
# 注册 UDF 方法1 在Dataframe列对象表达式中使用你的函数
3637
parse_gender_udf = udf(parse_gender, returnType=StringType())
38+
39+
# UDF()函数并不创建目录条目。 因此, 我们不应该得到任何输出。
3740
logger.info("Catalog Entry:")
3841
[logger.info(r) for r in spark.catalog.listFunctions() if "parse_gender" in r.name]
3942

43+
# 我们使用了withColumn()转换, 它只影响Gender字段。
4044
survey_df2 = survey_df.withColumn("Gender", parse_gender_udf("Gender"))
4145
survey_df2.show(10)
4246

47+
# 注册 UDF 方法2 将其注册为SQL 函数 在SQL表达式中使用你的函数
4348
spark.udf.register("parse_gender_udf", parse_gender, StringType())
49+
50+
# 找到一个条目并打印函数的详细信息
4451
logger.info("Catalog Entry:")
4552
[logger.info(r) for r in spark.catalog.listFunctions() if "parse_gender" in r.name]
4653

12-MiscDemo/MiscDemo.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,23 @@
1919
("Rosy", "7", "8", "63"), # 1963
2020
("Abdul", "23", "5", "81")] # 1981
2121

22+
# createDataFrame 新建
2223
raw_df = spark.createDataFrame(data_list).toDF("name", "day", "month", "year").repartition(3)
2324
raw_df.printSchema()
2425

26+
# know the use of cast()
2527
final_df = raw_df.withColumn("id", monotonically_increasing_id()) \
2628
.withColumn("day", col("day").cast(IntegerType())) \
2729
.withColumn("month", col("month").cast(IntegerType())) \
2830
.withColumn("year", col("year").cast(IntegerType())) \
29-
.withColumn("year", when(col("year") < 20, col("year") + 2000)
31+
.withColumn("year", \
32+
when(col("year") < 20, col("year") + 2000)
3033
.when(col("year") < 100, col("year") + 1900)
3134
.otherwise(col("year"))) \
3235
.withColumn("dob", expr("to_date(concat(day,'/',month,'/',year), 'd/M/y')")) \
3336
.drop("day", "month", "year") \
3437
.dropDuplicates(["name", "dob"]) \
35-
# .sort(expr("dob desc")) This doesn't seem to be working
3638
.sort(col("dob").desc())
39+
# .sort(expr("dob desc")) This doesn't seem to be working
3740

38-
final_df.show()
41+
final_df.show()

13-AggDemo/AggDemo.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
"avg(UnitPrice) as AvgPrice"
3232
).show()
3333

34+
# 首先创建表,再用 spark.sql
3435
invoice_df.createOrReplaceTempView("sales")
3536
summary_sql = spark.sql("""
3637
SELECT Country, InvoiceNo,

16-RankingDemo/RankingDemo.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
.orderBy(f.col("InvoiceValue").desc()) \
2121
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
2222

23+
# 找出每个国家InvoiceValue 的第一名
2324
df = summary_df.withColumn("Rank", f.dense_rank().over(rank_window)) \
2425
.where(f.col("Rank") == 1) \
2526
.sort("Country", "WeekNumber") \

19-ShuffleJoinDemo/SuffleJoinDemo.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from pyspark.sql import SparkSession
2+
from pyspark.sql.functions import broadcast
23

34
from lib.logger import Log4j
45

@@ -14,10 +15,12 @@
1415
flight_time_df1 = spark.read.json("data/d1/")
1516
flight_time_df2 = spark.read.json("data/d2/")
1617

18+
# 这种配置将确保我们在shuffle之后得到三个分区, 这意味着有三个reduce交换。
1719
spark.conf.set("spark.sql.shuffle.partitions", 3)
1820

1921
join_expr = flight_time_df1.id == flight_time_df2.id
20-
join_df = flight_time_df1.join(flight_time_df2, join_expr, "inner")
22+
# join_df = flight_time_df1.join(flight_time_df2, join_expr, "inner")
23+
join_df = flight_time_df1.join(broadcast(flight_time_df2), join_expr, "inner")
2124

2225
join_df.collect()
23-
input("press a key to stop...")
26+
input("press a key to stop...")

0 commit comments

Comments
 (0)