10 minutes to Koalas

This is a short introduction to Koalas, geared mainly for new users. This notebook shows you some key differences between pandas and Koalas. You can run this examples by yourself on a live notebook here. For Databricks Runtime, you can import and run the current .ipynb file out of the box. Try it on Databricks Community Edition for free.

Customarily, we import Koalas as follows:

[1]:
import pandas as pd
import numpy as np
import databricks.koalas as ks
from pyspark.sql import SparkSession

Object Creation

Creating a Koalas Series by passing a list of values, letting Koalas create a default integer index:

[2]:
s = ks.Series([1, 3, 5, np.nan, 6, 8])
[3]:
s
[3]:
0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

Creating a Koalas DataFrame by passing a dict of objects that can be converted to series-like.

[4]:
kdf = ks.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])
[5]:
kdf
[5]:
a b c
10 1 100 one
20 2 200 two
30 3 300 three
40 4 400 four
50 5 500 five
60 6 600 six

Creating a pandas DataFrame by passing a numpy array, with a datetime index and labeled columns:

[6]:
dates = pd.date_range('20130101', periods=6)
[7]:
dates
[7]:
DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06'],
              dtype='datetime64[ns]', freq='D')
[8]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
[9]:
pdf
[9]:
A B C D
2013-01-01 -0.621429 1.515041 -1.735483 -1.235009
2013-01-02 0.844961 -0.999771 0.108356 0.109456
2013-01-03 1.343862 -1.257980 0.099766 -0.137677
2013-01-04 3.001767 -0.208167 -1.059449 0.312599
2013-01-05 -0.035864 0.312126 0.252281 0.627551
2013-01-06 -1.200404 0.276134 -0.344308 -0.367934

Now, this pandas DataFrame can be converted to a Koalas DataFrame

[10]:
kdf = ks.from_pandas(pdf)
[11]:
type(kdf)
[11]:
databricks.koalas.frame.DataFrame

It looks and behaves the same as a pandas DataFrame though

[12]:
kdf
[12]:
A B C D
2013-01-01 -0.621429 1.515041 -1.735483 -1.235009
2013-01-02 0.844961 -0.999771 0.108356 0.109456
2013-01-03 1.343862 -1.257980 0.099766 -0.137677
2013-01-04 3.001767 -0.208167 -1.059449 0.312599
2013-01-05 -0.035864 0.312126 0.252281 0.627551
2013-01-06 -1.200404 0.276134 -0.344308 -0.367934

Also, it is possible to create a Koalas DataFrame from Spark DataFrame.

Creating a Spark DataFrame from pandas DataFrame

[13]:
spark = SparkSession.builder.getOrCreate()
[14]:
sdf = spark.createDataFrame(pdf)
[15]:
sdf.show()
+--------------------+--------------------+--------------------+--------------------+
|                   A|                   B|                   C|                   D|
+--------------------+--------------------+--------------------+--------------------+
| -0.6214290839748133|  1.5150410562536945| -1.7354827055737831| -1.2350091172431052|
|  0.8449607212376394| -0.9997705636655247| 0.10835607649858589|  0.1094555359929294|
|  1.3438622379103737| -1.2579798113362755|  0.0997664833965215|-0.13767658889070905|
|   3.001767403315059|-0.20816676142436616| -1.0594485090898984| 0.31259853367492724|
|-0.03586387305407219|  0.3121259401964947|  0.2522808041799677|  0.6275512901423211|
| -1.2004042904971255| 0.27613400857508563|-0.34430818441482375|-0.36793440398703187|
+--------------------+--------------------+--------------------+--------------------+

Creating Koalas DataFrame from Spark DataFrame. to_koalas() is automatically attached to Spark DataFrame and available as an API when Koalas is imported.

[16]:
kdf = sdf.to_koalas()
[17]:
kdf
[17]:
A B C D
0 -0.621429 1.515041 -1.735483 -1.235009
1 0.844961 -0.999771 0.108356 0.109456
2 1.343862 -1.257980 0.099766 -0.137677
3 3.001767 -0.208167 -1.059449 0.312599
4 -0.035864 0.312126 0.252281 0.627551
5 -1.200404 0.276134 -0.344308 -0.367934

Having specific dtypes . Types that are common to both Spark and pandas are currently supported.

[18]:
kdf.dtypes
[18]:
A    float64
B    float64
C    float64
D    float64
dtype: object

Viewing Data

See the API Reference.

See the top rows of the frame. The results may not be the same as pandas though: unlike pandas, the data in a Spark dataframe is not ordered, it has no intrinsic notion of index. When asked for the head of a dataframe, Spark will just take the requested number of rows from a partition. Do not rely on it to return specific rows, use .loc or iloc instead.

[19]:
kdf.head()
[19]:
A B C D
0 -0.621429 1.515041 -1.735483 -1.235009
1 0.844961 -0.999771 0.108356 0.109456
2 1.343862 -1.257980 0.099766 -0.137677
3 3.001767 -0.208167 -1.059449 0.312599
4 -0.035864 0.312126 0.252281 0.627551

