Dr. Dobb's is part of the Informa Tech Division of Informa PLC

This site is operated by a business or businesses owned by Informa PLC and all copyright resides with them. Informa PLC's registered office is 5 Howick Place, London SW1P 1WG. Registered in England and Wales. Number 8860726.


Channels ▼
RSS

Database

Distributing Data Using TLT30G


Sep98: Distributing Data Using TLT30G

Oleg is a computer scientist with CSC. He can be contacted at [email protected].


TLT30G is a software system for distributing data from a central location to a number of clients over unidirectional, noisy, and generally slow communication links -- the delivery of up-to-date weather information to a ship, for instance. The information -- gridded data, observations, storm warnings, and so on -- is prepared at a central site, which broadcasts over appropriate channels. Any user equipped with the right satellite receiver and a PC can pick up the broadcast, unpack, and view the data.

A broadcasting server at a central location may not receive feedback from clients. Besides, when data is being sent to a multitude of recipients, feedback is infeasible; otherwise, the single server would be overwhelmed with retransmission requests. To maximize the probability that any client receives the complete data set, the broadcaster fragments data into segments and repeatedly sends them. A receiver must do its best to reassemble a file as soon as it obtains all segments. That means the receiver should pick up transmission starting from any segment, handle segments received in any order, and listen through as many loops of broadcast as necessary to fully assemble the transmitted file. After the file is successfully received, all further segments belonging to the same broadcast session should be disregarded. The client also should be smart enough to handle interrupted broadcasts (which are purged after a certain time-to-live), outdated broadcasting sessions, as well as several simultaneous transmissions from possibly different servers.

The broadcasting service should also be as transport-independent as possible, being able to function with any communication link that is capable of sending and receiving an eight-bit byte. The byte transport therefore is handled by plug-ins; currently, TLT30G (available electronically; see "Resource Center," page 3) supports UNIX pipes, UNIX and Win32 asynchronous serial lines, and UDP datagrams, on SunSparc/Solaris, HP-UX, and Windows 95/NT.

Communication Protocol

TLT30G's communication model is similar to the familiar radio/TV broadcast, with several transmitters showering data that a number of recipients pick up. Thus, TLT30G is made of transmitter (broadcaster) and receiver modules, which are separately compiled executables, running as separate processes on (generally) separate hosts connected via an appropriate communication link. The broadcaster module exists in several flavors: The simplest one sends out a single file, while the advanced versions simultaneously distribute a number of files, taking into account a possibility that each file may be concurrently updated.

The broadcaster and receiver applications speak (and thus implement) a common TLT30G session protocol, which can run on top of several different transports. The TLT30G protocol's scope is segmenting and reassembling data within a transmission session. The protocol itself does not care how segments are passed from one end to another, only how segments are specified and interpreted. To support such an abstraction, the transport layer is accessed exclusively through a pair of abstract classes LAPIn/LAPOut, which define methods to send/receive bytes and mark packets' boundaries. Currently, TLT30G provides implementations of these interfaces for a stream (file/pipe/serial) and datagram (TCP/IP) communication links.

A segment is a logical unit of transmission for the TLT30G protocol. A broadcasting module splits the file being distributed into fragments, and builds segments around them with the help of a TLT30G protocol API. A transport layer is called upon to transmit segments, either in packets/frames or in a byte/bit stream depending on a particular transport. A receiving module listens for incoming segments and places their payload data according to a file transmission session they belong to, and their position in the file's assembly. As Figure 1 shows, every segment starts with a fixed header, followed by payload data, and is terminated by a CRC field. All segments have the same size (specified in the 0th segment). The 0th segment (the segment with seq_number = 0) defines a transmission session and carries its parameters (Figure 2). Because of its importance, the 0th segment is repeated twice during every loop of a broadcast. Still, the receiver can start assembling a file even if it missed the 0th segment.

Since only one byte is allocated for the segment data length field, the segment's body cannot exceed 255 bytes. This seems appropriate for slow communication channels. A particular value for the segment size is chosen by a transmitter upon start-up, and remains fixed for all sessions of a given transmitter. A receiver, however, can handle segments of any legal size (within a 4-254 range). Moreover, segment sizes may differ among different reception sessions handled concurrently by a receiver.

TLT30G's Communication Links

