Active Object and "message pump"

This time I'll reference Herb Sutter article http://drdobbs.com/high-performance-computing/225700095  about implementation of higher-level abstractions and idioms using Active Object. In this example active object acts as a message pump. It enqueues all calls/messages from other thread to a container and process one by one. So from a caller/client view, there is no need to wait for return result. The caller can process other tasks. To achieve concurrency semantics we make following requirements for such active object:

  • active object should wait for incoming calls/messages as long as needed. Client decides when to stop message pump explicitly by calling appropriate method, or when active object is leaving the scope.
  • active object's thread should not waste resources waiting for messages. The solution would be to wait for an event triggered by another thread. Such mechanism can be implemented using condition variable.
Implementation of such active object is shown bellow. Based on mentioned requirements source code differs from code proposed in the article.
Use c++ compiler which support C++0x standard. (g++ -o test test.cpp -std=c++0x -lpthread)

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

using std::shared_ptr;
using std::string;
using std::vector;

template< typename T >
class Queue 
{
    private:
        mutable std::mutex mutex_;
        std::queue< T > queue_;
        std::condition_variable condition_;
        
    public:
        void push(T value)
        {
            std::lock_guard lk(mutex_);
            queue_.push(value);
            condition_.notify_one();
        }
        
        void pop(T& value)
        {
            std::unique_lock lk(mutex_);
            condition_.wait(lk, [&](){ return !this->queue_.empty(); });
            value = queue_.front();
            queue_.pop();
        }
};

class ActiveObject 
{
    public:
        class Message
        {
            public:
                virtual ~Message() {}
                virtual bool execute() {  return false; }
        };

        explicit ActiveObject()
        {
            thread_ = shared_ptr(
new std::thread(std::bind(&ActiveObject::run, this)));
        }

        ~ActiveObject()
        {
            send( shared_ptr< Message >(new Message) );
            thread_->join();
        }
        
        ActiveObject(const ActiveObject& obj) = delete;
        ActiveObject& operator=(const ActiveObject& rhs) = delete;      

        void send( shared_ptr< Message > msg )
        {
            mq_.push(msg);
        }

    private:
        Queue< shared_ptr< Message > > mq_;
        shared_ptr thread_;

        void run()
        {
            shared_ptr< Message > msg;
            while(true)
            {
                mq_.pop(msg);
                if (!msg->execute())
                     break;
            }
        }
};

class Backgrounder
{
    private:
        class MessagePrint : public ActiveObject::Message
        {
            Backgrounder* this_;
            const vector& data_;
            public:
                MessagePrint(Backgrounder* b, const vector& d) : this_(b), data_(d) {}
                bool execute()
                {
                    std::ostream_iterator it_out(std::cout, " ");
                    std::copy(data_.begin(), data_.end(), it_out);
                    std::cout << std::endl;
                    return true;
                }
        };
        class MessageSave : public ActiveObject::Message
        {
            Backgrounder* this_;
            string filename_;
            const vector& data_;
            public:
                MessageSave(Backgrounder* b, const string& f, const vector& d) : this_(b), filename_(f), data_(d) {}
                bool execute()
                {
                    std::ofstream out(filename_.c_str(), std::ios::out);  
                    std::ostream_iterator it_out(out, " ");
                    std::copy(data_.begin(), data_.end(), it_out);
                    out.close();
                    return true;
                }
        };

        ActiveObject active_;

    public:
        void save(const string& filename, const vector& data)
        {
            shared_ptr< MessageSave > msgSave(new MessageSave(this, filename, data));
            active_.send(msgSave);
        }
        void print(const vector& data)
        {
            shared_ptr< MessagePrint > msgPrint(new MessagePrint(this, data));
            active_.send(msgPrint);
        }
        void done()
        {
            shared_ptr< ActiveObject::Message > msgDone(new ActiveObject::Message);
            active_.send(msgDone);
        }
};

