英文:
Add new timestamp column with interval in dataframe in pyspark
问题 {#heading}
我正在使用PySpark,并且有一个Spark数据框。我想要添加一个新列"timestamp interval",间隔为15分钟。请问有人可以帮忙吗?
我的数据集如下:
+-------------+-----+-------+
|id |model|price |
+-------------+-----+-------+
|2187233 |1 |54.13 |
|2187233 |1 |44.94 |
|2187233 |1 |39.84 |
|2187233 |1 |36.95 |
|99999653468 |1 |108.06 |
|99999653468 |1 |108.96 |
|99999653468 |1 |108.84 |
|99999653468 |1 |108.86 |
+-------------+-----+-------+
假设当前时间是2023-07-30 00:00:00
那么结果应该如下:
+-------------+-----+------------------+-------------------+
|id |model|price |ds |
+-------------+-----+------------------+-------------------+
|2187233 |1 |54.13 |2023-07-30 00:00:00|
|2187233 |1 |44.94 |2023-07-30 00:15:00|
|2187233 |1 |39.84 |2023-07-30 00:30:00|
|2187233 |1 |36.95 |2023-07-30 00:45:00|
|99999653468 |1 |108.06 |2023-07-30 00:00:00|
|99999653468 |1 |108.96 |2023-07-30 00:15:00|
|99999653468 |1 |108.84 |2023-07-30 00:30:00|
|99999653468 |1 |108.86 |2023-07-30 00:45:00|
+-------------+-----+------------------+-------------------+
英文:
I'm using PySpark and I have a Spark dataframe. I want to add a new column timestamp interval with 15 minutes. Can anyone help please.
<pre>
My Dataset:
+-------------+-----+-------
|id |model|price |
+-------------+-----+-------
|2187233 |1 |54.13 |
|2187233 |1 |44.94 |
|2187233 |1 |39.84 |
|2187233 |1 |36.95 |
|99999653468|1 |108.06 |
|99999653468|1 |108.96 |
|99999653468|1 |108.84 |
|99999653468|1 |108.86 |
+-------------+-----+--------
</pre>
Suppose current time is 2023-07-30 00:00:00
then Result should be:
<pre>
+-------------+-----+------------------+-------------------+
|id |model|price |ds |
+-------------+-----+------------------+-------------------+
|2187233 |1 |54.13 |2023-07-30 00:00:00|
|2187233 |1 |44.94 |2023-07-30 00:15:00|
|2187233 |1 |39.84 |2023-07-30 00:30:00|
|2187233 |1 |36.95 |2023-07-30 00:45:00|
|99999653468|1 |108.06 |2023-07-30 00:00:00|
|99999653468|1 |108.96 |2023-07-30 00:15:00|
|99999653468|1 |108.84 |2023-07-30 00:30:00|
|99999653468|1 |108.86 |2023-07-30 00:45:00|
+-------------+-----+------------------+-------------------+
</pre>
答案1 {#1}
得分: 0
使用PySpark
中的withColumn
函数和expr
函数,您可以实现这一点。您需要导入必要的函数并创建一个具有所需时间戳
间隔的新列
。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_timestamp
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("TimestampInterval").getOrCreate()
interval_minutes = 15
# 您的DataFrame
data = [
(2187233, 1, 54.13),
(2187233, 1, 44.94),
(2187233, 1, 39.84),
(2187233, 1, 36.95),
(99999653468, 1, 108.06),
(99999653468, 1, 108.96),
(99999653468, 1, 108.84),
(99999653468, 1, 108.86)
]
columns = ["id", "model", "price"]
df = spark.createDataFrame(data, columns)
window_spec = Window.partitionBy("id").orderBy("model")
df = df.withColumn("interval_num", (expr("row_number() OVER PARTITION BY id ORDER BY model") - 1))
df = df.withColumn("interval", expr(f"INTERVAL {interval_minutes} MINUTES * interval_num"))
df = df.withColumn("ds", current_timestamp() + col("interval"))
df = df.drop("interval_num", "interval")
df.show(truncate=False)
请注意,以上是您提供的代码的翻译部分。 英文:
U can achieve this using the withColumn
function along with the expr
function in PySpark
. U need to import the necessary functions and create a new column
with the desired timestamp
intervals
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_timestamp
from pyspark.sql.window import Window
spark = SparkSession.builder.appName(\"TimestampInterval\").getOrCreate()
interval_minutes = 15
Your DataFrame
==============
data = \[
(2187233, 1, 54.13),
(2187233, 1, 44.94),
(2187233, 1, 39.84),
(2187233, 1, 36.95),
(99999653468, 1, 108.06),
(99999653468, 1, 108.96),
(99999653468, 1, 108.84),
(99999653468, 1, 108.86)
\]
columns = \[\"id\", \"model\", \"price\"\]
df = spark.createDataFrame(data, columns)
window_spec = Window.partitionBy(\"id\").orderBy(\"model\")
df = df.withColumn(\"interval_num\", (expr(\"row_number() OVER PARTITION BY id ORDER BY model\") - 1))
df = df.withColumn(\"interval\", expr(f\"INTERVAL {interval_minutes} MINUTES \* interval_num\"))
df = df.withColumn(\"ds\", current_timestamp() + col(\"interval\"))
`df = df.drop("interval_num", "interval")
df.show(truncate=False)
`