platform/include/pion/platform/DatabaseInserter.hpp

00001 // ------------------------------------------------------------------------
00002 // Pion is a development platform for building Reactors that process Events
00003 // ------------------------------------------------------------------------
00004 // Copyright (C) 2007-2009 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Pion is free software: you can redistribute it and/or modify it under the
00007 // terms of the GNU Affero General Public License as published by the Free
00008 // Software Foundation, either version 3 of the License, or (at your option)
00009 // any later version.
00010 //
00011 // Pion is distributed in the hope that it will be useful, but WITHOUT ANY
00012 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00013 // FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
00014 // more details.
00015 //
00016 // You should have received a copy of the GNU Affero General Public License
00017 // along with Pion.  If not, see <http://www.gnu.org/licenses/>.
00018 //
00019 
00020 #ifndef __PION_DATABASINSERTER_HEADER__
00021 #define __PION_DATABASINSERTER_HEADER__
00022 
00023 #include <vector>
00024 #include <boost/scoped_ptr.hpp>
00025 #include <boost/thread/thread.hpp>
00026 #include <boost/thread/condition.hpp>
00027 #include <pion/PionConfig.hpp>
00028 #include <pion/PionException.hpp>
00029 #include <pion/PionLogger.hpp>
00030 #include <pion/platform/Event.hpp>
00031 #include <pion/platform/RuleChain.hpp>
00032 #include <pion/platform/Query.hpp>
00033 #include <pion/platform/Database.hpp>
00034 #include <pion/platform/DatabaseManager.hpp>
00035 
00036 
00037 namespace pion {        // begin namespace pion
00038 namespace platform {    // begin namespace platform (Pion Platform Library)
00039 
00040 
00044 class PION_PLATFORM_API DatabaseInserter
00045 {
00046 public:
00047 
00050     class MissingDatabaseManagerException : public std::exception {
00051     public:
00052         virtual const char* what() const throw() {
00053             return "DatabaseInserter is missing the DatabaseManager";
00054         }
00055     };
00056 
00058     class EmptyDatabaseException : public std::exception {
00059     public:
00060         virtual const char* what() const throw() {
00061             return "DatabaseInserter configuration is missing a required Database parameter";
00062         }
00063     };
00064 
00066     class EmptyTableException : public std::exception {
00067     public:
00068         virtual const char* what() const throw() {
00069             return "DatabaseInserter configuration is missing a required Table parameter";
00070         }
00071     };
00072 
00074     class NoFieldsException : public std::exception {
00075     public:
00076         virtual const char* what() const throw() {
00077             return "DatabaseInserter configuration must contain at least one field mapping";
00078         }
00079     };
00080 
00082     class EmptyFieldException : public std::exception {
00083     public:
00084         virtual const char* what() const throw() {
00085             return "DatabaseInserter configuration includes an empty field name";
00086         }
00087     };
00088 
00090     class EmptyTermException : public std::exception {
00091     public:
00092         virtual const char* what() const throw() {
00093             return "DatabaseInserter configuration is missing a term identifier";
00094         }
00095     };
00096 
00098     class UnknownTermException : public PionException {
00099     public:
00100         UnknownTermException(const std::string& term_id)
00101             : PionException("DatabaseInserter configuration maps field to an unknown term: ", term_id) {}
00102     };
00103 
00105     class IllegalCharactersException: public PionException {
00106     public:
00107         IllegalCharactersException(const std::string& field_name)
00108             : PionException("DatabaseInserter configuration has a field name with illegal characters: ", field_name) {}
00109     };
00110 
00112     class NoUniqueKeyFound: public PionException {
00113     public:
00114         NoUniqueKeyFound(void)
00115             : PionException("DatabaseInserter configuration has MaxAge, but there is no Unique indexed column") {}
00116     };
00117 
00119     class MissingEventTime: public PionException {
00120     public:
00121         MissingEventTime(const std::string& element )
00122             : PionException("DatabaseInserter configuration has MaxAge, but the age term is missing: " + element) {}
00123     };
00124 
00126     DatabaseInserter(void) :
00127         m_logger(PION_GET_LOGGER("pion.platform.DatabaseInserter")),
00128         m_database_mgr_ptr(NULL),
00129         m_event_queue_ptr(new EventQueue), 
00130         m_queue_max(DEFAULT_QUEUE_SIZE), m_queue_timeout(DEFAULT_QUEUE_TIMEOUT),
00131         m_is_running(false), m_partition(0), m_wipe(false), m_max_age(0), m_last_time(0), m_table_size(0)
00132     {}
00133 
00135     virtual ~DatabaseInserter() { stop(); }
00136 
00138     inline void setDatabaseManager(DatabaseManager& mgr) { m_database_mgr_ptr = & mgr; }
00139     
00147     void setConfig(const Vocabulary& v, const xmlNodePtr config_ptr);
00148 
00155     void updateVocabulary(const Vocabulary& v);
00156 
00161     void updateDatabases(void);
00162 
00168     void insert(const EventPtr& e);
00169 
00172     void start(void);
00173 
00175     void stop(void);
00176 
00178     std::size_t getEventsQueued(void) const;
00179 
00181     std::size_t getKeyCacheSize(void) const;
00182 
00184     const std::string& getDatabaseId(void) const { return m_database_id; }
00185 
00187     DatabasePtr getDatabasePtr(void) { return m_database_ptr; }
00188 
00190     const std::string& getTableName(void) const { return m_table_name; }
00191 
00193     bool setTableName(const std::string& name)
00194     {
00195         if (m_is_running) return false;
00196         m_table_name = name;
00197         return true;
00198     }
00199 
00201     bool setPartition(unsigned partition)
00202     {
00203         if (m_is_running) return false;
00204         m_partition = partition;
00205         return true;
00206     }
00207 
00209     bool setWipe(bool wipe)
00210     {
00211         if (m_is_running) return false;
00212         m_wipe = wipe;
00213         return true;
00214     }
00215 
00216     boost::uint32_t getRotate(void) const
00217     {
00218         return m_cache_size ? (m_cache_consumption * 100UL / m_cache_size) : 0;
00219     }
00220 
00221     boost::uint64_t getTableSize(void) const { return m_table_size; }
00222 
00224     Query::FieldMap getFieldMap(void) const { return m_field_map; }
00225 
00227     Query::IndexMap getIndexMap(void) const { return m_index_map; }
00228 
00230     inline bool isRunning(void) const { return m_is_running; }
00231     
00233     inline void setLogger(PionLogger log_ptr) { m_logger = log_ptr; }
00234 
00236     inline PionLogger getLogger(void) { return m_logger; }
00237 
00239     inline bool tableExists(void)
00240     {
00241         if (!m_database_ptr)
00242             m_database_ptr = getDatabaseManager().getDatabase(m_database_id);
00243         return m_database_ptr->tableExists(m_table_name, m_partition);
00244     }
00245 
00246 
00247 private:
00248 
00250     typedef std::vector<EventPtr>   EventQueue;
00251 
00252 
00254     DatabaseManager& getDatabaseManager(void);
00255 
00257     void insertEvents(void);
00258 
00266     bool checkEventQueue(boost::scoped_ptr<EventQueue>& insert_queue_ptr);
00267 
00268 
00270     static const boost::uint32_t            DEFAULT_QUEUE_SIZE;
00271 
00273     static const boost::uint32_t            DEFAULT_QUEUE_TIMEOUT;
00274 
00276     static const std::string                DATABASE_ELEMENT_NAME;
00277 
00279     static const std::string                TABLE_ELEMENT_NAME;
00280 
00282     static const std::string                FIELD_ELEMENT_NAME;
00283 
00285     static const std::string                QUEUE_SIZE_ELEMENT_NAME;
00286 
00288     static const std::string                QUEUE_TIMEOUT_ELEMENT_NAME;
00289 
00291     static const std::string                TERM_ATTRIBUTE_NAME;
00292 
00294     static const char *                     CHARSET_FOR_TABLES;
00295 
00297     static const std::string                INDEX_ATTRIBUTE_NAME;
00298 
00300     static const std::string                SQL_ATTRIBUTE_NAME;
00301 
00303     static const std::string                IGNORE_INSERT_ELEMENT_NAME;
00304 
00306     static const std::string                DEFAULT_IGNORE;
00307 
00309     static const std::string                MAX_KEY_AGE_ELEMENT_NAME;
00310 
00312     static const std::string                EVENT_AGE_ELEMENT_NAME;
00313 
00315     static const boost::uint32_t            DEFAULT_MAX_AGE;
00316 
00318     PionLogger                              m_logger;
00319 
00321     DatabaseManager *                       m_database_mgr_ptr;
00322 
00324     std::string                             m_database_id;
00325 
00327     std::string                             m_table_name;
00328 
00330     Query::FieldMap                         m_field_map;
00331 
00333     Query::IndexMap                         m_index_map;
00334 
00336     DatabasePtr                             m_database_ptr;
00337     
00339     QueryPtr                                m_insert_query_ptr;
00340 
00342     QueryPtr                                m_begin_transaction_ptr;
00343 
00345     QueryPtr                                m_commit_transaction_ptr;
00346 
00348     boost::scoped_ptr<EventQueue>           m_event_queue_ptr;
00349 
00351     boost::uint32_t                         m_queue_max;
00352 
00354     boost::uint32_t                         m_queue_timeout;
00355 
00357     mutable boost::mutex                    m_queue_mutex;
00358 
00360     boost::condition                        m_wakeup_worker;
00361 
00363     boost::condition                        m_swapped_queue;
00364 
00366     boost::scoped_ptr<boost::thread>        m_thread;
00367 
00369     bool                                    m_ignore_insert;
00370     
00372     volatile bool                           m_is_running;
00373 
00375     unsigned                                m_partition;
00376 
00378     bool                                    m_wipe;
00379 
00381     pion::platform::RuleChain               m_rules;
00382 
00384     typedef PION_HASH_MAP<pion::platform::Event::BlobType, boost::uint32_t, PION_HASH(pion::platform::Event::BlobType)> KeyHash;
00385     KeyHash                                 m_keys;
00386 
00388     Vocabulary::TermRef                     m_key_term_ref;
00389 
00391     boost::uint32_t                         m_max_age;
00392 
00394     Vocabulary::TermRef                     m_timestamp_term_ref;
00395 
00397     boost::uint32_t                         m_last_time;
00398 
00400     boost::uint64_t                         m_cache_consumption;
00401 
00403     boost::uint32_t                         m_cache_overhead;
00404 
00406     std::vector<Vocabulary::Term>           m_cache_terms;
00407 
00409     boost::uint64_t                         m_cache_rows;
00410 
00412     boost::uint64_t                         m_cache_size;
00413 
00415     boost::uint64_t                         m_table_size;
00416 };
00417 
00418 
00419 }   // end namespace platform
00420 }   // end namespace pion
00421 
00422 #endif

Generated on Tue Aug 10 12:20:32 2010 for pion-platform by  doxygen 1.4.7