Today, companies regularly generate terabytes of data in their daily operations. The sources include everything from data captured from network sensors, to the Web, social media, transactional business data, and data created in other business contexts. Given the volume of data being generated, real-time computation has become a major challenge faced by many organizations. A scalable real-time computation system that we have used effectively is the open-source Storm tool, which was developed at Twitter and is sometimes referred to as "real-time Hadoop." However, Storm is far simpler to use than Hadoop in that it does not require mastering an alternate universe of new technologies simply to handle big data jobs.
This article explains how to use Storm. The example project, called "Speeding Alert System," analyzes real-time data and raises a trigger and relevant data to a database, when the speed of a vehicle exceeds a predefined threshold.
Whereas Hadoop relies on batch processing, Storm is a real-time, distributed, fault-tolerant, computation system. Like Hadoop, it can process huge amounts of databut does so in real time with guaranteed reliability; that is, every message will be processed. Storm also offers features such as fault tolerance and distributed computation, which make it suitable for processing huge amounts of data on different machines. It has these features as well:
- It has simple scalability. To scale, you simply add machines and change parallelism settings of the topology. Storm's usage of Hadoop's Zookeeper for cluster coordination makes it scalable for large cluster sizes.
- It guarantees processing of every message.
- Storm clusters are easy to manage.
- Storm is fault tolerant: Once a topology is submitted, Storm runs the topology until it is killed or the cluster is shut down. Also, if there are faults during execution, reassignment of tasks is handled by Storm.
- Topologies in Storm can be defined in any language, although typically Java is used.
To follow the rest of the article, you first need to install and set up Storm. The steps are straightforward:
- Download the Storm archive from the official Storm website.
- Unpack the bin/ directory onto your PATH and make sure the bin/storm script is executable.
A Storm cluster mainly consists of a master and worker node, with coordination done by Zookeeper.
The master node runs a daemon, Nimbus, which is responsible for distributing the code around the cluster, assigning the tasks, and monitoring failures. It is similar to the Job Tracker in Hadoop.
The worker node runs a daemon, Supervisor, which listens to the work assigned and runs the worker process based on requirements. Each worker node executes a subset of a topology. The coordination between Nimbus and several supervisors is managed by a Zookeeper system or cluster.
Zookeeper is responsible for maintaining the coordination service between the supervisor and master. The logic for a real-time application is packaged into a Storm "topology." A topology consists of a graph of spouts (data sources) and bolts (data operations) that are connected with stream groupings (coordination). Let's look at these terms in greater depth.
In simple terms, a spout reads the data from a source for use in the topology. A spout can either be reliable or unreliable. A reliable spout makes sure to resend a tuple (which is an ordered list of data items) if Storm fails to process it. An unreliable spout does not track the tuple once it's emitted. The main method in a spout is
nextTuple(). This method either emits a new tuple to the topology or it returns if there is nothing to emit.
A bolt is responsible for all the processing that happens in a topology. Bolts can do anything from filtering to joins, aggregations, talking to files/databases, and so on. Bolts receive the data from a spout for processing, which may further emit tuples to another bolt in case of complex stream transformations. The main method in a bolt is
execute(), which accepts a tuple as input. In both the spout and bolt, to emit the tuple to more than one stream, the streams can be declared and specified in
A stream grouping defines how a stream should be partitioned among the bolt's tasks. There are built-in stream groupings provided by Storm: shuffle grouping, fields grouping, all grouping, one grouping, direct grouping, and local/shuffle grouping. Custom implementation by using the
CustomStreamGrouping interface can also be added.
For our use case, we designed one topology of spout and bolt that can process a huge amount of data (log files) designed to trigger an alarm when a specific value crosses a predefined threshold. Using a Storm topology, the log file is read line by line and the topology is designed to monitor the incoming data. In terms of Storm components, the spout reads the incoming data. It not only reads the data from existing files, but it also monitors for new files. As soon as a file is modified, spout reads this new entry and, after converting it to tuples (a format that can be read by a bolt), emits the tuples to the bolt to perform threshold analysis, which finds any record that has exceeded the threshold.
The next section explains the use case in detail.
In this article, we will be mainly concentrating on two types of threshold analysis: instant threshold and time series threshold.
- Instant threshold checks if the value of a field has exceeded the threshold value at that instant and raises a trigger if the condition is satisfied. For example, it raises a trigger if the speed of a vehicle exceeds 80 km/h.
- Time series threshold checks if the value of a field has exceeded the threshold value for a given time window and raises a trigger if the same is satisfied. For example, it raises a trigger if the speed of a vehicle exceeds 80 km/h more than once in last five minutes.
Listing One shows a log file of the type we'll use, which contains vehicle data information such as vehicle number, speed at which the vehicle is traveling, and location in which the information is captured.
Listing One: A log file with entries of vehicles passing through the checkpoint.
AB 123, 60, North city
BC 123, 70, South city
CD 234, 40, South city
DE 123, 40, East city
EF 123, 90, South city
GH 123, 50, West city
A corresponding XML file is created, which consists of the schema for the incoming data. It is used for parsing the log file. The schema XML and its corresponding description are shown in the table.
The XML file and the log file are in a directory that is monitored by the spout constantly for real-time changes. The topology we use for this example is shown in Figure 1.
Figure 1: Topology created in Storm to process real-time data.
As shown in Figure 1, the
FileListenerSpout accepts the input log file, reads the data line by line, and emits the data to the
ThresoldCalculatorBolt for further threshold processing. Once the processing is done, the contents of the line for which the threshold is calculated is emitted to the
DBWriterBolt, where it is persisted in the database (or an alert is raised). The detailed implementation for this process is explained next.
Spout takes a log file and the XML descriptor file as the input. The XML file consists of the schema corresponding to the log file. Let us consider an example log file, which has vehicle data information such as vehicle number, speed at which the vehicle is travelling, and location in which the information is captured. (See Figure 2.)
Figure 2: Flow of data from log files to Spout.