To use packages (pandas, numpy, statsmodels, etc) of Anaconda in Spark, sumbit spark job (or start spark shell) in anaconda environment.
The following codes demonstrate convert dataframe between Spark and pandas,
and the groupby
function in each dataframe:
$ cat << EOF > groupbyDemo.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (avg, count, first,
pandas_udf, concat, col, PandasUDFType)
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType
spark = SparkSession.builder.master("local[*]").appName('demo').getOrCreate()
ls = [['a', 'b', 3], ['a', 'b', 4], ['a', 'c', 3], ['b', 'b', 5]]
df = spark.createDataFrame(ls, schema=['A', 'B', 'C'])
print('Spark DF:')
df.show()
grps = df.groupBy('A')
print('grps.mean():')
grps.mean().show()
print('grps.count():')
grps.count().show()
schema = StructType([StructField("A", StringType(), True),
StructField("B", StringType(), True),
StructField("C", IntegerType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def grp_self(grp):
print('grp.shape:', grp.shape)
print('grp.info():', grp.info())
return grp.head()
print('Contents of each group (PySpark):')
grps.apply(grp_self).show()
pdf = df.toPandas()
print('Pandas DF: pdf')
print(pdf)
pgrps = pdf.groupby('A')
print('pgrps.mean():\n%s' % pgrps.mean())
print('pgrps.count():\n%s' % pgrps.count())
print('Contents of each group (Pandas):')
for grp in pgrps:
print(grp)
fdf = spark.createDataFrame(pdf)
EOF
$ export SPARK_HOME=$HOME/apps/spark-2.2.0-bin-hadoop2.7
$ export PATH=$SPARK_HOME/bin:$PATH
$ export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
$ export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.6-src.zip:$PYTHONPATH"
$ . activate anaconda
$ python groupbyDemo.py
从程序的输出可知:
-
两种 DataFrame 做
groupby
后生成的每个分组的 schema 与原 DataFrame 是一致的; -
PySpark 分组后可以通过
pandas_udf
用 Pandas 处理每个分组后的 DataFrame; -
PySpark 分组对象不是 Iterrable,Pandas 分组后生成一个 Iterable,可以用
for
遍历;