Bolt Implementation
The output of Spout is given to Bolt for further processing. The topology we have considered for our use case consists of two bolts as shown in Figure 3.
Figure 3: Flow of data from Spout to Bolt.
ThresholdCalculatorBolt
The tuples emitted by spout is received by the ThresholdCalculatorBolt for threshold processing. It accepts several of inputs for threshold check. The inputs it accepts are:
- Threshold value to check
- Threshold column number to check
- Threshold column data type
- Threshold check operator
- Threshold frequency of occurrence
- Threshold time window
A class, shown Listing Four, is defined to hold these values.
Listing Four: ThresholdInfo class.
public class ThresholdInfo implements Serializable
{
private String action;
private String rule;
private Object thresholdValue;
private int thresholdColNumber;
private Integer timeWindow;
private int frequencyOfOccurence;
}
Based on the values provided in fields, the threshold check is made in the execute() method as shown in Listing Five. The code mostly consists of parsing and checking the incoming values.
Listing Five: Code for threshold check.
public void execute(Tuple tuple, BasicOutputCollector collector)
{
if(tuple!=null)
{
List<Object> inputTupleList = (List<Object>) tuple.getValues();
int thresholdColNum = thresholdInfo.getThresholdColNumber();
Object thresholdValue = thresholdInfo.getThresholdValue();
String thresholdDataType =
tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();
Integer timeWindow = thresholdInfo.getTimeWindow();
int frequency = thresholdInfo.getFrequencyOfOccurence();
if(thresholdDataType.equalsIgnoreCase("string"))
{
String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();
String frequencyChkOp = thresholdInfo.getAction();
if(timeWindow!=null)
{
long curTime = System.currentTimeMillis();
long diffInMinutes = (curTime-startTime)/(1000);
if(diffInMinutes>=timeWindow)
{
if(frequencyChkOp.equals("=="))
{
if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get() > frequency)
splitAndEmit(inputTupleList,collector);
}
}
else if(frequencyChkOp.equals("!="))
{
if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get() > frequency)
splitAndEmit(inputTupleList,collector);
}
}
else
System.out.println("Operator not supported");
}
}
else
{
if(frequencyChkOp.equals("=="))
{
if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get() > frequency)
splitAndEmit(inputTupleList,collector);
}
}
else if(frequencyChkOp.equals("!="))
{
if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get() > frequency)
splitAndEmit(inputTupleList,collector);
}
}
}
}
else if(thresholdDataType.equalsIgnoreCase("int") ||
thresholdDataType.equalsIgnoreCase("double") ||
thresholdDataType.equalsIgnoreCase("float") ||
thresholdDataType.equalsIgnoreCase("long") ||
thresholdDataType.equalsIgnoreCase("short"))
{
String frequencyChkOp = thresholdInfo.getAction();
if(timeWindow!=null)
{
long valueToCheck =
Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());
long curTime = System.currentTimeMillis();
long diffInMinutes = (curTime-startTime)/(1000);
System.out.println("Difference in minutes="+diffInMinutes);
if(diffInMinutes>=timeWindow)
{
if(frequencyChkOp.equals("<"))
{
if(valueToCheck < Double.parseDouble(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get() > frequency)
splitAndEmit(inputTupleList,collector);
}
}
else if(frequencyChkOp.equals(">"))
{
if(valueToCheck > Double.parseDouble(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get() > frequency)
splitAndEmit(inputTupleList,collector);
}
}
else if(frequencyChkOp.equals("=="))
{
if(valueToCheck == Double.parseDouble(thresholdValue.toString()))
{
count.incrementAndGet();
if(count.get() > frequency)
splitAndEmit(inputTupleList,collector);
}
}
else if(frequencyChkOp.equals("!="))
{
. . .
}
}
}
else
splitAndEmit(null,collector);
}
else
{
System.err.println("Emitting null in bolt");
splitAndEmit(null,collector);
}
}