TLT30G currently runs over the following data transports:

  • A file link. A particular type of communication through a UNIX file. This can be either an asynchronous or synchronous channel. If the file link is a regular file, a broadcaster module dumps segments into this file one by one. The resulting dump (trace) can be fed into a receiver module, which reassembles the "transmitted" data sets. This scenario is useful in debugging the system. A "file" used for communication can also be a UNIX/Windows NT pipe, or character device (a serial port, for instance). In this case, the file link is an asynchronous communication channel-linking broadcaster and receiver modules/sites. Example 1 shows examples of using "file" links.
  • The file transport layer transmits a segment byte-by-byte in a continuous stream. Symbol A is arbitrarily chosen to mark the beginning of a segment in the stream. There is no special symbol/condition denoting a segment end.
  • Reading from a regular file is mostly an error-free operation. To check behavior of the receiver in the presence of channel noise, you may want to deliberately introduce errors into the input stream. You can do this by setting an environment variable LAPFILEERR to a desired error rate before launching the receiver; for example, if LAPFILEERR=10, every 10th byte read from a file (on average) would be deliberately mangled.
  • A Win32 serial communication link. Under UNIX, opening and reading a serial port is identical to handling a regular file; serial port-specific parameters (baud rate, parity, time-outs) can be set via an stty command prior to running the receiver. The same is true for NT. A COM1 port can be opened with a regular fopen() function; the port's parameters can be specified with a mode command. Unfortunately, the Windows 95 mode command is crippled, making it necessary to use Win32s-specific functions (CreateFile, ReadFile, CloseHandle, SetCommState, and SetCommTimeouts) to access a COM port and set its parameters. A separate Win32 serial transport plug-in was therefore needed. This plug-in (a concrete class implementing the LAPIn interface) supports the same file stream transport protocol as described earlier. In fact, a file sent from a UNIX host through a file link (a serial port, /dev/tty00, for instance) can be received on a Windows 95/NT box; see Example 1.
  • A network (UDP) transport. This type of communication link transmits segments as UDP datagrams over an IP network. Of all the Internet protocols, UDP is best suited for a unidirectional packet-oriented broadcast. Unlike the file link, this is a packet-oriented transport. Currently, one UDP packet carries a single segment. Yet, it is not difficult to pack several segments into a datagram to better utilize bandwidth.
  • The network transport lets a central site broadcast several "channels" of information. A channel corresponds to a separate UDP port, which must be specified by an environment variable LAPNETPORT. There may be several broadcasters transmitting different files concurrently over the same channel; any receiver tuned to this channel will pick up each file.
  • Adding a new link. To run TLT30G over a new transport, you must provide an implementation of abstract classes LAPIn and LAPOut. I suggest using the following naming convention: If the new transport link is to be named Foo, the derived LAP classes should be named as LAPFooIn (derived from LAPIn) and LAPFooOut (derived from LAPOut). The interface code (class definitions) has to be put in a file named LAPFoo.h. The broadcaster and receiver modules have to be recompiled with DDEFLAP=LAPFoo flag. Source-code modifications are not necessary.

TLT30G Walk-Through

To see how the system actually works, I'll follow the transmission of a single file across a particular (UDP) communication link.

