10 minutes to Koalas¶
This is a short introduction to Koalas, geared mainly for new users. This notebook shows you some key differences between pandas and Koalas. You can run this examples by yourself on a live notebook here. For Databricks Runtime, you can import and run the current .ipynb file out of the box. Try it on Databricks Community Edition for free.
Customarily, we import Koalas as follows:
[1]:
import pandas as pd
import numpy as np
import databricks.koalas as ks
from pyspark.sql import SparkSession
Object Creation¶
Creating a Koalas Series by passing a list of values, letting Koalas create a default integer index:
[2]:
s = ks.Series([1, 3, 5, np.nan, 6, 8])
[3]:
s
[3]:
0 1.0
1 3.0
2 5.0
3 NaN
4 6.0
5 8.0
dtype: float64
Creating a Koalas DataFrame by passing a dict of objects that can be converted to series-like.
[4]:
kdf = ks.DataFrame(
{'a': [1, 2, 3, 4, 5, 6],
'b': [100, 200, 300, 400, 500, 600],
'c': ["one", "two", "three", "four", "five", "six"]},
index=[10, 20, 30, 40, 50, 60])
[5]:
kdf
[5]:
a | b | c | |
---|---|---|---|
10 | 1 | 100 | one |
20 | 2 | 200 | two |
30 | 3 | 300 | three |
40 | 4 | 400 | four |
50 | 5 | 500 | five |
60 | 6 | 600 | six |
Creating a pandas DataFrame by passing a numpy array, with a datetime index and labeled columns:
[6]:
dates = pd.date_range('20130101', periods=6)
[7]:
dates
[7]:
DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
'2013-01-05', '2013-01-06'],
dtype='datetime64[ns]', freq='D')
[8]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
[9]:
pdf
[9]:
A | B | C | D | |
---|---|---|---|---|
2013-01-01 | -0.621429 | 1.515041 | -1.735483 | -1.235009 |
2013-01-02 | 0.844961 | -0.999771 | 0.108356 | 0.109456 |
2013-01-03 | 1.343862 | -1.257980 | 0.099766 | -0.137677 |
2013-01-04 | 3.001767 | -0.208167 | -1.059449 | 0.312599 |
2013-01-05 | -0.035864 | 0.312126 | 0.252281 | 0.627551 |
2013-01-06 | -1.200404 | 0.276134 | -0.344308 | -0.367934 |
Now, this pandas DataFrame can be converted to a Koalas DataFrame
[10]:
kdf = ks.from_pandas(pdf)
[11]:
type(kdf)
[11]:
databricks.koalas.frame.DataFrame
It looks and behaves the same as a pandas DataFrame though
[12]:
kdf
[12]:
A | B | C | D | |
---|---|---|---|---|
2013-01-01 | -0.621429 | 1.515041 | -1.735483 | -1.235009 |
2013-01-02 | 0.844961 | -0.999771 | 0.108356 | 0.109456 |
2013-01-03 | 1.343862 | -1.257980 | 0.099766 | -0.137677 |
2013-01-04 | 3.001767 | -0.208167 | -1.059449 | 0.312599 |
2013-01-05 | -0.035864 | 0.312126 | 0.252281 | 0.627551 |
2013-01-06 | -1.200404 | 0.276134 | -0.344308 | -0.367934 |
Also, it is possible to create a Koalas DataFrame from Spark DataFrame.
Creating a Spark DataFrame from pandas DataFrame
[13]:
spark = SparkSession.builder.getOrCreate()
[14]:
sdf = spark.createDataFrame(pdf)
[15]:
sdf.show()
+--------------------+--------------------+--------------------+--------------------+
| A| B| C| D|
+--------------------+--------------------+--------------------+--------------------+
| -0.6214290839748133| 1.5150410562536945| -1.7354827055737831| -1.2350091172431052|
| 0.8449607212376394| -0.9997705636655247| 0.10835607649858589| 0.1094555359929294|
| 1.3438622379103737| -1.2579798113362755| 0.0997664833965215|-0.13767658889070905|
| 3.001767403315059|-0.20816676142436616| -1.0594485090898984| 0.31259853367492724|
|-0.03586387305407219| 0.3121259401964947| 0.2522808041799677| 0.6275512901423211|
| -1.2004042904971255| 0.27613400857508563|-0.34430818441482375|-0.36793440398703187|
+--------------------+--------------------+--------------------+--------------------+
Creating Koalas DataFrame from Spark DataFrame. to_koalas()
is automatically attached to Spark DataFrame and available as an API when Koalas is imported.
[16]:
kdf = sdf.to_koalas()
[17]:
kdf
[17]:
A | B | C | D | |
---|---|---|---|---|
0 | -0.621429 | 1.515041 | -1.735483 | -1.235009 |
1 | 0.844961 | -0.999771 | 0.108356 | 0.109456 |
2 | 1.343862 | -1.257980 | 0.099766 | -0.137677 |
3 | 3.001767 | -0.208167 | -1.059449 | 0.312599 |
4 | -0.035864 | 0.312126 | 0.252281 | 0.627551 |
5 | -1.200404 | 0.276134 | -0.344308 | -0.367934 |
Having specific dtypes . Types that are common to both Spark and pandas are currently supported.
[18]:
kdf.dtypes
[18]:
A float64
B float64
C float64
D float64
dtype: object
Viewing Data¶
See the API Reference.
See the top rows of the frame. The results may not be the same as pandas though: unlike pandas, the data in a Spark dataframe is not ordered, it has no intrinsic notion of index. When asked for the head of a dataframe, Spark will just take the requested number of rows from a partition. Do not rely on it to return specific rows, use .loc
or iloc
instead.
[19]:
kdf.head()
[19]:
A | B | C | D | |
---|---|---|---|---|
0 | -0.621429 | 1.515041 | -1.735483 | -1.235009 |
1 | 0.844961 | -0.999771 | 0.108356 | 0.109456 |
2 | 1.343862 | -1.257980 | 0.099766 | -0.137677 |
3 | 3.001767 | -0.208167 | -1.059449 | 0.312599 |
4 | -0.035864 | 0.312126 | 0.252281 | 0.627551 |
Display the index, columns, and the underlying numpy data.
You can also retrieve the index; the index column can be ascribed to a DataFrame, see later
[20]:
kdf.index
[20]:
Int64Index([0, 1, 2, 3, 4, 5], dtype='int64')
[21]:
kdf.columns
[21]:
Index(['A', 'B', 'C', 'D'], dtype='object')
[22]:
kdf.to_numpy()
[22]:
array([[-0.62142908, 1.51504106, -1.73548271, -1.23500912],
[ 0.84496072, -0.99977056, 0.10835608, 0.10945554],
[ 1.34386224, -1.25797981, 0.09976648, -0.13767659],
[ 3.0017674 , -0.20816676, -1.05944851, 0.31259853],
[-0.03586387, 0.31212594, 0.2522808 , 0.62755129],
[-1.20040429, 0.27613401, -0.34430818, -0.3679344 ]])
Describe shows a quick statistic summary of your data
[23]:
kdf.describe()
[23]:
A | B | C | D | |
---|---|---|---|---|
count | 6.000000 | 6.000000 | 6.000000 | 6.000000 |
mean | 0.555482 | -0.060436 | -0.446473 | -0.115169 |
std | 1.517076 | 1.007223 | 0.792741 | 0.648616 |
min | -1.200404 | -1.257980 | -1.735483 | -1.235009 |
25% | -0.621429 | -0.999771 | -1.059449 | -0.367934 |
50% | -0.035864 | -0.208167 | -0.344308 | -0.137677 |
75% | 1.343862 | 0.312126 | 0.108356 | 0.312599 |
max | 3.001767 | 1.515041 | 0.252281 | 0.627551 |
Transposing your data
[24]:
kdf.T
[24]:
0 | 1 | 2 | 3 | 4 | 5 | |
---|---|---|---|---|---|---|
A | -0.621429 | 0.844961 | 1.343862 | 3.001767 | -0.035864 | -1.200404 |
B | 1.515041 | -0.999771 | -1.257980 | -0.208167 | 0.312126 | 0.276134 |
C | -1.735483 | 0.108356 | 0.099766 | -1.059449 | 0.252281 | -0.344308 |
D | -1.235009 | 0.109456 | -0.137677 | 0.312599 | 0.627551 | -0.367934 |
Sorting by its index
[25]:
kdf.sort_index(ascending=False)
[25]:
A | B | C | D | |
---|---|---|---|---|
5 | -1.200404 | 0.276134 | -0.344308 | -0.367934 |
4 | -0.035864 | 0.312126 | 0.252281 | 0.627551 |
3 | 3.001767 | -0.208167 | -1.059449 | 0.312599 |
2 | 1.343862 | -1.257980 | 0.099766 | -0.137677 |
1 | 0.844961 | -0.999771 | 0.108356 | 0.109456 |
0 | -0.621429 | 1.515041 | -1.735483 | -1.235009 |
Sorting by value
[26]:
kdf.sort_values(by='B')
[26]:
A | B | C | D | |
---|---|---|---|---|
2 | 1.343862 | -1.257980 | 0.099766 | -0.137677 |
1 | 0.844961 | -0.999771 | 0.108356 | 0.109456 |
3 | 3.001767 | -0.208167 | -1.059449 | 0.312599 |
5 | -1.200404 | 0.276134 | -0.344308 | -0.367934 |
4 | -0.035864 | 0.312126 | 0.252281 | 0.627551 |
0 | -0.621429 | 1.515041 | -1.735483 | -1.235009 |
Missing Data¶
Koalas primarily uses the value np.nan
to represent missing data. It is by default not included in computations.
[27]:
pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])
[28]:
pdf1.loc[dates[0]:dates[1], 'E'] = 1
[29]:
kdf1 = ks.from_pandas(pdf1)
[30]:
kdf1
[30]:
A | B | C | D | E | |
---|---|---|---|---|---|
2013-01-01 | -0.621429 | 1.515041 | -1.735483 | -1.235009 | 1.0 |
2013-01-02 | 0.844961 | -0.999771 | 0.108356 | 0.109456 | 1.0 |
2013-01-03 | 1.343862 | -1.257980 | 0.099766 | -0.137677 | NaN |
2013-01-04 | 3.001767 | -0.208167 | -1.059449 | 0.312599 | NaN |
To drop any rows that have missing data.
[31]:
kdf1.dropna(how='any')
[31]:
A | B | C | D | E | |
---|---|---|---|---|---|
2013-01-01 | -0.621429 | 1.515041 | -1.735483 | -1.235009 | 1.0 |
2013-01-02 | 0.844961 | -0.999771 | 0.108356 | 0.109456 | 1.0 |
Filling missing data.
[32]:
kdf1.fillna(value=5)
[32]:
A | B | C | D | E | |
---|---|---|---|---|---|
2013-01-01 | -0.621429 | 1.515041 | -1.735483 | -1.235009 | 1.0 |
2013-01-02 | 0.844961 | -0.999771 | 0.108356 | 0.109456 | 1.0 |
2013-01-03 | 1.343862 | -1.257980 | 0.099766 | -0.137677 | 5.0 |
2013-01-04 | 3.001767 | -0.208167 | -1.059449 | 0.312599 | 5.0 |
Operations¶
Stats¶
Operations in general exclude missing data.
Performing a descriptive statistic:
[33]:
kdf.mean()
[33]:
A 0.555482
B -0.060436
C -0.446473
D -0.115169
dtype: float64
Spark Configurations¶
Various configurations in PySpark could be applied internally in Koalas. For example, you can enable Arrow optimization to hugely speed up internal pandas conversion. See PySpark Usage Guide for Pandas with Apache Arrow.
[34]:
prev = spark.conf.get("spark.sql.execution.arrow.enabled") # Keep its default value.
ks.set_option("compute.default_index_type", "distributed") # Use default index prevent overhead.
import warnings
warnings.filterwarnings("ignore") # Ignore warnings coming from Arrow optimizations.
[35]:
spark.conf.set("spark.sql.execution.arrow.enabled", True)
%timeit ks.range(300000).to_pandas()
311 ms ± 30.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
[36]:
spark.conf.set("spark.sql.execution.arrow.enabled", False)
%timeit ks.range(300000).to_pandas()
1.25 s ± 29.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
[37]:
ks.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.enabled", prev) # Set its default value back.
Grouping¶
By “group by” we are referring to a process involving one or more of the following steps:
Splitting the data into groups based on some criteria
Applying a function to each group independently
Combining the results into a data structure
[38]:
kdf = ks.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
'foo', 'bar', 'foo', 'foo'],
'B': ['one', 'one', 'two', 'three',
'two', 'two', 'one', 'three'],
'C': np.random.randn(8),
'D': np.random.randn(8)})
[39]:
kdf
[39]:
A | B | C | D | |
---|---|---|---|---|
0 | foo | one | 0.392094 | -0.197885 |
1 | bar | one | 0.397240 | 0.768301 |
2 | foo | two | -1.683135 | -0.210606 |
3 | bar | three | -1.776986 | -0.092022 |
4 | foo | two | -0.499332 | 0.463287 |
5 | bar | two | 0.386921 | 1.995358 |
6 | foo | one | -0.514731 | 1.042816 |
7 | foo | three | 0.194186 | 1.745033 |
Grouping and then applying the sum() function to the resulting groups.
[40]:
kdf.groupby('A').sum()
[40]:
C | D | |
---|---|---|
A | ||
bar | -0.992825 | 2.671637 |
foo | -2.110918 | 2.842644 |
Grouping by multiple columns forms a hierarchical index, and again we can apply the sum function.
[41]:
kdf.groupby(['A', 'B']).sum()
[41]:
C | D | ||
---|---|---|---|
A | B | ||
foo | one | -0.122637 | 0.844931 |
two | -2.182467 | 0.252681 | |
bar | three | -1.776986 | -0.092022 |
foo | three | 0.194186 | 1.745033 |
bar | two | 0.386921 | 1.995358 |
one | 0.397240 | 0.768301 |
Plotting¶
See the Plotting docs.
[42]:
pser = pd.Series(np.random.randn(1000),
index=pd.date_range('1/1/2000', periods=1000))
[43]:
kser = ks.Series(pser)
[44]:
kser = kser.cummax()
[45]:
kser.plot()
On a DataFrame, the plot() method is a convenience to plot all of the columns with labels:
[46]:
pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
columns=['A', 'B', 'C', 'D'])
[47]:
kdf = ks.from_pandas(pdf)
[48]:
kdf = kdf.cummax()
[49]:
kdf.plot()
Getting data in/out¶
See the Input/Output docs.
CSV¶
CSV is straightforward and easy to use. See here to write a CSV file and here to read a CSV file.
[50]:
kdf.to_csv('foo.csv')
ks.read_csv('foo.csv').head(10)
[50]:
A | B | C | D | |
---|---|---|---|---|
0 | -0.821342 | -0.325142 | 0.904636 | -0.925984 |
1 | 1.498758 | 0.045747 | 0.904636 | 0.726606 |
2 | 1.498758 | 0.045747 | 0.904636 | 0.726606 |
3 | 1.498758 | 1.534086 | 0.904636 | 0.726606 |
4 | 1.498758 | 1.534086 | 0.904636 | 0.726606 |
5 | 1.498758 | 1.534086 | 0.904636 | 0.726606 |
6 | 1.498758 | 1.534086 | 0.904636 | 0.726606 |
7 | 1.498758 | 1.534086 | 0.904636 | 0.856176 |
8 | 1.498758 | 1.534086 | 0.904636 | 0.856176 |
9 | 1.498758 | 1.534086 | 0.904636 | 1.532448 |
Parquet¶
Parquet is an efficient and compact file format to read and write faster. See here to write a Parquet file and here to read a Parquet file.
[51]:
kdf.to_parquet('bar.parquet')
ks.read_parquet('bar.parquet').head(10)
[51]:
A | B | C | D | |
---|---|---|---|---|
0 | -0.821342 | -0.325142 | 0.904636 | -0.925984 |
1 | 1.498758 | 0.045747 | 0.904636 | 0.726606 |
2 | 1.498758 | 0.045747 | 0.904636 | 0.726606 |
3 | 1.498758 | 1.534086 | 0.904636 | 0.726606 |
4 | 1.498758 | 1.534086 | 0.904636 | 0.726606 |
5 | 1.498758 | 1.534086 | 0.904636 | 0.726606 |
6 | 1.498758 | 1.534086 | 0.904636 | 0.726606 |
7 | 1.498758 | 1.534086 | 0.904636 | 0.856176 |
8 | 1.498758 | 1.534086 | 0.904636 | 0.856176 |
9 | 1.498758 | 1.534086 | 0.904636 | 1.532448 |
Spark IO¶
In addition, Koalas fully support Spark’s various datasources such as ORC and an external datasource. See here to write it to the specified datasource and here to read it from the datasource.
[52]:
kdf.to_spark_io('zoo.orc', format="orc")
ks.read_spark_io('zoo.orc', format="orc").head(10)
[52]:
A | B | C | D | |
---|---|---|---|---|
0 | -0.821342 | -0.325142 | 0.904636 | -0.925984 |
1 | 1.498758 | 0.045747 | 0.904636 | 0.726606 |
2 | 1.498758 | 0.045747 | 0.904636 | 0.726606 |
3 | 1.498758 | 1.534086 | 0.904636 | 0.726606 |
4 | 1.498758 | 1.534086 | 0.904636 | 0.726606 |
5 | 1.498758 | 1.534086 | 0.904636 | 0.726606 |
6 | 1.498758 | 1.534086 | 0.904636 | 0.726606 |
7 | 1.498758 | 1.534086 | 0.904636 | 0.856176 |
8 | 1.498758 | 1.534086 | 0.904636 | 0.856176 |
9 | 1.498758 | 1.534086 | 0.904636 | 1.532448 |