Click here to Skip to main content
15,881,882 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
Input:
1. Given below input Partner data in comma separated file. (file name - Partner.csv).
2. Given List of Invalid Party Ids.

Requirement:
1 Read the input Partner csv file and Invalid party ids list as Dataframes using spark APIs.
3. Retrieve the latest record per party id based on the Last updated Date from input Partner Data.
4. Retrieve the valid Party ids from the Latest record DataFrame based on lookup from Invalid Party DataFrame.

Input :
1)Partner Input DataFrame:
prty_id country role lst_upd_dt
P1 IN Partner 2022/03/01
P1 IN Default 2020/12/31
P2 JP Customer 2021/07/25
P2 JP VSI 2022/01/01
P2 JP Default 2020/12/31
P3 CS Vendor 2021/05/18
P3 CS Default 2020/09/09
P4 US Partner 2021/04/23
P4 US Customer 2022/03/12
P5 CA Partner 2020/08/07
P5 CA Partner 2022/10/01
P6 IN Customer 2019/03/01
P7 CN Vendor 2022/02/01
P7 CN Default 2020/12/31
P8 BZ Vendor 2020/09/15


Partner.csv file contents are in the format:
Party Id,Country,Role,Last Updated Date
P1,IN,Partner,2022/03/01
P1,IN,Default,2020/12/31
……………………….. etc

2) Invalid Party Id Records
prty_id
P1
P7
P4

Expected Output:
1)Latest Party Records (Primary Key : prty_id, Latest Record identified using : lst_upd_dt)
prty_id country role lst_upd_dt
P1 IN Partner 2022/03/01
P2 JP VSI 2022/01/01
P3 CS Vendor 2021/05/18
P4 US Customer 2022/03/12
P5 CA Partner 2022/10/01
P6 IN Customer 2019/03/01
P7 CN Vendor 2022/02/01
P8 BZ Vendor 2020/09/15

2) Valid Party Ids from Latest Record.

prty_id country role lst_upd_dt
P2 JP VSI 2022/01/01
P3 CS Vendor 2021/05/18
P5 CA Partner 2022/10/01
P6 IN Customer 2019/03/01
P8 BZ Vendor 2020/09/15

What I have tried:

val df = spark.read.option("header",true)
.csv("/FileStore/tables/prty.csv")
import org.apache.spark.sql.functions._
val df2=df.createOrReplaceTempView("Contracts")
//val orders=Contracts.select("lst_upd_dt")
df2.show()
Posted
Updated 29-Aug-22 17:11pm
Comments
Richard MacCutchan 27-Aug-22 4:10am    
Do you have a question?
Silpa Silpa 27-Aug-22 4:33am    
: Spark Scala coding exercise based on below requirement.
Expectation is to implement all the requirements using latest SPARK API (not sql) and functionalities as per our cluster upgraded version (spark version - 2.4.0, scala -2.11.8).

Input:
1. Given below input Partner data in comma separated file. (file name - Partner.csv).
2. Given List of Invalid Party Ids.

Requirement:
1 Read the input Partner csv file and Invalid party ids list as Dataframes using spark APIs.
3. Retrieve the latest record per party id based on the Last updated Date from input Partner Data.
4. Retrieve the valid Party ids from the Latest record DataFrame based on lookup from Invalid Party DataFrame.

Richard MacCutchan 27-Aug-22 4:53am    
Yes, you already posted that detail. So what is your actual question? If you are expecting someone here to do your work for you, then I am afraid you will be disappointed. This forum is here to help people with problems in the code that they write, not to do their work.
Silpa Silpa 27-Aug-22 5:05am    
Oh ok thank you for your confirmation.. i am not expecting some one to write full code for me.. i am using sql and i also know how to filter in sql but in this code they are asking about spark api i don't know how to filter using in spark api scala. That thing i don't know..so i posted full question for clarification only..i am just searching for example ..not for full code.. thankyou.. i think i reached wrong site...
Richard MacCutchan 27-Aug-22 5:34am    
Well again, this is not a teaching site, it is named Quick answers for a reason. if you want to understand how to use spark then you should go to Overview - Spark 3.3.0 Documentation[^].

1 solution

Here, it has an examples that you seek: Spark DataFrame Where Filter | Multiple Conditions - Spark by {Examples}[^]

Java
package com.sparkbyexamples.spark.dataframe

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ArrayType, StringType, StructType}
import org.apache.spark.sql.functions.array_contains

object FilterExample extends App{
  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val arrayStructureData = Seq(
    Row(Row("James","","Smith"),List("Java","Scala","C++"),"OH","M"),
    Row(Row("Anna","Rose",""),List("Spark","Java","C++"),"NY","F"),
    Row(Row("Julia","","Williams"),List("CSharp","VB"),"OH","F"),
    Row(Row("Maria","Anne","Jones"),List("CSharp","VB"),"NY","M"),
    Row(Row("Jen","Mary","Brown"),List("CSharp","VB"),"NY","M"),
    Row(Row("Mike","Mary","Williams"),List("Python","VB"),"OH","M")
  )

  val arrayStructureSchema = new StructType()
    .add("name",new StructType()
    .add("firstname",StringType)
    .add("middlename",StringType)
    .add("lastname",StringType))
    .add("languages", ArrayType(StringType))
    .add("state", StringType)
    .add("gender", StringType)

  val df = spark.createDataFrame(
   spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
  df.printSchema()
  df.show()

  //Condition
  df.filter(df("state") === "OH")
    .show(false)

  //SQL Expression
  df.filter("gender == 'M'")
    .show(false)

  //multiple condition
  df.filter(df("state") === "OH" && df("gender") === "M")
    .show(false)

  //Array condition
  df.filter(array_contains(df("languages"),"Java"))
    .show(false)

  //Struct condition
  df.filter(df("name.lastname") === "Williams")
    .show(false)
}
 
Share this answer
 

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900