On the broadcaster side, the entire transmission occurs within a method FileBeingSent::do_broadcast(LAPOut& out_link) (see Listing OneThis is the method that implements the outbound part of the TLT30G session protocol, fragmenting the file and sending the segments out. The method is usually repeatedly called. The FileBeingSent class is derived from a protected class Segment0 (see Listing Two), which represents a single file transmission session. Whenever a Segment0 object is constructed (for example, as a part of a FileBeingSent object), a new session is created: A session ID is picked (as a random number), the name of the file is remembered, its size is determined, and the total number of segments in this session is computed. These are all parameters necessary to compose the Segment0 of the TLT30G protocol; see Figure 2. A method Segment0::send(out_link) is used to send out this segment (see Listing One and Example 2).

This method, as well as the do_broadcast() method in Listing One, rely on a SegmentOutStream, which assembles a segment, packs items of various kinds in it, and interfaces a raw byte transport. This class provides a higher-level abstraction of the communication link LAPOut. Upon construction, a SegmentOutStream object informs the link that a new segment is about to begin. The object offers a family of operator << methods you can use to send data of particular types: short or long integers, a segment's header, or simply an array of bytes. This is similar to the regular C++ output stream. The SegmentOutStream object takes care of marshaling the data in a platform-independent network byte order, and accumulating a CRC. When the object is destroyed, it writes the CRC down and tells the output link that the segment is finished. For more information on the streaming paradigm, see my article "Speaking in Iostreams-ese" (C/C++ Users Journal, May 1997).

A byte transport is represented in the code by an instance of a LAPOut class. While SegmentOutStream can handle a variety of data types, the transport class is concerned only with moving raw bytes and marking packets (Example 3). LAPOut is a pure abstract class (an interface) that is implemented in a number of derived classes that support specific communication links. The main transmitter module creates an instance of a particular concrete transport class, and passes it -- as a LAPOut& reference -- to all other communication objects. Since all these objects see a transport class only through the LAPOut& interface, the higher-level protocol services they provide are independent of the actual byte transport. For illustration, Listing Three (available electronically) presents a set of concrete classes LAPNetOut/LAPNetIn, which implement the LAPOut/LAPIn interface for a UDP communication link. The full UDP transport code is available electronically; see "Resource Center," page 3.

A TLT30G receiver can handle several simultaneous broadcasting sessions from different transmitters. Each reception session exists in one of the following states: started, because Segment 0 of the session was just received; opened, when some other segment of the session was first spotted; active, that is, accumulating segments; and zombie. A session becomes a zombie when all needed segments have been received, and the corresponding file successfully reassembled. Therefore, the receiver may ignore subsequent broadcast loops for that session. The receiver manages sessions with the help of a Sessions object. This object is instantiated in the main receiver module. Every time a receiver in its main loop acquires a new segment, it passes the corresponding Segment Input Stream object to the Sessions object (Listing Four, available electronically). The object itself is merely a list of Session objects, each representing a separate reception session (Listing Five, available electronically). The Sessions::digest() method, Example 4, takes a newly received segment and decides what to do with it. The method extracts the session ID from the segment's header, and tries to locate the corresponding Session object. If successful, the found Session consumes the segment. Otherwise, a new Session object is created and chained into the list.

All nontrivial processing of a new segment takes place within a Session object. If the corresponding session's status is active, the segment's data would be put into the appropriate place into the file's assembly, unless the segment is a duplicate. A bitstring Session::fragments_received keeps track of successfully processed segments. This lets a Session handle segments received in any order, and detect duplicates. Once the complete set of segments is received, the corresponding data file is passed to a user-supplied helper, and the Session becomes a zombie (freeing all resources that are no longer necessary).

Waiting for a new segment, receiving its header and the body, and validating that segment (including a CRC check) are performed by a SegmentInStream object when it is constructed. The object can then be used to take items of various types from the segment's body; the object unmarshals data and converts from the network to the native byte order transparently. Similarly to SegmentOutStream, the SegmentInStream object relies on the byte transport layer (LAPIn interface) for detection of a new packet and receiving bytes making up the segment (see Listing Two).

DDJ

Listing One

                  // Broadcast the file, return FALSE if the broadcast was
                   // interrupted (because file was changed in the meantime)
bool FileBeingSent::do_broadcast(LAPOut& out_link)
{
  Segment0::log("Starting");
  Segment0::send(out_link);
  Segment0::send(out_link);  // and do it again, 0th segment is important


</p>
  char buffer[256];
  assert( MTU < sizeof(buffer) );
  rewind(fp);
  for(register unsigned long i=1; i<= max_seq_no; i++)
  {
    if( was_modified() )
      return Logger::log("Session aborted: the file was changed"),
      false;
    if( fread(buffer,sizeof(char),MTU,fp) <= 0 )
      perror("reading input file"),
      _error("EOF or error reading an input file");


</p>
    SegmentOutStream(out_link) << SegmentHeader(i,session_id,MTU)
                                             << byte_array(buffer,MTU);
  }
  Logger::log("Finished broadcast in session %d",session_id);
  return true;
}

Back to Article

Listing Two

#ifndef __GNUC__#pragma once
#else
#pragma interface
#endif


</p>
#ifndef __Segment_h
#define __Segment_h


</p>
#include "LAP.h"
#include "myenv.h"


</p>
class SegmentOutStream;
class SegmentInStream;


</p>
typedef unsigned short ID;


</p>
            // All segments start with this....
struct SegmentHeader
{
  const unsigned long seq_number;   // Sequence number
  const ID session_id;          // ID of the current session
  const unsigned char data_length;  // Length of the data only
                    // Must be within (1,255)
  inline int q_header_size(void) const  // True header size, w/o any padding
    { return sizeof(seq_number) + sizeof(session_id) + sizeof(data_length); }
  SegmentHeader(const unsigned long _seq_number, const short _session_id,
                                                      const int _data_length)
    : seq_number(_seq_number), session_id(_session_id),
      data_length(_data_length)
    { assert( _data_length > 1 && _data_length < 255 ); }
  void write(SegmentOutStream& out_packet) const;
  void read(SegmentInStream& in_packet);
};
class Segment0 : public SegmentHeader
{
protected:
  char file_name[20];           // only the basic portion of name, w/o path
  const unsigned long file_size;
  unsigned long max_seq_no;     // That is, how many data packets
  const unsigned char MTU;      // Max transmission unit: data_length
                                // in all the packets to follow
  inline int q_data_size(void) const
    { return sizeof(file_size) + sizeof(MTU) +
             sizeof(max_seq_no) + sizeof(file_name); }
         // _Partially_ fill out Segment0 based on what found in some_packet
  Segment0(const SegmentHeader& some_packet);
    static unsigned char default_MTU;
  public:
  enum { max_MTU = 255 };  // None of the segments can be bigger than that
  Segment0(const char * full_file_name);
  void send(LAPOut& out_link) const;
  void log(const char * title) const;   // Log the contents of Segment0,
                                        // which represents a session.
                                        // Complete a partially finished
                                        // Segment0 from a packet
  void complete(SegmentInStream& packet) throw (rc_bad_packet);
  unsigned int q_MTU(void) const        { return MTU; }
  unsigned int q_max_seq_no(void) const     { return max_seq_no; }
  static void set_default_MTU(const int new_MTU);
  static int  get_default_MTU(void);
};
            // A stream to help compose a packet
class SegmentOutStream
{
  LAPOut& out_link;
  CRC16 crc;
  void write_byte(const unsigned char a_byte)
  { out_link.write_byte(a_byte); crc << a_byte; }
public:
  SegmentOutStream(LAPOut& _out_link);
  ~SegmentOutStream(void);      // write the CRC and flush the packet
  SegmentOutStream& operator << (const unsigned char a_byte)
        { write_byte(a_byte); return *this; }
  SegmentOutStream& operator << (const unsigned short a_short);
  SegmentOutStream& operator << (const unsigned long a_long);
  SegmentOutStream& operator << (const SegmentHeader& seg_header);
        // Write an array of bytes
        // Array: a representation for an array. Note, for safety an Array 
        // object cannot be constructed explicitly, either on stack or on 
        // heap. An Array object can only be constructed via a friend 
        // function byte_array, and is always transient.
  class Array
  {
    friend SegmentOutStream;
    friend SegmentInStream;
    const char * const ptr;
    const unsigned long size;
    Array(const char * byte_array, const unsigned long _size) 
                                                 // private constructor
    : ptr(byte_array), size(_size) {}
    Array& operator = (const Array&); // unimplemented and forbidden
    Array(const Array&);              // unimplemented and forbidden
  public:
    friend inline Array byte_array(const char * ptr, const unsigned long size)
        { return Array(ptr,size); }
  };
  SegmentOutStream& operator << (const Array& array);
};
inline SegmentOutStream&
SegmentOutStream::operator << (const SegmentHeader& seg_header)
{
  seg_header.write(*this);
  return *this;
}
      // Stream to help de-compose a packet. When stream is fully constructed,
      //  it contains complete packet, with CRC and other checks performed
class SegmentInStream
{
  unsigned char buffer[Segment0::max_MTU+1];    // The buffer where the packet 
                // is received. Two ptrs in buffer for the curent reading
                // and writing position
  const unsigned char * read_ptr;
  unsigned char * write_ptr;
  CRC16 curr_crc;       // CRC accumulated while the packet is being received
                // The packet header is being read as a part
                // of the receiving process
  SegmentHeader header;
                // Take the current byte from the buffer
  unsigned char take_byte(void)
    { assert( write_ptr > read_ptr ); return *read_ptr++; }
                // Receive a byte from the link and into the buffer
  void receive_byte(LAPIn& in_link);
public:
                // Receive a packet. It throws up if there was an input error
  SegmentInStream(LAPIn& in_link);
  const SegmentHeader& q_header(void) const { return header; }
  SegmentInStream& operator >> (unsigned char& a_byte)
        { a_byte = take_byte(); return *this; }
  SegmentInStream& operator >> (unsigned short& a_short);
  SegmentInStream& operator >> (unsigned long& a_long);
  SegmentInStream& operator >> (const SegmentOutStream::Array& array);
  void write(FILE * fp);    // Dump the rest of the stream into a file
};
#endif

Back to Article


Copyright © 1998, Dr. Dobb's Journal

Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.