想了解pyspark数据框将多列转换为浮点数的新动态吗?本文将为您提供详细的信息,我们还将为您解答关于pyspark行转列的相关问题,此外,我们还将为您介绍关于Pandas数据框到Spark数据框,是
想了解pyspark数据框将多列转换为浮点数的新动态吗?本文将为您提供详细的信息,我们还将为您解答关于pyspark 行转列的相关问题,此外,我们还将为您介绍关于Pandas数据框到Spark数据框,是否将NaN转换为实际的null?、Pyspark 数据框将映射转换为字符串、PySpark将类型为“映射”的列转换为数据框中的多个列、Pyspark数据框上的数据透视字符串列的新知识。
本文目录一览:- pyspark数据框将多列转换为浮点数(pyspark 行转列)
- Pandas数据框到Spark数据框,是否将NaN转换为实际的null?
- Pyspark 数据框将映射转换为字符串
- PySpark将类型为“映射”的列转换为数据框中的多个列
- Pyspark数据框上的数据透视字符串列
pyspark数据框将多列转换为浮点数(pyspark 行转列)
我正在尝试将数据框的多列从字符串转换为浮动
df_temp = sc.parallelize([("1", "2", "3.4555"), ("5.6", "6.7", "7.8")]).toDF(("x", "y", "z"))df_temp.select(*(float(col(c)).alias(c) for c in df_temp.columns)).show()
但我得到了错误
select() argument after * must be a sequence, not generator
我不明白为什么会引发此错误
答案1
小编典典float()
不是Spark函数,您需要该函数cast()
:
from pyspark.sql.functions import coldf_temp.select(*(col(c).cast("float").alias(c) for c in df_temp.columns))
Pandas数据框到Spark数据框,是否将NaN转换为实际的null?
我想将数据框从熊猫转换为Spark,并且正在使用spark_context.createDataFrame()
创建数据框的方法。我还在方法中指定了架构createDataFrame()
。
我想知道的是如何处理特殊情况。例如,当转换为Spark数据帧时,熊猫中的 NaN 最终为字符串“
NaN”。我正在寻找如何获取实际的空值而不是“ NaN”的方法。
Pyspark 数据框将映射转换为字符串
如何解决Pyspark 数据框将映射转换为字符串?
我尝试使用这种方法将数据从 spark rdd 插入到 postgres
def handle_rdd(rdd):
if not rdd.isEmpty():
mode="append"
url = "jdbc:postgresql://localhost:port/db_name"
properties = {"user": "username","password": "password"}
df = ss.createDataFrame(rdd,schema=[''data''])
df.show()
df.printSchema()
print(type(df))
print(df)
df.write.jdbc(url=url,table="schema.table_name",mode=mode,properties=properties)
使用 Json 输入返回错误:
: java.lang.IllegalArgumentException: 无法获取 map
我可以像使用 Pandas 数据帧一样访问 pyspark 数据帧,或者有什么方法可以将 itu 转换为字符串?
这是一些输出
+--------------------+
| data|
+--------------------+
|[invoiceNumber ->...|
+--------------------+
root
|-- data: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
<class ''pyspark.sql.dataframe.DataFrame''>
DataFrame[data: map<string,string>]
我正在使用
解析json数据kafkaStream = KafkaUtils.createDirectStream(ssc,[topic],{''bootstrap.servers'':brokers})
lines = kafkaStream.map(lambda x: json.loads(x[1]))
lines.foreachRDD(handle_rdd)
谢谢
备注我正在使用: 火花 2.4.8 Python 3.7
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)
PySpark将类型为“映射”的列转换为数据框中的多个列
输入值
我有一列Parameters
类型map
的表格:
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> d = [{'Parameters': {'foo': '1','bar': '2','baz': 'aaa'}}]
>>> df = sqlContext.createDataFrame(d)
>>> df.collect()
[Row(Parameters={'foo': '1','baz': 'aaa'})]
输出量
我想在pyspark重塑它,这样所有的按键(foo
,bar
,等)都列,分别为:
[Row(foo='1',bar='2',baz='aaa')]
使用withColumn
作品:
(df
.withColumn('foo',df.Parameters['foo'])
.withColumn('bar',df.Parameters['bar'])
.withColumn('baz',df.Parameters['baz'])
.drop('Parameters')
).collect()
但是 我需要一个解决方案, 因为我有很多 列名称,所以没有明确提及列名称 。
架构图
>>> df.printSchema()
root
|-- Parameters: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Pyspark数据框上的数据透视字符串列
我有一个像这样的简单数据框:
rdd = sc.parallelize( [ (0, "A", 223,"201603", "PORT"), (0, "A", 22,"201602", "PORT"), (0, "A", 422,"201601", "DOCK"), (1,"B", 3213,"201602", "DOCK"), (1,"B", 3213,"201601", "PORT"), (2,"C", 2321,"201601", "DOCK") ])df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])df_data.show() +---+----+----+------+----+| id|type|cost| date|ship|+---+----+----+------+----+| 0| A| 223|201603|PORT|| 0| A| 22|201602|PORT|| 0| A| 422|201601|DOCK|| 1| B|3213|201602|DOCK|| 1| B|3213|201601|PORT|| 2| C|2321|201601|DOCK|+---+----+----+------+----+
我需要按日期进行调整:
df_data.groupby(df_data.id, df_data.type).pivot("date").avg("cost").show()+---+----+------+------+------+| id|type|201601|201602|201603|+---+----+------+------+------+| 2| C|2321.0| null| null|| 0| A| 422.0| 22.0| 223.0|| 1| B|3213.0|3213.0| null|+---+----+------+------+------+
一切正常。但是现在我需要对其进行透视,并获得一个非数字列:
df_data.groupby(df_data.id, df_data.type).pivot("date").avg("ship").show()
当然,我会得到一个例外:
AnalysisException: u''"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;''
我想产生一些东西
+---+----+------+------+------+| id|type|201601|201602|201603|+---+----+------+------+------+| 2| C|DOCK | null| null|| 0| A| DOCK | PORT| DOCK|| 1| B|DOCK |PORT | null|+---+----+------+------+------+
有可能pivot
吗?
答案1
小编典典假设(id |type | date)
组合是唯一的,并且您的唯一目标是枢纽而不是合计,则可以使用first
(或任何其他不限于数值的函数):
from pyspark.sql.functions import first(df_data .groupby(df_data.id, df_data.type) .pivot("date") .agg(first("ship")) .show())## +---+----+------+------+------+## | id|type|201601|201602|201603|## +---+----+------+------+------+## | 2| C| DOCK| null| null|## | 0| A| DOCK| PORT| PORT|## | 1| B| PORT| DOCK| null|## +---+----+------+------+------+
如果这些假设不正确,则必须预先汇总数据。例如,对于最常见的ship
值:
from pyspark.sql.functions import max, struct(df_data .groupby("id", "type", "date", "ship") .count() .groupby("id", "type") .pivot("date") .agg(max(struct("count", "ship"))) .show())## +---+----+--------+--------+--------+## | id|type| 201601| 201602| 201603|## +---+----+--------+--------+--------+## | 2| C|[1,DOCK]| null| null|## | 0| A|[1,DOCK]|[1,PORT]|[1,PORT]|## | 1| B|[1,PORT]|[1,DOCK]| null|## +---+----+--------+--------+--------+
今天关于pyspark数据框将多列转换为浮点数和pyspark 行转列的讲解已经结束,谢谢您的阅读,如果想了解更多关于Pandas数据框到Spark数据框,是否将NaN转换为实际的null?、Pyspark 数据框将映射转换为字符串、PySpark将类型为“映射”的列转换为数据框中的多个列、Pyspark数据框上的数据透视字符串列的相关知识,请在本站搜索。
本文标签: