



How to transform in DataFrame in PySpark?

问题 {#heading}


我在 Py Spark 中有一个数据框,其中包含列:id、name、value。


data = [
    (1, "A", 0),
    (1, "B", 2),
    (1, "C", 0),
    (2, "A", 5),
    (2, "B", 0),
    (2, "C", 2),
    (3, "A", 7),
    (3, "B", 8),
    (3, "C", 9),
columns = ["id", "event_name", "event_value"]
df = spark.createDataFrame(data, columns)


  1. 每个id在某一行中有值A,如果对于name = A的行中value为0,则删除该id的所有value为0的行,仅保留name为A或value > 0的行。
  2. 每个id在某一行中都有值A,如果在name = A的行中value > 0,则不删除该id的任何行,无论该id的value列中有什么值。


id  | name | value
1   | A    | 0 
1   | B    | 2
2   | A    | 5
2   | B    | 0
2   | C    | 2
3   | A    | 7
3   | B    | 8
3   | C    | 9

如何在 Py Spark 中实现这个目标? 英文:

I have a data frame in Py Spark with columns: id, name, value.
The column name should take the values A,B,C for each id. The value column has numeric values.

Sample data frame:`

data = [
    (1, "A", 0),
    (1, "B", 2),
    (1, "C", 0),
    (2, "A", 5),
    (2, "B", 0),
    (2, "C", 2),
    (3, "A", 7),
    (3, "B", 8),
    (3, "C", 9),
columns = ["id", "event_name", "event_value"]
df = spark.createDataFrame(data, columns)

`id  | name | value
1   | A    | 0 
1   | B    | 2
1   | C    | 0
2   | A    | 5
2   | B    | 0
2   | C    | 2
3   | A    | 7
3   | B    | 8
3   | C    | 9`

`As output it needs a data frame with columns: id, event_name, event_value according to the following conditions:

  1. each id has value A in one of the rows, if for the row with name = A in the row value is 0 then delete for this id all rows where it has value 0 in value, leave only the row where in name is value A or value > 0
  2. each id has the value A in one of the rows, if for a row with name = A there is a value > 0 in the value column, then do not delete any row for that id, no matter what values are in the value for that id

So, as a result I need something like below:

`id  | name | value
1   | A    | 0 
1   | B    | 2
2   | A    | 5
2   | B    | 0
2   | C    | 2
3   | A    | 7
3   | B    | 8
3   | C    | 9`

How can I do that in Py Spark ?

答案1 {#1}

得分: 1

按照id对数据框进行分区,创建一个布尔标志以检查在任何行中event_nameevent_value是否为('A', 0),然后将此标志与其他提供的条件一起用于对数据框进行filter

exp = F.expr("event_name = 'A' AND event_value = 0").cast('int')
counts = F.sum(exp).over(Window.partitionBy('id'))

mask = ~((F.col('event_value') == 0) & (counts > 0)) | (F.col('event_name') == 'A')
df1 = df.withColumn('mask', mask).filter('mask')


| id|event_name|event_value|mask|
|  1|         A|          0|true|
|  1|         B|          2|true|
|  2|         A|          5|true|
|  2|         B|          0|true|
|  2|         C|          2|true|
|  3|         A|          7|true|
|  3|         B|          8|true|
|  3|         C|          9|true|


Partition the dataframe by id to create a boolean flag to check the condition when event_name and event_value are ('A', 0) in any of the rows then use this flag along with the other provided conditions to filter the dataframe.

exp = F.expr("event_name = 'A' AND event_value = 0").cast('int')
counts = F.sum(exp).over(Window.partitionBy('id'))
`mask = ~((F.col('event_value') == 0) & (counts > 0)) | (F.col('event_name') == 'A')
df1 = df.withColumn('mask', mask).filter('mask')

| id|event_name|event_value|mask|
|  1|         A|          0|true|
|  1|         B|          2|true|
|  2|         A|          5|true|
|  2|         B|          0|true|
|  2|         C|          2|true|
|  3|         A|          7|true|
|  3|         B|          8|true|
|  3|         C|          9|true|

答案2 {#2}

得分: 0


scala> spark.table("input").show(false)
|id |event_name|event_value|
|1  |A         |0          |
|1  |B         |2          |
|1  |C         |0          |
|2  |A         |5          |
|2  |B         |0          |
|2  |C         |2          |
|3  |A         |7          |
|3  |B         |8          |
|3  |C         |9          |

scala> :paste
// 进入粘贴模式(ctrl-D完成)

    WITH wn_input AS (
        (RANK() OVER(PARTITION BY ID ORDER BY event_name, event_value)) AS rid
    FROM input
    event_name AS name,
    event_value AS value
FROM wn_input WHERE not ( rid = 1 AND event_name = 'A' AND event_value = 0)

// 退出粘贴模式,现在进行解释。

|id |name|value|
|1  |B   |2    |
|1  |C   |0    |
|2  |A   |5    |
|2  |B   |0    |
|2  |C   |2    |
|3  |A   |7    |
|3  |B   |8    |
|3  |C   |9    |


Use window function & then filter based on requirement. Please check below code

scala> spark.table("input").show(false)
|id |event_name|event_value|
|1  |A         |0          |
|1  |B         |2          |
|1  |C         |0          |
|2  |A         |5          |
|2  |B         |0          |
|2  |C         |2          |
|3  |A         |7          |
|3  |B         |8          |
|3  |C         |9          |

scala> :paste
// Entering paste mode (ctrl-D to finish)

WITH wn_input AS (
(RANK() OVER(PARTITION BY ID ORDER BY event_name, event_value)) AS rid
FROM input
event_name AS name,
event_value AS value
FROM wn_input WHERE not ( rid = 1 AND event_name = \'A\' AND event_value = 0)

// Exiting paste mode, now interpreting.

|id |name|value|
|1  |B   |2    |
|1  |C   |0    |
|2  |A   |5    |
|2  |B   |0    |
|2  |C   |2    |
|3  |A   |7    |
|3  |B   |8    |
|3  |C   |9    |

未经允许不得转载:工具盒子 » 如何在PySpark中将DataFrame进行转换?