51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

在Pyspark中,在数据框中添加带有时间间隔的新时间戳列。

英文:

Add new timestamp column with interval in dataframe in pyspark

问题 {#heading}

我正在使用PySpark,并且有一个Spark数据框。我想要添加一个新列"timestamp interval",间隔为15分钟。请问有人可以帮忙吗?

我的数据集如下:

+-------------+-----+-------+
|id           |model|price  |
+-------------+-----+-------+
|2187233      |1    |54.13  |
|2187233      |1    |44.94  |
|2187233      |1    |39.84  |
|2187233      |1    |36.95  |
|99999653468  |1    |108.06 |
|99999653468  |1    |108.96 |
|99999653468  |1    |108.84 |
|99999653468  |1    |108.86 |
+-------------+-----+-------+

假设当前时间是2023-07-30 00:00:00

那么结果应该如下:

+-------------+-----+------------------+-------------------+
|id           |model|price             |ds                 |
+-------------+-----+------------------+-------------------+
|2187233      |1    |54.13             |2023-07-30 00:00:00|
|2187233      |1    |44.94             |2023-07-30 00:15:00|
|2187233      |1    |39.84             |2023-07-30 00:30:00|
|2187233      |1    |36.95             |2023-07-30 00:45:00|
|99999653468  |1    |108.06            |2023-07-30 00:00:00|
|99999653468  |1    |108.96            |2023-07-30 00:15:00|
|99999653468  |1    |108.84            |2023-07-30 00:30:00|
|99999653468  |1    |108.86            |2023-07-30 00:45:00|
+-------------+-----+------------------+-------------------+

英文:

I'm using PySpark and I have a Spark dataframe. I want to add a new column timestamp interval with 15 minutes. Can anyone help please.

<pre>
My Dataset:
+-------------+-----+-------
|id |model|price |
+-------------+-----+-------
|2187233 |1 |54.13 |
|2187233 |1 |44.94 |
|2187233 |1 |39.84 |
|2187233 |1 |36.95 |
|99999653468|1 |108.06 |
|99999653468|1 |108.96 |
|99999653468|1 |108.84 |
|99999653468|1 |108.86 |
+-------------+-----+--------
</pre>

Suppose current time is 2023-07-30 00:00:00

then Result should be:

<pre>
+-------------+-----+------------------+-------------------+
|id |model|price |ds |
+-------------+-----+------------------+-------------------+
|2187233 |1 |54.13 |2023-07-30 00:00:00|
|2187233 |1 |44.94 |2023-07-30 00:15:00|
|2187233 |1 |39.84 |2023-07-30 00:30:00|
|2187233 |1 |36.95 |2023-07-30 00:45:00|
|99999653468|1 |108.06 |2023-07-30 00:00:00|
|99999653468|1 |108.96 |2023-07-30 00:15:00|
|99999653468|1 |108.84 |2023-07-30 00:30:00|
|99999653468|1 |108.86 |2023-07-30 00:45:00|
+-------------+-----+------------------+-------------------+
</pre>

答案1 {#1}

得分: 0

使用PySpark中的withColumn函数和expr函数,您可以实现这一点。您需要导入必要的函数并创建一个具有所需时间戳间隔的新列

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_timestamp
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("TimestampInterval").getOrCreate()
interval_minutes = 15

# 您的DataFrame
data = [
    (2187233, 1, 54.13),
    (2187233, 1, 44.94),
    (2187233, 1, 39.84),
    (2187233, 1, 36.95),
    (99999653468, 1, 108.06),
    (99999653468, 1, 108.96),
    (99999653468, 1, 108.84),
    (99999653468, 1, 108.86)
]

columns = ["id", "model", "price"]
df = spark.createDataFrame(data, columns)

window_spec = Window.partitionBy("id").orderBy("model")
df = df.withColumn("interval_num", (expr("row_number() OVER PARTITION BY id ORDER BY model") - 1))
df = df.withColumn("interval", expr(f"INTERVAL {interval_minutes} MINUTES * interval_num"))
df = df.withColumn("ds", current_timestamp() + col("interval"))

df = df.drop("interval_num", "interval")
df.show(truncate=False)

请注意,以上是您提供的代码的翻译部分。 英文:

U can achieve this using the withColumn function along with the expr function in PySpark. U need to import the necessary functions and create a new column with the desired timestamp intervals

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_timestamp
from pyspark.sql.window import Window

spark = SparkSession.builder.appName(\&quot;TimestampInterval\&quot;).getOrCreate()
interval_minutes = 15


Your DataFrame
==============



data = \[
(2187233, 1, 54.13),
(2187233, 1, 44.94),
(2187233, 1, 39.84),
(2187233, 1, 36.95),
(99999653468, 1, 108.06),
(99999653468, 1, 108.96),
(99999653468, 1, 108.84),
(99999653468, 1, 108.86)
\]


columns = \[\&quot;id\&quot;, \&quot;model\&quot;, \&quot;price\&quot;\]
df = spark.createDataFrame(data, columns)


window_spec = Window.partitionBy(\&quot;id\&quot;).orderBy(\&quot;model\&quot;)
df = df.withColumn(\&quot;interval_num\&quot;, (expr(\&quot;row_number() OVER PARTITION BY id ORDER BY model\&quot;) - 1))
df = df.withColumn(\&quot;interval\&quot;, expr(f\&quot;INTERVAL {interval_minutes} MINUTES \* interval_num\&quot;))
df = df.withColumn(\&quot;ds\&quot;, current_timestamp() + col(\&quot;interval\&quot;))

`df = df.drop(&quot;interval_num&quot;, &quot;interval&quot;)
df.show(truncate=False)
`


赞(1)
未经允许不得转载:工具盒子 » 在Pyspark中,在数据框中添加带有时间间隔的新时间戳列。