Dr. Dobb's is part of the Informa Tech Division of Informa PLC

This site is operated by a business or businesses owned by Informa PLC and all copyright resides with them. Informa PLC's registered office is 5 Howick Place, London SW1P 1WG. Registered in England and Wales. Number 8860726.


Channels ▼
RSS

Tools

Web-Based Video Monitoring


Jul01: Web-Based Video Monitoring

Michael is a developer working and living in San Francisco. He can be contacted at [email protected].


Real-Time Streaming Protocol


Ensuring that a web site delivers the goods can be a full-time job. What can help, of course, is an automated monitoring system that watches important components on your site and alerts you (or the webmaster) of an immediate failure.

While there are a number of available web services that monitor broken links, page delivery times, transactions, and the like for standard HTML web sites, tools are generally lacking when it comes to monitoring dynamic configurations with streaming content. I recently worked on a project that had just such a requirement — to monitor the health of a web site where a large chunk of the content was streaming video. In particular, there was a fear (and a well-founded one) that the site would lose live video feeds from time to time. With no monitoring tools in place, the broken video link(s) would at best look sloppy and at worst give the site investors cold feet.

The design of the web site called for initially supporting up to 50 concurrent, independent live video feeds while a team of content authors cranked out newsworthy content, adding (and deleting) new streams to the site day and night. Designing and delivering web pages with no errors is tough with any site. Adding video streams to a site is even tougher (just think about all the additional hardware/software components that can fail — the camera, encoder, or leased line for each signal). Consequently, what we needed was a server application designed to continuously monitor video and image streams. I wanted to deploy this monitor on a staging server running Linux, and given the nature of this being a quick burn job (what isn't?), I wanted development to take place on Windows. And since I was concerned about debugging time before an upcoming vacation (memory leaks and the like), I elected to write the application in Java. Finally, I expected that as the web site matured, this monitor would be asked to perform additional work not included in the original design such as supporting new streaming video protocols and reporting methods.

Consuming What You Produce

The monitoring tool I developed is called "StreamHealth" and modeled after a producer/consumer object design. (The complete source code and related files for StreamHealth is available electronically; see "Resource Center," page 5.) Figure 1 is a simple producer/consumer relationship. The advantages of this design are that the producer and consumer run on separate time schedules in their own execution threads. They are loosely connected through a notification protocol. For the producer, this means querying for the video connection state and posting important results when they arrive; and for the consumer, reporting on the state of the connection. The producer communicates its results to the consumer through a shared data object that is synchronized to prevent concurrent reading and writing to this object.

In the StreamHealth design (Figure 2), SiteState is the producer of the data objects (StreamData) describing the stream's health. The consumer is the ReportMgr that consumes the StreamData object and publishes the results. The StreamData object is used to represent the state information of a single stream. StreamData objects are represented by the structure in Example 1.

These StreamData objects are posted to the ReportMgr whenever one of four types of events occurs:

  • kSTATUS_HAS_STARTED indicates a new monitor cycle has initiated. This also indicates that the stream configuration has been updated and has been produced for notification by the reporting objects.
  • kSTATUS_IS_UPDATED is an event triggered when the state of a stream has changed. A change of state is of particular and immediate interest to the reporting manager. This event is sent immediately when a stream has reported the change.

  • kSTATUS_IS_COMPLETE is an event that provides a complete recap of all streams status (changed and unchanged).

  • kSTATUS_NO_STREAMS can be treated as an error if you are expecting at least one stream to always be monitored. If this condition is configured as an error, the ReportMgr receives this event.

Events are posted to the ReportMgr when SiteState identifies an important condition (one that fits one of the aforementioned four events). SiteState creates a new StreamData object using the constructor that copies SiteState's internal StreamData record. The StreamData objects (or object) are then inserted into a StatusAList (an extended ArrayList containing an event flag) in the SiteState.doneWithStatus() method.

The status flag is added to the ArrayList, the ArrayList is pushed onto a Vector (SiteState.mSDBufferVect), and finally the ReportMgr is notified of consumable data through the notifyAll() method in SiteState.postUpdate(); see Example 2. The notifyAll() call wakes any thread that is currently sleeping in a wait() call. ReportMgr then makes a call to SiteState.getUpdateBuf() for the event and StreamData object; see Listing One. All events are dispatched through SiteState.postUpdate(). StreamData objects are always copied when inserted into the StatusAList. This is to prevent concurrent reading and writing of a StreamData object and to ensure that the StreamData internal state matches the state of the posted event. One area of potential thread mischief (that is, where you want mutual exclusion) is with the ReportMgr and SiteState access to the mSDBufferVect. However, the Vector class in Java is a synchronized class (note that ArrayList is the unofficial unsynchronized version of Vector in Java); therefore, the only explicit thread synchronization occurs between the ReportMgr and SiteState objects with their Wait/Notify protocol. Finally, when the ReportMgr calls SiteState.getUpdateBuf() for new events, concurrent access can occur between calls to mSDBuffVect. However, the IsEmpty() call is only able to transition from True to False (items are only inserted in this Vector outside of this method). This lets the logic in SiteState.getUpdateBuf() proceed without fear of mSDBuffVect reflecting an indeterminate state.

Notification is handled within a spinlock pattern that synchronizes the notification and the wait cycle of the SiteState and ReportMgr threads. The SiteState.postUpdate() method contains the notifyAll() call that wakes up the ReportMgr.run(). This approach is more CPU efficient over a polling thread as the JVM is responsible for notifying the waiting thread to wake up and check for data from SiteState. When ReportMgr.run() calls wait(), the thread will remain suspended until a notifyAll() call is made by SiteState.postUpdate(). The ReportMgr will awake and find data waiting through a call to SiteState.getUpdateBuf(). The ReportMgr will proceed to consume (that is, ReportMgr obtains a reference to the ArrayList and this object is removed from the mSDBufferVect). ReportMgr will continue to consume all of the StatusAList objects until the Vector is empty (returns NULL). ReportMgr will then drop into its wait state.

Within SiteState (producer) and ReportMgr (consumer), there are several helper objects that actually perform the specific tasks of reporting and watching the streams. Figure 2 is the more detailed view of the role of these objects in monitoring streams.

Acting on Events of Importance

ReportMgr is responsible for managing and coordinating Report objects. Report objects (ReportHTML and ReportDIR) encapsulate the specific reporting/publishing details and must implement the Report interface. The report interface is a generic interface that enforces four methods:

Public void startUpdate(ArrayList al);

//init update cycle

Public void update(ArrayList al);

//report important status

Public void finishUpdate(ArrayList al);

//update completed

Public void reportGeneralError();

//general failure to report

These methods are analogous to the events that can be posted by the SiteState object and are dispatched within ReportMgr to the Report interface implementations. This interface lets additional report objects be added to the ReportMgr with minimal change. Currently in StreamHealth, the implemented Report objects are: ReportDir, a directory notification scheme; and ReportHTML, an HTML status page. Of the two, the ReportDIR is designed to notify of a failure immediately within a server directory. This mechanism is one way to notify another process of a failed stream. ReportHTML summarizes all stream status at the end of the testing cycle.

When the ReportMgr wakes up and finds data through its call to SiteState.getUpdateBuf(), it passes the data down to the individual Report objects to do their work. In the future, the webmaster (or even, heaven forbid, the marketing department) might request that results are e-mailed or faxed, or some additional analysis of failures be reported. The Report interface is designed to minimize the source changes outside of the specific reporting implementation.

The Producer

On the producer side, SiteState is responsible for the querying, receiving, recording, and notification of the state of streams to the ReportMgr. The SiteState object can also make decisions on the health of the stream and whether an event is important enough to notify the ReportMgr.

SiteState runs in its own thread by extending the Java thread class. Within the SiteState.run() method, the health of the streams are checked within an infinite loop, only pausing for a sleep period between cycles. After each sleep period, SiteState.run() parses the list of active streams (StreamList.xml), reconciles the internal stream list, initializes new stream monitors, notifies of important events (the four events of interest listed previously), and finally updates the changes found in StreamList.xml against the internal data representation. Another important role of the SiteState object is to provide services to the ReportMgr through the SiteState.getUpdateBuf() method, and to the individual StreamWatch objects through the SiteState.doneWithStatus() method.

Communicating with the Streaming Server

StreamWatch is an abstract class that contains some implementation details, but as an abstract class, requires specific streaming implementations to be derived from it. Derived classes are responsible for defining the algorithm to probe a stream's health. New StreamWatch objects are instantiated for each connection by the StreamWatchFactory (see the Factory pattern from Design Patterns, by Erich Gamma et al., Addison-Wesley, 1995). The two implemented stream types are kTYPE_STREAM_VIDEO (RTSP protocol), and kTYPE_STREAM_IMAGE (HTTP streaming images). This design allows future derived implementations to add support for additional streaming protocols (why not multicasting, for instance, once ISPs decide to update all their router's support for this protocol). Implementation requires coding the new StreamWatch object and adding a new identifier (stream typemaybe kTYPE_STREAM_MULTI, for example) that is recognized by the StreamWatchFactory object. StreamWatch implementations probe the state of the socket and report back on three pieces of information: server response, status state of the stream, and auxiliary response data. The status state reported back is an int that signifies whether the stream is ultimately functioning or not; that is, it's GOOD, BAD, or UNDEFINED. The auxiliary data is an ArrayList that contains specific response information from the server, as captured by the specific StreamWatch implementations. This could include streaming bandwidth, packet size, and so on, and is ultimately handed over to the ReportMgr to do with as it sees fit.

