platform/include/pion/platform/ReactionScheduler.hpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #ifndef __PION_REACTIONSCHEDULER_HEADER__
00021 #define __PION_REACTIONSCHEDULER_HEADER__
00022 
00023 #include <vector>
00024 #include <boost/shared_ptr.hpp>
00025 #include <boost/function/function0.hpp>
00026 #include <boost/thread/mutex.hpp>
00027 #include <pion/PionConfig.hpp>
00028 #include <pion/PionScheduler.hpp>
00029 #include <pion/PionException.hpp>
00030 #include <pion/PionLockedQueue.hpp>
00031 
00032 
00033 namespace pion {        // begin namespace pion
00034 namespace platform {    // begin namespace platform (Pion Platform Library)
00035 
00039 class ReactionScheduler :
00040     public PionSingleServiceScheduler
00041 {
00042 public:
00043     
00045     class NoServiceException : public std::exception {
00046     public:
00047         virtual const char* what() const throw() {
00048             return "ReactionScheduler has no io_service available";
00049         }
00050     };
00051 
00052     
00054     ReactionScheduler(void) {}
00055     
00057     virtual ~ReactionScheduler() { shutdown(); }
00058     
00060     virtual void startup(void) {
00061         // lock mutex for thread safety
00062         boost::mutex::scoped_lock scheduler_lock(m_mutex);
00063         
00064         if (! m_is_running) {
00065             PION_LOG_INFO(m_logger, "Starting thread scheduler");
00066             m_is_running = true;
00067             
00068             // schedule a work item to make sure that the service doesn't complete
00069             m_service.reset();
00070             keepRunning(m_service, m_timer);
00071 
00072             // start a thread that will be used to handle io_service requests
00073             m_service_thread.reset(new boost::thread( boost::bind(&PionScheduler::processServiceWork,
00074                                                                   this, boost::ref(m_service)) ));
00075             
00076             // start multiple threads to handle async tasks
00077             for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
00078                 boost::shared_ptr<boost::thread> new_thread(new boost::thread(
00079                     boost::bind(&ReactionScheduler::processReactionQueue, this) ));
00080                 m_thread_pool.push_back(new_thread);
00081             }
00082         }
00083     }       
00084     
00086     virtual void stopThreads(void) {
00087         stopThreadInfo();
00088         PionSingleServiceScheduler::stopThreads();
00089         if (m_service_thread)
00090             m_service_thread->join();
00091         boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex);
00092         m_thread_info.clear();
00093     }
00094 
00096     virtual void finishThreads(void) {
00097         PionSingleServiceScheduler::finishThreads();
00098         m_thread_pool.clear();
00099         m_service_thread.reset();
00100     }
00101 
00107     virtual void post(boost::function0<void> work_func) {
00108         m_reaction_queue.push(work_func);
00109     }
00110     
00112     inline std::size_t getQueueSize(void) const { return m_reaction_queue.size(); }
00113 
00114     
00115 protected:
00116     
00118     typedef boost::function0<void>      Reaction;
00119 
00121     typedef PionLockedQueue<Reaction>   ReactionQueue;
00122 
00124     typedef boost::shared_ptr<ReactionQueue::ConsumerThread>    ThreadInfoPtr;
00125 
00127     typedef std::vector<ThreadInfoPtr>  ThreadInfoVector;
00128     
00129     
00131     ThreadInfoPtr getThreadInfo(void) {
00132         ThreadInfoPtr info_ptr(new ReactionQueue::ConsumerThread);
00133         boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex);
00134         m_thread_info.push_back(info_ptr);
00135         return info_ptr;
00136     }
00137 
00139     void stopThreadInfo(void) {
00140         boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex);
00141         for (ThreadInfoVector::iterator i = m_thread_info.begin();
00142             i != m_thread_info.end(); ++i)
00143         {
00144             (*i)->stop();
00145         }
00146     }
00147 
00149     void processReactionQueue(void) {
00150         Reaction r;
00151         ThreadInfoPtr info_ptr(getThreadInfo());
00152         while (m_is_running) {
00153             try {
00154                 while (m_is_running) {
00155                     if (m_reaction_queue.pop(r, *info_ptr) && m_is_running)
00156                         r();
00157                 }
00158             } catch (std::exception& e) {
00159                 PION_LOG_ERROR(m_logger, e.what());
00160             } catch (...) {
00161                 PION_LOG_ERROR(m_logger, "caught unrecognized exception");
00162             }
00163         }
00164     }       
00165     
00166     
00168     ReactionQueue                       m_reaction_queue;
00169 
00171     boost::shared_ptr<boost::thread>    m_service_thread;
00172 
00174     ThreadInfoVector                    m_thread_info;
00175 
00177     boost::mutex                        m_thread_info_mutex;
00178 };
00179     
00180     
00181 }   // end namespace platform
00182 }   // end namespace pion
00183 
00184 #endif

Generated on Tue Aug 10 12:20:32 2010 for pion-platform by  doxygen 1.4.7