{"id":249904,"date":"2022-05-26T12:15:15","date_gmt":"2022-05-26T11:15:15","guid":{"rendered":"https:\/\/blog.jetbrains.com\/?post_type=big-data-tools&#038;p=249904"},"modified":"2022-11-30T13:45:27","modified_gmt":"2022-11-30T12:45:27","slug":"kotlin-api-for-apache-spark-streaming-jupyter-and-more","status":"publish","type":"big-data-tools","link":"https:\/\/blog.jetbrains.com\/zh-hans\/big-data-tools\/2022\/05\/26\/kotlin-api-for-apache-spark-streaming-jupyter-and-more","title":{"rendered":"Kotlin API for Apache Spark: Streaming, Jupyter, and More"},"content":{"rendered":"\n<p>Hello, fellow data engineers! It\u2019s Pasha here, and today I&#8217;m going to introduce you to the new release of Kotlin API for Apache Spark. It&#8217;s been a long time since the last major release announcements, mainly because we wanted to avoid bothering you with minor improvements. But today&#8217;s announcement is huge!<\/p>\n\n\n\n<p>First, let me remind you what the Kotlin API for Apache Spark is and why it was created. Apache Spark is a framework for distributed computations. It is usually used by data engineers for solving different tasks, for example for the ETL process. It supports multiple languages straight out of the box: Java, Scala, Python, and R. We at JetBrains are committed to supporting one more language for Apache Spark \u2013 Kotlin, as we believe it can combine multiple pros from other language APIs while avoiding their cons.<\/p>\n\n\n\n<p>If you don&#8217;t want to read and just want to try it out \u2013 here is the link to the repo:<\/p>\n\n\n\n<p align=\"center\"><a class=\"ek-link jb-download-button\" title=\"Repository\" href=\"https:\/\/github.com\/Kotlin\/kotlin-spark-api\/\" target=\"_blank\" rel=\"noopener\">Repository<\/a><\/p>\n\n\n\n<p>Otherwise, let&#8217;s begin our overview.<\/p>\n\n\n\n<h1 class=\"wp-block-heading\">Spark Streaming<\/h1>\n\n\n\n<p>For a long time, we&#8217;d supported only one API from Apache Spark: <code>Dataset<\/code> API. While it&#8217;s widely popular, we can&#8217;t just ignore the fact that there is at least one more trendy extension to Apache Spark: <em>Spark Streaming<\/em>. Its name is self-explanatory, but just to be sure that we&#8217;re on the same page, allow me to elaborate a little.<\/p>\n\n\n\n<p><em>Spark Streaming<\/em> is a solution to build streaming processing systems using Spark. Contrary to other stream processing solutions, <em>Spark Streaming<\/em> works with micro-batches. When reading data from a source, instead of working on one element at a time, it reads all data in defined time frames or &#8220;batches&#8221; (for example, it might read everything available each 100ms).<\/p>\n\n\n\n<p>There are multiple core entities in <em>Spark Streaming<\/em>:<\/p>\n\n\n\n<ol><li>Discretized stream (<code>DStream<\/code>) represents a continuous stream of data. It can be created from an input source (socket, Kafka, or even a text file) or from another <code>DStream<\/code>. <code>DStream<\/code> is represented as a sequence of <a href=\"https:\/\/spark.apache.org\/docs\/latest\/api\/scala\/org\/apache\/spark\/rdd\/RDD.html\" target=\"_blank\" rel=\"noopener\"><code>RDD<\/code>s (Resilient Distributed Dataset)<\/a>.<\/li><li>Spark streaming context (<a href=\"https:\/\/spark.apache.org\/docs\/latest\/api\/scala\/org\/apache\/spark\/streaming\/StreamingContext.html\" target=\"_blank\" rel=\"noopener\"><code>StreamingContext<\/code><\/a>) is the main entry point for working with DStreams. Its main goal is to provide us with different methods for creating streams from different sources.<\/li><\/ol>\n\n\n\n<p>As you might already know, we have a special <code>withSpark<\/code> function in our core API (you can view it <a href=\"https:\/\/blog.jetbrains.com\/kotlin\/2021\/02\/kotlin-for-apache-spark-one-step-closer-to-your-production-cluster\/#Support_for_the_custom_SparkSessionBuilder\">here<\/a>, for example). Of course, for Streaming, we have something similar: <code>withSparkStreaming<\/code>. It has some defaults that we think are reasonable. You can take a look at them <a href=\"https:\/\/github.com\/Kotlin\/kotlin-spark-api\/blob\/spark-3.2\/kotlin-spark-api\/3.2\/src\/main\/kotlin\/org\/jetbrains\/kotlinx\/spark\/api\/SparkSession.kt#L300-L309\" target=\"_blank\" rel=\"noreferrer noopener\">here<\/a> if you want.<\/p>\n\n\n\n<!--more-->\n\n\n\n<p>The very basic sample usage will look like this:<\/p>\n\n\n\n<pre class=\"kotlin-code\" data-highlight-only=\"true\" theme=\"idea\" indent=\"4\" style=\"visibility: hidden; padding: 36px 0;\">\nwithSparkStreaming { \/\/ this: KSparkStreamingSession\n   val lines: JavaReceiverInputDStream&lt;string&gt; = TODO() \/\/ create some string stream, for example, from socket\n   val words: JavaDStream&lt;string&gt; = TODO() \/\/ some transformation\n   words.foreachRDD { rdd: JavaRDD&lt;string&gt;, _: Time -&gt;\n      withSpark(rdd) { \/\/ this: KSparkSession\n         val dataframe: Dataset&lt;testrow&gt; = rdd.map { TestRow(it) }.toDS()\n         dataframe\n            .groupByKey { it.word }\n            .count()\n            .show()\n      }\n   }\n}\n<\/pre><\/testrow><\/string><\/string><\/string>\n\n\n\n<p>What can we see here? We create a <em>Spark Streaming<\/em> context with the call <code>withSparkStreaming<\/code>. Many useful things are available inside it, for example, a <code>withSpark<\/code> function that will obtain or create a <em>Spark Session<\/em>. It also provides access to the <code>ssc<\/code> variable (which stands for \u2013 you guessed it \u2013 &#8220;spark streaming context&#8221;).<\/p>\n\n\n\n<p>As you can see, no other non-obvious abstractions are involved. We can work with the <code>RDD<\/code>s, <code>JavaDStream<\/code>s, etc. that we are familiar with.<\/p>\n\n\n\n<p>The <code>withSpark<\/code> function inside <code>withSparkStreaming<\/code> is slightly different from the one you&#8217;re familiar with. It can find the right Spark Session from the <code>SparkConf<\/code> of the <code>ssc<\/code> variable or (as seen in the example) from an <code>RDD<\/code>. However, you still get a <code>KSparkSession<\/code> context which can give you the ability to create <code>Dataset<\/code>s or broadcast variables, etc. But in contrast to its batching counterpart, the Spark Session won&#8217;t be closed at the end of the <code>withSpark<\/code> block. Lastly, it behaves similarly to the Kotlin run function because it returns the last line of its contents.<\/p>\n\n\n\n<p>You can find more examples <a href=\"https:\/\/github.com\/Kotlin\/kotlin-spark-api\/tree\/204ac2b96844a8df20ad700e652085d385d5ff6c\/examples\/src\/main\/kotlin\/org\/jetbrains\/kotlinx\/spark\/examples\/streaming\" target=\"_blank\" rel=\"noreferrer noopener\">on our GitHub<\/a> and more detailed documentation is available on our <a href=\"https:\/\/github.com\/Kotlin\/kotlin-spark-api\/wiki\/Streaming\" target=\"_blank\" rel=\"noreferrer noopener\">wiki<\/a>.<\/p>\n\n\n\n<h1 class=\"wp-block-heading\">Jupyter support<\/h1>\n\n\n\n<p>A <a href=\"https:\/\/github.com\/Kotlin\/kotlin-jupyter\" target=\"_blank\" rel=\"noopener\">Kotlin kernel for Jupyter<\/a> has existed for some time already. You can perform experiments with different Kotlin for Data Science tools, such as <a href=\"https:\/\/github.com\/Kotlin\/multik\" target=\"_blank\" rel=\"noopener\">multik<\/a> (library for multi-dimensional arrays in Kotlin) or <a href=\"https:\/\/github.com\/Kotlin\/kotlindl\" target=\"_blank\" rel=\"noopener\">KotlinDL<\/a> (a Deep Learning API written in Kotlin and inspired by Keras, working on top of TensorFlow), and others described in the <a href=\"https:\/\/kotlinlang.org\/docs\/data-science-overview.html#libraries\" target=\"_blank\" rel=\"noopener\">Kotlin documentation<\/a>. But we are aware that Jupyter is quite popular among data engineers, too, so we&#8217;ve added support for the Kotlin API for Apache Spark as well. In fact, all you need to do to start using it in your notebook is put <code>%use spark<\/code> in your notebook&#8217;s cell. You can see an example on<a href=\"https:\/\/github.com\/Kotlin\/kotlin-spark-api\/blob\/204ac2b96844a8df20ad700e652085d385d5ff6c\/examples\/src\/main\/kotlin\/org\/jetbrains\/kotlinx\/spark\/examples\/JupyterExample.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\"> our GitHub<\/a>.<\/p>\n\n\n\n<p>The main features you will find in this support are autocompletion and table rendering. When you use <code>%use spark<\/code>, all of the notebook cells are automagically wrapped into one implicit withSpark block, which gives you access to all the sugar we provide.<\/p>\n\n\n\n<p>The aforementioned <em>Spark Streaming<\/em> is supported too. To use it, all you need to do is add <code>%use spark-streaming<\/code>. Of course, all the features of dynamic execution are supported \u2013 for example, tables will update automatically.<\/p>\n\n\n\n<p>Please be aware that Jupyter support for Kotlin API for Apache Spark is experimental and may have some limitations. One such limitation we are aware of: You cannot mix batching and streaming in the same notebook \u2013 <code>withSparkStreaming<\/code> doesn&#8217;t work inside of <code>withSpark<\/code> block. Don&#8217;t hesitate to provide us with examples if something doesn&#8217;t seem to behave as it should for you. We&#8217;re always happy to help!<\/p>\n\n\n\n<p>You can find an example of a notebook with streaming <a href=\"https:\/\/github.com\/Kotlin\/kotlin-spark-api\/blob\/204ac2b96844a8df20ad700e652085d385d5ff6c\/examples\/src\/main\/kotlin\/org\/jetbrains\/kotlinx\/spark\/examples\/streaming\/JupyterStreamingExample.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">here<\/a>.<\/p>\n\n\n\n<p>Of course, it works in <a href=\"https:\/\/datalore.jetbrains.com\/\" target=\"_blank\" rel=\"noopener\">Datalore<\/a> too. Here is an <a href=\"https:\/\/datalore.jetbrains.com\/view\/notebook\/xkFtYiFscUHx6f0oi59sQv\" target=\"_blank\" rel=\"noreferrer noopener\">example notebook<\/a>, as well as an <a href=\"https:\/\/datalore.jetbrains.com\/view\/notebook\/Tz2xEUrtLZvRgPg0xHGTMD\" target=\"_blank\" rel=\"noreferrer noopener\">example notebook for streaming<\/a>. In case you\u2019re not aware of what Datalore is:&nbsp;It\u2019s an online environment for Jupyter notebooks, developed by JetBrains.<\/p>\n\n\n\n<p>A bit more information on Jupyter integration can be found on our <a href=\"https:\/\/github.com\/JetBrains\/kotlin-spark-api\/wiki\/Jupyter\" target=\"_blank\" rel=\"noopener\">wiki<\/a>.<\/p>\n\n\n\n<h1 class=\"wp-block-heading\">Deprecating c in favor of t<\/h1>\n\n\n\n<p>While preparing this release, we noticed that our own reinvented tuples (called <code>ArityN<\/code>) aren\u2019t actually that effective when used extensively, and it&#8217;s more effective to reuse Scala <code>Tuple<\/code>s. So, we&#8217;ve deprecated our factory <code>c<\/code> method in favor of the factory <code>t<\/code> method. The semantics are totally the same, but the method creates a native Scala Tuple instead of Kotlin one.<\/p>\n\n\n\n<p>In Kotlin-esque fashion, here are some useful extension methods for tuples:<\/p>\n\n\n\n<pre class=\"kotlin-code\" data-highlight-only=\"true\" theme=\"idea\" indent=\"4\" style=\"visibility: hidden; padding: 36px 0;\">\nval a: Tuple2&lt;Int, Long&gt; = tupleOf(1, 2L) \/\/ explicit tupleOf, same convention as with `listOf`\nval b: Tuple3&lt;String, Double, Int&gt; = t(&quot;test&quot;, 1.0, 2) \/\/ `t` as an alternative to `c`\nval c: Tuple3&lt;Float, String, Int&gt; = 5f X &quot;aaa&quot; X 1 \/\/ infix function which creates tuple of 3 elements\ntupleOf(1) + 2 == tupleOf(1, 2) \/\/ &#039;+&#039;-syntax can be used to extend tuples\n<\/pre>\n\n\n\n<p>There is much more information about the new Tuples API available in the <a href=\"https:\/\/github.com\/Kotlin\/kotlin-spark-api\/wiki\/Tuples\" target=\"_blank\" rel=\"noreferrer noopener\">wiki<\/a>.<\/p>\n\n\n\n<p>We have also changed the structure of the documentation.<\/p>\n\n\n\n<p>The readme grew too large to easily digest, so we&#8217;ve split it up and put its contents into the wiki on GitHub.<\/p>\n\n\n\n<h1 class=\"wp-block-heading\">Conclusion<\/h1>\n\n\n\n<p>These are the most important changes to the Kotlin API for Apache Spark version 1.1. As usual, you can find the release on <a href=\"https:\/\/search.maven.org\/search?q=g:org.jetbrains.kotlinx.spark%20AND%20v:1.1.0\" target=\"_blank\" rel=\"noopener\">Maven Central<\/a>. If you have any ideas or if you need help or support, please contact us on <a href=\"https:\/\/kotlinlang.slack.com\/archives\/C4W52CFEZ\" target=\"_blank\" rel=\"noopener\">Slack<\/a> or GitHub issues.<\/p>\n\n\n\n<p>Also, please give a warm welcome to <strong>Jolan Rensen<\/strong>, who contributed to the project for some time and now works at JetBrains. He&#8217;s the main person in charge of maintaining the project, while yours truly only tries to help him out wherever he can. Jolan, we\u2019re thrilled to have you with us!<\/p>\n","protected":false},"author":1234,"featured_media":249850,"comment_status":"closed","ping_status":"closed","template":"","categories":[594],"tags":[21,91,590,6607],"cross-post-tag":[6355],"acf":[],"_links":{"self":[{"href":"https:\/\/blog.jetbrains.com\/zh-hans\/wp-json\/wp\/v2\/big-data-tools\/249904"}],"collection":[{"href":"https:\/\/blog.jetbrains.com\/zh-hans\/wp-json\/wp\/v2\/big-data-tools"}],"about":[{"href":"https:\/\/blog.jetbrains.com\/zh-hans\/wp-json\/wp\/v2\/types\/big-data-tools"}],"author":[{"embeddable":true,"href":"https:\/\/blog.jetbrains.com\/zh-hans\/wp-json\/wp\/v2\/users\/1234"}],"replies":[{"embeddable":true,"href":"https:\/\/blog.jetbrains.com\/zh-hans\/wp-json\/wp\/v2\/comments?post=249904"}],"version-history":[{"count":8,"href":"https:\/\/blog.jetbrains.com\/zh-hans\/wp-json\/wp\/v2\/big-data-tools\/249904\/revisions"}],"predecessor-version":[{"id":250326,"href":"https:\/\/blog.jetbrains.com\/zh-hans\/wp-json\/wp\/v2\/big-data-tools\/249904\/revisions\/250326"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/blog.jetbrains.com\/zh-hans\/wp-json\/wp\/v2\/media\/249850"}],"wp:attachment":[{"href":"https:\/\/blog.jetbrains.com\/zh-hans\/wp-json\/wp\/v2\/media?parent=249904"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/blog.jetbrains.com\/zh-hans\/wp-json\/wp\/v2\/categories?post=249904"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/blog.jetbrains.com\/zh-hans\/wp-json\/wp\/v2\/tags?post=249904"},{"taxonomy":"cross-post-tag","embeddable":true,"href":"https:\/\/blog.jetbrains.com\/zh-hans\/wp-json\/wp\/v2\/cross-post-tag?post=249904"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}