Here I will put some information about usage of Dataframes in Spark.
You can get first some good understanding from the below URL -
https://indatalabs.com/blog/data-engineering/convert-spark-rdd-to-dataframe-dataset
Now below are some commands which will be used during the journey on Dataframes :
Creating dataframe from existing RDD – val df = rdd.toDF()
Check the schema of above dataframe : df.printSchema
Check the contents of above dataframe : df.show
As you can see from various online documents that we can create the dataframes from various sources like RDDs or tables etc. But here I will try to share some examples to help you in your journey on Spark.
In below example I will be considering the RDDs & request you to check the solutions carefully as little issue can take your hours to resolve or find the solution.....So check the complete statements properly.
Example 1 :We have following 3 files, which you can create by yourself as given schema -
a) employees : It has comma separated data with many such rows like E01,Nitin
b) salary : It has comma separated data with many such rows like E01,10000
c) managers : It has comma separated data with many such rows like E01,ABC
Question : Now we have to print the data by combining the from all the above files & data should be below format -
ID Name Salary Manager Name
So we need to have the column names also like above one column above 'Manager Name' even has space.
Solution : Below are the statements which will be executed to get the data in above format & condition -
sc.textFile("file:///home/cloudera/Public/employees").map(l=>(l.split(",")(0), l.split(",")(1))).toDF().withColumnRenamed("_1","ID").withColumnRenamed("_2", "Name").registerTempTable("employees")
sc.textFile("file:///home/cloudera/Public/salary").map(l=>(l.split(",")(0), l.split(",")(1))).toDF().withColumnRenamed("_1","ID").withColumnRenamed("_2", "Salary").registerTempTable("salary")
sc.textFile("file:///home/cloudera/Public/managers").map(l=>(l.split(",")(0), l.split(",")(1))).toDF().withColumnRenamed("_1","ID").withColumnRenamed("_2", "Manager Name").registerTempTable("managers")
sqlContext.sql("select e.id,e.name,s.salary,`Manager Name` from employees e, salary s, managers m where e.id = s.id and e.id = m.id").show
I have used chaining causing long statements, we can break this to have intermediate variables & make the statements smaller.
We can even use the RDDs directly to join the data here based on the key which is 'ID' but to get the required format, I had to use dataframes.
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Example 2 : Suppose we have created RDD & populated the data & even filtered the required data as shown in www.nitinagrawal.com/rdds.html
Now if we want to query the data using the sql like queries & want to convert the RDDs to Dataframes or Datasets, then we can create those
in following ways -
a) rdd.toDF()
b) rdd.toDS()
c) Other way to create the Dataframes is shown below-
scala>import org.apache.spark.sql.SQLContext
scala> val sqlContext = new SQLContext(sc)
scala> import sqlContext.implicits._
scala> val people = sqlContext.createDataFrame(pop)
Now we can display, suppose top 2 records like shown below -
scala> people.show(2)
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Example 3 : Now we created a collection of persons as part of RDDs previous on www.nitinagrawal.com/rdds.html in Example 2
Here we will create dataframes from those RDDs & then will process the data by grouping & calculate the avg & total salary in each group.
Solution : Now suppose we have 'people' as RDD created & populated with data earlier. Now after this we will create dataframe from this & will work on
that. Below I have pasted the results also in 'Red'
scala> val dfPeople = people.toDF()
dfPeople: org.apache.spark.sql.DataFrame = [name: string, age: int, salary: double]
>>>>>>>>>>>>>Below is one way to create the Dataset from dataframe created, though I will not using it here, mentioned just for one example -
scala> val dsPeople = dfPeople.as[Person]
dsPeople: org.apache.spark.sql.Dataset[Person] = [name: string, age: int, salary: double]
scala> val dfCounts = dfPeople.groupBy(round(($"age"/10)*10,0)).count
dfCounts: org.apache.spark.sql.DataFrame = [round(((age / 10) * 10),0): double, count: bigint]
scala> dfCounts.show
+--------------------------+-----+
|round(((age / 10) * 10),0)|count|
+--------------------------+-----+
| 7.0| 10|
| 60.0| 6|
| 35.0| 4|
| 21.0| 9|
| 81.0| 11|
| 41.0| 8|
| 68.0| 6|
| 24.0| 11|
| 93.0| 10|
| 1.0| 10|
| 27.0| 10|
| 80.0| 12|
| 9.0| 8|
| 47.0| 9|
| 53.0| 16|
| 67.0| 14|
| 92.0| 9|
| 30.0| 9|
| 34.0| 13|
| 12.0| 12|
+--------------------------+-----+
only showing top 20 rows
As we can see in above output, column name is not friendly here, so below lets change the column names for our use -
scala> val dfCountsNamed = dfCounts.withColumnRenamed("round(((age / 10) * 10),0)","Ages").withColumnRenamed("count","Counts")
dfCountsNamed: org.apache.spark.sql.DataFrame = [Ages: double, Counts: bigint]
scala> dfCountsNamed.show
+----+------+
|Ages|Counts|
+----+------+
| 7.0| 10|
|60.0| 6|
|35.0| 4|
|21.0| 9|
|81.0| 11|
|41.0| 8|
|68.0| 6|
|24.0| 11|
|93.0| 10|
| 1.0| 10|
|27.0| 10|
|80.0| 12|
| 9.0| 8|
|47.0| 9|
|53.0| 16|
|67.0| 14|
|92.0| 9|
|30.0| 9|
|34.0| 13|
|12.0| 12|
+----+------+
only showing top 20 rows
Now it looks better.....right? Same way we will create dataframes & will do renaming of the columns.
scala> val dfSalarySum = dfPeople.groupBy(round(($"age"/10)*10,0)).agg(sum($"salary"))
dfSalarySum: org.apache.spark.sql.DataFrame = [round(((age / 10) * 10),0): double, sum(salary): double]
scala> val dfSalarySumNamed = dfSalarySum.withColumnRenamed("round(((age / 10) * 10),0)","Ages").withColumnRenamed("sum(salary)","Sums")
dfSalarySumNamed: org.apache.spark.sql.DataFrame = [Ages: double, Sums: double]
scala> val dfSalaryAvg = dfPeople.groupBy(round(($"age"/10)*10,0)).agg(avg($"salary"))
dfSalaryAvg: org.apache.spark.sql.DataFrame = [round(((age / 10) * 10),0): double, avg(salary): double]
scala> val dfSalaryAvgNamed = dfSalaryAvg.withColumnRenamed("round(((age / 10) * 10),0)","Ages").withColumnRenamed("avg(salary)","Avg")
dfSalaryAvgNamed: org.apache.spark.sql.DataFrame = [Ages: double, Avg: double]
scala> val res1 = dfCountsNamed.join(dfSalarySumNamed,Seq("Ages"))
res1: org.apache.spark.sql.DataFrame = [Ages: double, Counts: bigint, Sums: double]
scala> val res = res1.join(dfSalaryAvgNamed,Seq("Ages"))
res: org.apache.spark.sql.DataFrame = [Ages: double, Counts: bigint, Sums: double, Avg: double]
Below is the result of the work done above -
scala> res.show
+----+------+------------------+-------------------+
|Ages|Counts| Sums| Avg|
+----+------+------------------+-------------------+
| 7.0| 10| 5.115037947364884| 0.5115037947364884|
|21.0| 9| 5.074761901054652| 0.5638624334505169|
|35.0| 4|2.5222579296341796| 0.6305644824085449|
|60.0| 6|1.9025339464995012| 0.3170889910832502|
|81.0| 11| 4.643281578846819| 0.4221165071678927|
|24.0| 11|5.7773278650895925| 0.5252116240990539|
|41.0| 8|3.1941866564147006|0.39927333205183757|
|68.0| 6|3.6103797264786093| 0.6017299544131015|
|93.0| 10| 4.411033980253771| 0.4411033980253771|
| 1.0| 10| 4.963761526922989|0.49637615269229884|
| 9.0| 8|5.6554459923336236| 0.7069307490417029|
|27.0| 10|4.4041057870997955|0.44041057870997957|
|47.0| 9| 4.15880166099757|0.46208907344417444|
|80.0| 12| 7.277746458144803| 0.6064788715120669|
|30.0| 9|5.9201879238122395| 0.65779865820136|
|53.0| 16| 9.871173617382798| 0.6169483510864249|
|67.0| 14| 6.815161380975076|0.48679724149821973|
|92.0| 9| 3.819662203984193| 0.4244069115537992|
|12.0| 12| 3.532754674979648|0.29439622291497064|
|34.0| 13|3.8978548226266208| 0.2998349863558939|
+----+------+------------------+-------------------+
only showing top 20 rows
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
You can get first some good understanding from the below URL -
https://indatalabs.com/blog/data-engineering/convert-spark-rdd-to-dataframe-dataset
Now below are some commands which will be used during the journey on Dataframes :
Creating dataframe from existing RDD – val df = rdd.toDF()
Check the schema of above dataframe : df.printSchema
Check the contents of above dataframe : df.show
As you can see from various online documents that we can create the dataframes from various sources like RDDs or tables etc. But here I will try to share some examples to help you in your journey on Spark.
In below example I will be considering the RDDs & request you to check the solutions carefully as little issue can take your hours to resolve or find the solution.....So check the complete statements properly.
Example 1 :We have following 3 files, which you can create by yourself as given schema -
a) employees : It has comma separated data with many such rows like E01,Nitin
b) salary : It has comma separated data with many such rows like E01,10000
c) managers : It has comma separated data with many such rows like E01,ABC
Question : Now we have to print the data by combining the from all the above files & data should be below format -
ID Name Salary Manager Name
So we need to have the column names also like above one column above 'Manager Name' even has space.
Solution : Below are the statements which will be executed to get the data in above format & condition -
sc.textFile("file:///home/cloudera/Public/employees").map(l=>(l.split(",")(0), l.split(",")(1))).toDF().withColumnRenamed("_1","ID").withColumnRenamed("_2", "Name").registerTempTable("employees")
sc.textFile("file:///home/cloudera/Public/salary").map(l=>(l.split(",")(0), l.split(",")(1))).toDF().withColumnRenamed("_1","ID").withColumnRenamed("_2", "Salary").registerTempTable("salary")
sc.textFile("file:///home/cloudera/Public/managers").map(l=>(l.split(",")(0), l.split(",")(1))).toDF().withColumnRenamed("_1","ID").withColumnRenamed("_2", "Manager Name").registerTempTable("managers")
sqlContext.sql("select e.id,e.name,s.salary,`Manager Name` from employees e, salary s, managers m where e.id = s.id and e.id = m.id").show
I have used chaining causing long statements, we can break this to have intermediate variables & make the statements smaller.
We can even use the RDDs directly to join the data here based on the key which is 'ID' but to get the required format, I had to use dataframes.
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Example 2 : Suppose we have created RDD & populated the data & even filtered the required data as shown in www.nitinagrawal.com/rdds.html
Now if we want to query the data using the sql like queries & want to convert the RDDs to Dataframes or Datasets, then we can create those
in following ways -
a) rdd.toDF()
b) rdd.toDS()
c) Other way to create the Dataframes is shown below-
scala>import org.apache.spark.sql.SQLContext
scala> val sqlContext = new SQLContext(sc)
scala> import sqlContext.implicits._
scala> val people = sqlContext.createDataFrame(pop)
Now we can display, suppose top 2 records like shown below -
scala> people.show(2)
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Example 3 : Now we created a collection of persons as part of RDDs previous on www.nitinagrawal.com/rdds.html in Example 2
Here we will create dataframes from those RDDs & then will process the data by grouping & calculate the avg & total salary in each group.
Solution : Now suppose we have 'people' as RDD created & populated with data earlier. Now after this we will create dataframe from this & will work on
that. Below I have pasted the results also in 'Red'
scala> val dfPeople = people.toDF()
dfPeople: org.apache.spark.sql.DataFrame = [name: string, age: int, salary: double]
>>>>>>>>>>>>>Below is one way to create the Dataset from dataframe created, though I will not using it here, mentioned just for one example -
scala> val dsPeople = dfPeople.as[Person]
dsPeople: org.apache.spark.sql.Dataset[Person] = [name: string, age: int, salary: double]
scala> val dfCounts = dfPeople.groupBy(round(($"age"/10)*10,0)).count
dfCounts: org.apache.spark.sql.DataFrame = [round(((age / 10) * 10),0): double, count: bigint]
scala> dfCounts.show
+--------------------------+-----+
|round(((age / 10) * 10),0)|count|
+--------------------------+-----+
| 7.0| 10|
| 60.0| 6|
| 35.0| 4|
| 21.0| 9|
| 81.0| 11|
| 41.0| 8|
| 68.0| 6|
| 24.0| 11|
| 93.0| 10|
| 1.0| 10|
| 27.0| 10|
| 80.0| 12|
| 9.0| 8|
| 47.0| 9|
| 53.0| 16|
| 67.0| 14|
| 92.0| 9|
| 30.0| 9|
| 34.0| 13|
| 12.0| 12|
+--------------------------+-----+
only showing top 20 rows
As we can see in above output, column name is not friendly here, so below lets change the column names for our use -
scala> val dfCountsNamed = dfCounts.withColumnRenamed("round(((age / 10) * 10),0)","Ages").withColumnRenamed("count","Counts")
dfCountsNamed: org.apache.spark.sql.DataFrame = [Ages: double, Counts: bigint]
scala> dfCountsNamed.show
+----+------+
|Ages|Counts|
+----+------+
| 7.0| 10|
|60.0| 6|
|35.0| 4|
|21.0| 9|
|81.0| 11|
|41.0| 8|
|68.0| 6|
|24.0| 11|
|93.0| 10|
| 1.0| 10|
|27.0| 10|
|80.0| 12|
| 9.0| 8|
|47.0| 9|
|53.0| 16|
|67.0| 14|
|92.0| 9|
|30.0| 9|
|34.0| 13|
|12.0| 12|
+----+------+
only showing top 20 rows
Now it looks better.....right? Same way we will create dataframes & will do renaming of the columns.
scala> val dfSalarySum = dfPeople.groupBy(round(($"age"/10)*10,0)).agg(sum($"salary"))
dfSalarySum: org.apache.spark.sql.DataFrame = [round(((age / 10) * 10),0): double, sum(salary): double]
scala> val dfSalarySumNamed = dfSalarySum.withColumnRenamed("round(((age / 10) * 10),0)","Ages").withColumnRenamed("sum(salary)","Sums")
dfSalarySumNamed: org.apache.spark.sql.DataFrame = [Ages: double, Sums: double]
scala> val dfSalaryAvg = dfPeople.groupBy(round(($"age"/10)*10,0)).agg(avg($"salary"))
dfSalaryAvg: org.apache.spark.sql.DataFrame = [round(((age / 10) * 10),0): double, avg(salary): double]
scala> val dfSalaryAvgNamed = dfSalaryAvg.withColumnRenamed("round(((age / 10) * 10),0)","Ages").withColumnRenamed("avg(salary)","Avg")
dfSalaryAvgNamed: org.apache.spark.sql.DataFrame = [Ages: double, Avg: double]
scala> val res1 = dfCountsNamed.join(dfSalarySumNamed,Seq("Ages"))
res1: org.apache.spark.sql.DataFrame = [Ages: double, Counts: bigint, Sums: double]
scala> val res = res1.join(dfSalaryAvgNamed,Seq("Ages"))
res: org.apache.spark.sql.DataFrame = [Ages: double, Counts: bigint, Sums: double, Avg: double]
Below is the result of the work done above -
scala> res.show
+----+------+------------------+-------------------+
|Ages|Counts| Sums| Avg|
+----+------+------------------+-------------------+
| 7.0| 10| 5.115037947364884| 0.5115037947364884|
|21.0| 9| 5.074761901054652| 0.5638624334505169|
|35.0| 4|2.5222579296341796| 0.6305644824085449|
|60.0| 6|1.9025339464995012| 0.3170889910832502|
|81.0| 11| 4.643281578846819| 0.4221165071678927|
|24.0| 11|5.7773278650895925| 0.5252116240990539|
|41.0| 8|3.1941866564147006|0.39927333205183757|
|68.0| 6|3.6103797264786093| 0.6017299544131015|
|93.0| 10| 4.411033980253771| 0.4411033980253771|
| 1.0| 10| 4.963761526922989|0.49637615269229884|
| 9.0| 8|5.6554459923336236| 0.7069307490417029|
|27.0| 10|4.4041057870997955|0.44041057870997957|
|47.0| 9| 4.15880166099757|0.46208907344417444|
|80.0| 12| 7.277746458144803| 0.6064788715120669|
|30.0| 9|5.9201879238122395| 0.65779865820136|
|53.0| 16| 9.871173617382798| 0.6169483510864249|
|67.0| 14| 6.815161380975076|0.48679724149821973|
|92.0| 9| 3.819662203984193| 0.4244069115537992|
|12.0| 12| 3.532754674979648|0.29439622291497064|
|34.0| 13|3.8978548226266208| 0.2998349863558939|
+----+------+------------------+-------------------+
only showing top 20 rows
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------