diff options
Diffstat (limited to 'src/db.cpp')
-rw-r--r-- | src/db.cpp | 286 |
1 files changed, 201 insertions, 85 deletions
@@ -2,32 +2,52 @@ #include "db.h" #include "user.h" #include "misc_functions.h" +#include "config.h" #include <string> #include <iostream> #include <queue> #include <unistd.h> -#include <time.h> +#include <ctime> #include <mutex> #include <thread> +#include <unordered_set> #define DB_LOCK_TIMEOUT 50 -mysql::mysql(std::string mysql_db, std::string mysql_host, std::string username, std::string password) : - db(mysql_db), server(mysql_host), db_user(username), pw(password), - u_active(false), t_active(false), p_active(false), s_active(false), tok_active(false) -{ +mysql::mysql(config * conf) : u_active(false), t_active(false), p_active(false), s_active(false), tok_active(false) { + load_config(conf); + if (mysql_db == "") { + std::cout << "No database selected" << std::endl; + return; + } + try { - conn.connect(mysql_db.c_str(), mysql_host.c_str(), username.c_str(), password.c_str(), 0); + mysqlpp::ReconnectOption reconnect(true); + conn.set_option(&reconnect); + conn.connect(mysql_db.c_str(), mysql_host.c_str(), mysql_username.c_str(), mysql_password.c_str(), 0); } catch (const mysqlpp::Exception &er) { std::cout << "Failed to connect to MySQL (" << er.what() << ')' << std::endl; return; } - std::cout << "Connected to MySQL" << std::endl; - std::cout << "Clearing xbt_files_users and resetting peer counts..."; - std::cout.flush(); - clear_peer_data(); - std::cout << "done" << std::endl; + if (!readonly) { + std::cout << "Clearing xbt_files_users and resetting peer counts..."; + std::cout.flush(); + clear_peer_data(); + std::cout << "done" << std::endl; + } +} + +void mysql::load_config(config * conf) { + mysql_db = conf->get_str("mysql_db"); + mysql_host = conf->get_str("mysql_host"); + mysql_username = conf->get_str("mysql_username"); + mysql_password = conf->get_str("mysql_password"); + readonly = conf->get_bool("readonly"); +} + +void mysql::reload_config(config * conf) { + load_config(conf); } bool mysql::connected() { @@ -45,104 +65,195 @@ void mysql::clear_peer_data() { std::cerr << "Unable to reset seeder and leecher count!" << std::endl; } } catch (const mysqlpp::BadQuery &er) { - std::cerr << "Query error: " << er.what() << " in clear_peer_data" << std::endl; + std::cerr << "Query error in clear_peer_data: " << er.what() << std::endl; } catch (const mysqlpp::Exception &er) { - std::cerr << "Query error: " << er.what() << " in clear_peer_data" << std::endl; + std::cerr << "Query error in clear_peer_data: " << er.what() << std::endl; } } void mysql::load_torrents(torrent_list &torrents) { mysqlpp::Query query = conn.query("SELECT ID, info_hash, freetorrent, Snatched FROM torrents ORDER BY ID;"); - if (mysqlpp::StoreQueryResult res = query.store()) { - mysqlpp::String one("1"); // Hack to get around bug in mysql++3.0.0 - mysqlpp::String two("2"); + try { + mysqlpp::StoreQueryResult res = query.store(); + std::unordered_set<std::string> cur_keys; size_t num_rows = res.num_rows(); + std::lock_guard<std::mutex> tl_lock(torrent_list_mutex); + if (torrents.size() == 0) { + torrents.reserve(num_rows * 1.05); // Reserve 5% extra space to prevent rehashing + } else { + // Create set with all currently known info hashes to remove nonexistent ones later + cur_keys.reserve(torrents.size()); + for (auto const &it: torrents) { + cur_keys.insert(it.first); + } + } for (size_t i = 0; i < num_rows; i++) { std::string info_hash; res[i][1].to_string(info_hash); - - torrent t; - t.id = res[i][0]; - if (res[i][2].compare(one) == 0) { - t.free_torrent = FREE; - } else if (res[i][2].compare(two) == 0) { - t.free_torrent = NEUTRAL; + if (info_hash == "") { + continue; + } + mysqlpp::sql_enum free_torrent(res[i][2]); + + torrent tmp_tor; + auto it = torrents.insert(std::pair<std::string, torrent>(info_hash, tmp_tor)); + torrent &tor = (it.first)->second; + if (it.second) { + tor.id = res[i][0]; + tor.balance = 0; + tor.completed = res[i][3]; + tor.last_selected_seeder = ""; + } else { + tor.tokened_users.clear(); + cur_keys.erase(info_hash); + } + if (free_torrent == "1") { + tor.free_torrent = FREE; + } else if (free_torrent == "2") { + tor.free_torrent = NEUTRAL; } else { - t.free_torrent = NORMAL; + tor.free_torrent = NORMAL; } - t.balance = 0; - t.completed = res[i][3]; - t.last_selected_seeder = ""; - torrents[info_hash] = t; } + for (auto const &info_hash: cur_keys) { + // Remove tracked torrents that weren't found in the database + auto it = torrents.find(info_hash); + if (it != torrents.end()) { + torrent &tor = it->second; + stats.leechers -= tor.leechers.size(); + stats.seeders -= tor.seeders.size(); + for (auto &p: tor.leechers) { + p.second.user->decr_leeching(); + } + for (auto &p: tor.seeders) { + p.second.user->decr_seeding(); + } + torrents.erase(it); + } + } + } catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error in load_torrents: " << er.what() << std::endl; + return; } + std::cout << "Loaded " << torrents.size() << " torrents" << std::endl; + load_tokens(torrents); } void mysql::load_users(user_list &users) { - mysqlpp::Query query = conn.query("SELECT ID, can_leech, torrent_pass, visible FROM users_main WHERE Enabled='1';"); - if (mysqlpp::StoreQueryResult res = query.store()) { + mysqlpp::Query query = conn.query("SELECT ID, can_leech, torrent_pass, (Visible='0' OR IP='127.0.0.1') AS Protected FROM users_main WHERE Enabled='1';"); + try { + mysqlpp::StoreQueryResult res = query.store(); size_t num_rows = res.num_rows(); + std::unordered_set<std::string> cur_keys; + std::lock_guard<std::mutex> ul_lock(user_list_mutex); + if (users.size() == 0) { + users.reserve(num_rows * 1.05); // Reserve 5% extra space to prevent rehashing + } else { + // Create set with all currently known user keys to remove nonexistent ones later + cur_keys.reserve(users.size()); + for (auto const &it: users) { + cur_keys.insert(it.first); + } + } for (size_t i = 0; i < num_rows; i++) { - std::string passkey; - res[i][2].to_string(passkey); - bool protect_ip = res[i][3].compare("1") != 0; - - user_ptr u(new user(res[i][0], res[i][1], protect_ip)); - users.insert(std::pair<std::string, user_ptr>(passkey, u)); + std::string passkey(res[i][2]); + bool protect_ip = res[i][3]; + user_ptr tmp_user = std::make_shared<user>(res[i][0], res[i][1], protect_ip); + auto it = users.insert(std::pair<std::string, user_ptr>(passkey, tmp_user)); + if (!it.second) { + user_ptr &u = (it.first)->second; + u->set_leechstatus(res[i][1]); + u->set_protected(protect_ip); + u->set_deleted(false); + cur_keys.erase(passkey); + } + } + for (auto const &passkey: cur_keys) { + // Remove users that weren't found in the database + auto it = users.find(passkey); + if (it != users.end()) { + it->second->set_deleted(true); + users.erase(it); + } } + } catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error in load_users: " << er.what() << std::endl; + return; } + std::cout << "Loaded " << users.size() << " users" << std::endl; } void mysql::load_tokens(torrent_list &torrents) { mysqlpp::Query query = conn.query("SELECT uf.UserID, t.info_hash FROM users_freeleeches AS uf JOIN torrents AS t ON t.ID = uf.TorrentID WHERE uf.Expired = '0';"); - if (mysqlpp::StoreQueryResult res = query.store()) { + int token_count = 0; + try { + mysqlpp::StoreQueryResult res = query.store(); size_t num_rows = res.num_rows(); + std::lock_guard<std::mutex> tl_lock(torrent_list_mutex); for (size_t i = 0; i < num_rows; i++) { std::string info_hash; res[i][1].to_string(info_hash); - torrent_list::iterator it = torrents.find(info_hash); + auto it = torrents.find(info_hash); if (it != torrents.end()) { torrent &tor = it->second; tor.tokened_users.insert(res[i][0]); + ++token_count; } } + } catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error in load_tokens: " << er.what() << std::endl; + return; } + std::cout << "Loaded " << token_count << " tokens" << std::endl; } void mysql::load_whitelist(std::vector<std::string> &whitelist) { mysqlpp::Query query = conn.query("SELECT peer_id FROM xbt_client_whitelist;"); - if (mysqlpp::StoreQueryResult res = query.store()) { + try { + mysqlpp::StoreQueryResult res = query.store(); size_t num_rows = res.num_rows(); + std::lock_guard<std::mutex> wl_lock(whitelist_mutex); + whitelist.clear(); for (size_t i = 0; i<num_rows; i++) { - whitelist.push_back(res[i][0].c_str()); + std::string peer_id; + res[i][0].to_string(peer_id); + whitelist.push_back(peer_id); } + } catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error in load_whitelist: " << er.what() << std::endl; + return; + } + if (whitelist.size() == 0) { + std::cout << "Assuming no whitelist desired, disabling" << std::endl; + } else { + std::cout << "Loaded " << whitelist.size() << " clients into the whitelist" << std::endl; } } -void mysql::record_token(std::string &record) { +void mysql::record_token(const std::string &record) { if (update_token_buffer != "") { update_token_buffer += ","; } update_token_buffer += record; } -void mysql::record_user(std::string &record) { +void mysql::record_user(const std::string &record) { if (update_user_buffer != "") { update_user_buffer += ","; } update_user_buffer += record; } -void mysql::record_torrent(std::string &record) { - std::unique_lock<std::mutex> tb_lock(torrent_buffer_lock); +void mysql::record_torrent(const std::string &record) { + std::lock_guard<std::mutex> tb_lock(torrent_buffer_lock); if (update_torrent_buffer != "") { update_torrent_buffer += ","; } update_torrent_buffer += record; } -void mysql::record_peer(std::string &record, std::string &ip, std::string &peer_id, std::string &useragent) { +void mysql::record_peer(const std::string &record, const std::string &ip, const std::string &peer_id, const std::string &useragent) { if (update_heavy_peer_buffer != "") { update_heavy_peer_buffer += ","; } @@ -151,7 +262,7 @@ void mysql::record_peer(std::string &record, std::string &ip, std::string &peer_ update_heavy_peer_buffer += q.str(); } -void mysql::record_peer(std::string &record, std::string &peer_id) { +void mysql::record_peer(const std::string &record, const std::string &peer_id) { if (update_light_peer_buffer != "") { update_light_peer_buffer += ","; } @@ -161,7 +272,7 @@ void mysql::record_peer(std::string &record, std::string &peer_id) { update_light_peer_buffer += q.str(); } -void mysql::record_snatch(std::string &record, std::string &ip) { +void mysql::record_snatch(const std::string &record, const std::string &ip) { if (update_snatch_buffer != "") { update_snatch_buffer += ","; } @@ -183,11 +294,15 @@ void mysql::flush() { } void mysql::flush_users() { + if (readonly) { + update_user_buffer.clear(); + return; + } std::string sql; - std::unique_lock<std::mutex> uq_lock(user_queue_lock); + std::lock_guard<std::mutex> uq_lock(user_queue_lock); size_t qsize = user_queue.size(); if (verbose_flush || qsize > 0) { - std::cout << "User flush queue size: " << qsize << std::endl; + std::cout << "User flush queue size: " << qsize << ", next query length: " << user_queue.front().size() << std::endl; } if (update_user_buffer == "") { return; @@ -203,12 +318,16 @@ void mysql::flush_users() { } void mysql::flush_torrents() { + std::lock_guard<std::mutex> tb_lock(torrent_buffer_lock); + if (readonly) { + update_torrent_buffer.clear(); + return; + } std::string sql; - std::unique_lock<std::mutex> tq_lock(torrent_queue_lock); - std::unique_lock<std::mutex> tb_lock(torrent_buffer_lock); + std::lock_guard<std::mutex> tq_lock(torrent_queue_lock); size_t qsize = torrent_queue.size(); if (verbose_flush || qsize > 0) { - std::cout << "Torrent flush queue size: " << qsize << std::endl; + std::cout << "Torrent flush queue size: " << qsize << ", next query length: " << torrent_queue.front().size() << std::endl; } if (update_torrent_buffer == "") { return; @@ -229,11 +348,15 @@ void mysql::flush_torrents() { } void mysql::flush_snatches() { + if (readonly) { + update_snatch_buffer.clear(); + return; + } std::string sql; - std::unique_lock<std::mutex> sq_lock(snatch_queue_lock); + std::lock_guard<std::mutex> sq_lock(snatch_queue_lock); size_t qsize = snatch_queue.size(); if (verbose_flush || qsize > 0) { - std::cout << "Snatch flush queue size: " << qsize << std::endl; + std::cout << "Snatch flush queue size: " << qsize << ", next query length: " << snatch_queue.front().size() << std::endl; } if (update_snatch_buffer == "" ) { return; @@ -248,11 +371,16 @@ void mysql::flush_snatches() { } void mysql::flush_peers() { + if (readonly) { + update_light_peer_buffer.clear(); + update_heavy_peer_buffer.clear(); + return; + } std::string sql; - std::unique_lock<std::mutex> pq_lock(peer_queue_lock); + std::lock_guard<std::mutex> pq_lock(peer_queue_lock); size_t qsize = peer_queue.size(); if (verbose_flush || qsize > 0) { - std::cout << "Peer flush queue size: " << qsize << std::endl; + std::cout << "Peer flush queue size: " << qsize << ", next query length: " << peer_queue.front().size() << std::endl; } // Nothing to do @@ -260,12 +388,6 @@ void mysql::flush_peers() { return; } - if (qsize == 0) { - sql = "SET session sql_log_bin = 0"; - peer_queue.push(sql); - sql.clear(); - } - if (update_heavy_peer_buffer != "") { // Because xfu inserts are slow and ram is not infinite we need to // limit this queue's size @@ -290,7 +412,7 @@ void mysql::flush_peers() { if (qsize >= 1000) { peer_queue.pop(); } - sql = "INSERT INTO xbt_files_users (fid,timespent,announced,peer_id,mtime) VALUES " + + sql = "INSERT INTO xbt_files_users (uid,fid,timespent,announced,peer_id,mtime) VALUES " + update_light_peer_buffer + " ON DUPLICATE KEY UPDATE upspeed=0, downspeed=0, timespent=VALUES(timespent), " + "announced=VALUES(announced), mtime=VALUES(mtime)"; @@ -306,11 +428,15 @@ void mysql::flush_peers() { } void mysql::flush_tokens() { + if (readonly) { + update_token_buffer.clear(); + return; + } std::string sql; - std::unique_lock<std::mutex> tq_lock(token_queue_lock); + std::lock_guard<std::mutex> tq_lock(token_queue_lock); size_t qsize = token_queue.size(); if (verbose_flush || qsize > 0) { - std::cout << "Token flush queue size: " << qsize << std::endl; + std::cout << "Token flush queue size: " << qsize << ", next query length: " << token_queue.front().size() << std::endl; } if (update_token_buffer == "") { return; @@ -328,7 +454,7 @@ void mysql::flush_tokens() { void mysql::do_flush_users() { u_active = true; try { - mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + mysqlpp::Connection c(mysql_db.c_str(), mysql_host.c_str(), mysql_username.c_str(), mysql_password.c_str(), 0); while (user_queue.size() > 0) { try { std::string sql = user_queue.front(); @@ -338,7 +464,7 @@ void mysql::do_flush_users() { sleep(3); continue; } else { - std::unique_lock<std::mutex> uq_lock(user_queue_lock); + std::lock_guard<std::mutex> uq_lock(user_queue_lock); user_queue.pop(); } } @@ -355,8 +481,6 @@ void mysql::do_flush_users() { } catch (const mysqlpp::Exception &er) { std::cerr << "MySQL error in flush_users: " << er.what() << std::endl; - u_active = false; - return; } u_active = false; } @@ -364,7 +488,7 @@ void mysql::do_flush_users() { void mysql::do_flush_torrents() { t_active = true; try { - mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + mysqlpp::Connection c(mysql_db.c_str(), mysql_host.c_str(), mysql_username.c_str(), mysql_password.c_str(), 0); while (torrent_queue.size() > 0) { try { std::string sql = torrent_queue.front(); @@ -378,7 +502,7 @@ void mysql::do_flush_torrents() { sleep(3); continue; } else { - std::unique_lock<std::mutex> tq_lock(torrent_queue_lock); + std::lock_guard<std::mutex> tq_lock(torrent_queue_lock); torrent_queue.pop(); } } @@ -395,8 +519,6 @@ void mysql::do_flush_torrents() { } catch (const mysqlpp::Exception &er) { std::cerr << "MySQL error in flush_torrents: " << er.what() << std::endl; - t_active = false; - return; } t_active = false; } @@ -404,7 +526,7 @@ void mysql::do_flush_torrents() { void mysql::do_flush_peers() { p_active = true; try { - mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + mysqlpp::Connection c(mysql_db.c_str(), mysql_host.c_str(), mysql_username.c_str(), mysql_password.c_str(), 0); while (peer_queue.size() > 0) { try { std::string sql = peer_queue.front(); @@ -414,7 +536,7 @@ void mysql::do_flush_peers() { sleep(3); continue; } else { - std::unique_lock<std::mutex> pq_lock(peer_queue_lock); + std::lock_guard<std::mutex> pq_lock(peer_queue_lock); peer_queue.pop(); } } @@ -431,8 +553,6 @@ void mysql::do_flush_peers() { } catch (const mysqlpp::Exception &er) { std::cerr << "MySQL error in flush_peers: " << er.what() << std::endl; - p_active = false; - return; } p_active = false; } @@ -440,7 +560,7 @@ void mysql::do_flush_peers() { void mysql::do_flush_snatches() { s_active = true; try { - mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + mysqlpp::Connection c(mysql_db.c_str(), mysql_host.c_str(), mysql_username.c_str(), mysql_password.c_str(), 0); while (snatch_queue.size() > 0) { try { std::string sql = snatch_queue.front(); @@ -450,7 +570,7 @@ void mysql::do_flush_snatches() { sleep(3); continue; } else { - std::unique_lock<std::mutex> sq_lock(snatch_queue_lock); + std::lock_guard<std::mutex> sq_lock(snatch_queue_lock); snatch_queue.pop(); } } @@ -467,8 +587,6 @@ void mysql::do_flush_snatches() { } catch (const mysqlpp::Exception &er) { std::cerr << "MySQL error in flush_snatches: " << er.what() << std::endl; - s_active = false; - return; } s_active = false; } @@ -476,7 +594,7 @@ void mysql::do_flush_snatches() { void mysql::do_flush_tokens() { tok_active = true; try { - mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + mysqlpp::Connection c(mysql_db.c_str(), mysql_host.c_str(), mysql_username.c_str(), mysql_password.c_str(), 0); while (token_queue.size() > 0) { try { std::string sql = token_queue.front(); @@ -486,7 +604,7 @@ void mysql::do_flush_tokens() { sleep(3); continue; } else { - std::unique_lock<std::mutex> tq_lock(token_queue_lock); + std::lock_guard<std::mutex> tq_lock(token_queue_lock); token_queue.pop(); } } @@ -503,8 +621,6 @@ void mysql::do_flush_tokens() { } catch (const mysqlpp::Exception &er) { std::cerr << "MySQL error in flush_tokens: " << er.what() << std::endl; - tok_active = false; - return; } tok_active = false; } |