51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

如何使用Pyspark获取跨多个文件排序的Parquet行组统计信息?

英文:

How to get Parquet row groups stats sorted across multiple files with Pyspark?

问题 {#heading}

你可以尝试使用repartition方法来改变数据分区的分布,从而达到你想要的效果。例如:

df = df.repartition(2, 'longColumn')

这将会根据longColumn列重新分区成两个分区,这可能会改变统计信息的分布情况。你可以根据需要调整分区的数量,以达到你想要的结果。 英文:

My process is creating as outcome multiple parquet files. In my specific case 64 as the number of final partitions. Single file is around ~400MB divided by 3 row groups of ~128MB each one. If I look at single file, every row group has own stats min/max which is well distributed per column. For example, given a long column that has for one file (part-00000-*):

- group 1 - min:102 max: 9992
- group 2 - min:9994 max: 21990
- group 3 - min:22098 max: 35764

and the same column on the next file (part-00001-*) with similar distribution

- group 1 - min:99 max: 9882
- group 2 - min:9980 max: 21979
- group 3 - min:22018 max: 32764

for a total of 2 files.
What I need is having a distribution like

part-00000-*:
- group 1 - min:99 max: 8662
- group 2 - min:8994 max: 13986
- group 3 - min:14333 max: 19845

part-00001-\*

`
`
* group 1 - min:19877 max: 25621


* group 2 - min:25654 max: 30091



* `group 3 - min:31094 max: 35764
  `

How can I get that when writing files in PySpark? I know I can simply sort by df.orderBy('longColumn') but that affects performance too much so I am looking for an alternative.

答案1 {#1}

得分: 1

假设你的longColumn中的值分布比较均匀,也许可以采用repartitionByRange接着是sortWithinPartitions来优化策略。

df.
  repartitionByRange(64, "longColumn").
  sortWithinPartitions("longColumn", ascending=True).
  write.parquet("myFile.parquet")

这样,你就能避免在一个执行器上收集和排序整个数据框。 英文:

Assuming more or less uniform distribution of values within your longColumn, perhaps a repartitionByRange followed by sortWithinPartitions would be a better strategy.

df.
  repartitionByRange(64,"longColumn").
  sortWithinPartitions("longColumn",ascending=True).
  write.parquet("myFile.parquet")

This way, you'll avoid collecting and sorting the whole dataframe on one executor.

答案2 {#2}

得分: 0

你可以通过确保在写入文件之前对该列进行全局排序来实现这一点。假设你的数据框名为 df,你的长列名为 longColumn

然后你可以这样做:

df.orderBy("longColumn").write.parquet("myFile.parquet")

如果你接着检查 longColumn 中的行组的最小/最大值,你会发现你所描述的内容。 英文:

You can achieve this by making sure you have a global ordering on that column before you write your file. Let's say your dataframe is called df and your long column is called longColumn.

Then you do something like:

df.orderBy("longColumn").write.parquet("myFile.parquet")

If you then check the min/max values for your row groups in longColumn you will see you have what you just described.


赞(1)
未经允许不得转载:工具盒子 » 如何使用Pyspark获取跨多个文件排序的Parquet行组统计信息?