Nitin Agrawal
Contact -
  • Home
  • Interviews
    • Secret Receipe
    • InterviewFacts
    • Resume Thoughts
    • Daily Coding Problems
    • BigShyft
    • CompanyInterviews >
      • InvestmentBanks >
        • ECS
        • Bank Of America
        • WesternUnion
        • WellsFargo
      • ProductBasedCompanies >
        • CA Technologies
        • Model N India
        • Verizon Media
        • Oracle & GoJek
        • IVY Computec
        • Nvidia
        • ClearWaterAnalytics
        • ADP
        • ServiceNow
        • Pubmatic
        • Expedia
        • Amphora
        • CDK Global
        • CDK Global
        • Epic
        • Sincro-Pune
        • Whiz.AI
        • ChargePoint
      • ServiceBasedCompanies >
        • Altimetrik
        • ASG World Wide Pvt Ltd
        • Paraxel International & Pramati Technologies Pvt Ltd
        • MitraTech
        • Intelizest Coding Round
        • EPAM
    • Interviews Theory
  • Programming Languages
    • Java Script >
      • Tutorials
      • Code Snippets
    • Reactive Programming >
      • Code Snippets
    • R
    • DataStructures >
      • LeetCode Problems
      • AnagramsSet
    • Core Java >
      • Codility
      • Program Arguments OR VM arguments & Environment variables
      • Java Releases
      • Threading >
        • ThreadsOrder
        • ProducerConsumer
        • Finalizer
        • RaceCondition
        • Executors
        • Future Or CompletableFuture
      • Important Points
      • Immutability
      • Dictionary
      • URL Validator
    • Julia
    • Python >
      • Decorators
      • String Formatting
      • Generators_Threads
      • JustLikeThat
    • Go >
      • Tutorial
      • CodeSnippet
      • Go Routine_Channel
      • Suggestions
    • Methodologies & Design Patterns >
      • Design Principles
      • Design Patterns >
        • TemplatePattern
        • Adapter Design Pattern
        • Decorator
        • Proxy
        • Lazy Initialization
        • CombinatorPattern
        • RequestChaining
        • Singleton >
          • Singletons
  • Frameworks
    • Apache Velocity
    • Spring >
      • Spring Boot >
        • CustomProperties
        • ExceptionHandling
        • Issues
      • Quick View
    • Rest WebServices >
      • Interviews
      • Swagger
    • Cloudera BigData >
      • Ques_Ans
      • Hive
      • Apache Spark >
        • ApacheSpark Installation
        • SparkCode
        • Sample1
        • DataFrames
        • RDDs
        • SparkStreaming
        • SparkFiles
    • Integration >
      • Apache Camel
    • Testing Frameworks >
      • JUnit >
        • JUnit Runners
      • EasyMock
      • Mockito >
        • Page 2
      • TestNG
    • Blockchain >
      • Ethereum Smart Contract
      • Blockchain Java Example
    • Microservices >
      • Messaging Formats
      • Design Patterns
    • AWS >
      • Honeycode
    • Dockers >
      • GitBash
      • Issues
  • Databases
    • MySql
    • Oracle >
      • Interview1
      • SQL Queries
    • Elastic Search
  • Random issues
    • TOAD issue
    • Architect's suggestions
  • Your Views
Here I will put some information about usage of Dataframes in Spark.

​You can get first some good understanding from the below URL -
https://indatalabs.com/blog/data-engineering/convert-spark-rdd-to-dataframe-dataset

Now below are some commands which will be used during the journey on Dataframes :
Creating dataframe from existing RDD – val df = rdd.toDF()
Check the schema of above dataframe : df.printSchema
Check the contents of above dataframe : df.show

As you can see from various online documents that we can create the dataframes from various sources like RDDs or tables etc. But here I will try to share some examples to help you in your journey on Spark.
​In below example I will be considering the RDDs & request you to check the solutions carefully as little issue can take your hours to resolve or find the solution.....So check the complete statements properly.

​Example 1 :We have following 3 files, which you can create by yourself as given  schema -
                    a) employees : It has comma separated data with many such rows like E01,Nitin
​                    b) salary         : It has comma separated data with many such rows like E01,10000
​                    c) managers   : It has comma separated data with many such rows like E01,ABC
​          Question : Now we have to print the data by combining the from all the above files & data should be below format -
​                                          ID   Name    Salary     Manager Name
                            So we need to have the column names also like above one column above 'Manager Name' even has space.
​           Solution : Below are the statements which will be executed to get the data in above format & condition -
 
sc.textFile("file:///home/cloudera/Public/employees").map(l=>(l.split(",")(0), l.split(",")(1))).toDF().withColumnRenamed("_1","ID").withColumnRenamed("_2", "Name").registerTempTable("employees")
sc.textFile("file:///home/cloudera/Public/salary").map(l=>(l.split(",")(0), l.split(",")(1))).toDF().withColumnRenamed("_1","ID").withColumnRenamed("_2", "Salary").registerTempTable("salary")
sc.textFile("file:///home/cloudera/Public/managers").map(l=>(l.split(",")(0), l.split(",")(1))).toDF().withColumnRenamed("_1","ID").withColumnRenamed("_2", "Manager Name").registerTempTable("managers")
sqlContext.sql("select e.id,e.name,s.salary,`Manager Name` from employees e, salary s, managers m where e.id = s.id and e.id = m.id").show​

​I have used chaining causing long statements, we can break this to have intermediate variables & make the statements smaller.
​We can even use the RDDs directly to join the data here based on the key which is 'ID' but to get the required format, I had to use dataframes.
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
​Example 2 : Suppose we have created RDD & populated the data & even filtered the required data as shown in www.nitinagrawal.com/rdds.html
​                     Now if we want to query the data using the sql like queries & want to convert the RDDs to Dataframes or Datasets, then we can create those
​                     in following ways -
                 a)  rdd.toDF()
​                 b)  rdd.toDS()
​                 c) Other way to create the Dataframes is shown below-
                      scala>import org.apache.spark.sql.SQLContext
                      scala> val sqlContext = new SQLContext(sc)
                      scala> import sqlContext.implicits._
                      scala> val people = sqlContext.createDataFrame(pop)
                Now we can display, suppose top 2 records like shown below -
                 scala> people.show(2)

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
​
​​Example 3 : ​Now we created a collection of persons as part of RDDs previous on 
www.nitinagrawal.com/rdds.html in Example 2
​                     Here we will create dataframes from those RDDs & then will process the data by grouping & calculate the avg & total salary in each group.
​Solution :  Now suppose we have 'people' as RDD created & populated with data earlier. Now after this we will create dataframe from this & will work on 
​                   that. Below I have pasted the results also in 'Red'
​scala> val dfPeople = people.toDF()
dfPeople: org.apache.spark.sql.DataFrame = [name: string, age: int, salary: double]

>>>>>>>>>>>>>Below is one way to create the Dataset from dataframe created, though I will not using it here, mentioned just for one example -
scala> val dsPeople = dfPeople.as[Person]
dsPeople: org.apache.spark.sql.Dataset[Person] = [name: string, age: int, salary: double]

scala> val dfCounts = dfPeople.groupBy(round(($"age"/10)*10,0)).count
dfCounts: org.apache.spark.sql.DataFrame = [round(((age / 10) * 10),0): double, count: bigint]

scala> dfCounts.show
+--------------------------+-----+                                             
|round(((age / 10) * 10),0)|count|
+--------------------------+-----+
|                       7.0|   10|
|                      60.0|    6|
|                      35.0|    4|
|                      21.0|    9|
|                      81.0|   11|
|                      41.0|    8|
|                      68.0|    6|
|                      24.0|   11|
|                      93.0|   10|
|                       1.0|   10|
|                      27.0|   10|
|                      80.0|   12|
|                       9.0|    8|
|                      47.0|    9|
|                      53.0|   16|
|                      67.0|   14|
|                      92.0|    9|
|                      30.0|    9|
|                      34.0|   13|
|                      12.0|   12|
+--------------------------+-----+
only showing top 20 rows


