platform/include/pion/platform/ReactionScheduler.hpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #ifndef __PION_REACTIONSCHEDULER_HEADER__
00021 #define __PION_REACTIONSCHEDULER_HEADER__
00022 
00023 #include <vector>
00024 #include <boost/shared_ptr.hpp>
00025 #include <boost/function/function0.hpp>
00026 #include <boost/thread/mutex.hpp>
00027 #include <pion/PionConfig.hpp>
00028 #include <pion/PionScheduler.hpp>
00029 #include <pion/PionException.hpp>
00030 #include <pion/PionLockedQueue.hpp>
00031 
00032 
00033 namespace pion {        // begin namespace pion
00034 namespace platform {    // begin namespace platform (Pion Platform Library)
00035 
00039 class ReactionScheduler :
00040     public PionSingleServiceScheduler
00041 {
00042 public:
00043     
00045     class NoServiceException : public std::exception {
00046     public:
00047         virtual const char* what() const throw() {
00048             return "ReactionScheduler has no io_service available";
00049         }
00050     };
00051 
00052     
00054     ReactionScheduler(void) {}
00055     
00057     virtual ~ReactionScheduler() { shutdown(); }
00058     
00060     virtual void startup(void) {
00061         // lock mutex for thread safety
00062         boost::mutex::scoped_lock scheduler_lock(m_mutex);
00063         
00064         if (! m_is_running) {
00065             PION_LOG_INFO(m_logger, "Starting thread scheduler");
00066             m_is_running = true;
00067             
00068             // schedule a work item to make sure that the service doesn't complete
00069             m_service.reset();
00070             keepRunning(m_service, m_timer);
00071 
00072             // start a thread that will be used to handle io_service requests
00073             m_service_thread.reset(new boost::thread( boost::bind(&PionScheduler::processServiceWork,
00074                                                                   this, boost::ref(m_service)) ));
00075             
00076             // start multiple threads to handle async tasks
00077             for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
00078                 boost::shared_ptr<boost::thread> new_thread(new boost::thread(
00079                     boost::bind(&ReactionScheduler::processReactionQueue, this) ));
00080                 m_thread_pool.push_back(new_thread);
00081             }
00082         }
00083     }       
00084     
00086     virtual void stopThreads(void) {
00087         stopThreadInfo();
00088         PionSingleServiceScheduler::stopThreads();
00089         if (m_service_thread)
00090             m_service_thread->join();
00091         boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex);
00092         m_thread_info.clear();
00093     }
00094 
00096     virtual void finishThreads(void) {
00097         PionSingleServiceScheduler::finishThreads();
00098         m_thread_pool.clear();
00099         m_service_thread.reset();
00100     }
00101 
00107     virtual void post(boost::function0<void> work_func) {
00108         m_reaction_queue.push(work_func);
00109     }
00110     
00112     inline std::size_t getQueueSize(void) const { return m_reaction_queue.size(); }
00113 
00114     
00115 protected:
00116     
00118     typedef boost::function0<void>      Reaction;
00119 
00121     typedef PionLockedQueue<Reaction>   ReactionQueue;
00122 
00124     typedef boost::shared_ptr<ReactionQueue::ConsumerThread>    ThreadInfoPtr;
00125 
00127     typedef std::vector<ThreadInfoPtr>  ThreadInfoVector;
00128     
00129     
00131     ThreadInfoPtr getThreadInfo(void) {
00132         // configure consumer thread to wakeup every second
00133         ThreadInfoPtr info_ptr(new ReactionQueue::ConsumerThread(boost::posix_time::seconds(1)));
00134         boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex);
00135         m_thread_info.push_back(info_ptr);
00136         return info_ptr;
00137     }
00138 
00140     void stopThreadInfo(void) {
00141         boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex);
00142         for (ThreadInfoVector::iterator i = m_thread_info.begin();
00143             i != m_thread_info.end(); ++i)
00144         {
00145             (*i)->stop();
00146         }
00147     }
00148 
00150     void processReactionQueue(void) {
00151         // retrieve reference to the thread's Event allocator
00152         EventFactory event_factory;
00153         EventAllocator& event_alloc(event_factory.getAllocator());
00154 
00155         // initialize consumer thread object
00156         ThreadInfoPtr info_ptr(getThreadInfo());
00157         
00158         // used to pop work off the queue
00159         Reaction r;
00160 
00161         while (m_is_running) {
00162             try {
00163                 while (m_is_running) {
00164                     if (m_reaction_queue.pop(r, *info_ptr) && m_is_running) {
00165                         // got new work for the queue
00166                         r();
00167                     } else {
00168                         // thread slept for 1 seconds with no new work
00169                         // release any ununsed memory in Event allocator
00170 #ifdef PION_EVENT_USE_POOL_ALLOCATORS
00171                         PION_LOG_DEBUG(m_logger, "Releasing unused memory for idle ReactionEngine thread");
00172                         event_alloc.release_memory();
00173 #endif
00174                     }
00175                 }
00176             } catch (std::exception& e) {
00177                 PION_LOG_ERROR(m_logger, e.what());
00178             } catch (...) {
00179                 PION_LOG_ERROR(m_logger, "caught unrecognized exception");
00180             }
00181         }
00182     }       
00183     
00184     
00186     ReactionQueue                       m_reaction_queue;
00187 
00189     boost::shared_ptr<boost::thread>    m_service_thread;
00190 
00192     ThreadInfoVector                    m_thread_info;
00193 
00195     boost::mutex                        m_thread_info_mutex;
00196 };
00197     
00198     
00199 }   // end namespace platform
00200 }   // end namespace pion
00201 
00202 #endif

Generated on Wed Apr 13 16:38:34 2011 for pion-platform by  doxygen 1.4.7