package sevsu.spark import java.nio.file.Paths import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object Application { private val conf: SparkConf = new SparkConf() .setMaster("local[*]") .setAppName("spark_example") .set("spark.ui.showConsoleProgress", "false") private val sc: SparkContext = getSparkContext(conf) private val resourcesRoot: String = this.getClass.getResource("/").toString private val personPath: String = resourcesRoot + "person.csv" private val apartmentPath: String = resourcesRoot + "apartment.csv" case class Person(id: Int, name: String) case class Apartment(id_apartment: Int, id_human: Int, num_rooms: Int, address: String) def main(args: Array[String]): Unit = { val rawPersonRdd: RDD[String] = sc.textFile(personPath) val rawApartmentRdd: RDD[String] = sc.textFile(apartmentPath) val persons = rawPersonRdd.map(strPerson => { strPerson.split(",").map(_.trim) match { case Array(id, name) => Person(id.toInt, name) } }) val apartments = rawApartmentRdd.map(strPerson => { strPerson.split(",").map(_.trim) match { case Array(id_apartment, id_human, num_rooms, address) => Apartment(id_apartment.toInt, id_human.toInt, num_rooms.toInt, address) } }) // ====== Task 1 ====== val personCounter: RDD[(Int, Int)] = apartments.map(item => (item.id_human, 1)) val numPersonApartments = personCounter.reduceByKey((a, b) => a + b) val numApartmentsPerson = numPersonApartments.map(_.swap).groupByKey().sortByKey() println(numApartmentsPerson.collect().mkString("\n")) // ====== Task 2 ====== val personPairRDD = persons.map(item => (item.id, item.name)) val joined = personPairRDD join numPersonApartments val numPersonNameApartments = joined.map(item => item._2) println(numPersonNameApartments.collect().mkString("\n")) // ====== Task 3 ====== val personApartmentsAddress = apartments .filter(_.num_rooms > 2) .map(item => (item.id_human, item.address) ) val personNameAddressWithId = personApartmentsAddress join personPairRDD val personNameAddress = personNameAddressWithId.map(item => item._2) println(personNameAddress.collect().mkString("\n")) sc.stop() } private def getSparkContext(conf: SparkConf): SparkContext = { if (System.getProperty("os.name").toLowerCase.contains("windows")) { System.setProperty( "hadoop.home.dir", Paths.get(this.getClass.getResource("/winutils/hadoop-2.7.1/").toURI).toString ) } new SparkContext(conf) } }