diff options
Diffstat (limited to 'src/worker.cpp')
-rw-r--r-- | src/worker.cpp | 303 |
1 files changed, 182 insertions, 121 deletions
diff --git a/src/worker.cpp b/src/worker.cpp index eee1c7e..49dbd2c 100644 --- a/src/worker.cpp +++ b/src/worker.cpp @@ -1,7 +1,4 @@ -#include <cmath> -#include <cstdlib> #include <iostream> -#include <fstream> #include <string> #include <map> #include <sstream> @@ -12,9 +9,6 @@ #include <mutex> #include <thread> -#include <netinet/in.h> -#include <arpa/inet.h> - #include "ocelot.h" #include "config.h" #include "db.h" @@ -26,11 +20,35 @@ #include "user.h" //---------- Worker - does stuff with input -worker::worker(torrent_list &torrents, user_list &users, std::vector<std::string> &_whitelist, config * conf_obj, mysql * db_obj, site_comm * sc) : torrents_list(torrents), users_list(users), whitelist(_whitelist), conf(conf_obj), db(db_obj), s_comm(sc) +worker::worker(config * conf_obj, torrent_list &torrents, user_list &users, std::vector<std::string> &_whitelist, mysql * db_obj, site_comm * sc) : + conf(conf_obj), db(db_obj), s_comm(sc), torrents_list(torrents), users_list(users), whitelist(_whitelist), status(OPEN), reaper_active(false) { + load_config(conf); +} + +void worker::load_config(config * conf) { + announce_interval = conf->get_uint("announce_interval"); + del_reason_lifetime = conf->get_uint("del_reason_lifetime"); + peers_timeout = conf->get_uint("peers_timeout"); + numwant_limit = conf->get_uint("numwant_limit"); + keepalive_enabled = conf->get_uint("keepalive_timeout") != 0; + site_password = conf->get_str("site_password"); + report_password = conf->get_str("report_password"); +} + +void worker::reload_config(config * conf) { + load_config(conf); +} + +void worker::reload_lists() { + status = PAUSED; + db->load_torrents(torrents_list); + db->load_users(users_list); + db->load_whitelist(whitelist); status = OPEN; } -bool worker::signal(int sig) { + +bool worker::shutdown() { if (status == OPEN) { status = CLOSING; std::cout << "closing tracker... press Ctrl-C again to terminate" << std::endl; @@ -42,12 +60,13 @@ bool worker::signal(int sig) { return false; } } -std::string worker::work(std::string &input, std::string &ip) { + +std::string worker::work(const std::string &input, std::string &ip, client_opts_t &client_opts) { unsigned int input_length = input.length(); //---------- Parse request - ugly but fast. Using substr exploded. if (input_length < 60) { // Way too short to be anything useful - return error("GET string too short"); + return error("GET string too short", client_opts); } size_t pos = 5; // skip 'GET /' @@ -56,7 +75,7 @@ std::string worker::work(std::string &input, std::string &ip) { std::string passkey; passkey.reserve(32); if (input[37] != '/') { - return error("Malformed announce"); + return error("Malformed announce", client_opts); } for (; pos < 37; pos++) { @@ -71,7 +90,6 @@ std::string worker::work(std::string &input, std::string &ip) { }; action_t action = INVALID; - std::unique_lock<std::mutex> lock(stats.mutex); switch (input[pos]) { case 'a': stats.announcements++; @@ -92,19 +110,11 @@ std::string worker::work(std::string &input, std::string &ip) { pos += 6; break; } - lock.unlock(); if (input[pos] != '?') { // No parameters given. Probably means we're not talking to a torrent client - return response("Nothing to see here", false, true); - } - - if (status != OPEN && action != UPDATE) { - return error("The tracker is temporarily unavailable."); - } - - if (action == INVALID) { - return error("Invalid action"); + client_opts.html = true; + return response("Nothing to see here", client_opts); } // Parse URL params @@ -115,7 +125,7 @@ std::string worker::work(std::string &input, std::string &ip) { std::string value; bool parsing_key = true; // true = key, false = value - pos++; // Skip the '?' + ++pos; // Skip the '?' for (; pos < input_length; ++pos) { if (input[pos] == '=') { parsing_key = false; @@ -139,8 +149,19 @@ std::string worker::work(std::string &input, std::string &ip) { } } } + ++pos; + + if (input.compare(pos, 5, "HTTP/") != 0) { + return error("Malformed HTTP request", client_opts); + } - pos += 10; // skip 'HTTP/1.1' - should probably be +=11, but just in case a client doesn't send \r + std::string http_version; + pos += 5; + while (input[pos] != '\r' && input[pos] != '\n') { + http_version.push_back(input[pos]); + ++pos; + } + ++pos; // skip line break - should probably be += 2, but just in case a client doesn't send \r // Parse headers params_type headers; @@ -171,66 +192,88 @@ std::string worker::work(std::string &input, std::string &ip) { } } + if (keepalive_enabled) { + auto hdr_http_close = headers.find("connection"); + if (hdr_http_close == headers.end()) { + client_opts.http_close = (http_version == "1.0"); + } else { + client_opts.http_close = (hdr_http_close->second != "Keep-Alive"); + } + } else { + client_opts.http_close = true; + } + + if (status != OPEN) { + return error("The tracker is temporarily unavailable.", client_opts); + } + + if (action == INVALID) { + return error("Invalid action", client_opts); + } + if (action == UPDATE) { - if (passkey == conf->site_password) { - return update(params); + if (passkey == site_password) { + return update(params, client_opts); } else { - return error("Authentication failure"); + return error("Authentication failure", client_opts); } } if (action == REPORT) { - if (passkey == conf->report_password) { - return report(params, users_list); + if (passkey == report_password) { + std::lock_guard<std::mutex> ul_lock(db->user_list_mutex); + return report(params, users_list, client_opts); } else { - return error("Authentication failure"); + return error("Authentication failure", client_opts); } } // Either a scrape or an announce - user_list::iterator u = users_list.find(passkey); - if (u == users_list.end()) { - return error("Passkey not found"); + std::unique_lock<std::mutex> ul_lock(db->user_list_mutex); + auto user_it = users_list.find(passkey); + if (user_it == users_list.end()) { + return error("Passkey not found", client_opts); } + user_ptr u = user_it->second; + ul_lock.unlock(); if (action == ANNOUNCE) { - std::unique_lock<std::mutex> tl_lock(db->torrent_list_mutex); // Let's translate the infohash into something nice // info_hash is a url encoded (hex) base 20 number std::string info_hash_decoded = hex_decode(params["info_hash"]); - torrent_list::iterator tor = torrents_list.find(info_hash_decoded); + std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex); + auto tor = torrents_list.find(info_hash_decoded); if (tor == torrents_list.end()) { - std::unique_lock<std::mutex> dr_lock(del_reasons_lock); + std::lock_guard<std::mutex> dr_lock(del_reasons_lock); auto msg = del_reasons.find(info_hash_decoded); if (msg != del_reasons.end()) { if (msg->second.reason != -1) { - return error("Unregistered torrent: " + get_del_reason(msg->second.reason)); + return error("Unregistered torrent: " + get_del_reason(msg->second.reason), client_opts); } else { - return error("Unregistered torrent"); + return error("Unregistered torrent", client_opts); } } else { - return error("Unregistered torrent"); + return error("Unregistered torrent", client_opts); } } - return announce(tor->second, u->second, params, headers, ip); + return announce(input, tor->second, u, params, headers, ip, client_opts); } else { - return scrape(infohashes, headers); + return scrape(infohashes, headers, client_opts); } } -std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, params_type &headers, std::string &ip) { +std::string worker::announce(const std::string &input, torrent &tor, user_ptr &u, params_type ¶ms, params_type &headers, std::string &ip, client_opts_t &client_opts) { cur_time = time(NULL); if (params["compact"] != "1") { - return error("Your client does not support compact announces"); + return error("Your client does not support compact announces", client_opts); } - bool gzip = false; - int64_t left = std::max((int64_t)0, strtolonglong(params["left"])); - int64_t uploaded = std::max((int64_t)0, strtolonglong(params["uploaded"])); - int64_t downloaded = std::max((int64_t)0, strtolonglong(params["downloaded"])); - int64_t corrupt = std::max((int64_t)0, strtolonglong(params["corrupt"])); + int64_t left = std::max((int64_t)0, strtoint64(params["left"])); + int64_t uploaded = std::max((int64_t)0, strtoint64(params["uploaded"])); + int64_t downloaded = std::max((int64_t)0, strtoint64(params["downloaded"])); + int64_t corrupt = std::max((int64_t)0, strtoint64(params["corrupt"])); int snatched = 0; // This is the value that gets sent to the database on a snatch int active = 1; // This is the value that marks a peer as active/inactive in the database @@ -242,26 +285,37 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par bool peer_changed = false; // Whether or not the peer is new or has changed since the last announcement bool invalid_ip = false; bool inc_l = false, inc_s = false, dec_l = false, dec_s = false; + userid_t userid = u->get_id(); params_type::const_iterator peer_id_iterator = params.find("peer_id"); if (peer_id_iterator == params.end()) { - return error("No peer ID"); + return error("No peer ID", client_opts); + } + const std::string peer_id = hex_decode(peer_id_iterator->second); + if (peer_id.length() != 20) { + return error("Invalid peer ID", client_opts); } - std::string peer_id = peer_id_iterator->second; - peer_id = hex_decode(peer_id); + std::unique_lock<std::mutex> wl_lock(db->whitelist_mutex); if (whitelist.size() > 0) { bool found = false; // Found client in whitelist? for (unsigned int i = 0; i < whitelist.size(); i++) { - if (peer_id.find(whitelist[i]) == 0) { + if (peer_id.compare(0, whitelist[i].length(), whitelist[i]) == 0) { found = true; break; } } if (!found) { - return error("Your client is not on the whitelist"); + return error("Your client is not on the whitelist", client_opts); } } + wl_lock.unlock(); + + std::stringstream peer_key_stream; + peer_key_stream << peer_id[12 + (tor.id & 7)] // "Randomize" the element order in the peer map by prefixing with a peer id byte + << userid // Include user id in the key to lower chance of peer id collisions + << peer_id; + const std::string peer_key(peer_key_stream.str()); if (params["event"] == "completed") { // Don't update <snatched> here as we may decide to use other conditions later on @@ -272,45 +326,44 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par update_torrent = true; active = 0; } - int userid = u->get_id(); peer * p; peer_list::iterator peer_it; // Insert/find the peer in the torrent list if (left > 0) { - peer_it = tor.leechers.find(peer_id); + peer_it = tor.leechers.find(peer_key); if (peer_it == tor.leechers.end()) { // We could search the seed list as well, but the peer reaper will sort things out eventually - peer_it = add_peer(tor.leechers, peer_id); + peer_it = add_peer(tor.leechers, peer_key); inserted = true; inc_l = true; } } else if (completed_torrent) { - peer_it = tor.leechers.find(peer_id); + peer_it = tor.leechers.find(peer_key); if (peer_it == tor.leechers.end()) { - peer_it = tor.seeders.find(peer_id); + peer_it = tor.seeders.find(peer_key); if (peer_it == tor.seeders.end()) { - peer_it = add_peer(tor.seeders, peer_id); + peer_it = add_peer(tor.seeders, peer_key); inserted = true; inc_s = true; } else { completed_torrent = false; } - } else if (tor.seeders.find(peer_id) != tor.seeders.end()) { + } else if (tor.seeders.find(peer_key) != tor.seeders.end()) { // If the peer exists in both peer lists, just decrement the seed count. // Should be cheaper than searching the seed list in the left > 0 case dec_s = true; } } else { - peer_it = tor.seeders.find(peer_id); + peer_it = tor.seeders.find(peer_key); if (peer_it == tor.seeders.end()) { - peer_it = tor.leechers.find(peer_id); + peer_it = tor.leechers.find(peer_key); if (peer_it == tor.leechers.end()) { - peer_it = add_peer(tor.seeders, peer_id); + peer_it = add_peer(tor.seeders, peer_key); inserted = true; } else { p = &peer_it->second; std::pair<peer_list::iterator, bool> insert - = tor.seeders.insert(std::pair<std::string, peer>(peer_id, *p)); + = tor.seeders.insert(std::pair<std::string, peer>(peer_key, *p)); tor.leechers.erase(peer_it); peer_it = insert.first; peer_changed = true; @@ -373,7 +426,7 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par upspeed = uploaded_change / (cur_time - p->last_announced); downspeed = downloaded_change / (cur_time - p->last_announced); } - std::set<int>::iterator sit = tor.tokened_users.find(userid); + auto sit = tor.tokened_users.find(userid); if (tor.free_torrent == NEUTRAL) { downloaded_change = 0; uploaded_change = 0; @@ -415,7 +468,7 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par } } - unsigned int port = strtolong(params["port"]); + uint16_t port = strtoint32(params["port"]) & 0xFFFF; // Generate compact ip/port string if (inserted || port != p->port || ip != p->ip) { p->port = port; @@ -464,18 +517,18 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par } db->record_peer(record_str, record_ip, peer_id, headers["user-agent"]); } else { - record << '(' << tor.id << ',' << (cur_time - p->first_announced) << ',' << p->announces << ','; + record << '(' << userid << ',' << tor.id << ',' << (cur_time - p->first_announced) << ',' << p->announces << ','; std::string record_str = record.str(); db->record_peer(record_str, peer_id); } // Select peers! - unsigned int numwant; + uint32_t numwant; params_type::const_iterator param_numwant = params.find("numwant"); if (param_numwant == params.end()) { - numwant = 50; + numwant = numwant_limit; } else { - numwant = std::min(50l, strtolong(param_numwant->second)); + numwant = std::min((int32_t)numwant_limit, strtoint32(param_numwant->second)); } if (stopped_torrent) { @@ -504,7 +557,7 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par // User is a seeder now! if (!inserted) { std::pair<peer_list::iterator, bool> insert - = tor.seeders.insert(std::pair<std::string, peer>(peer_id, *p)); + = tor.seeders.insert(std::pair<std::string, peer>(peer_key, *p)); tor.leechers.erase(peer_it); peer_it = insert.first; p = &peer_it->second; @@ -555,7 +608,7 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par i = tor.seeders.begin(); } // Don't show users themselves - if (i->second.user->get_id() == userid || !i->second.visible) { + if (i->second.user->is_deleted() || i->second.user->get_id() == userid || !i->second.visible) { ++i; continue; } @@ -569,7 +622,7 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par if (found_peers < numwant && tor.leechers.size() > 1) { for (peer_list::const_iterator i = tor.leechers.begin(); i != tor.leechers.end() && found_peers < numwant; ++i) { // Don't show users themselves or leech disabled users - if (i->second.ip_port == p->ip_port || i->second.user->get_id() == userid || !i->second.visible) { + if (i->second.user->is_deleted() || i->second.ip_port == p->ip_port || i->second.user->get_id() == userid || !i->second.visible) { continue; } found_peers++; @@ -590,10 +643,8 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par } // Update the stats - std::unique_lock<std::mutex> lock(stats.mutex); stats.succ_announcements++; if (dec_l || dec_s || inc_l || inc_s) { - std::unique_lock<std::mutex> us_lock(ustats_lock); if (inc_l) { p->user->incr_leeching(); stats.leechers++; @@ -611,12 +662,10 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par stats.seeders--; } } - lock.unlock(); // Correct the stats for the old user if the peer's user link has changed if (p->user != u) { if (!stopped_torrent) { - std::unique_lock<std::mutex> us_lock(ustats_lock); if (left > 0) { u->incr_leeching(); p->user->decr_leeching(); @@ -648,7 +697,7 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par } if (!u->can_leech() && left > 0) { - return error("Access denied, leeching forbidden"); + return error("Access denied, leeching forbidden", client_opts); } std::string output = "d8:completei"; @@ -659,9 +708,9 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par output += "e10:incompletei"; output += inttostr(tor.leechers.size()); output += "e8:intervali"; - output += inttostr(conf->announce_interval+std::min((size_t)600, tor.seeders.size())); // ensure a more even distribution of announces/second + output += inttostr(announce_interval + std::min((size_t)600, tor.seeders.size())); // ensure a more even distribution of announces/second output += "e12:min intervali"; - output += inttostr(conf->announce_interval); + output += inttostr(announce_interval); output += "e5:peers"; if (peers.length() == 0) { output += "0:"; @@ -680,13 +729,12 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type ¶ms, par * possibly inflated return size */ /*if (headers["accept-encoding"].find("gzip") != std::string::npos) { - gzip = true; + client_opts.gzip = true; }*/ - return response(output, gzip, false); + return response(output, client_opts); } -std::string worker::scrape(const std::list<std::string> &infohashes, params_type &headers) { - bool gzip = false; +std::string worker::scrape(const std::list<std::string> &infohashes, params_type &headers, client_opts_t &client_opts) { std::string output = "d5:filesd"; for (std::list<std::string>::const_iterator i = infohashes.begin(); i != infohashes.end(); ++i) { std::string infohash = *i; @@ -711,16 +759,17 @@ std::string worker::scrape(const std::list<std::string> &infohashes, params_type } output += "ee"; if (headers["accept-encoding"].find("gzip") != std::string::npos) { - gzip = true; + client_opts.gzip = true; } - return response(output, gzip, false); + return response(output, client_opts); } //TODO: Restrict to local IPs -std::string worker::update(params_type ¶ms) { +std::string worker::update(params_type ¶ms, client_opts_t &client_opts) { if (params["action"] == "change_passkey") { std::string oldpasskey = params["oldpasskey"]; std::string newpasskey = params["newpasskey"]; + std::lock_guard<std::mutex> ul_lock(db->user_list_mutex); auto u = users_list.find(oldpasskey); if (u == users_list.end()) { std::cout << "No user with passkey " << oldpasskey << " exists when attempting to change passkey to " << newpasskey << std::endl; @@ -733,10 +782,11 @@ std::string worker::update(params_type ¶ms) { torrent *t; std::string info_hash = params["info_hash"]; info_hash = hex_decode(info_hash); + std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex); auto i = torrents_list.find(info_hash); if (i == torrents_list.end()) { t = &torrents_list[info_hash]; - t->id = strtolong(params["id"]); + t->id = strtoint32(params["id"]); t->balance = 0; t->completed = 0; t->last_selected_seeder = ""; @@ -762,6 +812,7 @@ std::string worker::update(params_type ¶ms) { } else { fl = NEUTRAL; } + std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex); auto torrent_it = torrents_list.find(info_hash); if (torrent_it != torrents_list.end()) { torrent_it->second.free_torrent = fl; @@ -781,6 +832,7 @@ std::string worker::update(params_type ¶ms) { } else { fl = NEUTRAL; } + std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex); for (unsigned int pos = 0; pos < info_hashes.length(); pos += 20) { std::string info_hash = info_hashes.substr(pos, 20); auto torrent_it = torrents_list.find(info_hash); @@ -794,6 +846,7 @@ std::string worker::update(params_type ¶ms) { } else if (params["action"] == "add_token") { std::string info_hash = hex_decode(params["info_hash"]); int userid = atoi(params["userid"].c_str()); + std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex); auto torrent_it = torrents_list.find(info_hash); if (torrent_it != torrents_list.end()) { torrent_it->second.tokened_users.insert(userid); @@ -803,6 +856,7 @@ std::string worker::update(params_type ¶ms) { } else if (params["action"] == "remove_token") { std::string info_hash = hex_decode(params["info_hash"]); int userid = atoi(params["userid"].c_str()); + std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex); auto torrent_it = torrents_list.find(info_hash); if (torrent_it != torrents_list.end()) { torrent_it->second.tokened_users.erase(userid); @@ -817,22 +871,19 @@ std::string worker::update(params_type ¶ms) { if (reason_it != params.end()) { reason = atoi(params["reason"].c_str()); } + std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex); auto torrent_it = torrents_list.find(info_hash); if (torrent_it != torrents_list.end()) { std::cout << "Deleting torrent " << torrent_it->second.id << " for the reason '" << get_del_reason(reason) << "'" << std::endl; - std::unique_lock<std::mutex> stats_lock(stats.mutex); stats.leechers -= torrent_it->second.leechers.size(); stats.seeders -= torrent_it->second.seeders.size(); - stats_lock.unlock(); - std::unique_lock<std::mutex> us_lock(ustats_lock); - for (auto p = torrent_it->second.leechers.begin(); p != torrent_it->second.leechers.end(); ++p) { - p->second.user->decr_leeching(); + for (auto &p: torrent_it->second.leechers) { + p.second.user->decr_leeching(); } - for (auto p = torrent_it->second.seeders.begin(); p != torrent_it->second.seeders.end(); ++p) { - p->second.user->decr_seeding(); + for (auto &p: torrent_it->second.seeders) { + p.second.user->decr_seeding(); } - us_lock.unlock(); - std::unique_lock<std::mutex> dr_lock(del_reasons_lock); + std::lock_guard<std::mutex> dr_lock(del_reasons_lock); del_message msg; msg.reason = reason; msg.time = time(NULL); @@ -843,31 +894,37 @@ std::string worker::update(params_type ¶ms) { } } else if (params["action"] == "add_user") { std::string passkey = params["passkey"]; - unsigned int userid = strtolong(params["id"]); + userid_t userid = strtoint32(params["id"]); + std::lock_guard<std::mutex> ul_lock(db->user_list_mutex); auto u = users_list.find(passkey); if (u == users_list.end()) { bool protect_ip = params["visible"] == "0"; - user_ptr u(new user(userid, true, protect_ip)); - users_list.insert(std::pair<std::string, user_ptr>(passkey, u)); + user_ptr tmp_user = std::make_shared<user>(userid, true, protect_ip); + users_list.insert(std::pair<std::string, user_ptr>(passkey, tmp_user)); std::cout << "Added user " << passkey << " with id " << userid << std::endl; } else { std::cout << "Tried to add already known user " << passkey << " with id " << userid << std::endl; + u->second->set_deleted(false); } } else if (params["action"] == "remove_user") { std::string passkey = params["passkey"]; + std::lock_guard<std::mutex> ul_lock(db->user_list_mutex); auto u = users_list.find(passkey); if (u != users_list.end()) { std::cout << "Removed user " << passkey << " with id " << u->second->get_id() << std::endl; + u->second->set_deleted(true); users_list.erase(u); } } else if (params["action"] == "remove_users") { // Each passkey is exactly 32 characters long. std::string passkeys = params["passkeys"]; + std::lock_guard<std::mutex> ul_lock(db->user_list_mutex); for (unsigned int pos = 0; pos < passkeys.length(); pos += 32) { std::string passkey = passkeys.substr(pos, 32); auto u = users_list.find(passkey); if (u != users_list.end()) { std::cout << "Removed user " << passkey << std::endl; + u->second->set_deleted(true); users_list.erase(passkey); } } @@ -881,6 +938,7 @@ std::string worker::update(params_type ¶ms) { if (params["visible"] == "0") { protect_ip = true; } + std::lock_guard<std::mutex> ul_lock(db->user_list_mutex); user_list::iterator i = users_list.find(passkey); if (i == users_list.end()) { std::cout << "No user with passkey " << passkey << " found when attempting to change leeching status!" << std::endl; @@ -891,10 +949,12 @@ std::string worker::update(params_type ¶ms) { } } else if (params["action"] == "add_whitelist") { std::string peer_id = params["peer_id"]; + std::lock_guard<std::mutex> wl_lock(db->whitelist_mutex); whitelist.push_back(peer_id); std::cout << "Whitelisted " << peer_id << std::endl; } else if (params["action"] == "remove_whitelist") { std::string peer_id = params["peer_id"]; + std::lock_guard<std::mutex> wl_lock(db->whitelist_mutex); for (unsigned int i = 0; i < whitelist.size(); i++) { if (whitelist[i].compare(peer_id) == 0) { whitelist.erase(whitelist.begin() + i); @@ -905,6 +965,7 @@ std::string worker::update(params_type ¶ms) { } else if (params["action"] == "edit_whitelist") { std::string new_peer_id = params["new_peer_id"]; std::string old_peer_id = params["old_peer_id"]; + std::lock_guard<std::mutex> wl_lock(db->whitelist_mutex); for (unsigned int i = 0; i < whitelist.size(); i++) { if (whitelist[i].compare(old_peer_id) == 0) { whitelist.erase(whitelist.begin() + i); @@ -914,13 +975,15 @@ std::string worker::update(params_type ¶ms) { whitelist.push_back(new_peer_id); std::cout << "Edited whitelist item from " << old_peer_id << " to " << new_peer_id << std::endl; } else if (params["action"] == "update_announce_interval") { - unsigned int interval = strtolong(params["new_announce_interval"]); - conf->announce_interval = interval; - std::cout << "Edited announce interval to " << interval << std::endl; + const std::string interval = params["new_announce_interval"]; + conf->set("announce_interval", interval); + announce_interval = conf->get_uint("announce_interval"); + std::cout << "Edited announce interval to " << announce_interval << std::endl; } else if (params["action"] == "info_torrent") { std::string info_hash_hex = params["info_hash"]; std::string info_hash = hex_decode(info_hash_hex); std::cout << "Info for torrent '" << info_hash_hex << "'" << std::endl; + std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex); auto torrent_it = torrents_list.find(info_hash); if (torrent_it != torrents_list.end()) { std::cout << "Torrent " << torrent_it->second.id @@ -929,24 +992,27 @@ std::string worker::update(params_type ¶ms) { std::cout << "Failed to find torrent " << info_hash_hex << std::endl; } } - return response("success", false, false); + return response("success", client_opts); } -peer_list::iterator worker::add_peer(peer_list &peer_list, std::string &peer_id) { +peer_list::iterator worker::add_peer(peer_list &peer_list, const std::string &peer_key) { peer new_peer; - std::pair<peer_list::iterator, bool> insert - = peer_list.insert(std::pair<std::string, peer>(peer_id, new_peer)); - return insert.first; + auto it = peer_list.insert(std::pair<std::string, peer>(peer_key, new_peer)); + return it.first; } void worker::start_reaper() { - std::thread thread(&worker::do_start_reaper, this); - thread.detach(); + if (!reaper_active) { + std::thread thread(&worker::do_start_reaper, this); + thread.detach(); + } } void worker::do_start_reaper() { + reaper_active = true; reap_peers(); reap_del_reasons(); + reaper_active = false; } void worker::reap_peers() { @@ -959,12 +1025,10 @@ void worker::reap_peers() { auto p = t->second.leechers.begin(); peer_list::iterator del_p; while (p != t->second.leechers.end()) { - if (p->second.last_announced + conf->peers_timeout < cur_time) { + if (p->second.last_announced + peers_timeout < cur_time) { + std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex); del_p = p++; - std::unique_lock<std::mutex> us_lock(ustats_lock); del_p->second.user->decr_leeching(); - us_lock.unlock(); - std::unique_lock<std::mutex> tl_lock(db->torrent_list_mutex); t->second.leechers.erase(del_p); reaped_this = true; reaped_l++; @@ -974,12 +1038,10 @@ void worker::reap_peers() { } p = t->second.seeders.begin(); while (p != t->second.seeders.end()) { - if (p->second.last_announced + conf->peers_timeout < cur_time) { + if (p->second.last_announced + peers_timeout < cur_time) { + std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex); del_p = p++; - std::unique_lock<std::mutex> us_lock(ustats_lock); del_p->second.user->decr_seeding(); - us_lock.unlock(); - std::unique_lock<std::mutex> tl_lock(db->torrent_list_mutex); t->second.seeders.erase(del_p); reaped_this = true; reaped_s++; @@ -996,7 +1058,6 @@ void worker::reap_peers() { } } if (reaped_l || reaped_s) { - std::unique_lock<std::mutex> lock(stats.mutex); stats.leechers -= reaped_l; stats.seeders -= reaped_s; } @@ -1006,13 +1067,13 @@ void worker::reap_peers() { void worker::reap_del_reasons() { std::cout << "Starting del reason reaper" << std::endl; - time_t max_time = time(NULL) - conf->del_reason_lifetime; + time_t max_time = time(NULL) - del_reason_lifetime; auto it = del_reasons.begin(); unsigned int reaped = 0; for (; it != del_reasons.end(); ) { if (it->second.time <= max_time) { auto del_it = it++; - std::unique_lock<std::mutex> dr_lock(del_reasons_lock); + std::lock_guard<std::mutex> dr_lock(del_reasons_lock); del_reasons.erase(del_it); reaped++; continue; |