00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef __PION_DATABASINSERTER_HEADER__
00021 #define __PION_DATABASINSERTER_HEADER__
00022
00023 #include <vector>
00024 #include <boost/scoped_ptr.hpp>
00025 #include <boost/thread/thread.hpp>
00026 #include <boost/thread/condition.hpp>
00027 #include <pion/PionConfig.hpp>
00028 #include <pion/PionException.hpp>
00029 #include <pion/PionLogger.hpp>
00030 #include <pion/platform/Event.hpp>
00031 #include <pion/platform/RuleChain.hpp>
00032 #include <pion/platform/Query.hpp>
00033 #include <pion/platform/Database.hpp>
00034 #include <pion/platform/DatabaseManager.hpp>
00035
00036
00037 namespace pion {
00038 namespace platform {
00039
00040
00044 class PION_PLATFORM_API DatabaseInserter
00045 {
00046 public:
00047
00050 class MissingDatabaseManagerException : public std::exception {
00051 public:
00052 virtual const char* what() const throw() {
00053 return "DatabaseInserter is missing the DatabaseManager";
00054 }
00055 };
00056
00058 class EmptyDatabaseException : public std::exception {
00059 public:
00060 virtual const char* what() const throw() {
00061 return "DatabaseInserter configuration is missing a required Database parameter";
00062 }
00063 };
00064
00066 class EmptyTableException : public std::exception {
00067 public:
00068 virtual const char* what() const throw() {
00069 return "DatabaseInserter configuration is missing a required Table parameter";
00070 }
00071 };
00072
00074 class NoFieldsException : public std::exception {
00075 public:
00076 virtual const char* what() const throw() {
00077 return "DatabaseInserter configuration must contain at least one field mapping";
00078 }
00079 };
00080
00082 class EmptyFieldException : public std::exception {
00083 public:
00084 virtual const char* what() const throw() {
00085 return "DatabaseInserter configuration includes an empty field name";
00086 }
00087 };
00088
00090 class EmptyTermException : public std::exception {
00091 public:
00092 virtual const char* what() const throw() {
00093 return "DatabaseInserter configuration is missing a term identifier";
00094 }
00095 };
00096
00098 class UnknownTermException : public PionException {
00099 public:
00100 UnknownTermException(const std::string& term_id)
00101 : PionException("DatabaseInserter configuration maps field to an unknown term: ", term_id) {}
00102 };
00103
00105 class IllegalCharactersException: public PionException {
00106 public:
00107 IllegalCharactersException(const std::string& field_name)
00108 : PionException("DatabaseInserter configuration has a field name with illegal characters: ", field_name) {}
00109 };
00110
00112 class NoUniqueKeyFound: public PionException {
00113 public:
00114 NoUniqueKeyFound(void)
00115 : PionException("DatabaseInserter configuration has MaxAge, but there is no Unique indexed column") {}
00116 };
00117
00119 class MissingEventTime: public PionException {
00120 public:
00121 MissingEventTime(const std::string& element )
00122 : PionException("DatabaseInserter configuration has MaxAge, but the age term is missing: " + element) {}
00123 };
00124
00126 DatabaseInserter(void) :
00127 m_logger(PION_GET_LOGGER("pion.platform.DatabaseInserter")),
00128 m_database_mgr_ptr(NULL),
00129 m_event_queue_ptr(new EventQueue),
00130 m_queue_max(DEFAULT_QUEUE_SIZE), m_queue_timeout(DEFAULT_QUEUE_TIMEOUT),
00131 m_is_running(false), m_partition(0), m_wipe(false), m_max_age(0), m_last_time(0), m_table_size(0)
00132 {}
00133
00135 virtual ~DatabaseInserter() { stop(); }
00136
00138 inline void setDatabaseManager(DatabaseManager& mgr) { m_database_mgr_ptr = & mgr; }
00139
00147 void setConfig(const Vocabulary& v, const xmlNodePtr config_ptr);
00148
00155 void updateVocabulary(const Vocabulary& v);
00156
00161 void updateDatabases(void);
00162
00168 void insert(const EventPtr& e);
00169
00172 void start(void);
00173
00175 void stop(void);
00176
00178 std::size_t getEventsQueued(void) const;
00179
00181 std::size_t getKeyCacheSize(void) const;
00182
00184 const std::string& getDatabaseId(void) const { return m_database_id; }
00185
00187 DatabasePtr getDatabasePtr(void) { return m_database_ptr; }
00188
00190 const std::string& getTableName(void) const { return m_table_name; }
00191
00193 bool setTableName(const std::string& name)
00194 {
00195 if (m_is_running) return false;
00196 m_table_name = name;
00197 return true;
00198 }
00199
00201 bool setPartition(unsigned partition)
00202 {
00203 if (m_is_running) return false;
00204 m_partition = partition;
00205 return true;
00206 }
00207
00209 bool setWipe(bool wipe)
00210 {
00211 if (m_is_running) return false;
00212 m_wipe = wipe;
00213 return true;
00214 }
00215
00216 boost::uint32_t getRotate(void) const
00217 {
00218 return m_cache_size ? (m_cache_consumption * 100UL / m_cache_size) : 0;
00219 }
00220
00221 boost::uint64_t getTableSize(void) const { return m_table_size; }
00222
00224 Query::FieldMap getFieldMap(void) const { return m_field_map; }
00225
00227 Query::IndexMap getIndexMap(void) const { return m_index_map; }
00228
00230 inline bool isRunning(void) const { return m_is_running; }
00231
00233 inline void setLogger(PionLogger log_ptr) { m_logger = log_ptr; }
00234
00236 inline PionLogger getLogger(void) { return m_logger; }
00237
00239 inline bool tableExists(void)
00240 {
00241 if (!m_database_ptr)
00242 m_database_ptr = getDatabaseManager().getDatabase(m_database_id);
00243 return m_database_ptr->tableExists(m_table_name, m_partition);
00244 }
00245
00246
00247 private:
00248
00250 typedef std::vector<EventPtr> EventQueue;
00251
00252
00254 DatabaseManager& getDatabaseManager(void);
00255
00257 void insertEvents(void);
00258
00266 bool checkEventQueue(boost::scoped_ptr<EventQueue>& insert_queue_ptr);
00267
00268
00270 static const boost::uint32_t DEFAULT_QUEUE_SIZE;
00271
00273 static const boost::uint32_t DEFAULT_QUEUE_TIMEOUT;
00274
00276 static const std::string DATABASE_ELEMENT_NAME;
00277
00279 static const std::string TABLE_ELEMENT_NAME;
00280
00282 static const std::string FIELD_ELEMENT_NAME;
00283
00285 static const std::string QUEUE_SIZE_ELEMENT_NAME;
00286
00288 static const std::string QUEUE_TIMEOUT_ELEMENT_NAME;
00289
00291 static const std::string TERM_ATTRIBUTE_NAME;
00292
00294 static const char * CHARSET_FOR_TABLES;
00295
00297 static const std::string INDEX_ATTRIBUTE_NAME;
00298
00300 static const std::string SQL_ATTRIBUTE_NAME;
00301
00303 static const std::string IGNORE_INSERT_ELEMENT_NAME;
00304
00306 static const std::string DEFAULT_IGNORE;
00307
00309 static const std::string MAX_KEY_AGE_ELEMENT_NAME;
00310
00312 static const std::string EVENT_AGE_ELEMENT_NAME;
00313
00315 static const boost::uint32_t DEFAULT_MAX_AGE;
00316
00318 PionLogger m_logger;
00319
00321 DatabaseManager * m_database_mgr_ptr;
00322
00324 std::string m_database_id;
00325
00327 std::string m_table_name;
00328
00330 Query::FieldMap m_field_map;
00331
00333 Query::IndexMap m_index_map;
00334
00336 DatabasePtr m_database_ptr;
00337
00339 QueryPtr m_insert_query_ptr;
00340
00342 QueryPtr m_begin_transaction_ptr;
00343
00345 QueryPtr m_commit_transaction_ptr;
00346
00348 boost::scoped_ptr<EventQueue> m_event_queue_ptr;
00349
00351 boost::uint32_t m_queue_max;
00352
00354 boost::uint32_t m_queue_timeout;
00355
00357 mutable boost::mutex m_queue_mutex;
00358
00360 boost::condition m_wakeup_worker;
00361
00363 boost::condition m_swapped_queue;
00364
00366 boost::scoped_ptr<boost::thread> m_thread;
00367
00369 bool m_ignore_insert;
00370
00372 volatile bool m_is_running;
00373
00375 unsigned m_partition;
00376
00378 bool m_wipe;
00379
00381 pion::platform::RuleChain m_rules;
00382
00384 typedef PION_HASH_MAP<pion::platform::Event::BlobType, boost::uint32_t, PION_HASH(pion::platform::Event::BlobType)> KeyHash;
00385 KeyHash m_keys;
00386
00388 Vocabulary::TermRef m_key_term_ref;
00389
00391 boost::uint32_t m_max_age;
00392
00394 Vocabulary::TermRef m_timestamp_term_ref;
00395
00397 boost::uint32_t m_last_time;
00398
00400 boost::uint64_t m_cache_consumption;
00401
00403 boost::uint32_t m_cache_overhead;
00404
00406 std::vector<Vocabulary::Term> m_cache_terms;
00407
00409 boost::uint64_t m_cache_rows;
00410
00412 boost::uint64_t m_cache_size;
00413
00415 boost::uint64_t m_table_size;
00416 };
00417
00418
00419 }
00420 }
00421
00422 #endif