int main()
{
    double data[10] = { 1,2,3,4,5,6,7,8,9,10 };
    std::vector coll(data, data+10);

    Backgrounder bwork;
    
    bwork.save("test.txt", coll);
    bwork.print(coll);    
    bwork.done(); 
    bwork.print(coll);

    std::cout << "main thread!" << std::endl;

    return 0;
}

thread wakes up from its slumber when queue in not empty.

Active Object, part 2

Active object is a good example of wrapping "naked thread". From the caller point of view, it gives asynchronous functionality.
This time I will try to implement active object using boost future library. As in previous example, part 1, client is free to handle some task (let's say processing GUI events), while active object performing heavy duty task in a background. Boost Futures can be understand as one-off events. Client can poll the future to see if the event has occurred. Another important feature of using futures, that future may have data associated with it.
Bellow is C++ code snippet, which implements active object with Future:

#include 
#include 
#include 
#include 
#include 
#include 

class Message
{
    std::string message_; 
    public:
        explicit Message(const std::string& name) : message_(name) {} 
        std::string report() const
        {
            return message_;
        }
            
};

class ActiveObject : boost::noncopyable
{
    public:
        ActiveObject() 
        {
        }

        virtual ~ActiveObject()
        {
            std::cout << "destroying active object" << std::endl;
        }
        
        boost::shared_future getMessageFromRemote(int messageId)
        {
            boost::packaged_task task(boost::bind(&ActiveObject::someMesssage, this, messageId));            
            boost::shared_future res(task.get_future());

            boost::thread(boost::move(task));
            return res;
        }

    private:
        Message someMesssage(int messageId)
        {
            if (messageId == 1) 
                return Message("first");
            else if (messageId == 2)
                return Message("second");
            else
                return Message("empty");
        }
};

int main()
{
    ActiveObject ao;
    
    int id(1);
    boost::shared_future fmessage = ao.getMessageFromRemote(id);
    while (!fmessage.is_ready())
    {
        std::cout << "+";
    }

    Message message = fmessage.get();
    std::cout << message.report() << std::endl;

    return 0;
}

happy coding!

Active Object pattern using boost::asio

Today I was searching on the Internet for some Active Object pattern examples. And I found blog by Dean Berris:
http://cplusplus-soup.com/2006/12/06/boost-asio-and-patterns/

so I decided to make modifications and release code as working example.

To put it simple: active object owns its own private thread, and runs all of its work on that private thread.
What we gain from constructing such an object? First a caller can invoke some object methods and these methods are nonblocking: callers return immediately. In other words - asynchronous calls. Main thread at the same time can proceed, for example, with GUI event handling. When active object's thread, working in background, is done with heavy duty task, it just join main thread, before leaving scope.
C++ snippet of working example is shown bellow (tested with g++ 4.4):

#include    
#include    
#include    
#include    
#include    
#include    
#include    
#include    
#include    
 
class ActiveObject : boost::noncopyable
{
    public:
        ActiveObject() : work_(service_)
        {
            executionThread_.reset( new boost::thread(boost::bind(&boost::asio::io_service::run, &service_)) );
        }
 
        virtual ~ActiveObject()
        { 
            service_.poll();
            service_.stop();
            executionThread_->join();
            std::cout << "destroying active object" << std::endl;
        }
 
        void doSomething(const std::string& workName)
        {
            service_.post( boost::bind(&ActiveObject::someImpl, this, workName) );
        }
        
    protected:
        boost::asio::io_service service_;
    
    private:
        boost::asio::io_service::work work_;
        boost::shared_ptr executionThread_;
        boost::mutex mutex_;
 
        void someImpl(const std::string& name)
        {
            boost::lock_guard lk(mutex_);
            std::cout << "Doing work: " << name << std::endl; 
        }
};
 
int main()
{
    boost::shared_ptr pobj = boost::shared_ptr< ActiveObject >(new ActiveObject());
    
    pobj->doSomething("first");
    pobj->doSomething("second");
    
    std::cout << "end of main thread: " << std::endl;
    return 0;
}
or http://codepaste.net/n12fd4
happy coding!