Distributing Data Using TLT30G

TLT30G is a software system for distributing data from a central location to a number of clients over unidirectional, noisy, and generally slow communication links.


September 01, 1998
URL:http://www.drdobbs.com/database/distributing-data-using-tlt30g/184410660

Sep98: Distributing Data Using TLT30G

Oleg is a computer scientist with CSC. He can be contacted at [email protected].


TLT30G is a software system for distributing data from a central location to a number of clients over unidirectional, noisy, and generally slow communication links -- the delivery of up-to-date weather information to a ship, for instance. The information -- gridded data, observations, storm warnings, and so on -- is prepared at a central site, which broadcasts over appropriate channels. Any user equipped with the right satellite receiver and a PC can pick up the broadcast, unpack, and view the data.

A broadcasting server at a central location may not receive feedback from clients. Besides, when data is being sent to a multitude of recipients, feedback is infeasible; otherwise, the single server would be overwhelmed with retransmission requests. To maximize the probability that any client receives the complete data set, the broadcaster fragments data into segments and repeatedly sends them. A receiver must do its best to reassemble a file as soon as it obtains all segments. That means the receiver should pick up transmission starting from any segment, handle segments received in any order, and listen through as many loops of broadcast as necessary to fully assemble the transmitted file. After the file is successfully received, all further segments belonging to the same broadcast session should be disregarded. The client also should be smart enough to handle interrupted broadcasts (which are purged after a certain time-to-live), outdated broadcasting sessions, as well as several simultaneous transmissions from possibly different servers.

The broadcasting service should also be as transport-independent as possible, being able to function with any communication link that is capable of sending and receiving an eight-bit byte. The byte transport therefore is handled by plug-ins; currently, TLT30G (available electronically; see "Resource Center," page 3) supports UNIX pipes, UNIX and Win32 asynchronous serial lines, and UDP datagrams, on SunSparc/Solaris, HP-UX, and Windows 95/NT.

Communication Protocol

TLT30G's communication model is similar to the familiar radio/TV broadcast, with several transmitters showering data that a number of recipients pick up. Thus, TLT30G is made of transmitter (broadcaster) and receiver modules, which are separately compiled executables, running as separate processes on (generally) separate hosts connected via an appropriate communication link. The broadcaster module exists in several flavors: The simplest one sends out a single file, while the advanced versions simultaneously distribute a number of files, taking into account a possibility that each file may be concurrently updated.

The broadcaster and receiver applications speak (and thus implement) a common TLT30G session protocol, which can run on top of several different transports. The TLT30G protocol's scope is segmenting and reassembling data within a transmission session. The protocol itself does not care how segments are passed from one end to another, only how segments are specified and interpreted. To support such an abstraction, the transport layer is accessed exclusively through a pair of abstract classes LAPIn/LAPOut, which define methods to send/receive bytes and mark packets' boundaries. Currently, TLT30G provides implementations of these interfaces for a stream (file/pipe/serial) and datagram (TCP/IP) communication links.

A segment is a logical unit of transmission for the TLT30G protocol. A broadcasting module splits the file being distributed into fragments, and builds segments around them with the help of a TLT30G protocol API. A transport layer is called upon to transmit segments, either in packets/frames or in a byte/bit stream depending on a particular transport. A receiving module listens for incoming segments and places their payload data according to a file transmission session they belong to, and their position in the file's assembly. As Figure 1 shows, every segment starts with a fixed header, followed by payload data, and is terminated by a CRC field. All segments have the same size (specified in the 0th segment). The 0th segment (the segment with seq_number = 0) defines a transmission session and carries its parameters (Figure 2). Because of its importance, the 0th segment is repeated twice during every loop of a broadcast. Still, the receiver can start assembling a file even if it missed the 0th segment.

Since only one byte is allocated for the segment data length field, the segment's body cannot exceed 255 bytes. This seems appropriate for slow communication channels. A particular value for the segment size is chosen by a transmitter upon start-up, and remains fixed for all sessions of a given transmitter. A receiver, however, can handle segments of any legal size (within a 4-254 range). Moreover, segment sizes may differ among different reception sessions handled concurrently by a receiver.

TLT30G's Communication Links

TLT30G currently runs over the following data transports:

TLT30G Walk-Through

To see how the system actually works, I'll follow the transmission of a single file across a particular (UDP) communication link.