Within StreamWatch, the server response is more interesting: Depending on the protocol being used, the server response can be informational, warn of potential problems, or provide insight into the reason for the stream failure. In the cases of RTSP and HTTP only 2xx server codes are considered successful responses. A StreamWatch object continuously monitors the state of the stream with a predetermined delay between each check. I didn't want to continuously load or waste precious unicast streams from the server. Therefore, when a server response has been received (or an exception thrown), the StreamWatch object is then responsible for reporting back to the parent object with the results via the SiteState.doneWithStatus() (Listing Two) method, and the probing socket is closed. It is important for the implementation of the StreamWatch object to complete its status check in a timely manner. The responsiveness of the StreamWatch object determines the responsiveness of the system and the timely delivery of important results. To aid performance, the StreamWatch objects are preserved between testing cycles and referenced from the StreamData object. StreamWatch objects are threaded, mainly because I don't want a delay in one socket response to block reporting by other StreamWatch objects. With the current load on the staging server, 50 StreamWatch objects would be acceptable, but if this number increased in the future — say 200 or more — then I'll implement a StreamWatch pool within the StreamWatchFactory object to help reduce memory and performance requirements.

Dynamic Site Configuration via XML

While the video streams on this web site are under constant surveillance, the site undergoes changes on a daily basis. This means that video streams will be added, modified, and deleted. StreamHealth should be able to read the StreamList.xml stream configuration file, update its own internal representation, and most importantly, not fail on error. The stream configuration file had to be designed to reduce the risk of a misedited file causing the StreamHealth service to crash while being parsed. Of primary concern is a catastrophic failure in StreamHealth such that no notifications are sent. A good choice in satisfying the desire for robust parsing and providing a relatively easy file structure to edit by hand is to use XML.