Display the index, columns, and the underlying numpy data.

You can also retrieve the index; the index column can be ascribed to a DataFrame, see later

[20]:
kdf.index
[20]:
Int64Index([0, 1, 2, 3, 4, 5], dtype='int64')
[21]:
kdf.columns
[21]:
Index(['A', 'B', 'C', 'D'], dtype='object')
[22]:
kdf.to_numpy()
[22]:
array([[-0.62142908,  1.51504106, -1.73548271, -1.23500912],
       [ 0.84496072, -0.99977056,  0.10835608,  0.10945554],
       [ 1.34386224, -1.25797981,  0.09976648, -0.13767659],
       [ 3.0017674 , -0.20816676, -1.05944851,  0.31259853],
       [-0.03586387,  0.31212594,  0.2522808 ,  0.62755129],
       [-1.20040429,  0.27613401, -0.34430818, -0.3679344 ]])

Describe shows a quick statistic summary of your data

[23]:
kdf.describe()
[23]:
A B C D
count 6.000000 6.000000 6.000000 6.000000
mean 0.555482 -0.060436 -0.446473 -0.115169
std 1.517076 1.007223 0.792741 0.648616
min -1.200404 -1.257980 -1.735483 -1.235009
25% -0.621429 -0.999771 -1.059449 -0.367934
50% -0.035864 -0.208167 -0.344308 -0.137677
75% 1.343862 0.312126 0.108356 0.312599
max 3.001767 1.515041 0.252281 0.627551

Transposing your data

[24]:
kdf.T
[24]:
0 1 2 3 4 5
A -0.621429 0.844961 1.343862 3.001767 -0.035864 -1.200404
B 1.515041 -0.999771 -1.257980 -0.208167 0.312126 0.276134
C -1.735483 0.108356 0.099766 -1.059449 0.252281 -0.344308
D -1.235009 0.109456 -0.137677 0.312599 0.627551 -0.367934

Sorting by its index

[25]:
kdf.sort_index(ascending=False)
[25]:
A B C D
5 -1.200404 0.276134 -0.344308 -0.367934
4 -0.035864 0.312126 0.252281 0.627551
3 3.001767 -0.208167 -1.059449 0.312599
2 1.343862 -1.257980 0.099766 -0.137677
1 0.844961 -0.999771 0.108356 0.109456
0 -0.621429 1.515041 -1.735483 -1.235009

Sorting by value

[26]:
kdf.sort_values(by='B')
[26]:
A B C D
2 1.343862 -1.257980 0.099766 -0.137677
1 0.844961 -0.999771 0.108356 0.109456
3 3.001767 -0.208167 -1.059449 0.312599
5 -1.200404 0.276134 -0.344308 -0.367934
4 -0.035864 0.312126 0.252281 0.627551
0 -0.621429 1.515041 -1.735483 -1.235009

Missing Data

Koalas primarily uses the value np.nan to represent missing data. It is by default not included in computations.

[27]:
pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])
[28]:
pdf1.loc[dates[0]:dates[1], 'E'] = 1
[29]:
kdf1 = ks.from_pandas(pdf1)
[30]:
kdf1
[30]:
A B C D E
2013-01-01 -0.621429 1.515041 -1.735483 -1.235009 1.0
2013-01-02 0.844961 -0.999771 0.108356 0.109456 1.0
2013-01-03 1.343862 -1.257980 0.099766 -0.137677 NaN
2013-01-04 3.001767 -0.208167 -1.059449 0.312599 NaN

To drop any rows that have missing data.

[31]:
kdf1.dropna(how='any')
[31]:
A B C D E
2013-01-01 -0.621429 1.515041 -1.735483 -1.235009 1.0
2013-01-02 0.844961 -0.999771 0.108356 0.109456 1.0

Filling missing data.

[32]:
kdf1.fillna(value=5)
[32]:
A B C D E
2013-01-01 -0.621429 1.515041 -1.735483 -1.235009 1.0
2013-01-02 0.844961 -0.999771 0.108356 0.109456 1.0
2013-01-03 1.343862 -1.257980 0.099766 -0.137677 5.0
2013-01-04 3.001767 -0.208167 -1.059449 0.312599 5.0

Operations

Stats

Operations in general exclude missing data.

Performing a descriptive statistic:

[33]:
kdf.mean()
[33]:
A    0.555482
B   -0.060436
C   -0.446473
D   -0.115169
dtype: float64

Spark Configurations

Various configurations in PySpark could be applied internally in Koalas. For example, you can enable Arrow optimization to hugely speed up internal pandas conversion. See PySpark Usage Guide for Pandas with Apache Arrow.

