Channels ▼
RSS

C/C++

Easy, Real-Time Big Data Analysis Using Storm


Bolt Implementation

The output of Spout is given to Bolt for further processing. The topology we have considered for our use case consists of two bolts as shown in Figure 3.


Figure 3: Flow of data from Spout to Bolt.

ThresholdCalculatorBolt

The tuples emitted by spout is received by the ThresholdCalculatorBolt for threshold processing. It accepts several inputs for threshold check. The inputs it accepts are:

  • Threshold value to check
  • Threshold column number to check
  • Threshold column data type
  • Threshold check operator
  • Threshold frequency of occurrence
  • Threshold time window

A class, shown Listing Four, is defined to hold these values.

Listing Four: ThresholdInfo class.

public class ThresholdInfo implements Serializable
{
	private String action;
	private String rule;
	private Object thresholdValue;
	private int thresholdColNumber;
	private Integer timeWindow;
	private int frequencyOfOccurence;
}

Based on the values provided in fields, the threshold check is made in the execute() method as shown in Listing Five. The code mostly consists of parsing and checking the incoming values.

Listing Five: Code for threshold check.

public void execute(Tuple tuple, BasicOutputCollector collector) 
{
    if(tuple!=null)
    {
        List<Object> inputTupleList = (List<Object>) tuple.getValues();
        int thresholdColNum = thresholdInfo.getThresholdColNumber();
        Object thresholdValue = thresholdInfo.getThresholdValue();
        String thresholdDataType = 
            tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();
        Integer timeWindow = thresholdInfo.getTimeWindow();
        int frequency = thresholdInfo.getFrequencyOfOccurence();

        if(thresholdDataType.equalsIgnoreCase("string"))
        {
            String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();
            String frequencyChkOp = thresholdInfo.getAction();
            if(timeWindow!=null)
            {
                long curTime = System.currentTimeMillis();
                long diffInMinutes = (curTime-startTime)/(1000);
                if(diffInMinutes>=timeWindow)
                {
                    if(frequencyChkOp.equals("=="))
                    {
                         if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
                         {
                             count.incrementAndGet();
                             if(count.get() > frequency)
                                 splitAndEmit(inputTupleList,collector);
                         }
                    }
                    else if(frequencyChkOp.equals("!="))
                    {
                        if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
                        {
                             count.incrementAndGet();
                             if(count.get() > frequency)
                                 splitAndEmit(inputTupleList,collector);
                         }
                     }
                     else
                         System.out.println("Operator not supported");
                 }
             }
             else
             {
                 if(frequencyChkOp.equals("=="))
                 {
                     if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
                     {
                         count.incrementAndGet();
                         if(count.get() > frequency)
                             splitAndEmit(inputTupleList,collector);	
                     }
                 }
                 else if(frequencyChkOp.equals("!="))
                 {
                      if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
                      {
                          count.incrementAndGet();
                          if(count.get() > frequency)
                              splitAndEmit(inputTupleList,collector);	
                      }
                  }
              }
           }
           else if(thresholdDataType.equalsIgnoreCase("int") || 
                   thresholdDataType.equalsIgnoreCase("double") || 
                   thresholdDataType.equalsIgnoreCase("float") || 
                   thresholdDataType.equalsIgnoreCase("long") || 
                   thresholdDataType.equalsIgnoreCase("short"))
           {
               String frequencyChkOp = thresholdInfo.getAction();
               if(timeWindow!=null)
               {
                    long valueToCheck = 
                        Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());
                    long curTime = System.currentTimeMillis();
                    long diffInMinutes = (curTime-startTime)/(1000);
                    System.out.println("Difference in minutes="+diffInMinutes);
                    if(diffInMinutes>=timeWindow)
                    {
                         if(frequencyChkOp.equals("<"))
                         {
                             if(valueToCheck < Double.parseDouble(thresholdValue.toString()))
                             {
                                  count.incrementAndGet();
                                  if(count.get() > frequency)
                                      splitAndEmit(inputTupleList,collector);
                             }
                         }
                         else if(frequencyChkOp.equals(">"))
                         {
                              if(valueToCheck > Double.parseDouble(thresholdValue.toString())) 
                              {
                                  count.incrementAndGet();
                                  if(count.get() > frequency)
                                      splitAndEmit(inputTupleList,collector);
                              }
                          }
                          else if(frequencyChkOp.equals("=="))
                          {
                             if(valueToCheck == Double.parseDouble(thresholdValue.toString()))
                             {
                                 count.incrementAndGet();
                                 if(count.get() > frequency)
                                     splitAndEmit(inputTupleList,collector);
                              }
                          }
                          else if(frequencyChkOp.equals("!="))
                          {
   . . . 
                          }
                      }
                  
             }
     else
          splitAndEmit(null,collector);
     }
     else
     {
          System.err.println("Emitting null in bolt");
          splitAndEmit(null,collector);
     }
}
  


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 

Video