In the Flume agent's logs, I look for indication that the source, sink, and channel have successfully started. For example:
INFO nodemanager.DefaultLogicalNodeManager: Starting Channel memoryChannel INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started INFO nodemanager.DefaultLogicalNodeManager: Starting Sink hdfs-write INFO nodemanager.DefaultLogicalNodeManager: Starting Source netcat-collect INFO source.NetcatSource: Source starting
In a separate terminal, connect to the agent via netcat
and enter a series of messages.
> nc localhost 11111 > testing > 1 > 2 > 3
In the agent logs, an HDFS file will be created and committed every 30 seconds. If I print the contents of the files to standard out
using HDFS cat
, I'll find the messages from netcat
are stored.
More Advanced Deployments
Regardless of source, direct writers to HDFS are too simple to be suitable for many deployments: Application servers may reside in the cloud while clusters are on-premise, many streams of data may need to be consolidated, or events may need to be filtered during transmission. Fortunately, Flume easily enables reliable multi-hop event transmission. Fan-in and fan-out patterns are readily supported via multiple sources and channel options. Additionally, Flume provides the notion of interceptors, which allow the decoration and filtering of events in flight.
Multi-Hop Topologies
Flume provides multi-hop deployments via Apache Avro-serialized RPC calls. For a given hop, the sending agent implements an Avro sink directed to a host and port where the receiving agent is listening. The receiver implements an Avro source bound to the designated host-port combination. Reliability is ensured by Flume's transaction model. The sink on the sending agent does not close its transaction until receipt is acknowledged by the receiver. Similarly, the receiver does not acknowledge receipt until the incoming event has been committed to its channel.
#sender configuration avro-agent.sinks= avro-sink avro-agent.sinks.avro-sink.type=avro avro-agent.sinks.avro-sink.host=remote.host.com avro-agent.sinks.avro-sink.port=11111 #receiver configuration on remote.host.com hdfs-agent.sources=avro-source hdfs-agent.sources.avro-source.type=avro hdfs-agent.sources.avro-source.bind=0.0.0.0 hdfs-agent.sources.avro-source.port=11111 hdfs-agent.sources.avro-source.channels=memoryChannel
Figure 2: Multihop event flows are constructed using RPCs between Avro sources and sinks.
Fan-In and Fan-Out
Fan-in is a common case for Flume agents. Agents may be run on many data collectors (such as application servers) in a large deployment, while only one or two writers to a remote Hadoop cluster are required to handle the total event throughput. In this case, the Flume topology is simple to configure. Each agent at a data collector implements the appropriate source and an Avro sink. All Avro sinks point to the host and port of the Flume agent charged with writing to the Hadoop cluster. The agent at the Hadoop cluster simply configures an Avro source on the designated host and port. Incoming events are consolidated automatically and are written to the configured sink.
Fan-out topologies are enabled via Flume's source selectors. Selectors can be replicating sending all events to multiple channels or multiplexing. Multiplexed sources can be partitioned by mappings defined on events via interceptors. For example, a replicating selector may be appropriate when events need to be sent to HDFS and to flat log files or a database. Multiplexed selectors are useful when different mappings should be directed to different writers; for example, data destined for partitioned Hive tables may be best handled via multiplexing.
hdfs-agent.channels=mchannel1 mchannel2 hdfs-agent.sources.netcat-collect.selector.type = replicating hdfs-agent.sources.r1.channels = mchannel1 mchannel2
Interceptors
Flume provides a robust system of interceptors for in-flight modification of events. Some interceptors serve to decorate data with metadata useful in multiplexing the data or processing it after it has been written to the sink. Common decorations include timestamps, hostnames, and static headers. It's a great way to keep track of when your data arrived and from where it came.
More interestingly, interceptors can be used to selectively filter or decorate events. The Regex Filtering Interceptor allows events to be dropped if they match the provided regular expression. Similarly, the Regex Extractor Interceptor decorates event headers according to a regular expression. This is useful if incoming events require multiplexing, but static definitions are too inflexible.
hdfs-agent.sources.netcat-collect.interceptors = filt_int hdfs-agent.sources.netcat-collect.interceptors.filt_int.type=regex_filter hdfs-agent.sources.netcat-collect.interceptors.filt_int.regex=^echo.* hdfs-agent.sources.netcat-collect.interceptors.filt_int.excludeEvents=true
Conclusion
There are lots of ways to acquire Big Data with which to fill up a Hadoop cluster, but many of those data sources arrive as fast-moving streams of data. Fortunately, the Hadoop ecosystem contains a component specifically designed for transporting and writing these streams: Apache Flume. Flume provides a robust, self-contained application which ensures reliable transportation of streaming data. Flume agents are easy to configure, requiring only a property file and an agent name. Moreover, Flume's simple source-channel-sink design allows us to build complicated flows using only a set of Flume agents. So, while we don't often address the process of acquiring Big Data for our Hadoop clusters, doing so is as easy and fun as taking a log ride.
Dan McClary is the Principal Product Manager of Big Data at Oracle.