Click here to Skip to main content
15,881,803 members
Articles / Hosted Services / Azure
Article

Streaming at Scale with Kafka and Azure HDInsight Part Two Streaming Data into Kafka on HDInsight

Rate me:
Please Sign up or sign in to vote.
0.00/5 (No votes)
28 Apr 2022CPOL4 min read 2.8K   1  
In this article we explore how to stream data into Kafka.
Here we walk through the process of connecting a streaming data source to the Kafka cluster set up in a previous article. We pick an open data set to work with, and the create a simple Python application to stream it into our HDInsight Kafka cluster.

This article is a sponsored article. Articles such as these are intended to provide you with information on products and services that we consider useful and of value to developers

The first article in this series explored the simple setup process for a Kafka cluster on Azure HDInsight.

This article explores the process of creating and deploying the Kafka producer, which will simulate a stream of data from factory sensors in a Python producer script. The simulated stream also allows for adjusting and resubmitting as needed.

The Producer

The first step in creating the application is to define the machine and sensor information. After this, the Kafka broker list and topic name will be configured. The broker names on Azure are accessible using a Secure Shell Protocol (SSH) connection to the cluster. See the previous article for this cluster’s SSH credentials.

Alternatively, the topic can be created using an SSH connection with PowerShell. While this demonstration will only overview the relevant steps, Microsoft provides a complete tutorial on using PowerShell for Azure.

To use this method, it’s necessary to first install the Az PowerShell module, which is already on the cloud shell accessible from the portal. If you’re using SSH from a local machine, ensure that PowerShell and the Az module have both been installed.

First, connect to the primary head node of the cluster and replace CLUSTERNAME with created Kafka cluster name. Additionally, replace sshuser if the default was altered during the setup process:

ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net

If it’s not yet present, install the JSON processor (jq) on the primary head node. This may require the installation of any Python libraries that the application needs:

sudo apt -y install jq

Next, set the Kafka cluster name environment variable:

read -p "Enter the Kafka on HDInsight cluster name: " CLUSTERNAME

Then, set the ZOOKEEPER environment variable for the host:

export KAFKAZKHOSTS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`

Now, enter the admin password created in the cluster deployment process.

Next, set the KAFKA_BROKER environment variable for the host:

export KAFKA BROKERS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`

Then, get the broker list.

echo '$KAFKABROKERS='$KAFKABROKERS

This should return something similar to the following:

<brokername1>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,<brokername2>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

Next, paste these values into the producer.py file where required.

This Quickstart article details how to create a topic in our Kafka cluster using the information provided above. Use the following command to do so:

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic SensorReadings --zookeeper $KAFKAZKHOSTS

Upload the Application

While there are various ways to upload our Python file, it’s generally most sensible to follow the path of least resistance best. If not yet present, download the necessary files from GitHub.

In the shell, create the file using nano or another preferred editor. Then, copy and paste the contents of producer.py. Next, change the kafkaBrokers value to the value returned above. Save the file.

The application uses the kafka-python library. Install this as follows:

pip3 install kafka-python

Test the Producer

Run a quick test to confirm that everything is functioning properly by initiating the Kafka command-line consumer script.

<PATH>/kafka-console-consumer.sh --topic SensorReadings --from-beginning --bootstrap-server <paste your broker list here>

The path at the time of writing was /usr/hdp/2.6.5.3033-1/kafka/bin/.

The consumer then waits for Kafka to send data.

Now, open a second SSH console and execute the producer:

python3 producer.py

If all goes well, the producer sends a short burst of data to Kafka. This data is visible in the consumer console.

{"sensor": "mass", "machine": "mixer", "units": "kg", "time": 1646861863942, "value": 144.174}
{"sensor": "rotation", "machine": "mixer", "units": "rpm", "time": 1646861863942, "value": 65.009}
{"sensor": "fillpressure", "machine": "mixer", "units": "kPa", "time": 1646861863942, "value": 575.347}
{"sensor": "bowltemp", "machine": "mixer", "units": "DegC", "time": 1646861863942, "value": 25.452}
{"sensor": "ovenspeed", "machine": "oven", "units": "m/s", "time": 1646861863942, "value": 0.203}

There are five command-line options used for generating the data in the desired form:

  • -l or --loop_count followed by an integer specifies the number of cycles to generate. The actual number of records generated depends on the other selected options.
  • -s or --sensor followed by a sensor name specifies which sensor to simulate. If using this option, there’s no need to use the machine name option, as the sensor is unique. The machine name is purely informational.
  • -r or --rate followed by a decimal specifies the number of readings to send per second. This is merely the delay before sending the data on each cycle. It’s not extremely accurate, but it’s sufficient for its purpose.
  • -m or --machine followed by one of the machine names specifies the machine to use. If used without the --sensor option, then all the sensors for that machine fire simultaneously.
  • -t or --time followed by an integer specifies the number of seconds to run. When used, the process ignores the loop count option.

These options can combine to run the simulation using the run-scenario.sh shell script as a starting point. This script executes the application many times in background mode, simulating the continual delivery of data as if from a working environment.

In the script, set the command-line options to provide the inputs as required by the scenario.

Executing the script as below gives a burst of requirement-specified data for 25 seconds:

./run-scenario.sh 25

It does this by executing the following:

python3 producer.py -t 25 -r 0.2 -s mass &
python3 producer.py -t 25 -s rotation &
python3 producer.py -t 25 -r 10 -s FillPressure &
python3 producer.py -t 25 -r 10 -s BowlTemp &
python3 producer.py -t 25 -s OvenSpeed &
python3 producer.py -t 25 -r 1 -s ProveTemp &
python3 producer.py -t 25 -r 5 -s OvenTemp1 &
python3 producer.py -t 25 -r 5 -s OvenTemp2 &
python3 producer.py -t 25 -r 10 -s OvenTemp3 &
python3 producer.py -t 25 -r 2 -s CoolTemp1 &
python3 producer.py -t 25 -r 1 -s CoolTemp2 &
python3 producer.py -t 25 -s PackSpeed &
python3 producer.py -t 25 -r 10 -s PackCounter &

Running the script without any command-line parameters gives a 15-second burst. Any positive integer will work.

Conclusion

By now, it should be clear that using Kafka on HDInsight is essentially identical to using Kafka in-house. Whether running as a standalone on an old Ubuntu PC server, a multi-server cluster in our server room, or on Azure HDInsight, the approach doesn’t change.

Additionally, there’s no difference between using this producer or streaming from a database. The process is the same for both on-site and Azure Kafka implementations.

Now, all that remains is to stream and analyze the data. To explore this process, continue to the final article of this series, which will use PySpark to do just that.

To learn more about streaming at scale in HDInsight, and Apache Kafka in Azure HDInsight, check out these Microsoft Azure Developer Community resources.

This article is part of the series 'Streaming at Scale with Kafka and Azure HDInsight View All

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
United States United States
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
-- There are no messages in this forum --