summaryrefslogtreecommitdiffstats
path: root/src/db.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/db.cpp')
-rw-r--r--src/db.cpp579
1 files changed, 387 insertions, 192 deletions
diff --git a/src/db.cpp b/src/db.cpp
index aba9db5..d9ba4b9 100644
--- a/src/db.cpp
+++ b/src/db.cpp
@@ -1,272 +1,467 @@
#include "ocelot.h"
#include "db.h"
+#include "misc_functions.h"
#include <string>
#include <iostream>
+#include <queue>
+#include <unistd.h>
+#include <time.h>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
+#include <boost/lexical_cast.hpp>
+
+#define DB_LOCK_TIMEOUT 50
mysql::mysql(std::string mysql_db, std::string mysql_host, std::string username, std::string password) {
- if(!conn.connect(mysql_db.c_str(), mysql_host.c_str(), username.c_str(), password.c_str(), 0)) {
- std::cout << "Could not connect to MySQL" << std::endl;
- return;
- }
-
- update_user_buffer = "";
- update_torrent_buffer = "";
- update_peer_buffer = "";
- update_snatch_buffer = "";
+ if(!conn.connect(mysql_db.c_str(), mysql_host.c_str(), username.c_str(), password.c_str(), 0)) {
+ std::cout << "Could not connect to MySQL" << std::endl;
+ return;
+ }
+
+ db = mysql_db, server = mysql_host, db_user = username, pw = password;
+ u_active = false; t_active = false; p_active = false; s_active = false; tok_active = false;
+
+ std::cout << "Connected to MySQL" << std::endl;
+ update_user_buffer = "";
+ update_torrent_buffer = "";
+ update_peer_buffer = "";
+ update_snatch_buffer = "";
+
+ logger_ptr = logger::get_instance();
}
void mysql::load_torrents(std::unordered_map<std::string, torrent> &torrents) {
- mysqlpp::Query query = conn.query("SELECT ID, info_hash, freetorrent, Snatched FROM torrents ORDER BY ID;");
- if(mysqlpp::StoreQueryResult res = query.store()) {
- mysqlpp::String one("1"); // Hack to get around bug in mysql++3.0.0
- size_t num_rows = res.num_rows();
- for(size_t i = 0; i < num_rows; i++) {
- std::string info_hash;
- res[i][1].to_string(info_hash);
-
- torrent t;
- t.id = res[i][0];
- if(res[i][2].compare(one) == 0) {
- t.free_torrent = true;
- } else {
- t.free_torrent = false;
- }
- t.balance = 0;
- t.completed = res[i][3];
- t.last_selected_seeder = "";
- torrents[info_hash] = t;
- }
- }
+ mysqlpp::Query query = conn.query("SELECT ID, info_hash, freetorrent, Snatched FROM torrents ORDER BY ID;");
+ if(mysqlpp::StoreQueryResult res = query.store()) {
+ mysqlpp::String one("1"); // Hack to get around bug in mysql++3.0.0
+ mysqlpp::String two("2");
+ size_t num_rows = res.num_rows();
+ for(size_t i = 0; i < num_rows; i++) {
+ std::string info_hash;
+ res[i][1].to_string(info_hash);
+
+ torrent t;
+ t.id = res[i][0];
+ if(res[i][2].compare(one) == 0) {
+ t.free_torrent = FREE;
+ } else if(res[i][2].compare(two) == 0) {
+ t.free_torrent = NEUTRAL;
+ } else {
+ t.free_torrent = NORMAL;
+ }
+ t.balance = 0;
+ t.completed = res[i][3];
+ t.last_selected_seeder = "";
+ torrents[info_hash] = t;
+ }
+ }
}
void mysql::load_users(std::unordered_map<std::string, user> &users) {
- mysqlpp::Query query = conn.query("SELECT ID, can_leech, torrent_pass FROM users_main WHERE Enabled='1';");
- if(mysqlpp::StoreQueryResult res = query.store()) {
- size_t num_rows = res.num_rows();
- for(size_t i = 0; i < num_rows; i++) {
- std::string passkey;
- res[i][2].to_string(passkey);
-
- user u;
- u.id = res[i][0];
- u.can_leech = res[i][1];
- users[passkey] = u;
- }
- }
+ mysqlpp::Query query = conn.query("SELECT ID, can_leech, torrent_pass FROM users_main WHERE Enabled='1';");
+ if(mysqlpp::StoreQueryResult res = query.store()) {
+ size_t num_rows = res.num_rows();
+ for(size_t i = 0; i < num_rows; i++) {
+ std::string passkey;
+ res[i][2].to_string(passkey);
+
+ user u;
+ u.id = res[i][0];
+ u.can_leech = res[i][1];
+ users[passkey] = u;
+ }
+ }
}
-void mysql::load_whitelist(std::vector<std::string> &whitelist) {
- mysqlpp::Query query = conn.query("SELECT peer_id FROM xbt_client_whitelist;");
- if(mysqlpp::StoreQueryResult res = query.store()) {
- size_t num_rows = res.num_rows();
- for(size_t i = 0; i<num_rows; i++) {
- whitelist.push_back(res[i][0].c_str());
- }
- }
+void mysql::load_tokens(std::unordered_map<std::string, torrent> &torrents) {
+ mysqlpp::Query query = conn.query("SELECT uf.UserID, t.info_hash FROM users_freeleeches AS uf JOIN torrents AS t ON t.ID = uf.TorrentID WHERE uf.Expired = '0';");
+ if (mysqlpp::StoreQueryResult res = query.store()) {
+ size_t num_rows = res.num_rows();
+ for (size_t i = 0; i < num_rows; i++) {
+ std::string info_hash;
+ res[i][1].to_string(info_hash);
+ std::unordered_map<std::string, torrent>::iterator it = torrents.find(info_hash);
+ if (it != torrents.end()) {
+ torrent &tor = it->second;
+ tor.tokened_users.insert(res[i][0]);
+ }
+ }
+ }
}
+void mysql::load_whitelist(std::vector<std::string> &whitelist) {
+ mysqlpp::Query query = conn.query("SELECT peer_id FROM xbt_client_whitelist;");
+ if(mysqlpp::StoreQueryResult res = query.store()) {
+ size_t num_rows = res.num_rows();
+ for(size_t i = 0; i<num_rows; i++) {
+ whitelist.push_back(res[i][0].c_str());
+ }
+ }
+}
+void mysql::record_token(std::string &record) {
+ boost::mutex::scoped_lock lock(user_token_lock);
+ if (update_token_buffer != "") {
+ update_token_buffer += ",";
+ }
+ update_token_buffer += record;
+}
void mysql::record_user(std::string &record) {
- boost::mutex::scoped_lock lock(user_buffer_lock);
- if(update_user_buffer != "") {
- update_user_buffer += ",";
- }
- update_user_buffer += record;
+ boost::mutex::scoped_lock lock(user_buffer_lock);
+ if(update_user_buffer != "") {
+ update_user_buffer += ",";
+ }
+ update_user_buffer += record;
}
void mysql::record_torrent(std::string &record) {
- boost::mutex::scoped_lock lock(torrent_buffer_lock);
- if(update_torrent_buffer != "") {
- update_torrent_buffer += ",";
- }
- update_torrent_buffer += record;
+ boost::mutex::scoped_lock lock(torrent_buffer_lock);
+ if(update_torrent_buffer != "") {
+ update_torrent_buffer += ",";
+ }
+ update_torrent_buffer += record;
}
void mysql::record_peer(std::string &record, std::string &ip, std::string &peer_id, std::string &useragent) {
- boost::mutex::scoped_lock lock(peer_buffer_lock);
- if(update_peer_buffer != "") {
- update_peer_buffer += ",";
+ boost::mutex::scoped_lock lock(peer_buffer_lock);
+ if(update_peer_buffer != "") {
+ update_peer_buffer += ",";
+ }
+ mysqlpp::Query q = conn.query();
+ q << record << mysqlpp::quote << ip << ',' << mysqlpp::quote << peer_id << ',' << mysqlpp::quote << useragent << "," << time(NULL) << ')';
+
+ update_peer_buffer += q.str();
+}
+
+void mysql::record_peer_hist(std::string &record, std::string &peer_id, int tid) {
+ boost::mutex::scoped_lock (peer_hist_buffer_lock);
+ if (update_peer_hist_buffer != "") {
+ update_peer_hist_buffer += ",";
}
mysqlpp::Query q = conn.query();
- q << record << mysqlpp::quote << ip << ',' << mysqlpp::quote << peer_id << ',' << mysqlpp::quote << useragent << ')';
-
- update_peer_buffer += q.str();
+ q << record << ',' << mysqlpp::quote << peer_id << ',' << tid << ',' << time(NULL) << ')';
+ update_peer_hist_buffer += q.str();
}
+
void mysql::record_snatch(std::string &record) {
- boost::mutex::scoped_lock lock(snatch_buffer_lock);
- if(update_snatch_buffer != "") {
- update_snatch_buffer += ",";
- }
- update_snatch_buffer += record;
+ boost::mutex::scoped_lock lock(mysql::snatch_buffer_lock);
+ if(update_snatch_buffer != "") {
+ update_snatch_buffer += ",";
+ }
+ update_snatch_buffer += record;
}
+bool mysql::all_clear() {
+ return (user_queue.size() == 0 && torrent_queue.size() == 0 && peer_queue.size() == 0 && snatch_queue.size() == 0 && token_queue.size() == 0 && peer_hist_queue.size() == 0);
+}
-
+void mysql::flush() {
+ flush_users();
+ flush_torrents();
+ flush_snatches();
+ flush_peers();
+ flush_peer_hist();
+ flush_tokens();
+}
void mysql::flush_users() {
- boost::thread thread(&mysql::do_flush_users, this);
+ std::string sql;
+ boost::mutex::scoped_lock lock(user_buffer_lock);
+ if (update_user_buffer == "") {
+ return;
+ }
+ sql = "INSERT INTO users_main (ID, Uploaded, Downloaded) VALUES " + update_user_buffer +
+ " ON DUPLICATE KEY UPDATE Uploaded = Uploaded + VALUES(Uploaded), Downloaded = Downloaded + VALUES(Downloaded)";
+ user_queue.push(sql);
+ update_user_buffer.clear();
+ if (user_queue.size() == 1 && u_active == false) {
+ boost::thread thread(&mysql::do_flush_users, this);
+ }
}
-void mysql::do_flush_users() {
- std::cout << "flushing users" << std::endl;
+void mysql::flush_torrents() {
std::string sql;
- { // Lock mutex
- boost::mutex::scoped_lock lock(user_buffer_lock);
- if(update_user_buffer == "") {
- return;
- } else {
- sql = "INSERT INTO users_main(ID, Uploaded, Downloaded) VALUES ";
- sql += update_user_buffer;
- sql += " ON DUPLICATE KEY UPDATE Uploaded=Uploaded+VALUES(Uploaded), Downloaded=Downloaded+VALUES(Downloaded)";
- update_user_buffer.clear();
- }
+ boost::mutex::scoped_lock lock(torrent_buffer_lock);
+ if (update_torrent_buffer == "") {
+ return;
+ }
+ sql = "INSERT INTO torrents (ID,Seeders,Leechers,Snatched,Balance) VALUES " + update_torrent_buffer +
+ " ON DUPLICATE KEY UPDATE Seeders=VALUES(Seeders), Leechers=VALUES(Leechers), " +
+ "Snatched=Snatched+VALUES(Snatched), Balance=VALUES(Balance), last_action = " +
+ "IF(VALUES(Seeders) > 0, NOW(), last_action)";
+ torrent_queue.push(sql);
+ update_torrent_buffer.clear();
+ sql.clear();
+ sql = "DELETE FROM torrents WHERE info_hash = ''";
+ torrent_queue.push(sql);
+ if (torrent_queue.size() == 2 && t_active == false) {
+ boost::thread thread(&mysql::do_flush_torrents, this);
+ }
+}
+
+void mysql::flush_snatches() {
+ std::string sql;
+ boost::mutex::scoped_lock lock(snatch_buffer_lock);
+ if (update_snatch_buffer == "" ) {
+ return;
+ }
+ sql = "INSERT INTO xbt_snatched (uid, fid, tstamp, IP) VALUES " + update_snatch_buffer;
+ snatch_queue.push(sql);
+ update_snatch_buffer.clear();
+ if (snatch_queue.size() == 1 && s_active == false) {
+ boost::thread thread(&mysql::do_flush_snatches, this);
+ }
+}
+
+void mysql::flush_peers() {
+ std::string sql;
+ boost::mutex::scoped_lock lock(peer_buffer_lock);
+ // because xfu inserts are slow and ram is not infinite we need to
+ // limit this queue's size
+ if (peer_queue.size() >= 1000) {
+ peer_queue.pop();
+ }
+ if (update_peer_buffer == "") {
+ return;
}
- boost::mutex::scoped_lock db_lock(db_mutex);
-
+ if (peer_queue.size() == 0) {
+ sql = "SET session sql_log_bin = 0";
+ peer_queue.push(sql);
+ sql.clear();
+ }
- mysqlpp::Query query = conn.query(sql);
- for(int i = 0; i < 3; i++) {
- try {
- query.execute();
- break;
- } catch(const mysqlpp::BadQuery& er) { // deadlock
- std::cout << "Query error: " << er.what() << std::endl;
- sleep(3);
- } catch (const mysqlpp::Exception& er) { // Weird unpredictable shit
- std::cout << "Query error: " << er.what() << std::endl;
- sleep(3);
- }
+ sql = "INSERT INTO xbt_files_users (uid,fid,active,uploaded,downloaded,upspeed,downspeed,remaining," +
+ std::string("timespent,announced,ip,peer_id,useragent,mtime) VALUES ") + update_peer_buffer +
+ " ON DUPLICATE KEY UPDATE active=VALUES(active), uploaded=VALUES(uploaded), " +
+ "downloaded=VALUES(downloaded), upspeed=VALUES(upspeed), " +
+ "downspeed=VALUES(downspeed), remaining=VALUES(remaining), " +
+ "timespent=VALUES(timespent), announced=VALUES(announced), " +
+ "mtime=VALUES(mtime)";
+ peer_queue.push(sql);
+ update_peer_buffer.clear();
+ if (peer_queue.size() == 2 && p_active == false) {
+ boost::thread thread(&mysql::do_flush_peers, this);
}
- std::cout << "flushed users" << std::endl;
}
+void mysql::flush_peer_hist() {
+ std::string sql;
+ boost::mutex::scoped_lock lock(peer_hist_buffer_lock);
+ if (update_peer_hist_buffer == "") {
+ return;
+ }
-void mysql::flush_torrents() {
- boost::thread thread(&mysql::do_flush_torrents, this);
+ if (peer_hist_queue.size() == 0) {
+ sql = "SET session sql_log_bin = 0";
+ peer_hist_queue.push(sql);
+ sql.clear();
+ }
+
+ sql = "INSERT IGNORE INTO xbt_peers_history (uid, downloaded, remaining, uploaded, upspeed, downspeed, timespent, peer_id, fid, mtime) VALUES " + update_peer_hist_buffer;
+ peer_hist_queue.push(sql);
+ update_peer_hist_buffer.clear();
+ if (peer_hist_queue.size() == 2 && hist_active == false) {
+ boost::thread thread(&mysql::do_flush_peer_hist, this);
+ }
}
-void mysql::do_flush_torrents() {
- std::cout << "flushing torrents" << std::endl;
+void mysql::flush_tokens() {
std::string sql;
- { // Lock mutex
- boost::mutex::scoped_lock lock(torrent_buffer_lock);
- if(update_torrent_buffer == "") {
- return;
- } else {
- sql = "INSERT INTO torrents(ID,Seeders,Leechers,Snatched,Balance) VALUES ";
- sql += update_torrent_buffer;
- sql += " ON DUPLICATE KEY UPDATE Seeders=VALUES(Seeders), Leechers=VALUES(Leechers), Snatched=Snatched+VALUES(Snatched), Balance=VALUES(Balance), last_action = IF(VALUES(Seeders) > 0, NOW(), last_action)";
- update_torrent_buffer.clear();
- }
+ boost::mutex::scoped_lock lock(user_token_lock);
+ if (update_token_buffer == "") {
+ return;
}
-
- boost::mutex::scoped_lock db_lock(db_mutex);
-
- mysqlpp::Query query = conn.query(sql);
- for(int i = 0; i < 3; i++) {
+ sql = "INSERT INTO users_freeleeches (UserID, TorrentID, Downloaded) VALUES " + update_token_buffer +
+ " ON DUPLICATE KEY UPDATE Downloaded = Downloaded + VALUES(Downloaded)";
+ token_queue.push(sql);
+ update_token_buffer.clear();
+ if (token_queue.size() == 1 && tok_active == false) {
+ boost::thread(&mysql::do_flush_tokens, this);
+ }
+}
+
+void mysql::do_flush_users() {
+ u_active = true;
+ mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ while (user_queue.size() > 0) {
try {
- query.execute();
- break;
- } catch(const mysqlpp::BadQuery& er) { // deadlock
- std::cout << "Query error: " << er.what() << std::endl;
+ std::string sql = user_queue.front();
+ mysqlpp::Query query = c.query(sql);
+ if (!query.exec()) {
+ std::cout << "User flush failed (" << user_queue.size() << " remain)" << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ boost::mutex::scoped_lock lock(user_buffer_lock);
+ user_queue.pop();
+ std::cout << "Users flushed (" << user_queue.size() << " remain)" << std::endl;
+ }
+ }
+ catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error: " << er.what() << " in flush users with a qlength: " << user_queue.front().size() << " queue size: " << user_queue.size() << std::endl;
sleep(3);
- } catch (const mysqlpp::Exception& er) { // Weird unpredictable shit
- std::cout << "Query error: " << er.what() << std::endl;
+ continue;
+ } catch (const mysqlpp::Exception &er) {
+ std::cerr << "Query error: " << er.what() << " in flush users with a qlength: " << user_queue.front().size() << " queue size: " << user_queue.size() << std::endl;
sleep(3);
+ continue;
}
}
- std::cout << "flushed torrents" << std::endl;
+ u_active = false;
+}
- sql = "DELETE FROM torrents WHERE info_hash = ''";
- mysqlpp::Query dquery = conn.query(sql);
- for (int i = 0; i < 3; i++) {
+void mysql::do_flush_torrents() {
+ t_active = true;
+ mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ while (torrent_queue.size() > 0) {
try {
- dquery.execute();
- break;
- } catch (const mysqlpp::BadQuery& er) {
- std::cout << "Query error: " << er.what() << std::endl;
+ std::string sql = torrent_queue.front();
+ if (sql == "") {
+ torrent_queue.pop();
+ continue;
+ }
+ mysqlpp::Query query = c.query(sql);
+ if (!query.exec()) {
+ std::cout << "Torrent flush failed (" << torrent_queue.size() << " remain)" << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ boost::mutex::scoped_lock lock(torrent_buffer_lock);
+ torrent_queue.pop();
+ std::cout << "Torrents flushed (" << torrent_queue.size() << " remain)" << std::endl;
+ }
+ }
+ catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error: " << er.what() << " in flush torrents with a qlength: " << torrent_queue.front().size() << " queue size: " << torrent_queue.size() << std::endl;
sleep(3);
- } catch (const mysqlpp::Exception& er) {
- std::cout << "Query error: " << er.what() << std::endl;
+ continue;
+ } catch (const mysqlpp::Exception &er) {
+ std::cerr << "Query error: " << er.what() << " in flush torrents with a qlength: " << torrent_queue.front().size() << " queue size: " << torrent_queue.size() << std::endl;
sleep(3);
+ continue;
}
}
+ t_active = false;
}
-void mysql::flush_snatches() {
- boost::thread thread(&mysql::do_flush_snatches, this);
-}
-
-void mysql::do_flush_snatches() {
- std::cout << "flushing snatches" << std::endl;
- std::string sql;
- { // lock mutex
- boost::mutex::scoped_lock lock(snatch_buffer_lock);
- if(update_snatch_buffer == "") {
- return;
- } else {
- sql = "INSERT INTO xbt_snatched(uid,fid,tstamp,IP) VALUES ";
- sql += update_snatch_buffer;
- update_snatch_buffer.clear();
- }
- }
-
- boost::mutex::scoped_lock db_lock(db_mutex);
-
- mysqlpp::Query query = conn.query(sql);
- for(int i = 0; i < 3; i++) {
+void mysql::do_flush_peers() {
+ p_active = true;
+ mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ while (peer_queue.size() > 0) {
try {
- query.execute();
- break;
- } catch(const mysqlpp::BadQuery& er) { // deadlock
- std::cout << "Query error: " << er.what() << std::endl;
+ std::string sql = peer_queue.front();
+ mysqlpp::Query query = c.query(sql);
+ if (!query.exec()) {
+ std::cout << "Peer flush failed (" << peer_queue.size() << " remain)" << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ boost::mutex::scoped_lock lock(peer_buffer_lock);
+ peer_queue.pop();
+ std::cout << "Peers flushed (" << peer_queue.size() << " remain)" << std::endl;
+ }
+ }
+ catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error: " << er.what() << " in flush peers with a qlength: " << peer_queue.front().size() << " queue size: " << peer_queue.size() << std::endl;
sleep(3);
- } catch (const mysqlpp::Exception& er) { // Weird unpredictable shit
- std::cout << "Query error: " << er.what() << std::endl;
+ continue;
+ } catch (const mysqlpp::Exception &er) {
+ std::cerr << "Query error: " << er.what() << " in flush peers with a qlength: " << peer_queue.front().size() << " queue size: " << peer_queue.size() << std::endl;
sleep(3);
+ continue;
}
}
- std::cout << "flushed snatches" << std::endl;
+ p_active = false;
}
-void mysql::flush_peers() {
- boost::thread thread(&mysql::do_flush_peers, this);
+void mysql::do_flush_peer_hist() {
+ hist_active = true;
+ mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ while (peer_hist_queue.size() > 0) {
+ try {
+ std::string sql = peer_hist_queue.front();
+ mysqlpp::Query query = c.query(sql);
+ if (!query.exec()) {
+ std::cout << "Peer history flush failed (" << peer_hist_queue.size() << " remain)" << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ boost::mutex::scoped_lock lock(peer_hist_buffer_lock);
+ peer_hist_queue.pop();
+ std::cout << "Peer history flushed (" << peer_hist_queue.size() << " remain)" << std::endl;
+ }
+ }
+ catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error: " << er.what() << " in flush peer history with a qlength: " << peer_hist_queue.front().size() << " queue size: " << peer_hist_queue.size() << std::endl;
+ sleep(3);
+ continue;
+ } catch (const mysqlpp::Exception &er) {
+ std::cerr << "Query error: " << er.what() << " in flush peer history with a qlength: " << peer_hist_queue.front().size() << " queue size: " << peer_hist_queue.size() << std::endl;
+ sleep(3);
+ continue;
+ }
+ }
+ hist_active = false;
}
-void mysql::do_flush_peers() {
- std::cout << "flushing peers" << std::endl;
- std::string sql;
- { // lock mutex
- boost::mutex::scoped_lock lock(peer_buffer_lock);
- if(update_peer_buffer == "") {
- return;
- } else {
- sql = "INSERT INTO xbt_files_users(uid,fid,active,uploaded,downloaded,upspeed,downspeed,remaining,timespent,announced,ip,peer_id,useragent) VALUES ";
- sql += update_peer_buffer;
- sql += " ON DUPLICATE KEY UPDATE active=VALUES(active), uploaded=VALUES(uploaded), downloaded=VALUES(downloaded), upspeed=VALUES(upspeed), downspeed=VALUES(downspeed), ";
- sql += "remaining=VALUES(remaining), timespent=VALUES(timespent), announced=VALUES(announced), peer_id=VALUES(peer_id), useragent=VALUES(useragent),mtime=UNIX_TIMESTAMP(NOW())";
- update_peer_buffer.clear();
+void mysql::do_flush_snatches() {
+ s_active = true;
+ mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ while (snatch_queue.size() > 0) {
+ try {
+ std::string sql = snatch_queue.front();
+ mysqlpp::Query query = c.query(sql);
+ if (!query.exec()) {
+ std::cout << "Snatch flush failed (" << snatch_queue.size() << " remain)" << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ boost::mutex::scoped_lock lock(snatch_buffer_lock);
+ snatch_queue.pop();
+ std::cout << "Snatches flushed (" << snatch_queue.size() << " remain)" << std::endl;
+ }
+ }
+ catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error: " << er.what() << " in flush snatches with a qlength: " << snatch_queue.front().size() << " queue size: " << snatch_queue.size() << std::endl;
+ sleep(3);
+ continue;
+ } catch (const mysqlpp::Exception &er) {
+ std::cerr << "Query error: " << er.what() << " in flush snatches with a qlength: " << snatch_queue.front().size() << " queue size: " << snatch_queue.size() << std::endl;
+ sleep(3);
+ continue;
}
}
-
- boost::mutex::scoped_lock db_lock(db_mutex);
-
- mysqlpp::Query query = conn.query(sql);
- for(int i = 0; i < 3; i++) {
+ s_active = false;
+}
+
+void mysql::do_flush_tokens() {
+ tok_active = true;
+ mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ while (token_queue.size() > 0) {
try {
- query.execute();
- break;
- } catch(const mysqlpp::BadQuery& er) { // deadlock
- std::cout << "Query error: " << er.what() << std::endl;
+ std::string sql = token_queue.front();
+ mysqlpp::Query query = c.query(sql);
+ if (!query.exec()) {
+ std::cout << "Token flush failed (" << token_queue.size() << " remain)" << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ boost::mutex::scoped_lock lock(user_token_lock);
+ token_queue.pop();
+ std::cout << "Tokens flushed (" << token_queue.size() << " remain)" << std::endl;
+ }
+ }
+ catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error: " << er.what() << " in flush tokens with a qlength: " << token_queue.front().size() << " queue size: " << token_queue.size() << std::endl;
sleep(3);
- } catch (const mysqlpp::Exception& er) { // Weird unpredictable shit
- std::cout << "Query error: " << er.what() << std::endl;
+ continue;
+ } catch (const mysqlpp::Exception &er) {
+ std::cerr << "Query error: " << er.what() << " in flush tokens with a qlength: " << token_queue.front().size() << " queue size: " << token_queue.size() << std::endl;
sleep(3);
+ continue;
}
}
- std::cout << "flushed peers" << std::endl;
+ tok_active = false;
}