Active Object and "message pump"

This time I'll reference Herb Sutter article http://drdobbs.com/high-performance-computing/225700095  about implementation of higher-level abstractions and idioms using Active Object. In this example active object acts as a message pump. It enqueues all calls/messages from other thread to a container and process one by one. So from a caller/client view, there is no need to wait for return result. The caller can process other tasks. To achieve concurrency semantics we make following requirements for such active object:

  • active object should wait for incoming calls/messages as long as needed. Client decides when to stop message pump explicitly by calling appropriate method, or when active object is leaving the scope.
  • active object's thread should not waste resources waiting for messages. The solution would be to wait for an event triggered by another thread. Such mechanism can be implemented using condition variable.
Implementation of such active object is shown bellow. Based on mentioned requirements source code differs from code proposed in the article.
Use c++ compiler which support C++0x standard. (g++ -o test test.cpp -std=c++0x -lpthread)

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

using std::shared_ptr;
using std::string;
using std::vector;

template< typename T >
class Queue 
{
    private:
        mutable std::mutex mutex_;
        std::queue< T > queue_;
        std::condition_variable condition_;
        
    public:
        void push(T value)
        {
            std::lock_guard lk(mutex_);
            queue_.push(value);
            condition_.notify_one();
        }
        
        void pop(T& value)
        {
            std::unique_lock lk(mutex_);
            condition_.wait(lk, [&](){ return !this->queue_.empty(); });
            value = queue_.front();
            queue_.pop();
        }
};

class ActiveObject 
{
    public:
        class Message
        {
            public:
                virtual ~Message() {}
                virtual bool execute() {  return false; }
        };

        explicit ActiveObject()
        {
            thread_ = shared_ptr(
new std::thread(std::bind(&ActiveObject::run, this)));
        }

        ~ActiveObject()
        {
            send( shared_ptr< Message >(new Message) );
            thread_->join();
        }
        
        ActiveObject(const ActiveObject& obj) = delete;
        ActiveObject& operator=(const ActiveObject& rhs) = delete;      

        void send( shared_ptr< Message > msg )
        {
            mq_.push(msg);
        }

    private:
        Queue< shared_ptr< Message > > mq_;
        shared_ptr thread_;

        void run()
        {
            shared_ptr< Message > msg;
            while(true)
            {
                mq_.pop(msg);
                if (!msg->execute())
                     break;
            }
        }
};

class Backgrounder
{
    private:
        class MessagePrint : public ActiveObject::Message
        {
            Backgrounder* this_;
            const vector& data_;
            public:
                MessagePrint(Backgrounder* b, const vector& d) : this_(b), data_(d) {}
                bool execute()
                {
                    std::ostream_iterator it_out(std::cout, " ");
                    std::copy(data_.begin(), data_.end(), it_out);
                    std::cout << std::endl;
                    return true;
                }
        };
        class MessageSave : public ActiveObject::Message
        {
            Backgrounder* this_;
            string filename_;
            const vector& data_;
            public:
                MessageSave(Backgrounder* b, const string& f, const vector& d) : this_(b), filename_(f), data_(d) {}
                bool execute()
                {
                    std::ofstream out(filename_.c_str(), std::ios::out);  
                    std::ostream_iterator it_out(out, " ");
                    std::copy(data_.begin(), data_.end(), it_out);
                    out.close();
                    return true;
                }
        };

        ActiveObject active_;

    public:
        void save(const string& filename, const vector& data)
        {
            shared_ptr< MessageSave > msgSave(new MessageSave(this, filename, data));
            active_.send(msgSave);
        }
        void print(const vector& data)
        {
            shared_ptr< MessagePrint > msgPrint(new MessagePrint(this, data));
            active_.send(msgPrint);
        }
        void done()
        {
            shared_ptr< ActiveObject::Message > msgDone(new ActiveObject::Message);
            active_.send(msgDone);
        }
};

int main()
{
    double data[10] = { 1,2,3,4,5,6,7,8,9,10 };
    std::vector coll(data, data+10);

    Backgrounder bwork;
    
    bwork.save("test.txt", coll);
    bwork.print(coll);    
    bwork.done(); 
    bwork.print(coll);

    std::cout << "main thread!" << std::endl;

    return 0;
}

thread wakes up from its slumber when queue in not empty.

2 Responses so far.

  1. Unknown says:
    This comment has been removed by the author.
  2. Unknown says:

    May I give a small enhancement of this code snippet.
    Instead of using Class backgrounder with message object such as MessageSave to insert into the queue. It might be better to use lambda functions which captures the neccessary objects and returns bool for the while loop. Example:

    The kill message would look something like:
    mq_.push( []() { return false; } );

    The queue would be of type:


    typedef std::function< bool () > FunT;
    typedef Util::Queue< FunT > FunctionMsgQueueT;




    Another might be
    auto saveMsg = [this]()-> bool
    {
    std::ostream ..........
    .
    .
    return true;
    }


    The while loop can the extract the function directly and execute it.
    This way you dont need to create new shared pointers and such and keeping down the amount of Classes.

    Just a suggestion since you were kind enough to share this quite good snippet of code.

    Cheers