[34]:
prev = spark.conf.get("spark.sql.execution.arrow.enabled")  # Keep its default value.
ks.set_option("compute.default_index_type", "distributed")  # Use default index prevent overhead.
import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.
[35]:
spark.conf.set("spark.sql.execution.arrow.enabled", True)
%timeit ks.range(300000).to_pandas()
311 ms ± 30.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
[36]:
spark.conf.set("spark.sql.execution.arrow.enabled", False)
%timeit ks.range(300000).to_pandas()
1.25 s ± 29.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
[37]:
ks.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.enabled", prev)  # Set its default value back.

Grouping

By “group by” we are referring to a process involving one or more of the following steps:

  • Splitting the data into groups based on some criteria

  • Applying a function to each group independently

  • Combining the results into a data structure

[38]:
kdf = ks.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
                          'foo', 'bar', 'foo', 'foo'],
                    'B': ['one', 'one', 'two', 'three',
                          'two', 'two', 'one', 'three'],
                    'C': np.random.randn(8),
                    'D': np.random.randn(8)})
[39]:
kdf
[39]:
A B C D
0 foo one 0.392094 -0.197885
1 bar one 0.397240 0.768301
2 foo two -1.683135 -0.210606
3 bar three -1.776986 -0.092022
4 foo two -0.499332 0.463287
5 bar two 0.386921 1.995358
6 foo one -0.514731 1.042816
7 foo three 0.194186 1.745033

Grouping and then applying the sum() function to the resulting groups.

[40]:
kdf.groupby('A').sum()
[40]:
C D
A
bar -0.992825 2.671637
foo -2.110918 2.842644

Grouping by multiple columns forms a hierarchical index, and again we can apply the sum function.

[41]:
kdf.groupby(['A', 'B']).sum()
[41]:
C D
A B
foo one -0.122637 0.844931
two -2.182467 0.252681
bar three -1.776986 -0.092022
foo three 0.194186 1.745033
bar two 0.386921 1.995358
one 0.397240 0.768301

Plotting

See the Plotting docs.

[42]:
pser = pd.Series(np.random.randn(1000),
                 index=pd.date_range('1/1/2000', periods=1000))
[43]:
kser = ks.Series(pser)
[44]:
kser = kser.cummax()
[45]:
kser.plot()

On a DataFrame, the plot() method is a convenience to plot all of the columns with labels:

[46]:
pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
                   columns=['A', 'B', 'C', 'D'])
[47]:
kdf = ks.from_pandas(pdf)
[48]:
kdf = kdf.cummax()
[49]:
kdf.plot()

Getting data in/out

See the Input/Output docs.

CSV

CSV is straightforward and easy to use. See here to write a CSV file and here to read a CSV file.

[50]:
kdf.to_csv('foo.csv')
ks.read_csv('foo.csv').head(10)
[50]:
A B C D
0 -0.821342 -0.325142 0.904636 -0.925984
1 1.498758 0.045747 0.904636 0.726606
2 1.498758 0.045747 0.904636 0.726606
3 1.498758 1.534086 0.904636 0.726606
4 1.498758 1.534086 0.904636 0.726606
5 1.498758 1.534086 0.904636 0.726606
6 1.498758 1.534086 0.904636 0.726606
7 1.498758 1.534086 0.904636 0.856176
8 1.498758 1.534086 0.904636 0.856176
9 1.498758 1.534086 0.904636 1.532448

Parquet

Parquet is an efficient and compact file format to read and write faster. See here to write a Parquet file and here to read a Parquet file.

[51]:
kdf.to_parquet('bar.parquet')
ks.read_parquet('bar.parquet').head(10)
[51]:
A B C D
0 -0.821342 -0.325142 0.904636 -0.925984
1 1.498758 0.045747 0.904636 0.726606
2 1.498758 0.045747 0.904636 0.726606
3 1.498758 1.534086 0.904636 0.726606
4 1.498758 1.534086 0.904636 0.726606
5 1.498758 1.534086 0.904636 0.726606
6 1.498758 1.534086 0.904636 0.726606
7 1.498758 1.534086 0.904636 0.856176
8 1.498758 1.534086 0.904636 0.856176
9 1.498758 1.534086 0.904636 1.532448

Spark IO

In addition, Koalas fully support Spark’s various datasources such as ORC and an external datasource. See here to write it to the specified datasource and here to read it from the datasource.

[52]:
kdf.to_spark_io('zoo.orc', format="orc")
ks.read_spark_io('zoo.orc', format="orc").head(10)
[52]:
A B C D
0 -0.821342 -0.325142 0.904636 -0.925984
1 1.498758 0.045747 0.904636 0.726606
2 1.498758 0.045747 0.904636 0.726606
3 1.498758 1.534086 0.904636 0.726606
4 1.498758 1.534086 0.904636 0.726606
5 1.498758 1.534086 0.904636 0.726606
6 1.498758 1.534086 0.904636 0.726606
7 1.498758 1.534086 0.904636 0.856176
8 1.498758 1.534086 0.904636 0.856176
9 1.498758 1.534086 0.904636 1.532448