This is a short introduction to Koalas, geared mainly for new users. This notebook shows you some key differences between pandas and Koalas. You can run this examples by yourself on a live notebook here. For Databricks users, you can import the current .ipynb file and run it after installing Koalas.
Customarily, we import Koalas as follows:
[1]:
import pandas as pd import numpy as np import databricks.koalas as ks from pyspark.sql import SparkSession
Creating a Koalas Series by passing a list of values, letting Koalas create a default integer index:
[2]:
s = ks.Series([1, 3, 5, np.nan, 6, 8])
[3]:
s
0 1.0 1 3.0 2 5.0 3 NaN 4 6.0 5 8.0 Name: 0, dtype: float64
Creating a Koalas DataFrame by passing a dict of objects that can be converted to series-like.
[4]:
kdf = ks.DataFrame( {'a': [1, 2, 3, 4, 5, 6], 'b': [100, 200, 300, 400, 500, 600], 'c': ["one", "two", "three", "four", "five", "six"]}, index=[10, 20, 30, 40, 50, 60])
[5]:
kdf
Creating a pandas DataFrame by passing a numpy array, with a datetime index and labeled columns:
[6]:
dates = pd.date_range('20130101', periods=6)
[7]:
dates
DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04', '2013-01-05', '2013-01-06'], dtype='datetime64[ns]', freq='D')
[8]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
[9]:
pdf
Now, this pandas DataFrame can be converted to a Koalas DataFrame
[10]:
kdf = ks.from_pandas(pdf)
[11]:
type(kdf)
databricks.koalas.frame.DataFrame
It looks and behaves the same as a pandas DataFrame though
[12]:
Also, it is possible to create a Koalas DataFrame from Spark DataFrame.
Creating a Spark DataFrame from pandas DataFrame
[13]:
spark = SparkSession.builder.getOrCreate()
[14]:
sdf = spark.createDataFrame(pdf)
[15]:
sdf.show()
+--------------------+-------------------+--------------------+-------------------+ | A| B| C| D| +--------------------+-------------------+--------------------+-------------------+ |-0.40729126067930577|0.06655086061836445|-0.07314878758440578| 0.6482187447085683| | -0.848735274668907|0.43727685786558224| 0.6326566086816865| 0.312860815784838| |-0.41553692955141575|-1.7870717259038067| 0.24222142308402184| 0.125543462922973| | -1.637270523583917| 1.1348099198020765| 0.2825324338895592|0.13399483028402598| | -1.2304766522352943|-1.9257342346663335| 0.7362879432261002|-0.5476765308367703| | 1.0928943198263723|-1.0712812856772376| 0.31875224896792975|-0.4775906715060247| +--------------------+-------------------+--------------------+-------------------+
Creating Koalas DataFrame from Spark DataFrame. to_koalas() is automatically attached to Spark DataFrame and available as an API when Koalas is imported.
to_koalas()
[16]:
kdf = sdf.to_koalas()
[17]:
Having specific dtypes . Types that are common to both Spark and pandas are currently supported.
[18]:
kdf.dtypes
A float64 B float64 C float64 D float64 dtype: object
See the API Reference.
See the top rows of the frame. The results may not be the same as pandas though: unlike pandas, the data in a Spark dataframe is not ordered, it has no intrinsic notion of index. When asked for the head of a dataframe, Spark will just take the requested number of rows from a partition. Do not rely on it to return specific rows, use .loc or iloc instead.
.loc
iloc
[19]:
kdf.head()
Display the index, columns, and the underlying numpy data.
You can also retrieve the index; the index column can be ascribed to a DataFrame, see later
[20]:
kdf.index
Int64Index([0, 1, 2, 3, 4, 5], dtype='int64')
[21]:
kdf.columns
Index(['A', 'B', 'C', 'D'], dtype='object')
[22]:
kdf.to_numpy()
array([[-0.40729126, 0.06655086, -0.07314879, 0.64821874], [-0.84873527, 0.43727686, 0.63265661, 0.31286082], [-0.41553693, -1.78707173, 0.24222142, 0.12554346], [-1.63727052, 1.13480992, 0.28253243, 0.13399483], [-1.23047665, -1.92573423, 0.73628794, -0.54767653], [ 1.09289432, -1.07128129, 0.31875225, -0.47759067]])
Describe shows a quick statistic summary of your data
[23]:
kdf.describe()
Transposing your data
[24]:
kdf.T
Sorting by its index
[25]:
kdf.sort_index(ascending=False)
Sorting by value
[26]:
kdf.sort_values(by='B')
Koalas primarily uses the value np.nan to represent missing data. It is by default not included in computations.
np.nan
[27]:
pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])
[28]:
pdf1.loc[dates[0]:dates[1], 'E'] = 1
[29]:
kdf1 = ks.from_pandas(pdf1)
[30]:
kdf1
To drop any rows that have missing data.
[31]:
kdf1.dropna(how='any')
Filling missing data.
[32]:
kdf1.fillna(value=5)
Operations in general exclude missing data.
Performing a descriptive statistic:
[33]:
kdf.mean()
A -0.574403 B -0.524242 C 0.356550 D 0.032558 dtype: float64
Various configurations in PySpark could be applied internally in Koalas. For example, you can enable Arrow optimization to hugely speed up internal pandas conversion. See PySpark Usage Guide for Pandas with Apache Arrow.
[34]:
prev = spark.conf.get("spark.sql.execution.arrow.enabled") # Keep its default value. ks.set_option("compute.default_index_type", "distributed") # Use default index prevent overhead. import warnings warnings.filterwarnings("ignore") # Ignore warnings coming from Arrow optimizations.
[35]:
spark.conf.set("spark.sql.execution.arrow.enabled", True) %timeit ks.range(300000).to_pandas()
493 ms ± 157 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
[36]:
spark.conf.set("spark.sql.execution.arrow.enabled", False) %timeit ks.range(300000).to_pandas()
1.39 s ± 109 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
[37]:
ks.reset_option("compute.default_index_type") spark.conf.set("spark.sql.execution.arrow.enabled", prev) # Set its default value back.
By “group by” we are referring to a process involving one or more of the following steps:
Splitting the data into groups based on some criteria
Applying a function to each group independently
Combining the results into a data structure
[38]:
kdf = ks.DataFrame({'A': ['foo', 'bar', 'foo', 'bar', 'foo', 'bar', 'foo', 'foo'], 'B': ['one', 'one', 'two', 'three', 'two', 'two', 'one', 'three'], 'C': np.random.randn(8), 'D': np.random.randn(8)})
[39]:
Grouping and then applying the sum() function to the resulting groups.
[40]:
kdf.groupby('A').sum()
Grouping by multiple columns forms a hierarchical index, and again we can apply the sum function.
[41]:
kdf.groupby(['A', 'B']).sum()
See the Plotting docs.
[42]:
%matplotlib inline from matplotlib import pyplot as plt
[43]:
pser = pd.Series(np.random.randn(1000), index=pd.date_range('1/1/2000', periods=1000))
[44]:
kser = ks.Series(pser)
[45]:
kser = kser.cummax()
[46]:
kser.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x7feeb4b350b8>
On a DataFrame, the plot() method is a convenience to plot all of the columns with labels:
[47]:
pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index, columns=['A', 'B', 'C', 'D'])
[48]:
[49]:
kdf = kdf.cummax()
[50]:
kdf.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x7feebe266978>
See the Input/Output docs.
CSV is straightforward and easy to use. See here to write a CSV file and here to read a CSV file.
[51]:
kdf.to_csv('foo.csv') ks.read_csv('foo.csv').head(10)
Parquet is an efficient and compact file format to read and write faster. See here to write a Parquet file and here to read a Parquet file.
[52]:
kdf.to_parquet('bar.parquet') ks.read_parquet('bar.parquet').head(10)
In addition, Koalas fully support Spark’s various datasources such as ORC and an external datasource. See here to write it to the specified datasource and here to read it from the datasource.
[53]:
kdf.to_spark_io('zoo.orc', format="orc") ks.read_spark_io('zoo.orc', format="orc").head(10)