Each stream is represented by the following node structure:

<Stream Type="RTSP_VIDEO">

<Location Path="super/video.rm"></Location>

<Server IP="127.0.0.1" Port="8080"></Server>

</Stream>

For StreamHealth, two types of errors can occur when parsing an XML file: The node for a stream entry can fail and the entry for that stream will be incomplete; and the parsing of the file can fail.

A parsing error that occurs at a <Stream>, <Server>, or <Location> node will only affect a single entry, and in this case will be identified to the ReportMgr as an undefined stream. A more general file parsing failure will prevent any <Stream> nodes from being read. This is more difficult to trap as the parser may or may not throw an exception during a failed parsing. In general, a parsing error will either generate a zero length mStreamVector in SiteState or throw any one of six exceptions listed in StreamFile.readConfigFile(). In either case, StreamFile.readConfigFile() throws a RunTimeException that is caught in the main SiteState.run() routine. This failure is reported to the ReportMgr as an event. There are occasions when a zero length stream list is intentional, such as the process of removing all streams from the site temporarily. This error can be suppressed by the mNotifyOfNoStreams flag in the configuration file (important if this condition is expected). Exceptions thrown and caught during parsing are always treated as errors and are reported as the kSTATUS_NO_STREAMS event, while a zero stream file can be ignored.

