No technology is more synonymous with Big Data than Apache Hadoop. Hadoop's distributed filesystem and compute framework make possible cost-effective, linearly scalable processing of petabytes of data. Unfortunately, there are few tutorials devoted to how to get big data into Hadoop in the first place.
Some data destined for Hadoop clusters surely comes from sporadic bulk loading processes, such as database and mainframe offloads and batched data dumps from legacy systems. But what has made data really big in recent years is that most new data is contained in high-throughput streams. Application logs, GPS tracking, social media updates, and digital sensors all constitute fast-moving streams begging for storage in the Hadoop Distributed File System (HDFS). As you might expect, several technologies have been developed to address the need for collection and transport of these high-throughput streams. Facebook's Scribe and Apache/LinkedIn's Kafka both offer solutions to the problem, but Apache Flume is rapidly becoming a de facto standard for directing data streams into Hadoop.
This article describes the basics of Apache Flume and illustrates how to quickly set up Flume agents for collecting fast-moving data streams and pushing the data into Hadoop's filesystem. By the time we're finished, you should be able to configure and launch a Flume agent and understand how multi-hop and fan-out flows are easily constructed from multiple agents.
Anatomy of a Flume Agent
Flume deploys as one or more agents, each contained within its own instance of the Java Virtual Machine (JVM). Agents consist of three pluggable components: sources, sinks, and channels. An agent must have at least one of each in order to run. Sources collect incoming data as events. Sinks write events out, and channels provide a queue to connect the source and sink. (Figure 1.)
Figure 1: Flume Agents consist of sources, channels, and sinks.
Put simply, Flume sources listen for and consume events. Events can range from newline-terminated strings in
stdout to HTTP
POSTs and RPC calls it all depends on what sources the agent is configured to use. Flume agents may have more than one source, but must have at least one. Sources require a name and a type; the type then dictates additional configuration parameters.
On consuming an event, Flume sources write the event to a channel. Importantly, sources write to their channels as transactions. By dealing in events and transactions, Flume agents maintain end-to-end flow reliability. Events are not dropped inside a Flume agent unless the channel is explicitly allowed to discard them due to a full queue.
Channels are the mechanism by which Flume agents transfer events from their sources to their sinks. Events written to the channel by a source are not removed from the channel until a sink removes that event in a transaction. This allows Flume sinks to retry writes in the event of a failure in the external repository (such as HDFS or an outgoing network connection). For example, if the network between a Flume agent and a Hadoop cluster goes down, the channel will keep all events queued until the sink can correctly write to the cluster and close its transactions with the channel.
Channels are typically of two types: in-memory queues and durable disk-backed queues. In-memory channels provide high throughput but no recovery if an agent fails. File or database-backed channels, on the other hand, are durable. They support full recovery and event replay in the case of agent failure.
Sinks provide Flume agents pluggable output capability if you need to write to a new type storage, just write a Java class that implements the necessary classes. Like sources, sinks correspond to a type of output: writes to HDFS or HBase, remote procedure calls to other agents, or any number of other external repositories. Sinks remove events from the channel in transactions and write them to output. Transactions close when the event is successfully written, ensuring that all events are committed to their final destination.
Setting up a Simple Agent for HDFS
A simple one-source, one-sink Flume agent can be configured with just a single configuration file. In this example, I'll create a text file named
sample_agent.conf it looks a lot like a Java properties file. At the top of the file, I configure the agent's name and the names of its source, sink, and channel.
hdfs-agent.sources= netcat-collect hdfs-agent.sinks = hdfs-write hdfs-agent.channels= memory-channel
This defines an agent named
hdfs-agent and the names of the sources, sinks, and channels; keep the name in mind, because we'll need it to start the agent. Multiple sources, sinks, and channels can be defined on these lines as a whitespace-delimited list of names. In this case, the source is named
netcat-collect, the sink
hdfs-write, and the channel is named
memory-channel. The names are indicative of what I'm setting up: events collected via
netcat will be written to HDFS and I will use a memory-only queue for transactions.
Next, I configure the source. I use a
netcat source, as it provides a simple means of interactively testing the agent. A
netcat source requires a type as well as an address and port to which it should bind. The
netcat source will listen on localhost on port 11111; messages to
netcat will be consumed by the source as events.
hdfs-agent.sources.netcat-collect.type = netcat hdfs-agent.sources.netcat-collect.bind = 127.0.0.1 hdfs-agent.sources.netcat-collect.port = 11111
With the source defined, I'll configure the sink to write to HDFS. HDFS sinks support a number of options, but by default, the HDFS sink writes Hadoop
SequenceFiles. In this example, I'll specify the sink write raw textfiles to HDFS so they can be easily inspected; I'll also set a roll interval, which forces Flume to commit writes to HDFS every 30 seconds. File rolls can be configured based on time, size, or a combination of the two. The file rolls are particularly important in environments for which HDFS does not support appending to files.
hdfs-agent.sinks.hdfs-write.type = hdfs hdfs-agent.sinks.hdfs-write.hdfs.path = hdfs://namenode_address:8020/path/to/flume_test hdfs-agent.sinks.hdfs-write.rollInterval = 30 hdfs-agent.sinks.hdfs-write.hdfs.writeFormat=Text hdfs-agent.sinks.hdfs-write.hdfs.fileType=DataStream
Finally, I'll configure a memory-backed channel to transfer events from source to sink and connect them together. Keep in mind that if I exceed the channel capacity, Flume will drop events. If I need durability in a file or JDBC, channel should be used instead.
hdfs-agent.channels.memoryChannel.type = memory hdfs-agent.channels.memoryChannel.capacity=10000 hdfs-agent.sources.netcat-collect.channels=memoryChannel hdfs-agent.sinks.hdfs-write.channel=memoryChannel
With the configuration complete, I start the Flume agent from a terminal:
flume-ng agent -f /path/to/sample_agent.conf -n hdfs-agent