# Credit:
# https://spark.apache.org/docs/latest/api/python/getting_started/index.html
This is a short introduction to pandas API on Spark, geared mainly for new users. This notebook shows you some key differences between pandas and pandas API on Spark. You can run this examples by yourself in 'Live Notebook: pandas API on Spark' at the quickstart page.
Customarily, we import pandas API on Spark as follows:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession
WARNING:root:'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.
Creating a pandas-on-Spark Series by passing a list of values, letting pandas API on Spark create a default integer index:
s = ps.Series([1, 3, 5, np.nan, 6, 8])
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/01/25 04:07:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
s
0 1.0 1 3.0 2 5.0 3 NaN 4 6.0 5 8.0 dtype: float64
Creating a pandas-on-Spark DataFrame by passing a dict of objects that can be converted to series-like.
psdf = ps.DataFrame(
{'a': [1, 2, 3, 4, 5, 6],
'b': [100, 200, 300, 400, 500, 600],
'c': ["one", "two", "three", "four", "five", "six"]},
index=[10, 20, 30, 40, 50, 60])
psdf
a | b | c | |
---|---|---|---|
10 | 1 | 100 | one |
20 | 2 | 200 | two |
30 | 3 | 300 | three |
40 | 4 | 400 | four |
50 | 5 | 500 | five |
60 | 6 | 600 | six |
Creating a pandas DataFrame by passing a numpy array, with a datetime index and labeled columns:
dates = pd.date_range('20130101', periods=6)
dates
DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04', '2013-01-05', '2013-01-06'], dtype='datetime64[ns]', freq='D')
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf
A | B | C | D | |
---|---|---|---|---|
2013-01-01 | -1.254848 | 1.833178 | -1.616502 | -0.294077 |
2013-01-02 | 0.086338 | 2.020962 | -2.323341 | 0.124711 |
2013-01-03 | 1.864849 | 0.745987 | -1.310137 | -0.686164 |
2013-01-04 | -2.527620 | 1.559778 | 1.721010 | -0.670992 |
2013-01-05 | 0.360286 | -0.709993 | 0.285817 | 0.557672 |
2013-01-06 | 1.186880 | -1.433712 | 0.514280 | 1.186713 |
Now, this pandas DataFrame can be converted to a pandas-on-Spark DataFrame
psdf = ps.from_pandas(pdf)
type(psdf)
pyspark.pandas.frame.DataFrame
It looks and behaves the same as a pandas DataFrame.
psdf
A | B | C | D | |
---|---|---|---|---|
2013-01-01 | -1.254848 | 1.833178 | -1.616502 | -0.294077 |
2013-01-02 | 0.086338 | 2.020962 | -2.323341 | 0.124711 |
2013-01-03 | 1.864849 | 0.745987 | -1.310137 | -0.686164 |
2013-01-04 | -2.527620 | 1.559778 | 1.721010 | -0.670992 |
2013-01-05 | 0.360286 | -0.709993 | 0.285817 | 0.557672 |
2013-01-06 | 1.186880 | -1.433712 | 0.514280 | 1.186713 |
Also, it is possible to create a pandas-on-Spark DataFrame from Spark DataFrame easily.
Creating a Spark DataFrame from pandas DataFrame
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(pdf)
sdf.show()
+-------------------+-------------------+-------------------+--------------------+ | A| B| C| D| +-------------------+-------------------+-------------------+--------------------+ |-1.2548475122772038| 1.8331776546258787|-1.6165023351730972|-0.29407692972315247| |0.08633757294824139| 2.0209622174421855| -2.323341385632726| 0.12471075099908735| | 1.8648494672695133| 0.7459872878575147|-1.3101369253075392| -0.6861636491200168| |-2.5276197779369665| 1.5597776580668408| 1.7210095019868354| -0.6709920793357562| | 0.3602862541233372| -0.709992866867548| 0.2858171860843292| 0.5576724861220438| | 1.1868800841372706|-1.4337124371098868| 0.5142800919416487| 1.1867128608828876| +-------------------+-------------------+-------------------+--------------------+
Creating pandas-on-Spark DataFrame from Spark DataFrame.
psdf = sdf.to_pandas_on_spark()
psdf
22/01/25 04:09:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:09:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
A | B | C | D | |
---|---|---|---|---|
0 | -1.254848 | 1.833178 | -1.616502 | -0.294077 |
1 | 0.086338 | 2.020962 | -2.323341 | 0.124711 |
2 | 1.864849 | 0.745987 | -1.310137 | -0.686164 |
3 | -2.527620 | 1.559778 | 1.721010 | -0.670992 |
4 | 0.360286 | -0.709993 | 0.285817 | 0.557672 |
5 | 1.186880 | -1.433712 | 0.514280 | 1.186713 |
Having specific dtypes . Types that are common to both Spark and pandas are currently supported.
psdf.dtypes
A float64 B float64 C float64 D float64 dtype: object
Here is how to show top rows from the frame below.
Note that the data in a Spark dataframe does not preserve the natural order by default. The natural order can be preserved by setting compute.ordered_head
option but it causes a performance overhead with sorting internally.
psdf.head()
22/01/25 04:10:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:10:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
A | B | C | D | |
---|---|---|---|---|
0 | -1.254848 | 1.833178 | -1.616502 | -0.294077 |
1 | 0.086338 | 2.020962 | -2.323341 | 0.124711 |
2 | 1.864849 | 0.745987 | -1.310137 | -0.686164 |
3 | -2.527620 | 1.559778 | 1.721010 | -0.670992 |
4 | 0.360286 | -0.709993 | 0.285817 | 0.557672 |
Displaying the index, columns, and the underlying numpy data.
psdf.index
Int64Index([0, 1, 2, 3, 4, 5], dtype='int64')
psdf.columns
Index(['A', 'B', 'C', 'D'], dtype='object')
psdf.to_numpy()
22/01/25 04:11:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:11:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:11:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
array([[-1.25484751, 1.83317765, -1.61650234, -0.29407693], [ 0.08633757, 2.02096222, -2.32334139, 0.12471075], [ 1.86484947, 0.74598729, -1.31013693, -0.68616365], [-2.52761978, 1.55977766, 1.7210095 , -0.67099208], [ 0.36028625, -0.70999287, 0.28581719, 0.55767249], [ 1.18688008, -1.43371244, 0.51428009, 1.18671286]])
Showing a quick statistic summary of your data
psdf.describe()
A | B | C | D | |
---|---|---|---|---|
count | 6.000000 | 6.000000 | 6.000000 | 6.000000 |
mean | -0.047352 | 0.669367 | -0.454812 | 0.036311 |
std | 1.610817 | 1.435656 | 1.535863 | 0.739521 |
min | -2.527620 | -1.433712 | -2.323341 | -0.686164 |
25% | -1.254848 | -0.709993 | -1.616502 | -0.670992 |
50% | 0.086338 | 0.745987 | -1.310137 | -0.294077 |
75% | 1.186880 | 1.833178 | 0.514280 | 0.557672 |
max | 1.864849 | 2.020962 | 1.721010 | 1.186713 |
Transposing your data
psdf.T
22/01/25 04:11:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:11:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
0 | 1 | 2 | 3 | 4 | 5 | |
---|---|---|---|---|---|---|
A | -1.254848 | 0.086338 | 1.864849 | -2.527620 | 0.360286 | 1.186880 |
B | 1.833178 | 2.020962 | 0.745987 | 1.559778 | -0.709993 | -1.433712 |
C | -1.616502 | -2.323341 | -1.310137 | 1.721010 | 0.285817 | 0.514280 |
D | -0.294077 | 0.124711 | -0.686164 | -0.670992 | 0.557672 | 1.186713 |
Sorting by its index
psdf.sort_index(ascending=False)
22/01/25 04:11:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:11:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:11:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
A | B | C | D | |
---|---|---|---|---|
5 | 1.186880 | -1.433712 | 0.514280 | 1.186713 |
4 | 0.360286 | -0.709993 | 0.285817 | 0.557672 |
3 | -2.527620 | 1.559778 | 1.721010 | -0.670992 |
2 | 1.864849 | 0.745987 | -1.310137 | -0.686164 |
1 | 0.086338 | 2.020962 | -2.323341 | 0.124711 |
0 | -1.254848 | 1.833178 | -1.616502 | -0.294077 |
Sorting by value
psdf.sort_values(by='B')
22/01/25 04:11:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:11:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:11:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
A | B | C | D | |
---|---|---|---|---|
5 | 1.186880 | -1.433712 | 0.514280 | 1.186713 |
4 | 0.360286 | -0.709993 | 0.285817 | 0.557672 |
2 | 1.864849 | 0.745987 | -1.310137 | -0.686164 |
3 | -2.527620 | 1.559778 | 1.721010 | -0.670992 |
0 | -1.254848 | 1.833178 | -1.616502 | -0.294077 |
1 | 0.086338 | 2.020962 | -2.323341 | 0.124711 |
Pandas API on Spark primarily uses the value np.nan
to represent missing data. It is by default not included in computations.
pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])
pdf1.loc[dates[0]:dates[1], 'E'] = 1
psdf1 = ps.from_pandas(pdf1)
psdf1
A | B | C | D | E | |
---|---|---|---|---|---|
2013-01-01 | -1.254848 | 1.833178 | -1.616502 | -0.294077 | 1.0 |
2013-01-02 | 0.086338 | 2.020962 | -2.323341 | 0.124711 | 1.0 |
2013-01-03 | 1.864849 | 0.745987 | -1.310137 | -0.686164 | NaN |
2013-01-04 | -2.527620 | 1.559778 | 1.721010 | -0.670992 | NaN |
To drop any rows that have missing data.
psdf1.dropna(how='any')
A | B | C | D | E | |
---|---|---|---|---|---|
2013-01-01 | -1.254848 | 1.833178 | -1.616502 | -0.294077 | 1.0 |
2013-01-02 | 0.086338 | 2.020962 | -2.323341 | 0.124711 | 1.0 |
Filling missing data.
psdf1.fillna(value=5)
A | B | C | D | E | |
---|---|---|---|---|---|
2013-01-01 | -1.254848 | 1.833178 | -1.616502 | -0.294077 | 1.0 |
2013-01-02 | 0.086338 | 2.020962 | -2.323341 | 0.124711 | 1.0 |
2013-01-03 | 1.864849 | 0.745987 | -1.310137 | -0.686164 | 5.0 |
2013-01-04 | -2.527620 | 1.559778 | 1.721010 | -0.670992 | 5.0 |
Performing a descriptive statistic:
psdf.mean()
A -0.047352 B 0.669367 C -0.454812 D 0.036311 dtype: float64
Various configurations in PySpark could be applied internally in pandas API on Spark. For example, you can enable Arrow optimization to hugely speed up internal pandas conversion. See also PySpark Usage Guide for Pandas with Apache Arrow in PySpark documentation.
prev = spark.conf.get("spark.sql.execution.arrow.enabled") # Keep its default value.
ps.set_option("compute.default_index_type", "distributed") # Use default index prevent overhead.
import warnings
warnings.filterwarnings("ignore") # Ignore warnings coming from Arrow optimizations.
spark.conf.set("spark.sql.execution.arrow.enabled", True)
%timeit ps.range(300000).to_pandas()
22/01/25 04:14:58 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
674 ms ± 171 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
spark.conf.set("spark.sql.execution.arrow.enabled", False)
%timeit ps.range(300000).to_pandas()
22/01/25 04:15:27 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
1.86 s ± 177 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
ps.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.enabled", prev) # Set its default value back.
22/01/25 04:15:54 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
By “group by” we are referring to a process involving one or more of the following steps:
psdf = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
'foo', 'bar', 'foo', 'foo'],
'B': ['one', 'one', 'two', 'three',
'two', 'two', 'one', 'three'],
'C': np.random.randn(8),
'D': np.random.randn(8)})
psdf
A | B | C | D | |
---|---|---|---|---|
0 | foo | one | -0.481606 | 0.291328 |
1 | bar | one | 0.199865 | 1.459356 |
2 | foo | two | 1.126719 | -0.833016 |
3 | bar | three | -0.935977 | 0.090776 |
4 | foo | two | 0.953372 | 1.827039 |
5 | bar | two | -1.084966 | 0.141863 |
6 | foo | one | 1.319954 | 0.009976 |
7 | foo | three | -0.365080 | -0.150485 |
Grouping and then applying the sum() function to the resulting groups.
psdf.groupby('A').sum()
C | D | |
---|---|---|
A | ||
bar | -1.821077 | 1.691994 |
foo | 2.553360 | 1.144842 |
Grouping by multiple columns forms a hierarchical index, and again we can apply the sum function.
psdf.groupby(['A', 'B']).sum()
C | D | ||
---|---|---|---|
A | B | ||
foo | one | 0.838349 | 0.301304 |
two | 2.080091 | 0.994023 | |
bar | three | -0.935977 | 0.090776 |
foo | three | -0.365080 | -0.150485 |
bar | two | -1.084966 | 0.141863 |
one | 0.199865 | 1.459356 |
pser = pd.Series(np.random.randn(1000),
index=pd.date_range('1/1/2000', periods=1000))
psser = ps.Series(pser)
psser = psser.cummax()
psser.plot()
22/01/25 04:18:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:18:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:18:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
On a DataFrame, the plot() method is a convenience to plot all of the columns with labels:
pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
columns=['A', 'B', 'C', 'D'])
psdf = ps.from_pandas(pdf)
psdf = psdf.cummax()
psdf.plot()
22/01/25 04:19:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:19:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:19:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
For more details, Plotting documentation.
psdf.to_csv('foo.csv')
ps.read_csv('foo.csv').head(10)
22/01/25 04:20:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:20:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:20:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:20:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it. 22/01/25 04:20:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:20:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
A | B | C | D | |
---|---|---|---|---|
0 | 0.511439 | -0.221687 | -0.288679 | 0.461406 |
1 | 0.511439 | 1.134861 | -0.013088 | 0.461406 |
2 | 0.511439 | 1.134861 | 0.193259 | 1.719523 |
3 | 1.158817 | 1.134861 | 0.193259 | 1.719523 |
4 | 1.158817 | 1.134861 | 2.104593 | 1.719523 |
5 | 1.158817 | 1.771571 | 2.104593 | 1.719523 |
6 | 1.158817 | 1.850353 | 2.104593 | 1.719523 |
7 | 1.313855 | 1.850353 | 2.104593 | 1.719523 |
8 | 1.313855 | 1.850353 | 2.104593 | 1.719523 |
9 | 1.313855 | 1.850353 | 2.104593 | 1.719523 |
psdf.to_parquet('bar.parquet')
ps.read_parquet('bar.parquet').head(10)
22/01/25 04:20:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:20:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:20:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:20:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:20:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
A | B | C | D | |
---|---|---|---|---|
0 | 0.511439 | -0.221687 | -0.288679 | 0.461406 |
1 | 0.511439 | 1.134861 | -0.013088 | 0.461406 |
2 | 0.511439 | 1.134861 | 0.193259 | 1.719523 |
3 | 1.158817 | 1.134861 | 0.193259 | 1.719523 |
4 | 1.158817 | 1.134861 | 2.104593 | 1.719523 |
5 | 1.158817 | 1.771571 | 2.104593 | 1.719523 |
6 | 1.158817 | 1.850353 | 2.104593 | 1.719523 |
7 | 1.313855 | 1.850353 | 2.104593 | 1.719523 |
8 | 1.313855 | 1.850353 | 2.104593 | 1.719523 |
9 | 1.313855 | 1.850353 | 2.104593 | 1.719523 |
psdf.to_spark_io('zoo.orc', format="orc")
ps.read_spark_io('zoo.orc', format="orc").head(10)
22/01/25 04:20:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:20:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:20:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:20:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 22/01/25 04:20:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
A | B | C | D | |
---|---|---|---|---|
0 | 0.511439 | -0.221687 | -0.288679 | 0.461406 |
1 | 0.511439 | 1.134861 | -0.013088 | 0.461406 |
2 | 0.511439 | 1.134861 | 0.193259 | 1.719523 |
3 | 1.158817 | 1.134861 | 0.193259 | 1.719523 |
4 | 1.158817 | 1.134861 | 2.104593 | 1.719523 |
5 | 1.158817 | 1.771571 | 2.104593 | 1.719523 |
6 | 1.158817 | 1.850353 | 2.104593 | 1.719523 |
7 | 1.313855 | 1.850353 | 2.104593 | 1.719523 |
8 | 1.313855 | 1.850353 | 2.104593 | 1.719523 |
9 | 1.313855 | 1.850353 | 2.104593 | 1.719523 |
See the Input/Output documentation for more details.