​As we can see in above output, column name is not friendly here, so below lets change the column names for our use -
scala> val dfCountsNamed = dfCounts.withColumnRenamed("round(((age / 10) * 10),0)","Ages").withColumnRenamed("count","Counts")
dfCountsNamed: org.apache.spark.sql.DataFrame = [Ages: double, Counts: bigint]

scala> dfCountsNamed.show
+----+------+                                                                  
|Ages|Counts|
+----+------+
| 7.0|    10|
|60.0|     6|
|35.0|     4|
|21.0|     9|
|81.0|    11|
|41.0|     8|
|68.0|     6|
|24.0|    11|
|93.0|    10|
| 1.0|    10|
|27.0|    10|
|80.0|    12|
| 9.0|     8|
|47.0|     9|
|53.0|    16|
|67.0|    14|
|92.0|     9|
|30.0|     9|
|34.0|    13|
|12.0|    12|
+----+------+
only showing top 20 rows

Now it looks better.....right? Same way we will create dataframes & will do renaming of the columns.

scala> val dfSalarySum = dfPeople.groupBy(round(($"age"/10)*10,0)).agg(sum($"salary"))
dfSalarySum: org.apache.spark.sql.DataFrame = [round(((age / 10) * 10),0): double, sum(salary): double]

scala> val dfSalarySumNamed = dfSalarySum.withColumnRenamed("round(((age / 10) * 10),0)","Ages").withColumnRenamed("sum(salary)","Sums")
dfSalarySumNamed: org.apache.spark.sql.DataFrame = [Ages: double, Sums: double]

scala> val dfSalaryAvg = dfPeople.groupBy(round(($"age"/10)*10,0)).agg(avg($"salary"))
dfSalaryAvg: org.apache.spark.sql.DataFrame = [round(((age / 10) * 10),0): double, avg(salary): double]

scala> val dfSalaryAvgNamed = dfSalaryAvg.withColumnRenamed("round(((age / 10) * 10),0)","Ages").withColumnRenamed("avg(salary)","Avg")
dfSalaryAvgNamed: org.apache.spark.sql.DataFrame = [Ages: double, Avg: double]

​
scala> val res1 = dfCountsNamed.join(dfSalarySumNamed,Seq("Ages"))
res1: org.apache.spark.sql.DataFrame = [Ages: double, Counts: bigint, Sums: double]

scala> val res = res1.join(dfSalaryAvgNamed,Seq("Ages"))
res: org.apache.spark.sql.DataFrame = [Ages: double, Counts: bigint, Sums: double, Avg: double]

​Below is the result of the work done above -

scala> res.show
+----+------+------------------+-------------------+                           
|Ages|Counts|              Sums|                Avg|
+----+------+------------------+-------------------+
| 7.0|    10| 5.115037947364884| 0.5115037947364884|
|21.0|     9| 5.074761901054652| 0.5638624334505169|
|35.0|     4|2.5222579296341796| 0.6305644824085449|
|60.0|     6|1.9025339464995012| 0.3170889910832502|
|81.0|    11| 4.643281578846819| 0.4221165071678927|
|24.0|    11|5.7773278650895925| 0.5252116240990539|
|41.0|     8|3.1941866564147006|0.39927333205183757|
|68.0|     6|3.6103797264786093| 0.6017299544131015|
|93.0|    10| 4.411033980253771| 0.4411033980253771|
| 1.0|    10| 4.963761526922989|0.49637615269229884|
| 9.0|     8|5.6554459923336236| 0.7069307490417029|
|27.0|    10|4.4041057870997955|0.44041057870997957|
|47.0|     9|  4.15880166099757|0.46208907344417444|
|80.0|    12| 7.277746458144803| 0.6064788715120669|
|30.0|     9|5.9201879238122395|   0.65779865820136|
|53.0|    16| 9.871173617382798| 0.6169483510864249|
|67.0|    14| 6.815161380975076|0.48679724149821973|
|92.0|     9| 3.819662203984193| 0.4244069115537992|
|12.0|    12| 3.532754674979648|0.29439622291497064|
|34.0|    13|3.8978548226266208| 0.2998349863558939|
+----+------+------------------+-------------------+
only showing top 20 rows

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
​


Powered by Create your own unique website with customizable templates.