apache spark - Partition Location of RDD/Dataframe

I have a (pretty large, think 10e7 Rows) DataFrame from which i filter elements based on some property

val res = data.filter(data(FieldNames.myValue) === 2).select(pk.name, FieldName.myValue) 

My DataFrame has n Partitions data.rdd.getNumPartitions

Now i want to know from which partition my rows originated. I am aware that I could just iterate through all partitions with something like this

val temp = res.first() //or foreach, this is just an example
data.foreachPartition(f => {
    f.exists(row => row.get(0)==temp.get(0))
    //my code here
}) //compare PKs

or data.rdd.mapPartitionsWithIndex((idx, f) => ...)

However, this seems excessive and also not very performant if my results and my DataFrame becomes large.

Is there a Spark-way to do this after i've performed the filter()-operation?

Or alternatively, is there a way to rewrite/ an alternative to the filter()-statement so that it returns the origin of the row?

I could also save the partition location in my DataFrame and update that on repartitioning, but i'd rather do it in a spark-way

(The only similar question i found was here, and neither the question nor the comment is very helpful. I also found this which might be similar but not the same)

Thanks in advance for any help/pointers and i apologize if i missed a question similar to mine that has been answered already.

1 Answer

  1. Frank- Reply

    2019-11-14

    Partition numbers/counts are not stable as Spark will perform automatic expansion & reduction in partitioning. This means that input partition count may not be the same as the input file count, for example.

    The general pattern in these situations is to create some type of composite key based on the data in each input file. If the key is big you can hash it to reduce the size. If you don't care about collisions much, use Murmur3. If you are concerned about collisions, use MD5, which is still quite fast.

    If the only unique feature you have is the path of the input file, you'll have to add the file path as the distinguishing column. Here is a way to do this:

    val paths = Seq(...)
    val df = paths
      .map { path => 
        sqlContext.read.parquet(path)
          .withColumn("path", lit(path))
      }
      .reduceLeft(_ unionAll _)
    

    The idea is simple: read the input files one at a time, add a unique column associated with them, and then combine them together using UNION ALL.

Leave a Reply

Your email address will not be published. Required fields are marked *

You can use these HTML tags and attributes <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>