00001 // ------------------------------------------------------------------------ 00002 // Pion is a development platform for building Reactors that process Events 00003 // ------------------------------------------------------------------------ 00004 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com) 00005 // 00006 // Pion is free software: you can redistribute it and/or modify it under the 00007 // terms of the GNU Affero General Public License as published by the Free 00008 // Software Foundation, either version 3 of the License, or (at your option) 00009 // any later version. 00010 // 00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY 00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 00013 // FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for 00014 // more details. 00015 // 00016 // You should have received a copy of the GNU Affero General Public License 00017 // along with Pion. If not, see <http://www.gnu.org/licenses/>. 00018 // 00019 00020 #ifndef __PION_REACTIONSCHEDULER_HEADER__ 00021 #define __PION_REACTIONSCHEDULER_HEADER__ 00022 00023 #include <vector> 00024 #include <boost/shared_ptr.hpp> 00025 #include <boost/function/function0.hpp> 00026 #include <boost/thread/mutex.hpp> 00027 #include <pion/PionConfig.hpp> 00028 #include <pion/PionScheduler.hpp> 00029 #include <pion/PionException.hpp> 00030 #include <pion/PionLockedQueue.hpp> 00031 00032 00033 namespace pion { // begin namespace pion 00034 namespace platform { // begin namespace platform (Pion Platform Library) 00035 00039 class ReactionScheduler : 00040 public PionSingleServiceScheduler 00041 { 00042 public: 00043 00045 class NoServiceException : public std::exception { 00046 public: 00047 virtual const char* what() const throw() { 00048 return "ReactionScheduler has no io_service available"; 00049 } 00050 }; 00051 00052 00054 ReactionScheduler(void) {} 00055 00057 virtual ~ReactionScheduler() { shutdown(); } 00058 00060 virtual void startup(void) { 00061 // lock mutex for thread safety 00062 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00063 00064 if (! m_is_running) { 00065 PION_LOG_INFO(m_logger, "Starting thread scheduler"); 00066 m_is_running = true; 00067 00068 // schedule a work item to make sure that the service doesn't complete 00069 m_service.reset(); 00070 keepRunning(m_service, m_timer); 00071 00072 // start a thread that will be used to handle io_service requests 00073 m_service_thread.reset(new boost::thread( boost::bind(&PionScheduler::processServiceWork, 00074 this, boost::ref(m_service)) )); 00075 00076 // start multiple threads to handle async tasks 00077 for (boost::uint32_t n = 0; n < m_num_threads; ++n) { 00078 boost::shared_ptr<boost::thread> new_thread(new boost::thread( 00079 boost::bind(&ReactionScheduler::processReactionQueue, this) )); 00080 m_thread_pool.push_back(new_thread); 00081 } 00082 } 00083 } 00084 00086 virtual void stopThreads(void) { 00087 stopThreadInfo(); 00088 PionSingleServiceScheduler::stopThreads(); 00089 if (m_service_thread) 00090 m_service_thread->join(); 00091 boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex); 00092 m_thread_info.clear(); 00093 } 00094 00096 virtual void finishThreads(void) { 00097 PionSingleServiceScheduler::finishThreads(); 00098 m_thread_pool.clear(); 00099 m_service_thread.reset(); 00100 } 00101 00107 virtual void post(boost::function0<void> work_func) { 00108 m_reaction_queue.push(work_func); 00109 } 00110 00112 inline std::size_t getQueueSize(void) const { return m_reaction_queue.size(); } 00113 00114 00115 protected: 00116 00118 typedef boost::function0<void> Reaction; 00119 00121 typedef PionLockedQueue<Reaction> ReactionQueue; 00122 00124 typedef boost::shared_ptr<ReactionQueue::ConsumerThread> ThreadInfoPtr; 00125 00127 typedef std::vector<ThreadInfoPtr> ThreadInfoVector; 00128 00129 00131 ThreadInfoPtr getThreadInfo(void) { 00132 // configure consumer thread to wakeup every second 00133 ThreadInfoPtr info_ptr(new ReactionQueue::ConsumerThread(boost::posix_time::seconds(1))); 00134 boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex); 00135 m_thread_info.push_back(info_ptr); 00136 return info_ptr; 00137 } 00138 00140 void stopThreadInfo(void) { 00141 boost::mutex::scoped_lock thread_info_lock(m_thread_info_mutex); 00142 for (ThreadInfoVector::iterator i = m_thread_info.begin(); 00143 i != m_thread_info.end(); ++i) 00144 { 00145 (*i)->stop(); 00146 } 00147 } 00148 00150 void processReactionQueue(void) { 00151 // retrieve reference to the thread's Event allocator 00152 EventFactory event_factory; 00153 EventAllocator& event_alloc(event_factory.getAllocator()); 00154 00155 // initialize consumer thread object 00156 ThreadInfoPtr info_ptr(getThreadInfo()); 00157 00158 // used to pop work off the queue 00159 Reaction r; 00160 00161 while (m_is_running) { 00162 try { 00163 while (m_is_running) { 00164 if (m_reaction_queue.pop(r, *info_ptr) && m_is_running) { 00165 // got new work for the queue 00166 r(); 00167 } else { 00168 // thread slept for 1 seconds with no new work 00169 // release any ununsed memory in Event allocator 00170 #ifdef PION_EVENT_USE_POOL_ALLOCATORS 00171 PION_LOG_DEBUG(m_logger, "Releasing unused memory for idle ReactionEngine thread"); 00172 event_alloc.release_memory(); 00173 #endif 00174 } 00175 } 00176 } catch (std::exception& e) { 00177 PION_LOG_ERROR(m_logger, e.what()); 00178 } catch (...) { 00179 PION_LOG_ERROR(m_logger, "caught unrecognized exception"); 00180 } 00181 } 00182 } 00183 00184 00186 ReactionQueue m_reaction_queue; 00187 00189 boost::shared_ptr<boost::thread> m_service_thread; 00190 00192 ThreadInfoVector m_thread_info; 00193 00195 boost::mutex m_thread_info_mutex; 00196 }; 00197 00198 00199 } // end namespace platform 00200 } // end namespace pion 00201 00202 #endif
1.4.7