Click here to Skip to main content
15,867,686 members
Articles / Hosted Services / Azure
Article

Streaming at Scale with Kafka and Azure HDInsight Part Three: Analyzing Streaming Data on HDInsight with PySpark

Rate me:
Please Sign up or sign in to vote.
0.00/5 (No votes)
29 Apr 2022CPOL5 min read 3.7K   6  
In this article we use PySpark to stream and analyze our data.
Here we show you how to use PySpark on HDInsight to analyze the streaming data ingestion we set up in the previous article. We look at how HDInsight lets developers quickly set up and use PySpark without needing to spend time and effort administering it.

This article is a sponsored article. Articles such as these are intended to provide you with information on products and services that we consider useful and of value to developers

The first article of this series stepped through building a Kafka cluster on HDInsight. Then, the second part explored adding data to the sample application. This article walks through the process of streaming that data to Spark using PySpark in a Jupyter notebook. This notebook and the other required files can be found here.

Prerequisites

As in the previous article, this demonstration will obtain host information from Secure Shell Protocol (SSH) interfaces.

In a browser, navigate to the Jupyter engine in the cluster. Enter the login credentials for the previously established admin profile.

Notably, the notebook output differs from the command-line output.

The same code appears twice. First, it appears in the Jupyter notebook, kafka_streaming.ipyn. Then, it appears in the Python code file, kafka_streaming.py. Each file performs the same function with the same code.

Next, upload the Python script to the main Spark cluster node and execute the command-line script using the following command:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 kafka_streaming.py

To access the Jupyter notebook, initiate the environment in a browser. It is important to replace CLUSTERNAME with the Spark cluster name. The Jupyter setup on HDInsight Spark is automatic and already configured on setup. The URL is as follows:

https://CLUSTERNAME.azurehdinsight.net/jupyter

Instead of downloading the notebook, it is also possible to create a new PySpark notebook.

Now, link the notebook to the necessary Java packages using the following commands:

%%configure -f
{
    "Conf":	{"spark.jars.packages":
               "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0"
}

The –f option restarts the Spark session. This can be omitted if the magic runs in the first cell. However, note that if this command is re-executed, all of the pre-run data is erased.

At the time of writing, the above configuration worked properly. However, the software environment changes all too quickly. For this reason, the next cell contains this command:

sc.version

This can determine the correct Spark version above.

Occasionally, there may be errors locating the library (both on premises and on HDInsight). Fortunately, it’s often solvable by using the following command in the command prompt. On HDInsight, this requires connecting to the Spark cluster through SSH.

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

This appears to be more successful in locating and retrieving the library.

Once this has been configured, it’s time to execute the rest of the notebook.

First, set the kafkaBroker hosts and topic connection data:

kafkaBrokers = "YOUR_MAIN_KAFKA_BROKER"
kafkaTopic = "SensorReadings"
print("Done!")

Working with this code restarts or reruns cells. Ensure that only one Spark session executes.

To avoid unintentionally starting a second session, create a global singleton using the following function:

Python
def getSparkSessionInstance():
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .appName("StructuredStreaming") \
            .master("local[*]") \
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']

Then, this function can be called at any point.

The following cell re-assigns the Spark session, context, and streaming context if necessary. To avoid all of the INFO and WARN messages, set the context to only log errors.

Python
spark = getSparkSessionInstance()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)

The next cell connects to the kafkaBrokers and reads the stream from Kafka into a data frame. However, nothing happens until the start method of the stream writer triggers the streaming.

Python
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafkaBrokers) \
    .option("subscribe", kafkaTopic) \
    .option("startingOffsets", "earliest") \
    .load()

In a Kafka stream, both the key and value are byte arrays. Cast them as strings for use later. Additionally, the timestamp is in a long integer format and later will be converted to a Unix timestamp format.

For now, cast the key and value columns into strings in a new data frame.

Python
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)",
                           "timestamp")

The data from the hypothetical sensors is stored in JSON format as follows:

JSON
{
"sensor": "mass",
"machine": "mixer",
"units": "kg",
"time": 1646861863942,
"value": 144.174
}

The next step is to extract the necessary data (the returned values) for later use. To achieve this, define the schema to extract each streamed record. The schema below defines the structure.

Python
value_schema = StructType([
        StructField("sensor",StringType(),True),
        StructField("machine",StringType(),True),
        StructField("units",StringType(),True),
        StructField("time", LongType(), True),
        StructField("value", DoubleType(), True)
])

In the following cell, use the schema to extract the data into another data frame.

This is a convenient time to cast the Kafka-style timestamp to a Unix-style timestamp. However, it’s only necessary if using windowed streaming later.

First, cast the timestamp as a long integer, express it in seconds, and then cast it as a Unix timestamp.

Df2 = df1 \
  .withColumn("jsonData", from_json(col("value"), value_schema)) \
  .withColumn("timestamp", (col("timestamp").cast("long")/1000).cast("timestamp")) \
 .select("key", "jsonData.*", "timestamp")

The environment uses two output forms.

Grouped form shows the average value for each sensor. This option lists the sensor and average value over all readings. As more records are streamed, the process adjusts the averages. This isn’t very useful in a real environment. As the number of readings increases, an outlier will have less effect on the average. This can be detrimental when using the average to track a machine’s performance.

In contrast, windowed form is more realistic. Depending on window values, this displays a long list of values. However, the shorter list is preferable for display in a Jupyter notebook or on the command line.

Python
outStream = streamingDataFrame. \
    groupBy("key"). \
    avg("value")

For windowed mode, use the following lines and adjust the windows as preferred.

outStream = streamingDataFrame. \
    withWatermark("timestamp", "1 minute"). \
    groupBy(window("timestamp", "10 minutes", "5 minutes"), "key"). \
    groupBy("key"). \
    avg('value')

Then, stream the results to the console sink. The command below starts the streaming process. Once complete, the command waits for further streamed data from Kafka.

Python
query = outStream.writeStream. \
    outputMode("complete"). \
    option("numRows", 100). \
    option("truncate", "false"). \
    format("console"). \
    start(). \
    awaitTermination()

The streaming starts when the producer application executes the Kafka broker.

The Results

The results of this are somewhat bland, but the functionality is clear and straightforward. This is a simple sink application with minimal processing or analysis. The ability to set up and interact with two clusters with a single click is incredibly significant to a resource-strapped developer.

The console application displays the data as a table of average values for each sensor. When using the windowing code, the output shows the time windows. The notebook displays the streaming activity in both list and graphical formats.

Moreover, executing the producer application streams new batches to the sink, which updates the output figures.

Conclusion

Setting up and using clusters on HDInsight makes a simple process out of what used to be strenuous. The clusters are equally easy to provision and delete. All the virtual machines exist within a virtual network. Installing the software and configuring the servers can happen without an on-hand admin resource.

Furthermore, the PySpark notebook is ready for use with the PySpark kernel, which proves convenient time and again.

Overall, using HDInsight for cloud-based Kafka development represents a seamless and intuitive next step.

To learn more about streaming at scale in HDInsight, and Apache Kafka in Azure HDInsight, check out these Microsoft Azure Developer Community resources.

This article is part of the series 'Streaming at Scale with Kafka and Azure HDInsight View All

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
United States United States
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
-- There are no messages in this forum --