September 26, 2008
URL:http://www.drdobbs.com/parallel/weak-references-as-object-accessors/210604244
Sergey Babkin is a senior developer for Aleri. He can be contacted at [email protected].
Complex multi-threaded programs often end up a tangle of interconnected objects or threads. If some parts need to be stopped (such as when a user disconnects), things become complicated. Even the orderly stopping of the entire application becomes difficult: Objects keep referencing other objects that don't exist anymore. That's why you sometimes see programs that crash on an attempt to exit.
Java programmers are bound to chime in at this point and say: "Java programs don't crash!" Right. They just throw the unhandled exceptions.
The issue usually lies in the circular connections: Two objects, each with its thread, are referencing each other. If you try to destroy one, another one might reference it and blow up. It's not always that symmetrical, but that's the gist of the problem.
But Java programmers will likely chime in again and say: "The garbage collection takes care of it!" For one thing, the garbage collection means that there are no proper destructors. The finalize() methods may get delayed for an unknown length of time, until the garbage collector decides to do its sweep. Which is not good for the expensive resources like file descriptors. They still have to be freed manually. For another thing, one object's finalize method would find another object already finalized, and might not like it. So even though things are simpler, careful consideration is still needed, and the approach I present in this article might come in useful.
Java has another useful concept -- weak references. A weak reference refers to an object but doesn't prevent this object from being garbage collected if all the normal strong references to it are gone and the system starts running short of memory. If that has happened, the weak reference would contain null. Copying a weak reference to a strong one works as an atomic act of getting a hold on the object. Deleting the strong reference releases the hold.
Consider a small practical example of a system. Consisting of the following objects, each with its own thread:
The point here is that the worker object might be "pulled from under" the rest of the objects. And the weak references seem to be a convenient way to implement it.
When the RPC listener gets created, it would be given a weak reference to the worker object. As it creates the client threads, they will be given copies of that weak reference (also weak). Whenever they get a call to process, they would convert the weak reference to a strong one, thus getting a hold on the worker, check if the reference is still valid, and proceed with the call. If the reference is not valid any more, they would return an error code right away. After the call they would release the strong reference, thus releasing the hold. If the worker object gets deleted, it would first go and invalidate all the weak references. If some references are being held, it would have to wait for them to be released first. After that the worker object becomes unaccessible and may be safely destroyed.
This works only as long as the calls are reasonably short. If some call is unbounded in time (such as "wait until the worker enters a certain state"), the RPC client must hold the worker object in some other way and release the reference back to weak before waiting. Otherwise a deadlock would occur.
Let's devise a C++ implementation, using POSIX threads. The worker class would inherit from the base class RefTarget that would embody the functionality we're interested in. The class WeakReaf would implement a weak reference. Listing One is a reasonable class interface. The templates may be used to control the pointer types better but it's easier to show the example without them, with simple inheritance.
class RefTarget { friend class WeakRef; public: RefTarget(void *owner); // this pointer will be held in the weak references ~RefTarget(); WeakRef *newWeakRef(); // create a new weak reference protected: void invalidate(); // called from the subclass destructor void freeWeakRef(WeakRef *wr); // notification from a WeakRef that it gets destroyed private: // prevent the default and copy constructors and assignments RefTarget(); RefTarget(const RefTarget&); void operator=(const RefTarget&); }; class WeakRef { friend class RefTarget; public: ~WeakRef(); WeakRef *copy(); // create a copy of this reference void *grab(); // make the reference strong, return its current value void release(); // release the reference back to weak protected: WeakRef(RefTarget *target); // called from copy() and from RefTarget bool invalidate1(); // called from RefTarget invalidation void invalidate2(); // called if invalidate1() returned true private: // prevent the default and copy constructors and assignments // (they may be added but the code is simpler without them) WeakRef(); WeakRef(const WeakRef&); void operator=(const WeakRef&); };
Listing Two shows how you use it. The listing includes only the parts related to the weak references, skipping most of the thread control and such. Note that the Worker invalidates its RefTarget at the start of the destructor. This must be done before the object starts being taken apart. Leaving things until the RefTarget destructor would be too late, since the superclass destructor gets called only after the subclass destructor. Invalidation insures of two things: that the new weak references stop being given out (or are given out as already invalid) and that all the existing weak references get invalidated. The weak references are invalidated in two steps, by the reasons that I discussed later.
class Worker : public RefTarget { public: Worker() : RefTarget(this) { ... } ~Worker() { RefTarget::invalidate(); .... } ... }; class Listener { public: Listener(WeakRef *ref) : ref_(ref), done_(false) { ... } ~Listener() { delete ref_; ... } execute() { ... while(!done_) { ... accept connection ... Client * cl = new Client(connection, ref_.copy()); ... cl.start(); // starts the thread with execute() method } ... } protected: WeakRef *ref_; bool done_; }; class Client { public: Client(Connection *conn, WeakRef *ref) : conn_(conn), ref_(ref) { ... } ~Client() { delete ref_; ... } execute() { while(!conn_.eof()) { ... accept request ... error = 0; Worker *w = (Worker *)ref_->grab(); if (w == 0) error = WORKER_GONE; else error = w->call(...); ref_->release(); ... return result ... } ... } protected: Connection *conn_; WeakRef *ref_; }; // hypothetical main() ... Worker *w = new Worker; Listener l = new Listener(w->newWeakRef()); w->start(); l->start(); ...
The WeakRef constructor is protected. It's never ever called directly. Instead all the creation happens either through the factory method newWeakRef() of the target object, or through the copy() method. This makes sure that nobody will create a reference to an object that is being or has been destroyed.
The WeakRef methods grab() and release() are always called in pairs, it makes both the usage and the implementation simpler. If the reference became invalid, grab() would return a NULL pointer. This should always be checked before performing any actions. But if it's not NULL then the worker object is guaranteed to stay put until release() is called.
An important assumption in this code is that one weak reference is used by one thread only. This thread must take care to always release the reference after grabbing it, and to release the reference before destroying it. If the reference needs to be passed to another thread, it should be copied first, and then the copy given to that other thread. Of course, other means may be used to implement a consistent access from multiple threads to the same reference, but making extra copies is easier.
Now let's look at the implementation. The repeated motif here will be the reference counting, wrapped in the class Hold, whose interface is in the Listing Three. It takes care of all its internal synchronization.
class Hold { public: Hold(); ~Hold(); void getHold(); // increase the reference count void releaseHold(); // decrease the reference count void wait(); // wait until the reference count drops to 0 };
The WeakRef can use a Hold to keep track of the grabs on it. It would nicely provide the recursive grabbing. Later, when the RefTarget invalidates the WeakRef and needs to make sure that nobody has any grabs any more, it would wait on that Hold.
What else is needed? Obviously, RefTarget needs to keep track of its WeakRefs, allowing to invalidate them later. This can be done with an STL list, or with just a classic old-fashioned list. RefTarget would have the head of the list and each WeakRef would have a pointer to the next WeakRef. For convenience of removal of elements from the list, an "one-and-a-half-linked list" would do well: essentially a double-linked list but with the back pointer pointing to the pointer to this element, instead of being a pointer to the previous element. It doesn't allow going back through the list easily but gives enough information to delete the elements in place.
RefTarget would also need to keep a pointer to the owner object, and a flag whether the object is still valid. Once the object enters its destructor, the RefTarget::invalidate() method would reset this flag and prevent the creation of the new references. Then it can go at a leisure and invalidate the existing references. Otherwise it could be a potentially endless race of new references created while trying to collect the old ones.
The WeakRef needs to keep a pointer to its RefTarget, for two purposes. First, when it gets deleted, it must tell the RefTarget about that and be removed from the list. Second, when it gets copied, it must ask RefTarget to issue a new WeakRef.
Both RefTarget and WeakRef must have a mutex to protect their contents during concurrent access. And the last non-trivial item is the synchronization on deletion. It will be done through another instance of Hold and discussed in detail below. In the end, the class members amount to what is in Listing Four.
class RefTarget { ... protected: pthread_mutex_t mutex_; // controls all access Hold hold_; bool valid_; // gets reset on invalidation WeakRef *rlist_; // head of the list of references void *owner_; // value to pass to the weak refs ... }; class WeakRef { ... protected: // a sort-of-double-linked list, access controlled by target's mutex WeakRef *next_; WeakRef **prevp_; pthread_mutex_t mutex_; // controls all access Hold hold_; // keeps track of holds from RefTarget Hold grabs_; // keep track of the grabs RefTarget *target_; // reset to 0 on invalidation ... };
The creation and grabbing of references is straightforward; see Listing Five. The only important thing to remember is the avoidance of deadlocks due to circular dependencies. When RefTarget manipulates its list of references, it must keep its own mutex and when it calls the operations on WeakRef, they may acquire the WeakRef mutex. This means that when WeakRef holds its mutex, it must never call back into RefTarget. It must release its mutex first.
RefTarget::RefTarget(void *owner) : // this pointer will be held in the weak references valid_(true), rlist_(0), owner_(owner) { pthread_mutex_init(&mutex_, NULL); } WeakRef *RefTarget::newWeakRef() // create a new weak reference { WeakRef *wr; wr = new WeakRef(this); pthread_mutex_lock(&mutex_); if (!valid_) { if (wr->invalidate1()) wr->invalidate2(); } else { // add to the list if (rlist_) { rlist_->prevp_ = &wr->next_; } wr->next_ = rlist_; wr->prevp_ = &rlist_; rlist_ = wr; } pthread_mutex_unlock(&mutex_); return wr; } WeakRef::WeakRef(RefTarget *target) : // called from copy() and from RefTarget next_(0), prevp_(0), target_(target) { pthread_mutex_init(&mutex_, NULL); } WeakRef *WeakRef::copy() // create a copy of this reference { WeakRef *nwr; grab(); pthread_mutex_lock(&mutex_); RefTarget *t = target_; pthread_mutex_unlock(&mutex_); if (t) nwr = t->newWeakRef(); else nwr = new WeakRef(0); release(); return nwr; } void *WeakRef::grab() // make the reference strong, return its current value { grabs_.getHold(); pthread_mutex_lock(&mutex_); void *owner = target_? target_->owner_ : 0; pthread_mutex_unlock(&mutex_); return owner; } void WeakRef::release() { // release the reference back to weak grabs_.releaseHold(); }
The new weak references are always created by RefTarget::newWeakRef(), which keeps thack of all of them on its list. Even the copy() method calls there. Except in case when the reference has been invalidated; then the copy simply creates and returns another invalid reference. RefTarget::newWeakRef() does the same: if the target has already been invalidated, the returned new reference is invalid too. This approach is a little more tricky to implement but from the API standpoint it's better than returning a NULL instead of the new reference. This way there is no need to check the result of creation or copying. It would always be an object. The contents of that object would need to be checked during grabbing it but that check must be done anyway. Even if a valid reference had been created, it may get invalidated before it gets grabbed.
Invalidation is a more interesting task. It is shown in the Listing Six. As mentioned before, the actual destruction of the target's contents is done in the function invalidate() that must be called at the start of the child class destructor. The target's destructor only checks that the invalidation has been done, and aborts the process if it was not.
RefTarget::~RefTarget() { if (valid_) { fprintf(stderr, "~RefTarget: not invalidated!\n"); abort(); } pthread_mutex_destroy(&mutex_); } void RefTarget::invalidate() // called from the subclass destructor { pthread_mutex_lock(&mutex_); valid_ = false; while (rlist_) { WeakRef *wr = rlist_; rlist_ = rlist_->next_; if (rlist_) rlist_->prevp_ = &rlist_; wr->prevp_ = 0; wr->next_ = 0; bool code = wr->invalidate1(); if (code) { pthread_mutex_unlock(&mutex_); wr->invalidate2(); pthread_mutex_lock(&mutex_); } } pthread_mutex_unlock(&mutex_); hold_.wait(); } void RefTarget::freeWeakRef(WeakRef *wr) // notification from a WeakRef that it gets destroyed { // remove the ref from the list pthread_mutex_lock(&mutex_); if (wr->prevp_) { *wr->prevp_ = wr->next_; if (wr->next_) wr->next_->prevp_ = wr->prevp_; wr->prevp_ = 0; wr->next_ = 0; } pthread_mutex_unlock(&mutex_); } WeakRef::~WeakRef() { pthread_mutex_lock(&mutex_); RefTarget *t = target_; target_ = 0; if (t) t->hold_.getHold(); pthread_mutex_unlock(&mutex_); hold_.wait(); if (t) { t->freeWeakRef(this); t->hold_.releaseHold(); } pthread_mutex_destroy(&mutex_); } bool WeakRef::invalidate1() // called from RefTarget invalidation { pthread_mutex_lock(&mutex_); RefTarget *t = target_; target_ = 0; if (t) hold_.getHold(); pthread_mutex_unlock(&mutex_); return (t != 0); } void WeakRef::invalidate2() // called if invalidate1() returned true { grabs_.wait(); hold_.releaseHold(); }
The complication on destruction is that the target may get destroyed before the reference, of the reference may be destroyed before the target, or they may be destroyed at the same time. It makes the synchronization tricky. Getting this right in production took me a few attempts. Hopefully, things stayed right when transferring from the production to the example code.
Because of this, all the destruction in both directions is done in stages:
This disentangles two objects while making sure that none of the objects gets freed while still being used. Of course, the target and the reference are in an asymmetric relationship, and their destruction sequences are somewhat different.
The differences have to do mostly with the different character of the links and with the lock ordering to prevent the deadlocks. The weak reference has to release its own mutex before calling any operation that acquires the target mutex. On the other hand, the target must not get stuck for indefinite time while holding its mutex. This last requirement leads to the WeakRef invalidation being done in two steps. Invalidate1() does the invalidation as such and returns the boolean value showing whether it has any ongoing grabs. If so, invalidate2() is called to wait for any grabs to be released. The target releases its mutex while calling invalidate2().
Note that if the target object is already invalid, RefTarget::newWeakRef() invalidates the newly created reference without releasing its own mutex. This is fine because there are guaranteed to be no grabs, and no open-ended wait would be involved.
Listing Seven provides the complete implementation of the techniques presented here. This pattern was tricky to implement, but once done, all the complexity is hidden inside and the complicated multithreaded programs become less complex.
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> class RefTarget; class WeakRef; class Hold { public: Hold() : count_(0), waiting_(0) { pthread_mutex_init(&mutex_, NULL); pthread_cond_init(&cond_, NULL); } ~Hold() { pthread_mutex_destroy(&mutex_); pthread_cond_destroy(&cond_); } void getHold() { pthread_mutex_lock(&mutex_); ++count_; pthread_mutex_unlock(&mutex_); } void releaseHold() { pthread_mutex_lock(&mutex_); if (--count_ == 0 && waiting_ != 0) pthread_cond_broadcast(&cond_); pthread_mutex_unlock(&mutex_); } void wait() { pthread_mutex_lock(&mutex_); while(count_ != 0) { ++waiting_; pthread_cond_wait(&cond_, &mutex_); --waiting_; } pthread_mutex_unlock(&mutex_); } protected: pthread_mutex_t mutex_; // access to count_ pthread_cond_t cond_; // signalled when count_ drops to 0 int count_; // count of holds int waiting_; // count of threads waiting for release private: // prevent the copy constructors and assignments Hold(const Hold&); void operator=(const Hold&); }; class RefTarget { friend class WeakRef; public: RefTarget(void *owner); // this pointer will be held in the weak references ~RefTarget(); WeakRef *newWeakRef(); // create a new weak reference protected: void invalidate(); // called from the subclass destructor void freeWeakRef(WeakRef *wr); // notification from a WeakRef that it gets destroyed pthread_mutex_t mutex_; // controls all access Hold hold_; bool valid_; // gets reset on invalidation WeakRef *rlist_; // head of the list of references void *owner_; // value to pass to the weak refs private: // prevent the default and copy constructors and assignments RefTarget(); RefTarget(const RefTarget&); void operator=(const RefTarget&); }; class WeakRef { friend class RefTarget; public: ~WeakRef(); WeakRef *copy(); // create a copy of this reference void *grab(); // make the reference strong, return its current value void release(); // release the reference back to weak protected: WeakRef(RefTarget *target); // called from copy() and from RefTarget bool invalidate1(); // called from RefTarget invalidation void invalidate2(); // called if invalidate1() returned true // a sort-of-double-linked list, access controlled by target's mutex WeakRef *next_; WeakRef **prevp_; pthread_mutex_t mutex_; // controls all access Hold hold_; // keeps track of holds from RefTarget Hold grabs_; // keep track of the grabs RefTarget *target_; // reset to 0 on invalidation private: // prevent the default and copy constructors and assignments // (they may be added but the code is simpler without them) WeakRef(); WeakRef(const WeakRef&); void operator=(const WeakRef&); }; // ------------------ methods -------------------------- RefTarget::RefTarget(void *owner) : // this pointer will be held in the weak references valid_(true), rlist_(0), owner_(owner) { pthread_mutex_init(&mutex_, NULL); } RefTarget::~RefTarget() { if (valid_) { fprintf(stderr, "~RefTarget: not invalidated!\n"); abort(); } pthread_mutex_destroy(&mutex_); } WeakRef *RefTarget::newWeakRef() // create a new weak reference { WeakRef *wr; wr = new WeakRef(this); pthread_mutex_lock(&mutex_); if (!valid_) { if (wr->invalidate1()) wr->invalidate2(); } else { // add to the list if (rlist_) { rlist_->prevp_ = &wr->next_; } wr->next_ = rlist_; wr->prevp_ = &rlist_; rlist_ = wr; } pthread_mutex_unlock(&mutex_); return wr; } void RefTarget::invalidate() // called from the subclass destructor { pthread_mutex_lock(&mutex_); valid_ = false; while (rlist_) { WeakRef *wr = rlist_; rlist_ = rlist_->next_; if (rlist_) rlist_->prevp_ = &rlist_; wr->prevp_ = 0; wr->next_ = 0; bool code = wr->invalidate1(); if (code) { pthread_mutex_unlock(&mutex_); wr->invalidate2(); pthread_mutex_lock(&mutex_); } } pthread_mutex_unlock(&mutex_); hold_.wait(); } void RefTarget::freeWeakRef(WeakRef *wr) // notification from a WeakRef that it gets destroyed { // remove the ref from the list pthread_mutex_lock(&mutex_); if (wr->prevp_) { *wr->prevp_ = wr->next_; if (wr->next_) wr->next_->prevp_ = wr->prevp_; wr->prevp_ = 0; wr->next_ = 0; } pthread_mutex_unlock(&mutex_); } WeakRef::~WeakRef() { pthread_mutex_lock(&mutex_); RefTarget *t = target_; target_ = 0; if (t) t->hold_.getHold(); pthread_mutex_unlock(&mutex_); hold_.wait(); if (t) { t->freeWeakRef(this); t->hold_.releaseHold(); } pthread_mutex_destroy(&mutex_); } WeakRef *WeakRef::copy() // create a copy of this reference { WeakRef *nwr; grab(); pthread_mutex_lock(&mutex_); RefTarget *t = target_; pthread_mutex_unlock(&mutex_); if (t) nwr = t->newWeakRef(); else nwr = new WeakRef(0); release(); return nwr; } void *WeakRef::grab() // make the reference strong, return its current value { grabs_.getHold(); pthread_mutex_lock(&mutex_); void *owner = target_? target_->owner_ : 0; pthread_mutex_unlock(&mutex_); return owner; } void WeakRef::release() { // release the reference back to weak grabs_.releaseHold(); } WeakRef::WeakRef(RefTarget *target) : // called from copy() and from RefTarget next_(0), prevp_(0), target_(target) { pthread_mutex_init(&mutex_, NULL); } bool WeakRef::invalidate1() // called from RefTarget invalidation { pthread_mutex_lock(&mutex_); RefTarget *t = target_; target_ = 0; if (t) hold_.getHold(); pthread_mutex_unlock(&mutex_); return (t != 0); } void WeakRef::invalidate2() // called if invalidate1() returned true { grabs_.wait(); hold_.releaseHold(); } // ------------------------- test ------------------------- pthread_mutex_t mutex; // for deletion class Worker : public RefTarget { public: Worker() : RefTarget(this) { } ~Worker() { RefTarget::invalidate(); fprintf(stderr, "Worker: destroying %p\n", this); } static void *execute(void *arg) { Worker *w = (Worker *)arg; fprintf(stderr, "Worker: waiting %p\n", w); pthread_mutex_lock(&mutex); pthread_mutex_unlock(&mutex); delete w; } }; void * client1(void *arg) { WeakRef *ref = (WeakRef *)arg; fprintf(stderr, "client1: weak ref %p\n", ref); for (int i = 0; i < 5; i++) { void *w = ref->grab(); fprintf(stderr, "client1: got %p\n", w); sleep(1); fprintf(stderr, "client1: release\n"); ref->release(); sched_yield(); // Linux mutexes aren't honest... } delete ref; } void * client2(void *arg) { WeakRef *ref = (WeakRef *)arg; fprintf(stderr, "client2: weak ref %p\n", ref); fprintf(stderr, "client2: waiting\n"); pthread_mutex_lock(&mutex); pthread_mutex_unlock(&mutex); delete ref; fprintf(stderr, "client2: deleted ref\n"); } int main() { pthread_t tw, tc1, tc2; pthread_mutex_init(&mutex, NULL); Worker *w = new Worker; WeakRef *ref = w->newWeakRef(); pthread_mutex_lock(&mutex); pthread_create(&tw, NULL, Worker::execute, (void *)w); pthread_create(&tc1, NULL, client1, (void *)ref->copy()); pthread_create(&tc2, NULL, client2, (void *)ref->copy()); delete ref; sleep(3); pthread_mutex_unlock(&mutex); fprintf(stderr, "joining threads\n"); pthread_join(tw, NULL); pthread_join(tc1, NULL); pthread_join(tc2, NULL); return 0; } /* The test output may vary depending on the scheduling order but in general should be like: Worker: waiting 0x503010 client1: weak ref 0x503300 client1: got 0x503010 client2: weak ref 0x503540 client2: waiting client1: release client1: got 0x503010 client1: release client1: got 0x503010 joining threads client2: deleted ref client1: release Worker: destroying 0x503010 client1: got (nil) client1: release client1: got (nil) client1: release */
Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.