51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

如何在PySpark中将DataFrame进行转换?

英文:

How to transform in DataFrame in PySpark?

问题 {#heading}

以下是翻译好的部分:

我在 Py Spark 中有一个数据框,其中包含列:id、name、value。
列名应为每个id取值`A、B、C`。value列包含数值。

样本数据框:

data = [
    (1, "A", 0),
    (1, "B", 2),
    (1, "C", 0),
    (2, "A", 5),
    (2, "B", 0),
    (2, "C", 2),
    (3, "A", 7),
    (3, "B", 8),
    (3, "C", 9),
]
columns = ["id", "event_name", "event_value"]
df = spark.createDataFrame(data, columns)
df.show()

作为输出,需要一个具有以下列的数据框:id、event_name、event_value,根据以下条件:

  1. 每个id在某一行中有值A,如果对于name = A的行中value为0,则删除该id的所有value为0的行,仅保留name为A或value > 0的行。
  2. 每个id在某一行中都有值A,如果在name = A的行中value > 0,则不删除该id的任何行,无论该id的value列中有什么值。

因此,我需要的结果如下:

id  | name | value
----|------|------
1   | A    | 0 
1   | B    | 2
2   | A    | 5
2   | B    | 0
2   | C    | 2
3   | A    | 7
3   | B    | 8
3   | C    | 9

如何在 Py Spark 中实现这个目标? 英文:

I have a data frame in Py Spark with columns: id, name, value.
The column name should take the values A,B,C for each id. The value column has numeric values.

Sample data frame:`

data = [
    (1, "A", 0),
    (1, "B", 2),
    (1, "C", 0),
    (2, "A", 5),
    (2, "B", 0),
    (2, "C", 2),
    (3, "A", 7),
    (3, "B", 8),
    (3, "C", 9),
]
columns = ["id", "event_name", "event_value"]
df = spark.createDataFrame(data, columns)
df.show()

`id  | name | value
----|------|------
1   | A    | 0 
1   | B    | 2
1   | C    | 0
2   | A    | 5
2   | B    | 0
2   | C    | 2
3   | A    | 7
3   | B    | 8
3   | C    | 9`

`As output it needs a data frame with columns: id, event_name, event_value according to the following conditions:

  1. each id has value A in one of the rows, if for the row with name = A in the row value is 0 then delete for this id all rows where it has value 0 in value, leave only the row where in name is value A or value > 0
  2. each id has the value A in one of the rows, if for a row with name = A there is a value > 0 in the value column, then do not delete any row for that id, no matter what values are in the value for that id

So, as a result I need something like below:
`

`id  | name | value
----|------|------
1   | A    | 0 
1   | B    | 2
2   | A    | 5
2   | B    | 0
2   | C    | 2
3   | A    | 7
3   | B    | 8
3   | C    | 9`

How can I do that in Py Spark ?

答案1 {#1}

得分: 1

按照id对数据框进行分区,创建一个布尔标志以检查在任何行中event_nameevent_value是否为('A', 0),然后将此标志与其他提供的条件一起用于对数据框进行filter

exp = F.expr("event_name = 'A' AND event_value = 0").cast('int')
counts = F.sum(exp).over(Window.partitionBy('id'))

mask = ~((F.col('event_value') == 0) & (counts > 0)) | (F.col('event_name') == 'A')
df1 = df.withColumn('mask', mask).filter('mask')

结果如下:

+---+----------+-----------+----+
| id|event_name|event_value|mask|
+---+----------+-----------+----+
|  1|         A|          0|true|
|  1|         B|          2|true|
|  2|         A|          5|true|
|  2|         B|          0|true|
|  2|         C|          2|true|
|  3|         A|          7|true|
|  3|         B|          8|true|
|  3|         C|          9|true|
+---+----------+-----------+----+

英文:

Partition the dataframe by id to create a boolean flag to check the condition when event_name and event_value are ('A', 0) in any of the rows then use this flag along with the other provided conditions to filter the dataframe.

exp = F.expr("event_name = 'A' AND event_value = 0").cast('int')
counts = F.sum(exp).over(Window.partitionBy('id'))
`mask = ~((F.col('event_value') == 0) & (counts > 0)) | (F.col('event_name') == 'A')
df1 = df.withColumn('mask', mask).filter('mask')
`


+---+----------+-----------+----+
| id|event_name|event_value|mask|
+---+----------+-----------+----+
|  1|         A|          0|true|
|  1|         B|          2|true|
|  2|         A|          5|true|
|  2|         B|          0|true|
|  2|         C|          2|true|
|  3|         A|          7|true|
|  3|         B|          8|true|
|  3|         C|          9|true|
+---+----------+-----------+----+

答案2 {#2}

得分: 0

使用window函数,然后根据要求进行filter。请查看下面的代码:

scala> spark.table("input").show(false)
+---+----------+-----------+
|id |event_name|event_value|
+---+----------+-----------+
|1  |A         |0          |
|1  |B         |2          |
|1  |C         |0          |
|2  |A         |5          |
|2  |B         |0          |
|2  |C         |2          |
|3  |A         |7          |
|3  |B         |8          |
|3  |C         |9          |
+---+----------+-----------+

scala> :paste
// 进入粘贴模式(ctrl-D完成)

spark.sql("""
    WITH wn_input AS (
    SELECT
        *,
        (RANK() OVER(PARTITION BY ID ORDER BY event_name, event_value)) AS rid
    FROM input
)
SELECT
    id,
    event_name AS name,
    event_value AS value
FROM wn_input WHERE not ( rid = 1 AND event_name = 'A' AND event_value = 0)
""").show(false)

// 退出粘贴模式,现在进行解释。

+---+----+-----+
|id |name|value|
+---+----+-----+
|1  |B   |2    |
|1  |C   |0    |
|2  |A   |5    |
|2  |B   |0    |
|2  |C   |2    |
|3  |A   |7    |
|3  |B   |8    |
|3  |C   |9    |
+---+----+-----+

英文:

Use window function & then filter based on requirement. Please check below code

scala> spark.table("input").show(false)
+---+----------+-----------+
|id |event_name|event_value|
+---+----------+-----------+
|1  |A         |0          |
|1  |B         |2          |
|1  |C         |0          |
|2  |A         |5          |
|2  |B         |0          |
|2  |C         |2          |
|3  |A         |7          |
|3  |B         |8          |
|3  |C         |9          |
+---+----------+-----------+

scala> :paste
// Entering paste mode (ctrl-D to finish)

spark.sql(\"\"\"
WITH wn_input AS (
SELECT
\*,
(RANK() OVER(PARTITION BY ID ORDER BY event_name, event_value)) AS rid
FROM input
)
SELECT
id,
event_name AS name,
event_value AS value
FROM wn_input WHERE not ( rid = 1 AND event_name = \'A\' AND event_value = 0)
\"\"\").show(false)


// Exiting paste mode, now interpreting.

`+---+----+-----+
|id |name|value|
+---+----+-----+
|1  |B   |2    |
|1  |C   |0    |
|2  |A   |5    |
|2  |B   |0    |
|2  |C   |2    |
|3  |A   |7    |
|3  |B   |8    |
|3  |C   |9    |
+---+----+-----+
`


赞(1)
未经允许不得转载:工具盒子 » 如何在PySpark中将DataFrame进行转换?