platform/include/pion/platform/ReactionScheduler.hpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #ifndef __PION_REACTIONSCHEDULER_HEADER__
00021 #define __PION_REACTIONSCHEDULER_HEADER__
00022 
00023 #include <vector>
00024 #include <boost/shared_ptr.hpp>
00025 #include <boost/thread/mutex.hpp>
00026 #include <pion/PionConfig.hpp>
00027 #include <pion/PionScheduler.hpp>
00028 #include <pion/PionException.hpp>
00029 #include <pion/PionLockedQueue.hpp>
00030 
00031 
00032 namespace pion {        // begin namespace pion
00033 namespace platform {    // begin namespace platform (Pion Platform Library)
00034 
00038 class ReactionScheduler :
00039     public PionSingleServiceScheduler
00040 {
00041 public:
00042     
00044     class NoServiceException : public std::exception {
00045     public:
00046         virtual const char* what() const throw() {
00047             return "ReactionScheduler has no io_service available";
00048         }
00049     };
00050 
00051     
00053     ReactionScheduler(void) {}
00054     
00056     virtual ~ReactionScheduler() { shutdown(); }
00057     
00059     virtual void startup(void) {
00060         // lock mutex for thread safety
00061         boost::mutex::scoped_lock scheduler_lock(m_mutex);
00062         
00063         if (! m_is_running) {
00064             PION_LOG_INFO(m_logger, "Starting thread scheduler");
00065             m_is_running = true;
00066             
00067             // schedule a work item to make sure that the service doesn't complete
00068             m_service.reset();
00069             keepRunning(m_service, m_timer);
00070 
00071             // start a thread that will be used to handle io_service requests
00072             m_service_thread.reset(new boost::thread( boost::bind(&PionScheduler::processServiceWork,
00073                                                                   this, boost::ref(m_service)) ));
00074             
00075             // start multiple threads to handle async tasks
00076             for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
00077                 boost::shared_ptr<boost::thread> new_thread(new boost::thread(
00078                     boost::bind(&ReactionScheduler::processReactionQueue, this) ));
00079                 m_thread_pool.push_back(new_thread);
00080             }
00081         }
00082     }       
00083     
00085     virtual void stopThreads(void) {
00086         stopThreadInfo();
00087         PionSingleServiceScheduler::stopThreads();
00088         if (m_service_thread)
00089             m_service_thread->join();
00090         boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex);
00091         m_thread_info.clear();
00092     }
00093 
00095     virtual void finishThreads(void) {
00096         PionSingleServiceScheduler::finishThreads();
00097         m_thread_pool.clear();
00098         m_service_thread.reset();
00099     }
00100 
00106     virtual void post(boost::function0<void> work_func) {
00107         m_reaction_queue.push(work_func);
00108     }
00109 
00110     
00111 protected:
00112     
00114     typedef boost::function0<void>      Reaction;
00115 
00117     typedef PionLockedQueue<Reaction>   ReactionQueue;
00118 
00120     typedef boost::shared_ptr<ReactionQueue::ConsumerThread>    ThreadInfoPtr;
00121 
00123     typedef std::vector<ThreadInfoPtr>  ThreadInfoVector;
00124     
00125     
00127     ThreadInfoPtr getThreadInfo(void) {
00128         ThreadInfoPtr info_ptr(new ReactionQueue::ConsumerThread);
00129         boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex);
00130         m_thread_info.push_back(info_ptr);
00131         return info_ptr;
00132     }
00133 
00135     void stopThreadInfo(void) {
00136         boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex);
00137         for (ThreadInfoVector::iterator i = m_thread_info.begin();
00138             i != m_thread_info.end(); ++i)
00139         {
00140             (*i)->stop();
00141         }
00142     }
00143 
00145     void processReactionQueue(void) {
00146         Reaction r;
00147         ThreadInfoPtr info_ptr(getThreadInfo());
00148         while (m_is_running) {
00149             try {
00150                 while (m_is_running) {
00151                     if (m_reaction_queue.pop(r, *info_ptr) && m_is_running)
00152                         r();
00153                 }
00154             } catch (std::exception& e) {
00155                 PION_LOG_ERROR(m_logger, e.what());
00156             } catch (...) {
00157                 PION_LOG_ERROR(m_logger, "caught unrecognized exception");
00158             }
00159         }
00160     }       
00161     
00162     
00164     ReactionQueue                       m_reaction_queue;
00165 
00167     boost::shared_ptr<boost::thread>    m_service_thread;
00168 
00170     ThreadInfoVector                    m_thread_info;
00171 
00173     boost::mutex                        m_thread_info_mutex;
00174 };
00175     
00176     
00177 }   // end namespace platform
00178 }   // end namespace pion
00179 
00180 #endif

Generated on Mon Jun 29 13:54:11 2009 for pion-platform by  doxygen 1.4.7