Kotlin API for Apache Spark v1.2: UDTs, UDFs, RDDs, Compatibility, and More!

Hi everyone, Jolan here, with my first actual blog post! It’s been a couple of months since the last release of the Kotlin API for Apache Spark and we feel like all of the exciting changes since then are worth another announcement.

Let me remind you what the Kotlin API for Apache Spark is and why it was created. Apache Spark is a framework for distributed computations, which data engineers usually use to solve different tasks, like in the ETL process. It supports multiple languages straight out of the box, including Java, Scala, Python, and R. We at JetBrains are committed to supporting one more language for Apache Spark – Kotlin. We believe it can combine multiple pros from other language APIs while avoiding their cons.

If you don’t want to read about it and just want to try it out – here is the link to the repo.

Kotlin Spark API on GitHub

Otherwise, let’s begin!

UDTs & UDFs

User-defined types (or UDTs) are Spark’s go-to method for allowing any instance of a data-containing class to be stored in and read from a Dataset. One of the extensions for Spark that we now fully support which uses these types is Mllib, a library that aims to make practical machine learning scalable and easy. An example using Mllib can be found here (in Datalore). To define your own UDTs, here’s another example (in Datalore).

While it’s generally recommended to use Spark’s column functions for your Dataset- or SQL operations, sometimes a User-defined function (UDF) or User-defined aggregate function (UDAF) can provide extra flexibility. While the Kotlin Spark API did contain basic UDF support, we have greatly expanded that support. If you want, you can jump straight to all examples (in Datalore).

Similar to Spark’s API, you can create a UDF or UDAF and register it to make it executable from SQL. You can follow the original API, but you can also use some neat Kotlin tricks to make it a bit easier:

In contrast to Spark’s API, the created UDFs are now also typed. This means that when you directly invoke a UDF instance using Dataset.select() calls with the correctly typed TypedColumns, the UDF will return the correct types too:

Finally, we added simple Vararg-UDFs, which do not even exist in the Scala API. This allows a UDF to accept any number of same-type arguments and works both from SQL and the Dataset invocations.


(Note, the following example also works when you define a UDF the “normal” way using an array of any type as a single argument.)

We strongly encourage you to take a look at the examples (in Datalore) since they go over everything that’s possible with the new UDFs, and most importantly, we encourage you to play around with them and let us know what you think!

RDDs

Resilient Distributed Datasets (RDDs) are the backbone of Apache Spark. While in most cases typed Datasets are preferred, RDDs still have a special place thanks to their flexibility and speed. They are also still used under the hood of Datasets. While the Java API of RDDs is perfectly usable from Kotlin, we thought we could port over some of the niceties from the Scala API.

Similar to Scala, when you’re working with an RDD with the type Tuple2, you can utilize all kinds of handy key / value functions, like aggregateByKey(), reduceByKey(), groupByKey(), filterByRange(), and values() – no mapToPair necessary!

This also holds true for RRDs with a numeric type. Whereas previously you’d need to mapToDouble, you can now call mean(), sum(), stdev()max(), etc. directly.

Finally, we added some Kotlin-esque creation functions surrounding RDDs, such as rddOf()List.toRDD(), and RDD.toDS().
For an extensive example surrounding generating unique groups of indices using RDDs, look no further (in Datalore).

Jupyter

We are excited to see people prototyping and preprocessing their data using Spark from Jupyter notebooks in Kotlin! We further enhanced support for this by adding some essential features:

While adding %use spark is usually enough to get you going, sometimes it’s essential to change some of the properties Spark uses. This is why from release v1.2 (and the Kotlin Jupyter kernel v0.11.0-134 and up), you can now define Spark properties directly in the use-magic!

For example:  %use spark(spark=3.3.0, scala=2.12, spark.master=local[4]).

Display options were also overhauled; You can now set them using a nice little DSL:

This specific example will limit the number of rows displayed to only 2 and remove the truncation of text in cells entirely. These properties can also be provided in the use-magic.
Check Datalore for an example!

Compatibility

The largest changes for this release happened under the hood. We migrated the build system from Maven to Gradle and designed a new building method so that we can (as of right now) build 14 different versions of the Kotlin Spark API at once!

“Why 14 versions?” you might ask. Well… it’s complicated… But let me attempt to explain. You can also skip this and just jump to the new naming scheme below.

While most projects using Spark operate just fine with a patch-version change, the Kotlin Spark API works by replacing a core file of Spark. This file only operates well with the patch version it was created for, thanks to Spark using reflection to call parts of it.

Before, we would push releases just for each latest minor version of Spark (e.g. 3.0.3, 3.1.3, and 3.2.1). This worked fine, as long as you used the same exact Spark version. However, not everyone has the luxury of having the latest patch version of Spark, or even the same Scala version, especially when bound to an external server environment.

To ensure all of our users are able to use the latest version of the Kotlin Spark API, no matter their setup, we now build our API for all patch versions and all core Scala versions of Apache Spark from 3.0.0+, as found on Maven Central, using a preprocessor and lots of tests to solve the compatibility issues found in between versions.

The new naming scheme for the packages will now contain both the Apache Spark patch version, as well as the Scala core version:

[name]_[Apache Spark version]_[Scala core version]:[Kotlin for Apache Spark API version]

The only exception to this is scala-tuples-in-kotlin_[Scala core version]:[Kotlin for Apache Spark API version], which is independent of Spark and can be used on its own without Spark in any of your Kotlin / Scala hybrid projects!

The new release scheme looks as follows:

Apache SparkScalaKotlin Spark API
3.3.02.13kotlin-spark-api_3.3.0_2.13:VERSION
2.12kotlin-spark-api_3.3.0_2.12:VERSION
3.2.12.13kotlin-spark-api_3.2.1_2.13:VERSION
2.12kotlin-spark-api_3.2.1_2.12:VERSION
3.2.02.13kotlin-spark-api_3.2.0_2.13:VERSION
2.12kotlin-spark-api_3.2.0_2.12:VERSION
3.1.32.12kotlin-spark-api_3.1.3_2.12:VERSION
3.1.22.12kotlin-spark-api_3.1.2_2.12:VERSION
3.1.12.12kotlin-spark-api_3.1.1_2.12:VERSION
3.1.02.12kotlin-spark-api_3.1.0_2.12:VERSION
3.0.32.12kotlin-spark-api_3.0.3_2.12:VERSION
3.0.22.12kotlin-spark-api_3.0.2_2.12:VERSION
3.0.12.12kotlin-spark-api_3.0.1_2.12:VERSION
3.0.02.12kotlin-spark-api_3.0.0_2.12:VERSION

Where VERSION, at the time of writing, is “1.2.1”.

Conclusion

These are the most important changes to the Kotlin API for Apache Spark version 1.2. If you want to take a deeper look into everything that’s changed, we now use Milestones to keep track of the changes, as well as Github Releases.

As usual, you can find the release on Maven Central.

In other news, we now also publish SNAPSHOT versions to GitHub Packages during development. If you want to be the first to try out new features and provide us with feedback, give it a try!

If you have any ideas or if you need help or support, please contact us on Slack or GitHub issues. We gladly welcome any feedback or feature requests!
As usual, thanks to Pasha Finkelshteyn for his continued support during the development of the project!