The actual reading of the XML file is performed by recursive calls to the StreamFile.traverse() method through the nested node structure of the file. StreamFile.traverse() is called when encountering a child node(s). When the node name or attribute value matches strings in the StreamFile.traverse() method, this information is stuffed into a StreamData object. A StreamData object is created upon encountering a <Stream> node.

The streams listed in StreamList.xml can be updated at anytime; therefore, on each clock pass (via SiteState.run()). SiteState must initially check the file for its stream listing. Three things can occur to a listing on each pass:

  • A stream is removed as it is taken out of service or is no longer accessible via the web site.
  • A stream is added as a new stream to monitor.

  • A stream is modified or its location is changed.

Removed streams are deleted from the internal representation in SiteState at the end of the clock pass (after the ReportMgr receives a report that a stream has been removed from service). Newly added streams are allocated and initialized. Finally, modified streams are treated as a combination of the two previous StreamList.xml file modifications and are treated as new entries by resolving on the Path attribute on the <Location> tag, otherwise IP and Port are updated.

Conclusion

As the site hopefully increases in popularity, I expect to be asked to add new reporting techniques, monitor new stream types, and support ever-increasing numbers of concurrent streams. With all of these, I would expect to update the report mechanism to support dynamically added reporting modules and update the StreamWatchFactory to support pooling of StreamWatch objects.

StreamHealth lets webmasters sleep a bit easier knowing that this service is keeping an eye on those important video streams. The best aspect of this design is that with the expected success (and consequently growth) of the site, the design can be augmented to allow for scalability, additional reporting techniques, and for new stream types.

DDJ

Listing One

public void run() {
	StatusAList aList;

	System.out.println(new Date().toString() + " ReportMgr::run() running");
	
	try {
		while(true) {
			while ((aList = (StatusAList)mSiteState.getUpdateBuf()) != null) {
				switch (aList.mStatus) {
					case Konstants.kSTATUS_HAS_STARTED:
						startUpdate((ArrayList)aList);
						break;
					case Konstants.kSTATUS_IS_UPDATED: 
						update((ArrayList)aList);
						break;
					case Konstants.kSTATUS_IS_COMPLETE:
						finishUpdate((ArrayList)aList);
						break;
					case Konstants.kSTATUS_NO_STREAMS:
						reportGeneralError(); 
						break;
					default:
						System.out.println(new Date().toString() + 
							" Bad Status flag in  ReportMgr:run() "); } }
			synchronized(mSiteState) {
				mSiteState.wait(); } } }  //wait for an event to be posted.
	catch (InterruptedException ie) {
		System.out.println(new Date().toString() + 
			" InterruptedException thrown in  ReportMgr:run() " + ie);  } 
	catch (ClassCastException cce) { 
		System.out.println(new Date().toString() + 
			" ClassCastException thrown in  ReportMgr:run() " + cce);  } 
} 
}

Back to Article

Listing Two

public boolean doneWithStatus(Object sdobj, int status, 
				String resp, ArrayList auxData) {
	boolean bChanged = false;
	StatusAList updateAList;
	StreamData sd;
	
	sd = (StreamData)sdobj;
	sd.mTimeOfLastTest = new Date().toString();
		
	//find stream, update then notify if status|server response has changed
	if (mStreamVector.contains(sd)) {
		if (sd.mOpStatus != status || sd.mServerResponse != resp) {
			sd.mState = Konstants.kSTATE_UPDATED; 
			bChanged = true; }
		else {
			sd.mState = Konstants.kSTATE_UNCHANGED; }

		//copy monitored data into StreamData object
		sd.mOpStatus = status;
		sd.mServerResponse = resp;
		sd.mAddConnInfo = auxData;

		//post update if status has changed
		if (bChanged == true) {		
			updateAList = new StatusAList();
			updateAList.add(new StreamData(sd));
			updateAList.mStatus = Konstants.kSTATUS_IS_UPDATED;
			postUpdate(updateAList); }
		
		return true; }		//this stream is still actively monitored
	else {
		return false;}		//didn't find match, will be deleted
}

