Kotlin
A concise multiplatform language developed by JetBrains
Kotlin API for Apache Spark 1.0 Released
The Kotlin API for Apache Spark is now widely available. This is the first stable release of the API that we consider to be feature-complete with respect to the user experience and compatibility with core Spark APIs.
Let’s take a look at the new features this release brings to the API.
- Typed select and sort
- More column functions
- More
KeyValueGroupedDataset
wrapper functions - Support for Scala
TupleN
classes - Support for date and time types
- Support for maps encoded as tuples
- Conclusion
Typed select and sort
The Scala API has a typed select
method that returns Datasets
of Tuples
. Sometimes using them can be more idiomatic or convenient than using the map
function. Here’s what the syntax for this method looks like:
case class TestData(id: Long, name: String, url: String) // ds is of type Dataset[TestData] val result: Dataset[Tuple2[String, Long]] = ds.select($"name".as[String], $"id".as[Long])
Sometimes obtaining just a tuple may be really convenient, but this method has a drawback: you have to select a column by name and explicitly provide the type. This can lead to errors, which might be hard to fix in long pipelines.
We’re trying to address this issue at least partially in our extension to the Scala API. Consider the following Kotlin code:
data class TestData(val id: Long, val name: String, val url: String) // ds is of type Dataset<TestData> val result: Dataset<Arity2<String, Long>> = ds.selectTyped(TestData::name, TestData::id)
The result is the same, but the call is entirely type-safe. We do not use any strings and casts, and both the column name and the type are inferred from reflection.
We have also added a similarly reflective syntax to the sort
function.
In Scala, this API supports arities up to 5, and we decided to be as consistent with the Scala API as possible. We also think that the usage of tuples with arities above five is an indication that something is going wrong. For example, maybe it would be better to extract a new domain object or, conversely, to work with untyped datasets.
More column functions
The Scala API is very rich in terms of functions that can be called on columns. We cannot make them identical to the Scala API because of the limitations of Kotlin. For example, overriding class members with extensions is forbidden, and the Dataset
class is not extensible. But we can at least use infix functions and names in backticks to implement operator-like functions.
Here are the operator-like functions that we currently support:
==
!=
eq / `===`
neq / `=!=`
-col(...)
!col(...)
gt
lt
geq
leq
or
and /
`&&`+
-
*
/
%
Luckily, we can see that very few of the functions require backticks, and those that do can be autocompleted without you having to type them.
More KeyValueGroupedDataset
wrapper functions
We initially designed the API so that anyone could call any function that requires a Decoder
and Encoder
simply by using the magic <em>encoder()</em>
function, which generates everything automagically. It gave our users some flexibility, and it also allowed us not to implement all the functions that the Dataset
API offers. But we would ultimately like to provide our users with the best developer experience possible. This is why we’ve implemented necessary wrappers over KeyValueGroupedDataset
, and also why we’ve added support for the following functions:
cogroup
flatMapGroupsWithState
mapGroupsWithState
Support for Scala TupleN
classes
There are several APIs in the Spark API that return Datasets
of Tuples
. Examples of such APIs are the select
and joinWith
functions. Before this release, users had to manually find an encoder for tuples:
val encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT()) ds .select(ds.col("a").`as`<String>, ds.col("b").`as`<Int>) .map({ Tuple2(it._1(), it._2() + 1) }, encoder)
And the more we work with tuples, the more encoders we need, which leads to verbosity and requires us to find increasing numbers of names for new encoders.
After this change, code becomes as simple as any usual Kotlin API code:
ds .select(ds.col("a").`as`<String>, ds.col("b").`as`<Int>) .map { Tuple2(it._1(), it._2() + 1) }
You no longer need to rely on specific encoders or lambdas inside argument lists.
Support for date and time types
Work with dates and times is an important part of many data engineering workflows. For Spark 3.0, we had default encoders registered for Date
and Timestamp
, but inside data structures we had support only for LocalDate
and Instant
, which is obviously not enough. We now have full support for LocalDate
, Date
, Timestamp
, and Instant
both as top-level entities of dataframes and as fields inside of structures.
We have also added support for Date
and Timestamp
as fields inside of structures for Spark 2.
Support for maps encoded as tuples
There is a well-known practice of encoding maps as tuples. For example, rather than storing the ID of an entity and the name of the entity in the map, it is fairly common to store them in two columns in a structure like Dataset<Pair<Long, String>>
(which is how relational databases usually work).
We are aware of this, and we’ve decided to add support for working with such datasets in the same way you work with maps. We have added the functions takeKeys
and takeValues
to Dataset<Tuple2<T1, T2>>
, Dataset<Pair<T1, T2>>
, and Dataset<Arity2<T1, T2>>
.
Conclusion
We want to say a huge “thank you” to Jolan Rensen, who helped us tremendously by offering feedback, assisting with the implementation of features, and fixing bugs in this release. He worked with our project while writing his thesis, and we’re happy that we can help him with his brilliant work. If you want to read more about Jolan, please visit his site.
If you want to read more about the details of the new release, please check out the changelog.
As usual, the latest release is available at Maven Central. And we would love to get your feedback, which you can leave in: