diff options
Diffstat (limited to 'src/db.cpp')
-rw-r--r-- | src/db.cpp | 348 |
1 files changed, 158 insertions, 190 deletions
@@ -14,23 +14,23 @@ #define DB_LOCK_TIMEOUT 50 mysql::mysql(std::string mysql_db, std::string mysql_host, std::string username, std::string password) { - if(!conn.connect(mysql_db.c_str(), mysql_host.c_str(), username.c_str(), password.c_str(), 0)) { - std::cout << "Could not connect to MySQL" << std::endl; - return; - } + if (!conn.connect(mysql_db.c_str(), mysql_host.c_str(), username.c_str(), password.c_str(), 0)) { + std::cout << "Could not connect to MySQL" << std::endl; + return; + } db = mysql_db, server = mysql_host, db_user = username, pw = password; - u_active = false; t_active = false; p_active = false; s_active = false; tok_active = false; hist_active = false; - - std::cout << "Connected to MySQL" << std::endl; - update_user_buffer = ""; - update_torrent_buffer = ""; - update_peer_buffer = ""; - update_snatch_buffer = ""; + u_active = false; t_active = false; p_active = false; s_active = false; tok_active = false; + std::cout << "Connected to MySQL" << std::endl; + update_user_buffer = ""; + update_torrent_buffer = ""; + update_heavy_peer_buffer = ""; + update_light_peer_buffer = ""; + update_snatch_buffer = ""; - logger_ptr = logger::get_instance(); std::cout << "Clearing xbt_files_users and resetting peer counts..."; + std::cout.flush(); clear_peer_data(); std::cout << "done" << std::endl; } @@ -53,128 +53,125 @@ void mysql::clear_peer_data() { } void mysql::load_torrents(std::unordered_map<std::string, torrent> &torrents) { - mysqlpp::Query query = conn.query("SELECT ID, info_hash, freetorrent, Snatched FROM torrents ORDER BY ID;"); - if(mysqlpp::StoreQueryResult res = query.store()) { - mysqlpp::String one("1"); // Hack to get around bug in mysql++3.0.0 - mysqlpp::String two("2"); - size_t num_rows = res.num_rows(); - for(size_t i = 0; i < num_rows; i++) { - std::string info_hash; - res[i][1].to_string(info_hash); - - torrent t; - t.id = res[i][0]; - if(res[i][2].compare(one) == 0) { - t.free_torrent = FREE; - } else if(res[i][2].compare(two) == 0) { - t.free_torrent = NEUTRAL; - } else { - t.free_torrent = NORMAL; - } - t.balance = 0; - t.completed = res[i][3]; - t.last_selected_seeder = ""; - torrents[info_hash] = t; - } - } + mysqlpp::Query query = conn.query("SELECT ID, info_hash, freetorrent, Snatched FROM torrents ORDER BY ID;"); + if (mysqlpp::StoreQueryResult res = query.store()) { + mysqlpp::String one("1"); // Hack to get around bug in mysql++3.0.0 + mysqlpp::String two("2"); + size_t num_rows = res.num_rows(); + for (size_t i = 0; i < num_rows; i++) { + std::string info_hash; + res[i][1].to_string(info_hash); + + torrent t; + t.id = res[i][0]; + if (res[i][2].compare(one) == 0) { + t.free_torrent = FREE; + } else if (res[i][2].compare(two) == 0) { + t.free_torrent = NEUTRAL; + } else { + t.free_torrent = NORMAL; + } + t.balance = 0; + t.completed = res[i][3]; + t.last_selected_seeder = ""; + torrents[info_hash] = t; + } + } } void mysql::load_users(std::unordered_map<std::string, user> &users) { - mysqlpp::Query query = conn.query("SELECT ID, can_leech, torrent_pass FROM users_main WHERE Enabled='1';"); - if(mysqlpp::StoreQueryResult res = query.store()) { - size_t num_rows = res.num_rows(); - for(size_t i = 0; i < num_rows; i++) { - std::string passkey; - res[i][2].to_string(passkey); - - user u; - u.id = res[i][0]; - u.can_leech = res[i][1]; - users[passkey] = u; - } - } + mysqlpp::Query query = conn.query("SELECT ID, can_leech, torrent_pass, visible FROM users_main WHERE Enabled='1';"); + if (mysqlpp::StoreQueryResult res = query.store()) { + size_t num_rows = res.num_rows(); + for (size_t i = 0; i < num_rows; i++) { + std::string passkey; + res[i][2].to_string(passkey); + + user u; + u.id = res[i][0]; + u.can_leech = res[i][1]; + u.protect_ip = res[i][3].compare("1") != 0; + users[passkey] = u; + } + } } void mysql::load_tokens(std::unordered_map<std::string, torrent> &torrents) { - mysqlpp::Query query = conn.query("SELECT uf.UserID, t.info_hash FROM users_freeleeches AS uf JOIN torrents AS t ON t.ID = uf.TorrentID WHERE uf.Expired = '0';"); - if (mysqlpp::StoreQueryResult res = query.store()) { - size_t num_rows = res.num_rows(); - for (size_t i = 0; i < num_rows; i++) { - std::string info_hash; - res[i][1].to_string(info_hash); - std::unordered_map<std::string, torrent>::iterator it = torrents.find(info_hash); - if (it != torrents.end()) { - torrent &tor = it->second; - tor.tokened_users.insert(res[i][0]); - } - } - } + mysqlpp::Query query = conn.query("SELECT uf.UserID, t.info_hash FROM users_freeleeches AS uf JOIN torrents AS t ON t.ID = uf.TorrentID WHERE uf.Expired = '0';"); + if (mysqlpp::StoreQueryResult res = query.store()) { + size_t num_rows = res.num_rows(); + for (size_t i = 0; i < num_rows; i++) { + std::string info_hash; + res[i][1].to_string(info_hash); + std::unordered_map<std::string, torrent>::iterator it = torrents.find(info_hash); + if (it != torrents.end()) { + torrent &tor = it->second; + tor.tokened_users.insert(res[i][0]); + } + } + } } void mysql::load_whitelist(std::vector<std::string> &whitelist) { - mysqlpp::Query query = conn.query("SELECT peer_id FROM xbt_client_whitelist;"); - if(mysqlpp::StoreQueryResult res = query.store()) { - size_t num_rows = res.num_rows(); - for(size_t i = 0; i<num_rows; i++) { - whitelist.push_back(res[i][0].c_str()); - } - } + mysqlpp::Query query = conn.query("SELECT peer_id FROM xbt_client_whitelist;"); + if (mysqlpp::StoreQueryResult res = query.store()) { + size_t num_rows = res.num_rows(); + for (size_t i = 0; i<num_rows; i++) { + whitelist.push_back(res[i][0].c_str()); + } + } } void mysql::record_token(std::string &record) { - boost::mutex::scoped_lock lock(user_token_lock); - if (update_token_buffer != "") { - update_token_buffer += ","; - } - update_token_buffer += record; + if (update_token_buffer != "") { + update_token_buffer += ","; + } + update_token_buffer += record; } void mysql::record_user(std::string &record) { - boost::mutex::scoped_lock lock(user_buffer_lock); - if(update_user_buffer != "") { - update_user_buffer += ","; - } - update_user_buffer += record; + if (update_user_buffer != "") { + update_user_buffer += ","; + } + update_user_buffer += record; } + void mysql::record_torrent(std::string &record) { - boost::mutex::scoped_lock lock(torrent_buffer_lock); - if(update_torrent_buffer != "") { - update_torrent_buffer += ","; - } - update_torrent_buffer += record; + if (update_torrent_buffer != "") { + update_torrent_buffer += ","; + } + update_torrent_buffer += record; } + void mysql::record_peer(std::string &record, std::string &ip, std::string &peer_id, std::string &useragent) { - boost::mutex::scoped_lock lock(peer_buffer_lock); - if(update_peer_buffer != "") { - update_peer_buffer += ","; - } - mysqlpp::Query q = conn.query(); - q << record << mysqlpp::quote << ip << ',' << mysqlpp::quote << peer_id << ',' << mysqlpp::quote << useragent << "," << time(NULL) << ')'; - - update_peer_buffer += q.str(); -} + if (update_heavy_peer_buffer != "") { + update_heavy_peer_buffer += ","; + } + mysqlpp::Query q = conn.query(); + q << record << mysqlpp::quote << ip << ',' << mysqlpp::quote << peer_id << ',' << mysqlpp::quote << useragent << "," << time(NULL) << ')'; -void mysql::record_peer_hist(std::string &record, std::string &peer_id, int tid){ - boost::mutex::scoped_lock (peer_hist_buffer_lock); - if (update_peer_hist_buffer != "") { - update_peer_hist_buffer += ","; + update_heavy_peer_buffer += q.str(); +} +void mysql::record_peer(std::string &record, std::string &peer_id) { + if (update_light_peer_buffer != "") { + update_light_peer_buffer += ","; } mysqlpp::Query q = conn.query(); - q << record << ',' << mysqlpp::quote << peer_id << ',' << tid << ',' << time(NULL) << ')'; - update_peer_hist_buffer += q.str(); + q << record << mysqlpp::quote << peer_id << ',' << time(NULL) << ')'; + + update_light_peer_buffer += q.str(); } void mysql::record_snatch(std::string &record) { - boost::mutex::scoped_lock lock(mysql::snatch_buffer_lock); - if(update_snatch_buffer != "") { - update_snatch_buffer += ","; - } - update_snatch_buffer += record; + if (update_snatch_buffer != "") { + update_snatch_buffer += ","; + } + update_snatch_buffer += record; } bool mysql::all_clear() { - return (user_queue.size() == 0 && torrent_queue.size() == 0 && peer_queue.size() == 0 && snatch_queue.size() == 0 && token_queue.size() == 0 && peer_hist_queue.size() == 0); + return (user_queue.size() == 0 && torrent_queue.size() == 0 && peer_queue.size() == 0 && snatch_queue.size() == 0 && token_queue.size() == 0); } void mysql::flush() { @@ -182,13 +179,16 @@ void mysql::flush() { flush_torrents(); flush_snatches(); flush_peers(); - flush_peer_hist(); flush_tokens(); } void mysql::flush_users() { std::string sql; - boost::mutex::scoped_lock lock(user_buffer_lock); + boost::mutex::scoped_lock lock(user_queue_lock); + size_t qsize = user_queue.size(); + if (verbose_flush || qsize > 0) { + std::cout << "User flush queue size: " << qsize << std::endl; + } if (update_user_buffer == "") { return; } @@ -203,7 +203,11 @@ void mysql::flush_users() { void mysql::flush_torrents() { std::string sql; - boost::mutex::scoped_lock lock(torrent_buffer_lock); + boost::mutex::scoped_lock lock(torrent_queue_lock); + size_t qsize = torrent_queue.size(); + if (verbose_flush || qsize > 0) { + std::cout << "Torrent flush queue size: " << qsize << std::endl; + } if (update_torrent_buffer == "") { return; } @@ -223,7 +227,11 @@ void mysql::flush_torrents() { void mysql::flush_snatches() { std::string sql; - boost::mutex::scoped_lock lock(snatch_buffer_lock); + boost::mutex::scoped_lock lock(snatch_queue_lock); + size_t qsize = snatch_queue.size(); + if (verbose_flush || qsize > 0) { + std::cout << "Snatch flush queue size: " << qsize << std::endl; + } if (update_snatch_buffer == "" ) { return; } @@ -237,60 +245,62 @@ void mysql::flush_snatches() { void mysql::flush_peers() { std::string sql; - boost::mutex::scoped_lock lock(peer_buffer_lock); + boost::mutex::scoped_lock lock(peer_queue_lock); + size_t qsize = peer_queue.size(); + if (verbose_flush || qsize > 0) { + std::cout << "Peer flush queue size: " << qsize << std::endl; + } // because xfu inserts are slow and ram is not infinite we need to // limit this queue's size - if (peer_queue.size() >= 1000) { + if (qsize >= 1000) { peer_queue.pop(); } - if (update_peer_buffer == "") { + + // Nothing to do + if (update_light_peer_buffer == "" && update_heavy_peer_buffer == "") { return; } - - if (peer_queue.size() == 0) { + + if (qsize == 0) { sql = "SET session sql_log_bin = 0"; peer_queue.push(sql); sql.clear(); } - - sql = "INSERT INTO xbt_files_users (uid,fid,active,uploaded,downloaded,upspeed,downspeed,remaining,corrupt," + - std::string("timespent,announced,ip,peer_id,useragent,mtime) VALUES ") + update_peer_buffer + - " ON DUPLICATE KEY UPDATE active=VALUES(active), uploaded=VALUES(uploaded), " + - "downloaded=VALUES(downloaded), upspeed=VALUES(upspeed), " + - "downspeed=VALUES(downspeed), remaining=VALUES(remaining), " + - "corrupt=VALUES(corrupt), timespent=VALUES(timespent), " + - "announced=VALUES(announced), mtime=VALUES(mtime)"; - peer_queue.push(sql); - update_peer_buffer.clear(); - if (p_active == false) { - boost::thread thread(&mysql::do_flush_peers, this); - } -} -void mysql::flush_peer_hist() { - std::string sql; - boost::mutex::scoped_lock lock(peer_hist_buffer_lock); - if (update_peer_hist_buffer == "") { - return; + if (update_heavy_peer_buffer != "") { + sql = "INSERT INTO xbt_files_users (uid,fid,active,uploaded,downloaded,upspeed,downspeed,remaining,corrupt," + + std::string("timespent,announced,ip,peer_id,useragent,mtime) VALUES ") + update_heavy_peer_buffer + + " ON DUPLICATE KEY UPDATE active=VALUES(active), uploaded=VALUES(uploaded), " + + "downloaded=VALUES(downloaded), upspeed=VALUES(upspeed), " + + "downspeed=VALUES(downspeed), remaining=VALUES(remaining), " + + "corrupt=VALUES(corrupt), timespent=VALUES(timespent), " + + "announced=VALUES(announced), mtime=VALUES(mtime)"; + peer_queue.push(sql); + update_heavy_peer_buffer.clear(); + sql.clear(); } - - if (peer_hist_queue.size() == 0) { - sql = "SET session sql_log_bin = 0"; - peer_hist_queue.push(sql); + if (update_light_peer_buffer != "") { + sql = "INSERT INTO xbt_files_users (fid,timespent,announced,peer_id,mtime) VALUES " + + update_light_peer_buffer + + " ON DUPLICATE KEY UPDATE upspeed=0, downspeed=0, timespent=VALUES(timespent), " + + "announced=VALUES(announced), mtime=VALUES(mtime)"; + peer_queue.push(sql); + update_light_peer_buffer.clear(); sql.clear(); } - sql = "INSERT IGNORE INTO xbt_peers_history (uid, downloaded, remaining, uploaded, upspeed, downspeed, timespent, peer_id, fid, mtime) VALUES " + update_peer_hist_buffer; - peer_hist_queue.push(sql); - update_peer_hist_buffer.clear(); - if (hist_active == false) { - boost::thread thread(&mysql::do_flush_peer_hist, this); + if (p_active == false) { + boost::thread thread(&mysql::do_flush_peers, this); } } void mysql::flush_tokens() { std::string sql; - boost::mutex::scoped_lock lock(user_token_lock); + boost::mutex::scoped_lock lock(token_queue_lock); + size_t qsize = token_queue.size(); + if (verbose_flush || qsize > 0) { + std::cout << "Token flush queue size: " << qsize << std::endl; + } if (update_token_buffer == "") { return; } @@ -316,9 +326,8 @@ void mysql::do_flush_users() { sleep(3); continue; } else { - boost::mutex::scoped_lock lock(user_buffer_lock); + boost::mutex::scoped_lock lock(user_queue_lock); user_queue.pop(); - std::cout << "Users flushed (" << user_queue.size() << " remain)" << std::endl; } } catch (const mysqlpp::BadQuery &er) { @@ -357,9 +366,8 @@ void mysql::do_flush_torrents() { sleep(3); continue; } else { - boost::mutex::scoped_lock lock(torrent_buffer_lock); + boost::mutex::scoped_lock lock(torrent_queue_lock); torrent_queue.pop(); - std::cout << "Torrents flushed (" << torrent_queue.size() << " remain)" << std::endl; } } catch (const mysqlpp::BadQuery &er) { @@ -394,9 +402,8 @@ void mysql::do_flush_peers() { sleep(3); continue; } else { - boost::mutex::scoped_lock lock(peer_buffer_lock); + boost::mutex::scoped_lock lock(peer_queue_lock); peer_queue.pop(); - std::cout << "Peers flushed (" << peer_queue.size() << " remain)" << std::endl; } } catch (const mysqlpp::BadQuery &er) { @@ -418,43 +425,6 @@ void mysql::do_flush_peers() { p_active = false; } -void mysql::do_flush_peer_hist() { - hist_active = true; - try { - mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); - while (peer_hist_queue.size() > 0) { - try { - std::string sql = peer_hist_queue.front(); - mysqlpp::Query query = c.query(sql); - if (!query.exec()) { - std::cout << "Peer history flush failed (" << peer_hist_queue.size() << " remain)" << std::endl; - sleep(3); - continue; - } else { - boost::mutex::scoped_lock lock(peer_hist_buffer_lock); - peer_hist_queue.pop(); - std::cout << "Peer history flushed (" << peer_hist_queue.size() << " remain)" << std::endl; - } - } - catch (const mysqlpp::BadQuery &er) { - std::cerr << "Query error: " << er.what() << " in flush peer history with a qlength: " << peer_hist_queue.front().size() << " queue size: " << peer_hist_queue.size() << std::endl; - sleep(3); - continue; - } catch (const mysqlpp::Exception &er) { - std::cerr << "Query error: " << er.what() << " in flush peer history with a qlength: " << peer_hist_queue.front().size() << " queue size: " << peer_hist_queue.size() << std::endl; - sleep(3); - continue; - } - } - } - catch (const mysqlpp::Exception &er) { - std::cerr << "MySQL error in flush_peer_hist: " << er.what() << std::endl; - hist_active = false; - return; - } - hist_active = false; -} - void mysql::do_flush_snatches() { s_active = true; try { @@ -468,9 +438,8 @@ void mysql::do_flush_snatches() { sleep(3); continue; } else { - boost::mutex::scoped_lock lock(snatch_buffer_lock); + boost::mutex::scoped_lock lock(snatch_queue_lock); snatch_queue.pop(); - std::cout << "Snatches flushed (" << snatch_queue.size() << " remain)" << std::endl; } } catch (const mysqlpp::BadQuery &er) { @@ -505,9 +474,8 @@ void mysql::do_flush_tokens() { sleep(3); continue; } else { - boost::mutex::scoped_lock lock(user_token_lock); + boost::mutex::scoped_lock lock(token_queue_lock); token_queue.pop(); - std::cout << "Tokens flushed (" << token_queue.size() << " remain)" << std::endl; } } catch (const mysqlpp::BadQuery &er) { |