summaryrefslogtreecommitdiffstats
path: root/src/db.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/db.cpp')
-rw-r--r--src/db.cpp286
1 files changed, 201 insertions, 85 deletions
diff --git a/src/db.cpp b/src/db.cpp
index ab1d6be..8cbeec8 100644
--- a/src/db.cpp
+++ b/src/db.cpp
@@ -2,32 +2,52 @@
#include "db.h"
#include "user.h"
#include "misc_functions.h"
+#include "config.h"
#include <string>
#include <iostream>
#include <queue>
#include <unistd.h>
-#include <time.h>
+#include <ctime>
#include <mutex>
#include <thread>
+#include <unordered_set>
#define DB_LOCK_TIMEOUT 50
-mysql::mysql(std::string mysql_db, std::string mysql_host, std::string username, std::string password) :
- db(mysql_db), server(mysql_host), db_user(username), pw(password),
- u_active(false), t_active(false), p_active(false), s_active(false), tok_active(false)
-{
+mysql::mysql(config * conf) : u_active(false), t_active(false), p_active(false), s_active(false), tok_active(false) {
+ load_config(conf);
+ if (mysql_db == "") {
+ std::cout << "No database selected" << std::endl;
+ return;
+ }
+
try {
- conn.connect(mysql_db.c_str(), mysql_host.c_str(), username.c_str(), password.c_str(), 0);
+ mysqlpp::ReconnectOption reconnect(true);
+ conn.set_option(&reconnect);
+ conn.connect(mysql_db.c_str(), mysql_host.c_str(), mysql_username.c_str(), mysql_password.c_str(), 0);
} catch (const mysqlpp::Exception &er) {
std::cout << "Failed to connect to MySQL (" << er.what() << ')' << std::endl;
return;
}
- std::cout << "Connected to MySQL" << std::endl;
- std::cout << "Clearing xbt_files_users and resetting peer counts...";
- std::cout.flush();
- clear_peer_data();
- std::cout << "done" << std::endl;
+ if (!readonly) {
+ std::cout << "Clearing xbt_files_users and resetting peer counts...";
+ std::cout.flush();
+ clear_peer_data();
+ std::cout << "done" << std::endl;
+ }
+}
+
+void mysql::load_config(config * conf) {
+ mysql_db = conf->get_str("mysql_db");
+ mysql_host = conf->get_str("mysql_host");
+ mysql_username = conf->get_str("mysql_username");
+ mysql_password = conf->get_str("mysql_password");
+ readonly = conf->get_bool("readonly");
+}
+
+void mysql::reload_config(config * conf) {
+ load_config(conf);
}
bool mysql::connected() {
@@ -45,104 +65,195 @@ void mysql::clear_peer_data() {
std::cerr << "Unable to reset seeder and leecher count!" << std::endl;
}
} catch (const mysqlpp::BadQuery &er) {
- std::cerr << "Query error: " << er.what() << " in clear_peer_data" << std::endl;
+ std::cerr << "Query error in clear_peer_data: " << er.what() << std::endl;
} catch (const mysqlpp::Exception &er) {
- std::cerr << "Query error: " << er.what() << " in clear_peer_data" << std::endl;
+ std::cerr << "Query error in clear_peer_data: " << er.what() << std::endl;
}
}
void mysql::load_torrents(torrent_list &torrents) {
mysqlpp::Query query = conn.query("SELECT ID, info_hash, freetorrent, Snatched FROM torrents ORDER BY ID;");
- if (mysqlpp::StoreQueryResult res = query.store()) {
- mysqlpp::String one("1"); // Hack to get around bug in mysql++3.0.0
- mysqlpp::String two("2");
+ try {
+ mysqlpp::StoreQueryResult res = query.store();
+ std::unordered_set<std::string> cur_keys;
size_t num_rows = res.num_rows();
+ std::lock_guard<std::mutex> tl_lock(torrent_list_mutex);
+ if (torrents.size() == 0) {
+ torrents.reserve(num_rows * 1.05); // Reserve 5% extra space to prevent rehashing
+ } else {
+ // Create set with all currently known info hashes to remove nonexistent ones later
+ cur_keys.reserve(torrents.size());
+ for (auto const &it: torrents) {
+ cur_keys.insert(it.first);
+ }
+ }
for (size_t i = 0; i < num_rows; i++) {
std::string info_hash;
res[i][1].to_string(info_hash);
-
- torrent t;
- t.id = res[i][0];
- if (res[i][2].compare(one) == 0) {
- t.free_torrent = FREE;
- } else if (res[i][2].compare(two) == 0) {
- t.free_torrent = NEUTRAL;
+ if (info_hash == "") {
+ continue;
+ }
+ mysqlpp::sql_enum free_torrent(res[i][2]);
+
+ torrent tmp_tor;
+ auto it = torrents.insert(std::pair<std::string, torrent>(info_hash, tmp_tor));
+ torrent &tor = (it.first)->second;
+ if (it.second) {
+ tor.id = res[i][0];
+ tor.balance = 0;
+ tor.completed = res[i][3];
+ tor.last_selected_seeder = "";
+ } else {
+ tor.tokened_users.clear();
+ cur_keys.erase(info_hash);
+ }
+ if (free_torrent == "1") {
+ tor.free_torrent = FREE;
+ } else if (free_torrent == "2") {
+ tor.free_torrent = NEUTRAL;
} else {
- t.free_torrent = NORMAL;
+ tor.free_torrent = NORMAL;
}
- t.balance = 0;
- t.completed = res[i][3];
- t.last_selected_seeder = "";
- torrents[info_hash] = t;
}
+ for (auto const &info_hash: cur_keys) {
+ // Remove tracked torrents that weren't found in the database
+ auto it = torrents.find(info_hash);
+ if (it != torrents.end()) {
+ torrent &tor = it->second;
+ stats.leechers -= tor.leechers.size();
+ stats.seeders -= tor.seeders.size();
+ for (auto &p: tor.leechers) {
+ p.second.user->decr_leeching();
+ }
+ for (auto &p: tor.seeders) {
+ p.second.user->decr_seeding();
+ }
+ torrents.erase(it);
+ }
+ }
+ } catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error in load_torrents: " << er.what() << std::endl;
+ return;
}
+ std::cout << "Loaded " << torrents.size() << " torrents" << std::endl;
+ load_tokens(torrents);
}
void mysql::load_users(user_list &users) {
- mysqlpp::Query query = conn.query("SELECT ID, can_leech, torrent_pass, visible FROM users_main WHERE Enabled='1';");
- if (mysqlpp::StoreQueryResult res = query.store()) {
+ mysqlpp::Query query = conn.query("SELECT ID, can_leech, torrent_pass, (Visible='0' OR IP='127.0.0.1') AS Protected FROM users_main WHERE Enabled='1';");
+ try {
+ mysqlpp::StoreQueryResult res = query.store();
size_t num_rows = res.num_rows();
+ std::unordered_set<std::string> cur_keys;
+ std::lock_guard<std::mutex> ul_lock(user_list_mutex);
+ if (users.size() == 0) {
+ users.reserve(num_rows * 1.05); // Reserve 5% extra space to prevent rehashing
+ } else {
+ // Create set with all currently known user keys to remove nonexistent ones later
+ cur_keys.reserve(users.size());
+ for (auto const &it: users) {
+ cur_keys.insert(it.first);
+ }
+ }
for (size_t i = 0; i < num_rows; i++) {
- std::string passkey;
- res[i][2].to_string(passkey);
- bool protect_ip = res[i][3].compare("1") != 0;
-
- user_ptr u(new user(res[i][0], res[i][1], protect_ip));
- users.insert(std::pair<std::string, user_ptr>(passkey, u));
+ std::string passkey(res[i][2]);
+ bool protect_ip = res[i][3];
+ user_ptr tmp_user = std::make_shared<user>(res[i][0], res[i][1], protect_ip);
+ auto it = users.insert(std::pair<std::string, user_ptr>(passkey, tmp_user));
+ if (!it.second) {
+ user_ptr &u = (it.first)->second;
+ u->set_leechstatus(res[i][1]);
+ u->set_protected(protect_ip);
+ u->set_deleted(false);
+ cur_keys.erase(passkey);
+ }
+ }
+ for (auto const &passkey: cur_keys) {
+ // Remove users that weren't found in the database
+ auto it = users.find(passkey);
+ if (it != users.end()) {
+ it->second->set_deleted(true);
+ users.erase(it);
+ }
}
+ } catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error in load_users: " << er.what() << std::endl;
+ return;
}
+ std::cout << "Loaded " << users.size() << " users" << std::endl;
}
void mysql::load_tokens(torrent_list &torrents) {
mysqlpp::Query query = conn.query("SELECT uf.UserID, t.info_hash FROM users_freeleeches AS uf JOIN torrents AS t ON t.ID = uf.TorrentID WHERE uf.Expired = '0';");
- if (mysqlpp::StoreQueryResult res = query.store()) {
+ int token_count = 0;
+ try {
+ mysqlpp::StoreQueryResult res = query.store();
size_t num_rows = res.num_rows();
+ std::lock_guard<std::mutex> tl_lock(torrent_list_mutex);
for (size_t i = 0; i < num_rows; i++) {
std::string info_hash;
res[i][1].to_string(info_hash);
- torrent_list::iterator it = torrents.find(info_hash);
+ auto it = torrents.find(info_hash);
if (it != torrents.end()) {
torrent &tor = it->second;
tor.tokened_users.insert(res[i][0]);
+ ++token_count;
}
}
+ } catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error in load_tokens: " << er.what() << std::endl;
+ return;
}
+ std::cout << "Loaded " << token_count << " tokens" << std::endl;
}
void mysql::load_whitelist(std::vector<std::string> &whitelist) {
mysqlpp::Query query = conn.query("SELECT peer_id FROM xbt_client_whitelist;");
- if (mysqlpp::StoreQueryResult res = query.store()) {
+ try {
+ mysqlpp::StoreQueryResult res = query.store();
size_t num_rows = res.num_rows();
+ std::lock_guard<std::mutex> wl_lock(whitelist_mutex);
+ whitelist.clear();
for (size_t i = 0; i<num_rows; i++) {
- whitelist.push_back(res[i][0].c_str());
+ std::string peer_id;
+ res[i][0].to_string(peer_id);
+ whitelist.push_back(peer_id);
}
+ } catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error in load_whitelist: " << er.what() << std::endl;
+ return;
+ }
+ if (whitelist.size() == 0) {
+ std::cout << "Assuming no whitelist desired, disabling" << std::endl;
+ } else {
+ std::cout << "Loaded " << whitelist.size() << " clients into the whitelist" << std::endl;
}
}
-void mysql::record_token(std::string &record) {
+void mysql::record_token(const std::string &record) {
if (update_token_buffer != "") {
update_token_buffer += ",";
}
update_token_buffer += record;
}
-void mysql::record_user(std::string &record) {
+void mysql::record_user(const std::string &record) {
if (update_user_buffer != "") {
update_user_buffer += ",";
}
update_user_buffer += record;
}
-void mysql::record_torrent(std::string &record) {
- std::unique_lock<std::mutex> tb_lock(torrent_buffer_lock);
+void mysql::record_torrent(const std::string &record) {
+ std::lock_guard<std::mutex> tb_lock(torrent_buffer_lock);
if (update_torrent_buffer != "") {
update_torrent_buffer += ",";
}
update_torrent_buffer += record;
}
-void mysql::record_peer(std::string &record, std::string &ip, std::string &peer_id, std::string &useragent) {
+void mysql::record_peer(const std::string &record, const std::string &ip, const std::string &peer_id, const std::string &useragent) {
if (update_heavy_peer_buffer != "") {
update_heavy_peer_buffer += ",";
}
@@ -151,7 +262,7 @@ void mysql::record_peer(std::string &record, std::string &ip, std::string &peer_
update_heavy_peer_buffer += q.str();
}
-void mysql::record_peer(std::string &record, std::string &peer_id) {
+void mysql::record_peer(const std::string &record, const std::string &peer_id) {
if (update_light_peer_buffer != "") {
update_light_peer_buffer += ",";
}
@@ -161,7 +272,7 @@ void mysql::record_peer(std::string &record, std::string &peer_id) {
update_light_peer_buffer += q.str();
}
-void mysql::record_snatch(std::string &record, std::string &ip) {
+void mysql::record_snatch(const std::string &record, const std::string &ip) {
if (update_snatch_buffer != "") {
update_snatch_buffer += ",";
}
@@ -183,11 +294,15 @@ void mysql::flush() {
}
void mysql::flush_users() {
+ if (readonly) {
+ update_user_buffer.clear();
+ return;
+ }
std::string sql;
- std::unique_lock<std::mutex> uq_lock(user_queue_lock);
+ std::lock_guard<std::mutex> uq_lock(user_queue_lock);
size_t qsize = user_queue.size();
if (verbose_flush || qsize > 0) {
- std::cout << "User flush queue size: " << qsize << std::endl;
+ std::cout << "User flush queue size: " << qsize << ", next query length: " << user_queue.front().size() << std::endl;
}
if (update_user_buffer == "") {
return;
@@ -203,12 +318,16 @@ void mysql::flush_users() {
}
void mysql::flush_torrents() {
+ std::lock_guard<std::mutex> tb_lock(torrent_buffer_lock);
+ if (readonly) {
+ update_torrent_buffer.clear();
+ return;
+ }
std::string sql;
- std::unique_lock<std::mutex> tq_lock(torrent_queue_lock);
- std::unique_lock<std::mutex> tb_lock(torrent_buffer_lock);
+ std::lock_guard<std::mutex> tq_lock(torrent_queue_lock);
size_t qsize = torrent_queue.size();
if (verbose_flush || qsize > 0) {
- std::cout << "Torrent flush queue size: " << qsize << std::endl;
+ std::cout << "Torrent flush queue size: " << qsize << ", next query length: " << torrent_queue.front().size() << std::endl;
}
if (update_torrent_buffer == "") {
return;
@@ -229,11 +348,15 @@ void mysql::flush_torrents() {
}
void mysql::flush_snatches() {
+ if (readonly) {
+ update_snatch_buffer.clear();
+ return;
+ }
std::string sql;
- std::unique_lock<std::mutex> sq_lock(snatch_queue_lock);
+ std::lock_guard<std::mutex> sq_lock(snatch_queue_lock);
size_t qsize = snatch_queue.size();
if (verbose_flush || qsize > 0) {
- std::cout << "Snatch flush queue size: " << qsize << std::endl;
+ std::cout << "Snatch flush queue size: " << qsize << ", next query length: " << snatch_queue.front().size() << std::endl;
}
if (update_snatch_buffer == "" ) {
return;
@@ -248,11 +371,16 @@ void mysql::flush_snatches() {
}
void mysql::flush_peers() {
+ if (readonly) {
+ update_light_peer_buffer.clear();
+ update_heavy_peer_buffer.clear();
+ return;
+ }
std::string sql;
- std::unique_lock<std::mutex> pq_lock(peer_queue_lock);
+ std::lock_guard<std::mutex> pq_lock(peer_queue_lock);
size_t qsize = peer_queue.size();
if (verbose_flush || qsize > 0) {
- std::cout << "Peer flush queue size: " << qsize << std::endl;
+ std::cout << "Peer flush queue size: " << qsize << ", next query length: " << peer_queue.front().size() << std::endl;
}
// Nothing to do
@@ -260,12 +388,6 @@ void mysql::flush_peers() {
return;
}
- if (qsize == 0) {
- sql = "SET session sql_log_bin = 0";
- peer_queue.push(sql);
- sql.clear();
- }
-
if (update_heavy_peer_buffer != "") {
// Because xfu inserts are slow and ram is not infinite we need to
// limit this queue's size
@@ -290,7 +412,7 @@ void mysql::flush_peers() {
if (qsize >= 1000) {
peer_queue.pop();
}
- sql = "INSERT INTO xbt_files_users (fid,timespent,announced,peer_id,mtime) VALUES " +
+ sql = "INSERT INTO xbt_files_users (uid,fid,timespent,announced,peer_id,mtime) VALUES " +
update_light_peer_buffer +
" ON DUPLICATE KEY UPDATE upspeed=0, downspeed=0, timespent=VALUES(timespent), " +
"announced=VALUES(announced), mtime=VALUES(mtime)";
@@ -306,11 +428,15 @@ void mysql::flush_peers() {
}
void mysql::flush_tokens() {
+ if (readonly) {
+ update_token_buffer.clear();
+ return;
+ }
std::string sql;
- std::unique_lock<std::mutex> tq_lock(token_queue_lock);
+ std::lock_guard<std::mutex> tq_lock(token_queue_lock);
size_t qsize = token_queue.size();
if (verbose_flush || qsize > 0) {
- std::cout << "Token flush queue size: " << qsize << std::endl;
+ std::cout << "Token flush queue size: " << qsize << ", next query length: " << token_queue.front().size() << std::endl;
}
if (update_token_buffer == "") {
return;
@@ -328,7 +454,7 @@ void mysql::flush_tokens() {
void mysql::do_flush_users() {
u_active = true;
try {
- mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ mysqlpp::Connection c(mysql_db.c_str(), mysql_host.c_str(), mysql_username.c_str(), mysql_password.c_str(), 0);
while (user_queue.size() > 0) {
try {
std::string sql = user_queue.front();
@@ -338,7 +464,7 @@ void mysql::do_flush_users() {
sleep(3);
continue;
} else {
- std::unique_lock<std::mutex> uq_lock(user_queue_lock);
+ std::lock_guard<std::mutex> uq_lock(user_queue_lock);
user_queue.pop();
}
}
@@ -355,8 +481,6 @@ void mysql::do_flush_users() {
}
catch (const mysqlpp::Exception &er) {
std::cerr << "MySQL error in flush_users: " << er.what() << std::endl;
- u_active = false;
- return;
}
u_active = false;
}
@@ -364,7 +488,7 @@ void mysql::do_flush_users() {
void mysql::do_flush_torrents() {
t_active = true;
try {
- mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ mysqlpp::Connection c(mysql_db.c_str(), mysql_host.c_str(), mysql_username.c_str(), mysql_password.c_str(), 0);
while (torrent_queue.size() > 0) {
try {
std::string sql = torrent_queue.front();
@@ -378,7 +502,7 @@ void mysql::do_flush_torrents() {
sleep(3);
continue;
} else {
- std::unique_lock<std::mutex> tq_lock(torrent_queue_lock);
+ std::lock_guard<std::mutex> tq_lock(torrent_queue_lock);
torrent_queue.pop();
}
}
@@ -395,8 +519,6 @@ void mysql::do_flush_torrents() {
}
catch (const mysqlpp::Exception &er) {
std::cerr << "MySQL error in flush_torrents: " << er.what() << std::endl;
- t_active = false;
- return;
}
t_active = false;
}
@@ -404,7 +526,7 @@ void mysql::do_flush_torrents() {
void mysql::do_flush_peers() {
p_active = true;
try {
- mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ mysqlpp::Connection c(mysql_db.c_str(), mysql_host.c_str(), mysql_username.c_str(), mysql_password.c_str(), 0);
while (peer_queue.size() > 0) {
try {
std::string sql = peer_queue.front();
@@ -414,7 +536,7 @@ void mysql::do_flush_peers() {
sleep(3);
continue;
} else {
- std::unique_lock<std::mutex> pq_lock(peer_queue_lock);
+ std::lock_guard<std::mutex> pq_lock(peer_queue_lock);
peer_queue.pop();
}
}
@@ -431,8 +553,6 @@ void mysql::do_flush_peers() {
}
catch (const mysqlpp::Exception &er) {
std::cerr << "MySQL error in flush_peers: " << er.what() << std::endl;
- p_active = false;
- return;
}
p_active = false;
}
@@ -440,7 +560,7 @@ void mysql::do_flush_peers() {
void mysql::do_flush_snatches() {
s_active = true;
try {
- mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ mysqlpp::Connection c(mysql_db.c_str(), mysql_host.c_str(), mysql_username.c_str(), mysql_password.c_str(), 0);
while (snatch_queue.size() > 0) {
try {
std::string sql = snatch_queue.front();
@@ -450,7 +570,7 @@ void mysql::do_flush_snatches() {
sleep(3);
continue;
} else {
- std::unique_lock<std::mutex> sq_lock(snatch_queue_lock);
+ std::lock_guard<std::mutex> sq_lock(snatch_queue_lock);
snatch_queue.pop();
}
}
@@ -467,8 +587,6 @@ void mysql::do_flush_snatches() {
}
catch (const mysqlpp::Exception &er) {
std::cerr << "MySQL error in flush_snatches: " << er.what() << std::endl;
- s_active = false;
- return;
}
s_active = false;
}
@@ -476,7 +594,7 @@ void mysql::do_flush_snatches() {
void mysql::do_flush_tokens() {
tok_active = true;
try {
- mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ mysqlpp::Connection c(mysql_db.c_str(), mysql_host.c_str(), mysql_username.c_str(), mysql_password.c_str(), 0);
while (token_queue.size() > 0) {
try {
std::string sql = token_queue.front();
@@ -486,7 +604,7 @@ void mysql::do_flush_tokens() {
sleep(3);
continue;
} else {
- std::unique_lock<std::mutex> tq_lock(token_queue_lock);
+ std::lock_guard<std::mutex> tq_lock(token_queue_lock);
token_queue.pop();
}
}
@@ -503,8 +621,6 @@ void mysql::do_flush_tokens() {
}
catch (const mysqlpp::Exception &er) {
std::cerr << "MySQL error in flush_tokens: " << er.what() << std::endl;
- tok_active = false;
- return;
}
tok_active = false;
}