On the broadcaster side, the entire transmission occurs within a method FileBeingSent::do_broadcast(LAPOut& out_link) (see Listing OneThis is the method that implements the outbound part of the TLT30G session protocol, fragmenting the file and sending the segments out. The method is usually repeatedly called. The FileBeingSent class is derived from a protected class Segment0 (see Listing Two), which represents a single file transmission session. Whenever a Segment0 object is constructed (for example, as a part of a FileBeingSent object), a new session is created: A session ID is picked (as a random number), the name of the file is remembered, its size is determined, and the total number of segments in this session is computed. These are all parameters necessary to compose the Segment0 of the TLT30G protocol; see Figure 2. A method Segment0::send(out_link) is used to send out this segment (see Listing One and Example 2).

This method, as well as the do_broadcast() method in Listing One, rely on a SegmentOutStream, which assembles a segment, packs items of various kinds in it, and interfaces a raw byte transport. This class provides a higher-level abstraction of the communication link LAPOut. Upon construction, a SegmentOutStream object informs the link that a new segment is about to begin. The object offers a family of operator << methods you can use to send data of particular types: short or long integers, a segment's header, or simply an array of bytes. This is similar to the regular C++ output stream. The SegmentOutStream object takes care of marshaling the data in a platform-independent network byte order, and accumulating a CRC. When the object is destroyed, it writes the CRC down and tells the output link that the segment is finished. For more information on the streaming paradigm, see my article "Speaking in Iostreams-ese" (C/C++ Users Journal, May 1997).

A byte transport is represented in the code by an instance of a LAPOut class. While SegmentOutStream can handle a variety of data types, the transport class is concerned only with moving raw bytes and marking packets (Example 3). LAPOut is a pure abstract class (an interface) that is implemented in a number of derived classes that support specific communication links. The main transmitter module creates an instance of a particular concrete transport class, and passes it -- as a LAPOut& reference -- to all other communication objects. Since all these objects see a transport class only through the LAPOut& interface, the higher-level protocol services they provide are independent of the actual byte transport. For illustration, Listing Three (available electronically) presents a set of concrete classes LAPNetOut/LAPNetIn, which implement the LAPOut/LAPIn interface for a UDP communication link. The full UDP transport code is available electronically; see "Resource Center," page 3.

A TLT30G receiver can handle several simultaneous broadcasting sessions from different transmitters. Each reception session exists in one of the following states: started, because Segment 0 of the session was just received; opened, when some other segment of the session was first spotted; active, that is, accumulating segments; and zombie. A session becomes a zombie when all needed segments have been received, and the corresponding file successfully reassembled. Therefore, the receiver may ignore subsequent broadcast loops for that session. The receiver manages sessions with the help of a Sessions object. This object is instantiated in the main receiver module. Every time a receiver in its main loop acquires a new segment, it passes the corresponding Segment Input Stream object to the Sessions object (Listing Four, available electronically). The object itself is merely a list of Session objects, each representing a separate reception session (Listing Five, available electronically). The Sessions::digest() method, Example 4, takes a newly received segment and decides what to do with it. The method extracts the session ID from the segment's header, and tries to locate the corresponding Session object. If successful, the found Session consumes the segment. Otherwise, a new Session object is created and chained into the list.

All nontrivial processing of a new segment takes place within a Session object. If the corresponding session's status is active, the segment's data would be put into the appropriate place into the file's assembly, unless the segment is a duplicate. A bitstring Session::fragments_received keeps track of successfully processed segments. This lets a Session handle segments received in any order, and detect duplicates. Once the complete set of segments is received, the corresponding data file is passed to a user-supplied helper, and the Session becomes a zombie (freeing all resources that are no longer necessary).

Waiting for a new segment, receiving its header and the body, and validating that segment (including a CRC check) are performed by a SegmentInStream object when it is constructed. The object can then be used to take items of various types from the segment's body; the object unmarshals data and converts from the network to the native byte order transparently. Similarly to SegmentOutStream, the SegmentInStream object relies on the byte transport layer (LAPIn interface) for detection of a new packet and receiving bytes making up the segment (see Listing Two).

DDJ

Listing One

                  // Broadcast the file, return FALSE if the broadcast was
                   // interrupted (because file was changed in the meantime)
bool FileBeingSent::do_broadcast(LAPOut& out_link)
{
  Segment0::log("Starting");
  Segment0::send(out_link);
  Segment0::send(out_link);  // and do it again, 0th segment is important


char buffer[256]; assert( MTU < sizeof(buffer) ); rewind(fp); for(register unsigned long i=1; i<= max_seq_no; i++) { if( was_modified() ) return Logger::log("Session aborted: the file was changed"), false; if( fread(buffer,sizeof(char),MTU,fp) <= 0 ) perror("reading input file"), _error("EOF or error reading an input file");

SegmentOutStream(out_link) << SegmentHeader(i,session_id,MTU) << byte_array(buffer,MTU); } Logger::log("Finished broadcast in session %d",session_id); return true; }

Back to Article

Listing Two

#ifndef __GNUC__#pragma once
#else
#pragma interface
#endif


#ifndef __Segment_h #define __Segment_h

#include "LAP.h" #include "myenv.h"

class SegmentOutStream; class SegmentInStream;

typedef unsigned short ID;

// All segments start with this.... struct SegmentHeader { const unsigned long seq_number; // Sequence number const ID session_id; // ID of the current session const unsigned char data_length; // Length of the data only // Must be within (1,255) inline int q_header_size(void) const // True header size, w/o any padding { return sizeof(seq_number) + sizeof(session_id) + sizeof(data_length); } SegmentHeader(const unsigned long _seq_number, const short _session_id, const int _data_length) : seq_number(_seq_number), session_id(_session_id), data_length(_data_length) { assert( _data_length > 1 && _data_length < 255 ); } void write(SegmentOutStream& out_packet) const; void read(SegmentInStream& in_packet); }; class Segment0 : public SegmentHeader { protected: char file_name[20]; // only the basic portion of name, w/o path const unsigned long file_size; unsigned long max_seq_no; // That is, how many data packets const unsigned char MTU; // Max transmission unit: data_length // in all the packets to follow inline int q_data_size(void) const { return sizeof(file_size) + sizeof(MTU) + sizeof(max_seq_no) + sizeof(file_name); } // _Partially_ fill out Segment0 based on what found in some_packet Segment0(const SegmentHeader& some_packet); static unsigned char default_MTU; public: enum { max_MTU = 255 }; // None of the segments can be bigger than that Segment0(const char * full_file_name); void send(LAPOut& out_link) const; void log(const char * title) const; // Log the contents of Segment0, // which represents a session. // Complete a partially finished // Segment0 from a packet void complete(SegmentInStream& packet) throw (rc_bad_packet); unsigned int q_MTU(void) const { return MTU; } unsigned int q_max_seq_no(void) const { return max_seq_no; } static void set_default_MTU(const int new_MTU); static int get_default_MTU(void); }; // A stream to help compose a packet class SegmentOutStream { LAPOut& out_link; CRC16 crc; void write_byte(const unsigned char a_byte) { out_link.write_byte(a_byte); crc << a_byte; } public: SegmentOutStream(LAPOut& _out_link); ~SegmentOutStream(void); // write the CRC and flush the packet SegmentOutStream& operator << (const unsigned char a_byte) { write_byte(a_byte); return *this; } SegmentOutStream& operator << (const unsigned short a_short); SegmentOutStream& operator << (const unsigned long a_long); SegmentOutStream& operator << (const SegmentHeader& seg_header); // Write an array of bytes // Array: a representation for an array. Note, for safety an Array // object cannot be constructed explicitly, either on stack or on // heap. An Array object can only be constructed via a friend // function byte_array, and is always transient. class Array { friend SegmentOutStream; friend SegmentInStream; const char * const ptr; const unsigned long size; Array(const char * byte_array, const unsigned long _size) // private constructor : ptr(byte_array), size(_size) {} Array& operator = (const Array&); // unimplemented and forbidden Array(const Array&); // unimplemented and forbidden public: friend inline Array byte_array(const char * ptr, const unsigned long size) { return Array(ptr,size); } }; SegmentOutStream& operator << (const Array& array); }; inline SegmentOutStream& SegmentOutStream::operator << (const SegmentHeader& seg_header) { seg_header.write(*this); return *this; } // Stream to help de-compose a packet. When stream is fully constructed, // it contains complete packet, with CRC and other checks performed class SegmentInStream { unsigned char buffer[Segment0::max_MTU+1]; // The buffer where the packet // is received. Two ptrs in buffer for the curent reading // and writing position const unsigned char * read_ptr; unsigned char * write_ptr; CRC16 curr_crc; // CRC accumulated while the packet is being received // The packet header is being read as a part // of the receiving process SegmentHeader header; // Take the current byte from the buffer unsigned char take_byte(void) { assert( write_ptr > read_ptr ); return *read_ptr++; } // Receive a byte from the link and into the buffer void receive_byte(LAPIn& in_link); public: // Receive a packet. It throws up if there was an input error SegmentInStream(LAPIn& in_link); const SegmentHeader& q_header(void) const { return header; } SegmentInStream& operator >> (unsigned char& a_byte) { a_byte = take_byte(); return *this; } SegmentInStream& operator >> (unsigned short& a_short); SegmentInStream& operator >> (unsigned long& a_long); SegmentInStream& operator >> (const SegmentOutStream::Array& array); void write(FILE * fp); // Dump the rest of the stream into a file }; #endif

Back to Article


Copyright © 1998, Dr. Dobb's Journal
Sep98: Distributing Data Using TLT30G

Distributing Data Using TLT30G

By Oleg Kiselyov

Dr. Dobb's Journal September 1998

(a)
declare -x LAPFILENAME=test_output.dat
receiver cp @f /tmp/@n.1
(b)
set LAPFILENAME=test_output.dat
receiver copy @f \temp\@n
(c)
~/croot/Frag-Assem> /etc/mknod /tmp/mypipe p
~/croot/Frag-Assem> declare -x LAPFILENAME=/tmp/mypipe
~/croot/Frag-Assem> ( declare -x SESSION_TTL=1;
declare -x LAPFILEERR=1000; declare -x BC_LOG_FILE=/tmp/rec.log;
receiver cp @f /tmp/@n.1 )&
~/croot/Frag-Assem> broadcaster -s20 broadcaster.cc
  ~> diff /tmp/broadcaster.cc.1 croot/Frag-Assem/broadcaster.cc
[no differences found]
(d)
~/croot/Frag-Assem> declare -x LAPNETPORT=4000
~/croot/Frag-Assem> ( declare -x SESSION_TTL=1;
declare -x LAPNETERR=400; declare -x BC_LOG_FILE=/tmp/rec.log;
receiver cp @f /tmp/@n.1 )&
~/croot/Frag-Assem> declare -x LAPNETHOST=localhost
~/croot/Frag-Assem> broadcaster -s60 broadcaster.cc

Example 1: (a) Testing the receiver: Decoding a file of previously recorded segments (dumped by a broadcaster through a file access link) on UNIX; (b) on Windows 95/NT, open an MS-DOS window and enter this at the prompt; (c) running TLT30G through a UNIX pipe; (d) running TLT30G through a UDP link on the same host.


Copyright © 1998, Dr. Dobb's Journal
Sep98: Distributing Data Using TLT30G

Distributing Data Using TLT30G

By Oleg Kiselyov

Dr. Dobb's Journal September 1998

void Segment0::send(LAPOut& out_link) const
{
  SegmentOutStream(out_link) << (SegmentHeader)(*this)
        << file_size << MTU << max_seq_no
        << byte_array(file_name,sizeof(file_name));
}

Example 2: Sending the complete Segment0.


Copyright © 1998, Dr. Dobb's Journal
Sep98: Distributing Data Using TLT30G

Distributing Data Using TLT30G

By Oleg Kiselyov

Dr. Dobb's Journal September 1998

class LAPOut
{
public:
  virtual void write_byte(const unsigned char a_byte) = 0;
  virtual void begin_packet(void) = 0;
  virtual void finish_packet(void) = 0;
};

Example 3: Outbound transport interface.


Copyright © 1998, Dr. Dobb's Journal
Sep98: Distributing Data Using TLT30G

Distributing Data Using TLT30G

By Oleg Kiselyov

Dr. Dobb's Journal September 1998

void Sessions::digest(SegmentInStream& packet)
{
  SessionLink * sp;
  if( (sp = find(packet.q_header().session_id)) != 0 )
    sp->ingest(packet);
  else
  {
    sp = new SessionLink(sh_command,packet);
    sp->next = sessions;        // that won't be run if an exception was
    sessions = sp;             // thrown from the constructor... good!
  }
}

Example 4: Handling a newly received packet.


Copyright © 1998, Dr. Dobb's Journal
Sep98: Distributing Data Using TLT30G

Distributing Data Using TLT30G

By Oleg Kiselyov

Dr. Dobb's Journal September 1998

Figure 1: Segments, transmission units of the TLT30G protocol. The start-of-segment character(s) are determined by a link access layer and are not a part of the logical segment. All multibyte quantities (including integers of various sizes: ulong, ushort, short) are transmitted in the standard, Big-endian, network order: the most-significant-byte first.


Copyright © 1998, Dr. Dobb's Journal
Sep98: Distributing Data Using TLT30G

Distributing Data Using TLT30G

By Oleg Kiselyov

Dr. Dobb's Journal September 1998

Figure 2: The 0th segment: transmission session parameters. MTU tells the size of the payload data in all the segments of this session.


Copyright © 1998, Dr. Dobb's Journal

Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.