00001 // ------------------------------------------------------------------------ 00002 // Pion is a development platform for building Reactors that process Events 00003 // ------------------------------------------------------------------------ 00004 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com) 00005 // 00006 // Pion is free software: you can redistribute it and/or modify it under the 00007 // terms of the GNU Affero General Public License as published by the Free 00008 // Software Foundation, either version 3 of the License, or (at your option) 00009 // any later version. 00010 // 00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY 00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 00013 // FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for 00014 // more details. 00015 // 00016 // You should have received a copy of the GNU Affero General Public License 00017 // along with Pion. If not, see <http://www.gnu.org/licenses/>. 00018 // 00019 00020 #ifndef __PION_REACTIONSCHEDULER_HEADER__ 00021 #define __PION_REACTIONSCHEDULER_HEADER__ 00022 00023 #include <vector> 00024 #include <boost/shared_ptr.hpp> 00025 #include <boost/function/function0.hpp> 00026 #include <boost/thread/mutex.hpp> 00027 #include <pion/PionConfig.hpp> 00028 #include <pion/PionScheduler.hpp> 00029 #include <pion/PionException.hpp> 00030 #include <pion/PionLockedQueue.hpp> 00031 00032 00033 namespace pion { // begin namespace pion 00034 namespace platform { // begin namespace platform (Pion Platform Library) 00035 00039 class ReactionScheduler : 00040 public PionSingleServiceScheduler 00041 { 00042 public: 00043 00045 class NoServiceException : public std::exception { 00046 public: 00047 virtual const char* what() const throw() { 00048 return "ReactionScheduler has no io_service available"; 00049 } 00050 }; 00051 00052 00054 ReactionScheduler(void) {} 00055 00057 virtual ~ReactionScheduler() { shutdown(); } 00058 00060 virtual void startup(void) { 00061 // lock mutex for thread safety 00062 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00063 00064 if (! m_is_running) { 00065 PION_LOG_INFO(m_logger, "Starting thread scheduler"); 00066 m_is_running = true; 00067 00068 // schedule a work item to make sure that the service doesn't complete 00069 m_service.reset(); 00070 keepRunning(m_service, m_timer); 00071 00072 // start a thread that will be used to handle io_service requests 00073 m_service_thread.reset(new boost::thread( boost::bind(&PionScheduler::processServiceWork, 00074 this, boost::ref(m_service)) )); 00075 00076 // start multiple threads to handle async tasks 00077 for (boost::uint32_t n = 0; n < m_num_threads; ++n) { 00078 boost::shared_ptr<boost::thread> new_thread(new boost::thread( 00079 boost::bind(&ReactionScheduler::processReactionQueue, this) )); 00080 m_thread_pool.push_back(new_thread); 00081 } 00082 } 00083 } 00084 00086 virtual void stopThreads(void) { 00087 stopThreadInfo(); 00088 PionSingleServiceScheduler::stopThreads(); 00089 if (m_service_thread) 00090 m_service_thread->join(); 00091 boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex); 00092 m_thread_info.clear(); 00093 } 00094 00096 virtual void finishThreads(void) { 00097 PionSingleServiceScheduler::finishThreads(); 00098 m_thread_pool.clear(); 00099 m_service_thread.reset(); 00100 } 00101 00107 virtual void post(boost::function0<void> work_func) { 00108 m_reaction_queue.push(work_func); 00109 } 00110 00112 inline std::size_t getQueueSize(void) const { return m_reaction_queue.size(); } 00113 00114 00115 protected: 00116 00118 typedef boost::function0<void> Reaction; 00119 00121 typedef PionLockedQueue<Reaction> ReactionQueue; 00122 00124 typedef boost::shared_ptr<ReactionQueue::ConsumerThread> ThreadInfoPtr; 00125 00127 typedef std::vector<ThreadInfoPtr> ThreadInfoVector; 00128 00129 00131 ThreadInfoPtr getThreadInfo(void) { 00132 ThreadInfoPtr info_ptr(new ReactionQueue::ConsumerThread); 00133 boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex); 00134 m_thread_info.push_back(info_ptr); 00135 return info_ptr; 00136 } 00137 00139 void stopThreadInfo(void) { 00140 boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex); 00141 for (ThreadInfoVector::iterator i = m_thread_info.begin(); 00142 i != m_thread_info.end(); ++i) 00143 { 00144 (*i)->stop(); 00145 } 00146 } 00147 00149 void processReactionQueue(void) { 00150 Reaction r; 00151 ThreadInfoPtr info_ptr(getThreadInfo()); 00152 while (m_is_running) { 00153 try { 00154 while (m_is_running) { 00155 if (m_reaction_queue.pop(r, *info_ptr) && m_is_running) 00156 r(); 00157 } 00158 } catch (std::exception& e) { 00159 PION_LOG_ERROR(m_logger, e.what()); 00160 } catch (...) { 00161 PION_LOG_ERROR(m_logger, "caught unrecognized exception"); 00162 } 00163 } 00164 } 00165 00166 00168 ReactionQueue m_reaction_queue; 00169 00171 boost::shared_ptr<boost::thread> m_service_thread; 00172 00174 ThreadInfoVector m_thread_info; 00175 00177 boost::mutex m_thread_info_mutex; 00178 }; 00179 00180 00181 } // end namespace platform 00182 } // end namespace pion 00183 00184 #endif
1.4.7