DarkMatter in Cyberspace
  • Home
  • Categories
  • Tags
  • Archives

Using Pandas with PySpark


To use packages (pandas, numpy, statsmodels, etc) of Anaconda in Spark, sumbit spark job (or start spark shell) in anaconda environment.

The following codes demonstrate convert dataframe between Spark and pandas, and the groupby function in each dataframe:

$ cat << EOF > groupbyDemo.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (avg, count, first,
                 pandas_udf, concat, col, PandasUDFType)
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType

spark = SparkSession.builder.master("local[*]").appName('demo').getOrCreate()
ls = [['a', 'b', 3], ['a', 'b', 4], ['a', 'c', 3], ['b', 'b', 5]]
df = spark.createDataFrame(ls, schema=['A', 'B', 'C'])
print('Spark DF:')
df.show()
grps = df.groupBy('A')
print('grps.mean():')
grps.mean().show()
print('grps.count():')
grps.count().show()

schema = StructType([StructField("A", StringType(), True),
                     StructField("B", StringType(), True),
                     StructField("C", IntegerType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def grp_self(grp):
    print('grp.shape:', grp.shape)
    print('grp.info():', grp.info())
    return grp.head()

print('Contents of each group (PySpark):')
grps.apply(grp_self).show()

pdf = df.toPandas()
print('Pandas DF: pdf')
print(pdf)
pgrps = pdf.groupby('A')
print('pgrps.mean():\n%s' % pgrps.mean())
print('pgrps.count():\n%s' % pgrps.count())

print('Contents of each group (Pandas):')
for grp in pgrps:
    print(grp)

fdf = spark.createDataFrame(pdf)

EOF

$ export SPARK_HOME=$HOME/apps/spark-2.2.0-bin-hadoop2.7
$ export PATH=$SPARK_HOME/bin:$PATH
$ export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
$ export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.6-src.zip:$PYTHONPATH"
$ . activate anaconda
$ python groupbyDemo.py

从程序的输出可知:

  • 两种 DataFrame 做 groupby 后生成的每个分组的 schema 与原 DataFrame 是一致的;

  • PySpark 分组后可以通过 pandas_udf 用 Pandas 处理每个分组后的 DataFrame;

  • PySpark 分组对象不是 Iterrable,Pandas 分组后生成一个 Iterable,可以用 for 遍历;



Published

Mar 16, 2018

Last Updated

Dec 18, 2018

Category

Tech

Tags

  • anaconda 4
  • dataframe 1
  • pandas 5
  • pyspark 5
  • spark 21

Contact

  • Powered by Pelican. Theme: Elegant by Talha Mansoor