00001 // ------------------------------------------------------------------------ 00002 // Pion is a development platform for building Reactors that process Events 00003 // ------------------------------------------------------------------------ 00004 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com) 00005 // 00006 // Pion is free software: you can redistribute it and/or modify it under the 00007 // terms of the GNU Affero General Public License as published by the Free 00008 // Software Foundation, either version 3 of the License, or (at your option) 00009 // any later version. 00010 // 00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY 00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 00013 // FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for 00014 // more details. 00015 // 00016 // You should have received a copy of the GNU Affero General Public License 00017 // along with Pion. If not, see <http://www.gnu.org/licenses/>. 00018 // 00019 00020 #ifndef __PION_REACTIONSCHEDULER_HEADER__ 00021 #define __PION_REACTIONSCHEDULER_HEADER__ 00022 00023 #include <vector> 00024 #include <boost/shared_ptr.hpp> 00025 #include <boost/thread/mutex.hpp> 00026 #include <pion/PionConfig.hpp> 00027 #include <pion/PionScheduler.hpp> 00028 #include <pion/PionException.hpp> 00029 #include <pion/PionLockedQueue.hpp> 00030 00031 00032 namespace pion { // begin namespace pion 00033 namespace platform { // begin namespace platform (Pion Platform Library) 00034 00038 class ReactionScheduler : 00039 public PionSingleServiceScheduler 00040 { 00041 public: 00042 00044 class NoServiceException : public std::exception { 00045 public: 00046 virtual const char* what() const throw() { 00047 return "ReactionScheduler has no io_service available"; 00048 } 00049 }; 00050 00051 00053 ReactionScheduler(void) {} 00054 00056 virtual ~ReactionScheduler() { shutdown(); } 00057 00059 virtual void startup(void) { 00060 // lock mutex for thread safety 00061 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00062 00063 if (! m_is_running) { 00064 PION_LOG_INFO(m_logger, "Starting thread scheduler"); 00065 m_is_running = true; 00066 00067 // schedule a work item to make sure that the service doesn't complete 00068 m_service.reset(); 00069 keepRunning(m_service, m_timer); 00070 00071 // start a thread that will be used to handle io_service requests 00072 m_service_thread.reset(new boost::thread( boost::bind(&PionScheduler::processServiceWork, 00073 this, boost::ref(m_service)) )); 00074 00075 // start multiple threads to handle async tasks 00076 for (boost::uint32_t n = 0; n < m_num_threads; ++n) { 00077 boost::shared_ptr<boost::thread> new_thread(new boost::thread( 00078 boost::bind(&ReactionScheduler::processReactionQueue, this) )); 00079 m_thread_pool.push_back(new_thread); 00080 } 00081 } 00082 } 00083 00085 virtual void stopThreads(void) { 00086 stopThreadInfo(); 00087 PionSingleServiceScheduler::stopThreads(); 00088 if (m_service_thread) 00089 m_service_thread->join(); 00090 boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex); 00091 m_thread_info.clear(); 00092 } 00093 00095 virtual void finishThreads(void) { 00096 PionSingleServiceScheduler::finishThreads(); 00097 m_thread_pool.clear(); 00098 m_service_thread.reset(); 00099 } 00100 00106 virtual void post(boost::function0<void> work_func) { 00107 m_reaction_queue.push(work_func); 00108 } 00109 00110 00111 protected: 00112 00114 typedef boost::function0<void> Reaction; 00115 00117 typedef PionLockedQueue<Reaction> ReactionQueue; 00118 00120 typedef boost::shared_ptr<ReactionQueue::ConsumerThread> ThreadInfoPtr; 00121 00123 typedef std::vector<ThreadInfoPtr> ThreadInfoVector; 00124 00125 00127 ThreadInfoPtr getThreadInfo(void) { 00128 ThreadInfoPtr info_ptr(new ReactionQueue::ConsumerThread); 00129 boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex); 00130 m_thread_info.push_back(info_ptr); 00131 return info_ptr; 00132 } 00133 00135 void stopThreadInfo(void) { 00136 boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex); 00137 for (ThreadInfoVector::iterator i = m_thread_info.begin(); 00138 i != m_thread_info.end(); ++i) 00139 { 00140 (*i)->stop(); 00141 } 00142 } 00143 00145 void processReactionQueue(void) { 00146 Reaction r; 00147 ThreadInfoPtr info_ptr(getThreadInfo()); 00148 while (m_is_running) { 00149 try { 00150 while (m_is_running) { 00151 if (m_reaction_queue.pop(r, *info_ptr) && m_is_running) 00152 r(); 00153 } 00154 } catch (std::exception& e) { 00155 PION_LOG_ERROR(m_logger, e.what()); 00156 } catch (...) { 00157 PION_LOG_ERROR(m_logger, "caught unrecognized exception"); 00158 } 00159 } 00160 } 00161 00162 00164 ReactionQueue m_reaction_queue; 00165 00167 boost::shared_ptr<boost::thread> m_service_thread; 00168 00170 ThreadInfoVector m_thread_info; 00171 00173 boost::mutex m_thread_info_mutex; 00174 }; 00175 00176 00177 } // end namespace platform 00178 } // end namespace pion 00179 00180 #endif
1.4.7