Back to Article

Listing Three

public void run()
{
/*
	Checks on an RTSP streaming server for the existance of the DESCRIBE method, 
	then probes for the existance of the live stream.
*/	
	Socket sock = null;
	BufferedReader inStream = null;
	PrintStream outStream = null;
	StringBuffer response = null;
	int status = Konstants.kOPSTATUS_STREAM_UNDEFINED;
	ArrayList addStatus = null;
		
	//this indicates a faulty streamlist.xml entry		
	if (mURL == "") {
		return; }
			
	while (true) { //monitor stream until doneWithStatus() returns false
		System.out.println(new Date().toString() + 
					" starting to probe video stream: " + mURL);
		response = new StringBuffer();
			
		try { 
			sock = new Socket(mIP, mPort);
			System.out.println(new Date().toString() + 
					" socket opened to server: " + mURL);
			inStream = new BufferedReader(new 
					InputStreamReader(sock.getInputStream()));
			outStream = new PrintStream(sock.getOutputStream()); 

			//now let's start communicating with the RTSP server		
			if (supportsDescribe(outStream, inStream)) {	
				if (isGood(outStream, inStream, response)) {						
					status = Konstants.kOPSTATUS_STREAM_OK;
					System.out.println(new Date().toString() + 
							" Stream is good: " + mURL);

					//let's see if we can capture some additional details
					addStatus = new ArrayList();						
					getAuxData(outStream, inStream, addStatus); }	
		
				else {	//stream not available from RTSP server
					status = Konstants.kOPSTATUS_STREAM_BAD;
					addStatus = null;
					System.out.println(new Date().toString() + 
							" Stream is BAD: " + mURL); } } }
		catch (IOException ioe) {
			status = Konstants.kOPSTATUS_STREAM_BAD;
			System.out.println(new Date().toString() + 
				" IO Exception thrown in RealVideoWatch.run() " + ioe); }

		try {
			if (sock != null) {
				sock.close(); 
				inStream.close();
				outStream.close(); }
			if (!mSiteState.doneWithStatus(mUniqueID, status, 
							response.toString(), addStatus) ) { 
				return; } //I've been removed from monitoring

			sleep(Konstants.mDelayBetweenCycles.intValue()); }
		catch (InterruptedException ie) {
			System.out.println(new Date().toString() + 
				" Interrupted Exception thrown in RealVideoWatch.run() " + ie); }
		catch (IOException ioe) {
			System.out.println(new Date().toString() + 
				" IO Exception thrown in RealVideoWatch.run() " + ioe); } }
}


private boolean supportsDescribe(PrintStream ps, BufferedReader br) 
				throws IOException {
	String s;
	boolean bFlg = false;

	//let's see what the RTSP server supports
	ps.println("OPTIONS * RTSP/1.0");
	ps.println("");

	while (!(s = br.readLine()).equals("")) {
		if (s.indexOf("DESCRIBE") >= 0) {
			//we do support DESCRIBE
			bFlg = true; 
			break; } }

	while (!(br.readLine()).equals("")) { } 
	
			
	return bFlg;
}

private boolean isGood(PrintStream ps, BufferedReader br, StringBuffer response) 
				   throws IOException {
	String s, testStr;
	boolean bFlg = false;
	int statusLength = 12;
	
	//probe the server with DESCRIBE
	ps.println("DESCRIBE " + mURL + " RTSP/1.0");
	ps.println(""); 

	//now let's the parse the output stream for the results of the signal
	if ((s = br.readLine()) != null) {
		response.append(s); //let's save this response for reporting
		testStr = s.substring(0, statusLength);
		if (testStr.equals("RTSP/1.0 200") ||		//OK
			testStr.equals("RTSP/1.0 201") || 	//CREATED
			testStr.equals("RTSP/1.0 250")) {	//LOW ON STORAGE SPACE
			bFlg = true; } }
	
	while (!(br.readLine()).equals("")) { }

	return bFlg;
}

Back to Article


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.