Memory-Based Indexing on HDFS
Now consider a case where data is located in a distributed file system like Hadoop DFS. The aforementioned code will not work for directly creating index on distributed data, so we'd have to complete a few steps before proceeding, such as copying data from HDFS to a local file system, creating an index of the data present on the local file system, and finally storing the index files back to HDFS. The same steps would be required for searches. But this approach is time-consuming and suboptimal, so instead, let's index and search our data using the memory of the HDFS node where data is residing.
Assume that the data file "Test.txt" used earlier is now residing on HDFS, inside a working directory folder called "/DataFile/Test.txt." Create another folder called "/IndexFiles" inside the HDFS working directory, where our generated index files will be stored. The following Java code creates index files in memory for files stored on HDFS:
// Path where the index files will be stored.
String Index_DIR="/IndexFiles/";
// Path where the data file is stored.
String File_DIR="/DataFile/test.txt";
// Creating FileSystem object, to be able to work with HDFS
Configuration config = new Configuration();
config.set("fs.default.name","hdfs://127.0.0.1:9000/");
FileSystem dfs = FileSystem.get(config);
// Creating a RAMDirectory (memory) object, to be able to create index in memory.
RAMDirectory rdir = new RAMDirectory();
// Creating IndexWriter object for the Ram Directory
IndexWriter indexWriter = new IndexWriter (rdir, new StandardAnalyzer(), true);
// Creating FSDataInputStream object, for reading the data from "Test.txt" file residing on HDFS.
FSDataInputStream filereader = dfs.open(new Path(dfs.getWorkingDirectory()+ File_DIR));
String row=null;
// Reading each line present in the file.
while ((row=reader.readLine())!=null)
{
// Getting each field present in a row into an Array and file //delimiter is "space separated".
String Arow[]=row.split(" ");
// For each row, creating a document and adding data to the document
//with the associated fields.
org.apache.lucene.document.Document document = new org.apache.lucene.document.Document();
document.add(new Field("date",Arow[0],Field.Store.YES,Field.Index.ANALYZED));
document.add(new Field("time",Arow[1],Field.Store.YES,Field.Index.ANALYZED));
document.add(new Field ("cs-method",Arow[2],Field.Store.YES,Field.Index.ANALYZED));
document.add(new Field ("cs-uri",Arow[3],Field.Store.YES,Field.Index.ANALYZED));
document.add(new Field ("sc-status",Arow[4],Field.Store.YES,Field.Index.ANALYZED));
document.add(new Field ("time-taken",Arow[5],Field.Store.YES,Field.Index.ANALYZED));
// Adding document to the index file.
indexWriter.addDocument(document);
}
indexWriter.optimize();
indexWriter.close();
reader.close();
Thus, for the "Test.txt" file that is residing on HDFS, we now have index files created in memory. To store the index files in the HDFS folder:
// Getting files present in memory into an array.
String fileList[]=rdir.list();
// Reading index files from memory and storing them to HDFS.
for (int i = 0; I < fileList.length; i++)
{
IndexInput indxfile = rdir.openInput(fileList[i].trim());
long len = indxfile.length();
int len1 = (int) len;
// Reading data from file into a byte array.
byte[] bytarr = new byte[len1];
indxfile.readBytes(bytarr, 0, len1);
// Creating file in HDFS directory with name same as that of
//index file
Path src = new Path(dfs.getWorkingDirectory()+Index_DIR+ fileList[i].trim());
dfs.createNewFile(src);
// Writing data from byte array to the file in HDFS
FSDataOutputStream fs = dfs.create(new Path(dfs.getWorkingDirectory()+Index_DIR+fileList[i].trim()),true);
fs.write(bytarr);
fs.close();
}
dfs.closeAll();
Now we have the necessary index files created and stored in the HDFS directory for the "Test.txt" data file.
Memory-Based Searching on HDFS
We can now search the indexes stored in HDFS. First, we must make the HDFS index files available in memory for searching. Here is code is used for this process:
// Creating FileSystem object, to be able to work with HDFS
Configuration config = new Configuration();
config.set("fs.default.name","hdfs://127.0.0.1:9000/");
FileSystem dfs = FileSystem.get(config);
// Creating a RAMDirectory (memory) object, to be able to create index in memory.
RAMDirectory rdir = new RAMDirectory();
// Getting the list of index files present in the directory into an array.
Path pth = new Path(dfs.getWorkingDirectory()+Index_DIR);
FileSystemDirectory fsdir = new FileSystemDirectory(dfs,pth,false,config);
String filelst[] = fsdir.list();
FSDataInputStream filereader = null;
for (int i = 0; i<filelst.length; i++)
{
// Reading data from index files on HDFS directory into filereader object.
filereader = dfs.open(new Path(dfs.getWorkingDirectory()+Index_DIR+filelst[i]));
int size = filereader.available();
// Reading data from file into a byte array.
byte[] bytarr = new byte[size];
filereader.read(bytarr, 0, size);
// Creating file in RAM directory with names same as that of
//index files present in HDFS directory.
IndexOutput indxout = rdir.createOutput(filelst[i]);
// Writing data from byte array to the file in RAM directory
indxout.writeBytes(bytarr,bytarr.length);
indxout.flush();
indxout.close();
}
filereader.close();
Now we have all the required index files present in the RAM directory (or memory), so we can directly perform a search on the index files. The search code will be similar to that used for searching the local file system, the only change is that the Searcher object will be now created using the RAM directory object (rdir), instead of using the local file system directory path.
Searcher searcher = new IndexSearcher(rdir);
Analyzer analyzer = new StandardAnalyzer();
System.out.println("Total Documents = "+searcher.maxDoc()) ;
QueryParser parser = new QueryParser("time", analyzer);
Query query = parser.parse("02\\:24\\:04");
Hits hits = searcher.search(query);
System.out.println("Number of matching documents = "+ hits.length());
for (int i = 0; i < hits.length(); i++)
{
Document doc = hits.doc(i);
System.out.println(doc.get("date")+" "+ doc.get("time")+ " "+
doc.get("cs-method")+ " "+ doc.get("cs-uri")+ " "+ doc.get("sc-status")+ " "+ doc.get("time-taken"));
}
For the following output, a search is done on the field "time" and the text that is searched inside the "time" field is "02:\\:24\\:04." So when the code is run, all the documents (or rows) for which the "time" field contains "02:\\:24\\:04," are shown in the output:
Total Documents = 11 Number of matching documents = 4 2010-04-21 02:24:04 GET /blank 304 233 2010-04-21 02:24:04 GET /blank 500 567 2010-04-21 02:24:04 GET /blank 200 897 2010-04-21 02:24:04 POST /blank 200 567
Conclusion
Distributed file systems like HDFS are a powerful tool for storing and accessing the vast amounts of data available to us today. With memory-based indexing and searching, accessing the data you really want to find amid mountains of data you don't care about gets a little bit easier.


