summaryrefslogtreecommitdiffstats
path: root/src/worker.cpp
diff options
context:
space:
mode:
authorErik Andersson <erik@packy.se>2016-11-22 23:56:43 +0100
committerErik Andersson <erik@packy.se>2016-11-22 23:56:43 +0100
commit41954fffc10bfd230f857f57c6871b412d5f2e91 (patch)
tree010bcf28f294a58b4a22b7276cf615758648bcbe /src/worker.cpp
parented374a8dbcdaaf273964293d2805bdd61b148022 (diff)
downloadocelot-master.zip
ocelot-master.tar.gz
ocelot-master.tar.bz2
Ocelot v1.0HEADv1.0master
Diffstat (limited to 'src/worker.cpp')
-rw-r--r--src/worker.cpp303
1 files changed, 182 insertions, 121 deletions
diff --git a/src/worker.cpp b/src/worker.cpp
index eee1c7e..49dbd2c 100644
--- a/src/worker.cpp
+++ b/src/worker.cpp
@@ -1,7 +1,4 @@
-#include <cmath>
-#include <cstdlib>
#include <iostream>
-#include <fstream>
#include <string>
#include <map>
#include <sstream>
@@ -12,9 +9,6 @@
#include <mutex>
#include <thread>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-
#include "ocelot.h"
#include "config.h"
#include "db.h"
@@ -26,11 +20,35 @@
#include "user.h"
//---------- Worker - does stuff with input
-worker::worker(torrent_list &torrents, user_list &users, std::vector<std::string> &_whitelist, config * conf_obj, mysql * db_obj, site_comm * sc) : torrents_list(torrents), users_list(users), whitelist(_whitelist), conf(conf_obj), db(db_obj), s_comm(sc)
+worker::worker(config * conf_obj, torrent_list &torrents, user_list &users, std::vector<std::string> &_whitelist, mysql * db_obj, site_comm * sc) :
+ conf(conf_obj), db(db_obj), s_comm(sc), torrents_list(torrents), users_list(users), whitelist(_whitelist), status(OPEN), reaper_active(false)
{
+ load_config(conf);
+}
+
+void worker::load_config(config * conf) {
+ announce_interval = conf->get_uint("announce_interval");
+ del_reason_lifetime = conf->get_uint("del_reason_lifetime");
+ peers_timeout = conf->get_uint("peers_timeout");
+ numwant_limit = conf->get_uint("numwant_limit");
+ keepalive_enabled = conf->get_uint("keepalive_timeout") != 0;
+ site_password = conf->get_str("site_password");
+ report_password = conf->get_str("report_password");
+}
+
+void worker::reload_config(config * conf) {
+ load_config(conf);
+}
+
+void worker::reload_lists() {
+ status = PAUSED;
+ db->load_torrents(torrents_list);
+ db->load_users(users_list);
+ db->load_whitelist(whitelist);
status = OPEN;
}
-bool worker::signal(int sig) {
+
+bool worker::shutdown() {
if (status == OPEN) {
status = CLOSING;
std::cout << "closing tracker... press Ctrl-C again to terminate" << std::endl;
@@ -42,12 +60,13 @@ bool worker::signal(int sig) {
return false;
}
}
-std::string worker::work(std::string &input, std::string &ip) {
+
+std::string worker::work(const std::string &input, std::string &ip, client_opts_t &client_opts) {
unsigned int input_length = input.length();
//---------- Parse request - ugly but fast. Using substr exploded.
if (input_length < 60) { // Way too short to be anything useful
- return error("GET string too short");
+ return error("GET string too short", client_opts);
}
size_t pos = 5; // skip 'GET /'
@@ -56,7 +75,7 @@ std::string worker::work(std::string &input, std::string &ip) {
std::string passkey;
passkey.reserve(32);
if (input[37] != '/') {
- return error("Malformed announce");
+ return error("Malformed announce", client_opts);
}
for (; pos < 37; pos++) {
@@ -71,7 +90,6 @@ std::string worker::work(std::string &input, std::string &ip) {
};
action_t action = INVALID;
- std::unique_lock<std::mutex> lock(stats.mutex);
switch (input[pos]) {
case 'a':
stats.announcements++;
@@ -92,19 +110,11 @@ std::string worker::work(std::string &input, std::string &ip) {
pos += 6;
break;
}
- lock.unlock();
if (input[pos] != '?') {
// No parameters given. Probably means we're not talking to a torrent client
- return response("Nothing to see here", false, true);
- }
-
- if (status != OPEN && action != UPDATE) {
- return error("The tracker is temporarily unavailable.");
- }
-
- if (action == INVALID) {
- return error("Invalid action");
+ client_opts.html = true;
+ return response("Nothing to see here", client_opts);
}
// Parse URL params
@@ -115,7 +125,7 @@ std::string worker::work(std::string &input, std::string &ip) {
std::string value;
bool parsing_key = true; // true = key, false = value
- pos++; // Skip the '?'
+ ++pos; // Skip the '?'
for (; pos < input_length; ++pos) {
if (input[pos] == '=') {
parsing_key = false;
@@ -139,8 +149,19 @@ std::string worker::work(std::string &input, std::string &ip) {
}
}
}
+ ++pos;
+
+ if (input.compare(pos, 5, "HTTP/") != 0) {
+ return error("Malformed HTTP request", client_opts);
+ }
- pos += 10; // skip 'HTTP/1.1' - should probably be +=11, but just in case a client doesn't send \r
+ std::string http_version;
+ pos += 5;
+ while (input[pos] != '\r' && input[pos] != '\n') {
+ http_version.push_back(input[pos]);
+ ++pos;
+ }
+ ++pos; // skip line break - should probably be += 2, but just in case a client doesn't send \r
// Parse headers
params_type headers;
@@ -171,66 +192,88 @@ std::string worker::work(std::string &input, std::string &ip) {
}
}
+ if (keepalive_enabled) {
+ auto hdr_http_close = headers.find("connection");
+ if (hdr_http_close == headers.end()) {
+ client_opts.http_close = (http_version == "1.0");
+ } else {
+ client_opts.http_close = (hdr_http_close->second != "Keep-Alive");
+ }
+ } else {
+ client_opts.http_close = true;
+ }
+
+ if (status != OPEN) {
+ return error("The tracker is temporarily unavailable.", client_opts);
+ }
+
+ if (action == INVALID) {
+ return error("Invalid action", client_opts);
+ }
+
if (action == UPDATE) {
- if (passkey == conf->site_password) {
- return update(params);
+ if (passkey == site_password) {
+ return update(params, client_opts);
} else {
- return error("Authentication failure");
+ return error("Authentication failure", client_opts);
}
}
if (action == REPORT) {
- if (passkey == conf->report_password) {
- return report(params, users_list);
+ if (passkey == report_password) {
+ std::lock_guard<std::mutex> ul_lock(db->user_list_mutex);
+ return report(params, users_list, client_opts);
} else {
- return error("Authentication failure");
+ return error("Authentication failure", client_opts);
}
}
// Either a scrape or an announce
- user_list::iterator u = users_list.find(passkey);
- if (u == users_list.end()) {
- return error("Passkey not found");
+ std::unique_lock<std::mutex> ul_lock(db->user_list_mutex);
+ auto user_it = users_list.find(passkey);
+ if (user_it == users_list.end()) {
+ return error("Passkey not found", client_opts);
}
+ user_ptr u = user_it->second;
+ ul_lock.unlock();
if (action == ANNOUNCE) {
- std::unique_lock<std::mutex> tl_lock(db->torrent_list_mutex);
// Let's translate the infohash into something nice
// info_hash is a url encoded (hex) base 20 number
std::string info_hash_decoded = hex_decode(params["info_hash"]);
- torrent_list::iterator tor = torrents_list.find(info_hash_decoded);
+ std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex);
+ auto tor = torrents_list.find(info_hash_decoded);
if (tor == torrents_list.end()) {
- std::unique_lock<std::mutex> dr_lock(del_reasons_lock);
+ std::lock_guard<std::mutex> dr_lock(del_reasons_lock);
auto msg = del_reasons.find(info_hash_decoded);
if (msg != del_reasons.end()) {
if (msg->second.reason != -1) {
- return error("Unregistered torrent: " + get_del_reason(msg->second.reason));
+ return error("Unregistered torrent: " + get_del_reason(msg->second.reason), client_opts);
} else {
- return error("Unregistered torrent");
+ return error("Unregistered torrent", client_opts);
}
} else {
- return error("Unregistered torrent");
+ return error("Unregistered torrent", client_opts);
}
}
- return announce(tor->second, u->second, params, headers, ip);
+ return announce(input, tor->second, u, params, headers, ip, client_opts);
} else {
- return scrape(infohashes, headers);
+ return scrape(infohashes, headers, client_opts);
}
}
-std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, params_type &headers, std::string &ip) {
+std::string worker::announce(const std::string &input, torrent &tor, user_ptr &u, params_type &params, params_type &headers, std::string &ip, client_opts_t &client_opts) {
cur_time = time(NULL);
if (params["compact"] != "1") {
- return error("Your client does not support compact announces");
+ return error("Your client does not support compact announces", client_opts);
}
- bool gzip = false;
- int64_t left = std::max((int64_t)0, strtolonglong(params["left"]));
- int64_t uploaded = std::max((int64_t)0, strtolonglong(params["uploaded"]));
- int64_t downloaded = std::max((int64_t)0, strtolonglong(params["downloaded"]));
- int64_t corrupt = std::max((int64_t)0, strtolonglong(params["corrupt"]));
+ int64_t left = std::max((int64_t)0, strtoint64(params["left"]));
+ int64_t uploaded = std::max((int64_t)0, strtoint64(params["uploaded"]));
+ int64_t downloaded = std::max((int64_t)0, strtoint64(params["downloaded"]));
+ int64_t corrupt = std::max((int64_t)0, strtoint64(params["corrupt"]));
int snatched = 0; // This is the value that gets sent to the database on a snatch
int active = 1; // This is the value that marks a peer as active/inactive in the database
@@ -242,26 +285,37 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
bool peer_changed = false; // Whether or not the peer is new or has changed since the last announcement
bool invalid_ip = false;
bool inc_l = false, inc_s = false, dec_l = false, dec_s = false;
+ userid_t userid = u->get_id();
params_type::const_iterator peer_id_iterator = params.find("peer_id");
if (peer_id_iterator == params.end()) {
- return error("No peer ID");
+ return error("No peer ID", client_opts);
+ }
+ const std::string peer_id = hex_decode(peer_id_iterator->second);
+ if (peer_id.length() != 20) {
+ return error("Invalid peer ID", client_opts);
}
- std::string peer_id = peer_id_iterator->second;
- peer_id = hex_decode(peer_id);
+ std::unique_lock<std::mutex> wl_lock(db->whitelist_mutex);
if (whitelist.size() > 0) {
bool found = false; // Found client in whitelist?
for (unsigned int i = 0; i < whitelist.size(); i++) {
- if (peer_id.find(whitelist[i]) == 0) {
+ if (peer_id.compare(0, whitelist[i].length(), whitelist[i]) == 0) {
found = true;
break;
}
}
if (!found) {
- return error("Your client is not on the whitelist");
+ return error("Your client is not on the whitelist", client_opts);
}
}
+ wl_lock.unlock();
+
+ std::stringstream peer_key_stream;
+ peer_key_stream << peer_id[12 + (tor.id & 7)] // "Randomize" the element order in the peer map by prefixing with a peer id byte
+ << userid // Include user id in the key to lower chance of peer id collisions
+ << peer_id;
+ const std::string peer_key(peer_key_stream.str());
if (params["event"] == "completed") {
// Don't update <snatched> here as we may decide to use other conditions later on
@@ -272,45 +326,44 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
update_torrent = true;
active = 0;
}
- int userid = u->get_id();
peer * p;
peer_list::iterator peer_it;
// Insert/find the peer in the torrent list
if (left > 0) {
- peer_it = tor.leechers.find(peer_id);
+ peer_it = tor.leechers.find(peer_key);
if (peer_it == tor.leechers.end()) {
// We could search the seed list as well, but the peer reaper will sort things out eventually
- peer_it = add_peer(tor.leechers, peer_id);
+ peer_it = add_peer(tor.leechers, peer_key);
inserted = true;
inc_l = true;
}
} else if (completed_torrent) {
- peer_it = tor.leechers.find(peer_id);
+ peer_it = tor.leechers.find(peer_key);
if (peer_it == tor.leechers.end()) {
- peer_it = tor.seeders.find(peer_id);
+ peer_it = tor.seeders.find(peer_key);
if (peer_it == tor.seeders.end()) {
- peer_it = add_peer(tor.seeders, peer_id);
+ peer_it = add_peer(tor.seeders, peer_key);
inserted = true;
inc_s = true;
} else {
completed_torrent = false;
}
- } else if (tor.seeders.find(peer_id) != tor.seeders.end()) {
+ } else if (tor.seeders.find(peer_key) != tor.seeders.end()) {
// If the peer exists in both peer lists, just decrement the seed count.
// Should be cheaper than searching the seed list in the left > 0 case
dec_s = true;
}
} else {
- peer_it = tor.seeders.find(peer_id);
+ peer_it = tor.seeders.find(peer_key);
if (peer_it == tor.seeders.end()) {
- peer_it = tor.leechers.find(peer_id);
+ peer_it = tor.leechers.find(peer_key);
if (peer_it == tor.leechers.end()) {
- peer_it = add_peer(tor.seeders, peer_id);
+ peer_it = add_peer(tor.seeders, peer_key);
inserted = true;
} else {
p = &peer_it->second;
std::pair<peer_list::iterator, bool> insert
- = tor.seeders.insert(std::pair<std::string, peer>(peer_id, *p));
+ = tor.seeders.insert(std::pair<std::string, peer>(peer_key, *p));
tor.leechers.erase(peer_it);
peer_it = insert.first;
peer_changed = true;
@@ -373,7 +426,7 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
upspeed = uploaded_change / (cur_time - p->last_announced);
downspeed = downloaded_change / (cur_time - p->last_announced);
}
- std::set<int>::iterator sit = tor.tokened_users.find(userid);
+ auto sit = tor.tokened_users.find(userid);
if (tor.free_torrent == NEUTRAL) {
downloaded_change = 0;
uploaded_change = 0;
@@ -415,7 +468,7 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
}
}
- unsigned int port = strtolong(params["port"]);
+ uint16_t port = strtoint32(params["port"]) & 0xFFFF;
// Generate compact ip/port string
if (inserted || port != p->port || ip != p->ip) {
p->port = port;
@@ -464,18 +517,18 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
}
db->record_peer(record_str, record_ip, peer_id, headers["user-agent"]);
} else {
- record << '(' << tor.id << ',' << (cur_time - p->first_announced) << ',' << p->announces << ',';
+ record << '(' << userid << ',' << tor.id << ',' << (cur_time - p->first_announced) << ',' << p->announces << ',';
std::string record_str = record.str();
db->record_peer(record_str, peer_id);
}
// Select peers!
- unsigned int numwant;
+ uint32_t numwant;
params_type::const_iterator param_numwant = params.find("numwant");
if (param_numwant == params.end()) {
- numwant = 50;
+ numwant = numwant_limit;
} else {
- numwant = std::min(50l, strtolong(param_numwant->second));
+ numwant = std::min((int32_t)numwant_limit, strtoint32(param_numwant->second));
}
if (stopped_torrent) {
@@ -504,7 +557,7 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
// User is a seeder now!
if (!inserted) {
std::pair<peer_list::iterator, bool> insert
- = tor.seeders.insert(std::pair<std::string, peer>(peer_id, *p));
+ = tor.seeders.insert(std::pair<std::string, peer>(peer_key, *p));
tor.leechers.erase(peer_it);
peer_it = insert.first;
p = &peer_it->second;
@@ -555,7 +608,7 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
i = tor.seeders.begin();
}
// Don't show users themselves
- if (i->second.user->get_id() == userid || !i->second.visible) {
+ if (i->second.user->is_deleted() || i->second.user->get_id() == userid || !i->second.visible) {
++i;
continue;
}
@@ -569,7 +622,7 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
if (found_peers < numwant && tor.leechers.size() > 1) {
for (peer_list::const_iterator i = tor.leechers.begin(); i != tor.leechers.end() && found_peers < numwant; ++i) {
// Don't show users themselves or leech disabled users
- if (i->second.ip_port == p->ip_port || i->second.user->get_id() == userid || !i->second.visible) {
+ if (i->second.user->is_deleted() || i->second.ip_port == p->ip_port || i->second.user->get_id() == userid || !i->second.visible) {
continue;
}
found_peers++;
@@ -590,10 +643,8 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
}
// Update the stats
- std::unique_lock<std::mutex> lock(stats.mutex);
stats.succ_announcements++;
if (dec_l || dec_s || inc_l || inc_s) {
- std::unique_lock<std::mutex> us_lock(ustats_lock);
if (inc_l) {
p->user->incr_leeching();
stats.leechers++;
@@ -611,12 +662,10 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
stats.seeders--;
}
}
- lock.unlock();
// Correct the stats for the old user if the peer's user link has changed
if (p->user != u) {
if (!stopped_torrent) {
- std::unique_lock<std::mutex> us_lock(ustats_lock);
if (left > 0) {
u->incr_leeching();
p->user->decr_leeching();
@@ -648,7 +697,7 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
}
if (!u->can_leech() && left > 0) {
- return error("Access denied, leeching forbidden");
+ return error("Access denied, leeching forbidden", client_opts);
}
std::string output = "d8:completei";
@@ -659,9 +708,9 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
output += "e10:incompletei";
output += inttostr(tor.leechers.size());
output += "e8:intervali";
- output += inttostr(conf->announce_interval+std::min((size_t)600, tor.seeders.size())); // ensure a more even distribution of announces/second
+ output += inttostr(announce_interval + std::min((size_t)600, tor.seeders.size())); // ensure a more even distribution of announces/second
output += "e12:min intervali";
- output += inttostr(conf->announce_interval);
+ output += inttostr(announce_interval);
output += "e5:peers";
if (peers.length() == 0) {
output += "0:";
@@ -680,13 +729,12 @@ std::string worker::announce(torrent &tor, user_ptr &u, params_type &params, par
* possibly inflated return size
*/
/*if (headers["accept-encoding"].find("gzip") != std::string::npos) {
- gzip = true;
+ client_opts.gzip = true;
}*/
- return response(output, gzip, false);
+ return response(output, client_opts);
}
-std::string worker::scrape(const std::list<std::string> &infohashes, params_type &headers) {
- bool gzip = false;
+std::string worker::scrape(const std::list<std::string> &infohashes, params_type &headers, client_opts_t &client_opts) {
std::string output = "d5:filesd";
for (std::list<std::string>::const_iterator i = infohashes.begin(); i != infohashes.end(); ++i) {
std::string infohash = *i;
@@ -711,16 +759,17 @@ std::string worker::scrape(const std::list<std::string> &infohashes, params_type
}
output += "ee";
if (headers["accept-encoding"].find("gzip") != std::string::npos) {
- gzip = true;
+ client_opts.gzip = true;
}
- return response(output, gzip, false);
+ return response(output, client_opts);
}
//TODO: Restrict to local IPs
-std::string worker::update(params_type &params) {
+std::string worker::update(params_type &params, client_opts_t &client_opts) {
if (params["action"] == "change_passkey") {
std::string oldpasskey = params["oldpasskey"];
std::string newpasskey = params["newpasskey"];
+ std::lock_guard<std::mutex> ul_lock(db->user_list_mutex);
auto u = users_list.find(oldpasskey);
if (u == users_list.end()) {
std::cout << "No user with passkey " << oldpasskey << " exists when attempting to change passkey to " << newpasskey << std::endl;
@@ -733,10 +782,11 @@ std::string worker::update(params_type &params) {
torrent *t;
std::string info_hash = params["info_hash"];
info_hash = hex_decode(info_hash);
+ std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex);
auto i = torrents_list.find(info_hash);
if (i == torrents_list.end()) {
t = &torrents_list[info_hash];
- t->id = strtolong(params["id"]);
+ t->id = strtoint32(params["id"]);
t->balance = 0;
t->completed = 0;
t->last_selected_seeder = "";
@@ -762,6 +812,7 @@ std::string worker::update(params_type &params) {
} else {
fl = NEUTRAL;
}
+ std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex);
auto torrent_it = torrents_list.find(info_hash);
if (torrent_it != torrents_list.end()) {
torrent_it->second.free_torrent = fl;
@@ -781,6 +832,7 @@ std::string worker::update(params_type &params) {
} else {
fl = NEUTRAL;
}
+ std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex);
for (unsigned int pos = 0; pos < info_hashes.length(); pos += 20) {
std::string info_hash = info_hashes.substr(pos, 20);
auto torrent_it = torrents_list.find(info_hash);
@@ -794,6 +846,7 @@ std::string worker::update(params_type &params) {
} else if (params["action"] == "add_token") {
std::string info_hash = hex_decode(params["info_hash"]);
int userid = atoi(params["userid"].c_str());
+ std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex);
auto torrent_it = torrents_list.find(info_hash);
if (torrent_it != torrents_list.end()) {
torrent_it->second.tokened_users.insert(userid);
@@ -803,6 +856,7 @@ std::string worker::update(params_type &params) {
} else if (params["action"] == "remove_token") {
std::string info_hash = hex_decode(params["info_hash"]);
int userid = atoi(params["userid"].c_str());
+ std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex);
auto torrent_it = torrents_list.find(info_hash);
if (torrent_it != torrents_list.end()) {
torrent_it->second.tokened_users.erase(userid);
@@ -817,22 +871,19 @@ std::string worker::update(params_type &params) {
if (reason_it != params.end()) {
reason = atoi(params["reason"].c_str());
}
+ std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex);
auto torrent_it = torrents_list.find(info_hash);
if (torrent_it != torrents_list.end()) {
std::cout << "Deleting torrent " << torrent_it->second.id << " for the reason '" << get_del_reason(reason) << "'" << std::endl;
- std::unique_lock<std::mutex> stats_lock(stats.mutex);
stats.leechers -= torrent_it->second.leechers.size();
stats.seeders -= torrent_it->second.seeders.size();
- stats_lock.unlock();
- std::unique_lock<std::mutex> us_lock(ustats_lock);
- for (auto p = torrent_it->second.leechers.begin(); p != torrent_it->second.leechers.end(); ++p) {
- p->second.user->decr_leeching();
+ for (auto &p: torrent_it->second.leechers) {
+ p.second.user->decr_leeching();
}
- for (auto p = torrent_it->second.seeders.begin(); p != torrent_it->second.seeders.end(); ++p) {
- p->second.user->decr_seeding();
+ for (auto &p: torrent_it->second.seeders) {
+ p.second.user->decr_seeding();
}
- us_lock.unlock();
- std::unique_lock<std::mutex> dr_lock(del_reasons_lock);
+ std::lock_guard<std::mutex> dr_lock(del_reasons_lock);
del_message msg;
msg.reason = reason;
msg.time = time(NULL);
@@ -843,31 +894,37 @@ std::string worker::update(params_type &params) {
}
} else if (params["action"] == "add_user") {
std::string passkey = params["passkey"];
- unsigned int userid = strtolong(params["id"]);
+ userid_t userid = strtoint32(params["id"]);
+ std::lock_guard<std::mutex> ul_lock(db->user_list_mutex);
auto u = users_list.find(passkey);
if (u == users_list.end()) {
bool protect_ip = params["visible"] == "0";
- user_ptr u(new user(userid, true, protect_ip));
- users_list.insert(std::pair<std::string, user_ptr>(passkey, u));
+ user_ptr tmp_user = std::make_shared<user>(userid, true, protect_ip);
+ users_list.insert(std::pair<std::string, user_ptr>(passkey, tmp_user));
std::cout << "Added user " << passkey << " with id " << userid << std::endl;
} else {
std::cout << "Tried to add already known user " << passkey << " with id " << userid << std::endl;
+ u->second->set_deleted(false);
}
} else if (params["action"] == "remove_user") {
std::string passkey = params["passkey"];
+ std::lock_guard<std::mutex> ul_lock(db->user_list_mutex);
auto u = users_list.find(passkey);
if (u != users_list.end()) {
std::cout << "Removed user " << passkey << " with id " << u->second->get_id() << std::endl;
+ u->second->set_deleted(true);
users_list.erase(u);
}
} else if (params["action"] == "remove_users") {
// Each passkey is exactly 32 characters long.
std::string passkeys = params["passkeys"];
+ std::lock_guard<std::mutex> ul_lock(db->user_list_mutex);
for (unsigned int pos = 0; pos < passkeys.length(); pos += 32) {
std::string passkey = passkeys.substr(pos, 32);
auto u = users_list.find(passkey);
if (u != users_list.end()) {
std::cout << "Removed user " << passkey << std::endl;
+ u->second->set_deleted(true);
users_list.erase(passkey);
}
}
@@ -881,6 +938,7 @@ std::string worker::update(params_type &params) {
if (params["visible"] == "0") {
protect_ip = true;
}
+ std::lock_guard<std::mutex> ul_lock(db->user_list_mutex);
user_list::iterator i = users_list.find(passkey);
if (i == users_list.end()) {
std::cout << "No user with passkey " << passkey << " found when attempting to change leeching status!" << std::endl;
@@ -891,10 +949,12 @@ std::string worker::update(params_type &params) {
}
} else if (params["action"] == "add_whitelist") {
std::string peer_id = params["peer_id"];
+ std::lock_guard<std::mutex> wl_lock(db->whitelist_mutex);
whitelist.push_back(peer_id);
std::cout << "Whitelisted " << peer_id << std::endl;
} else if (params["action"] == "remove_whitelist") {
std::string peer_id = params["peer_id"];
+ std::lock_guard<std::mutex> wl_lock(db->whitelist_mutex);
for (unsigned int i = 0; i < whitelist.size(); i++) {
if (whitelist[i].compare(peer_id) == 0) {
whitelist.erase(whitelist.begin() + i);
@@ -905,6 +965,7 @@ std::string worker::update(params_type &params) {
} else if (params["action"] == "edit_whitelist") {
std::string new_peer_id = params["new_peer_id"];
std::string old_peer_id = params["old_peer_id"];
+ std::lock_guard<std::mutex> wl_lock(db->whitelist_mutex);
for (unsigned int i = 0; i < whitelist.size(); i++) {
if (whitelist[i].compare(old_peer_id) == 0) {
whitelist.erase(whitelist.begin() + i);
@@ -914,13 +975,15 @@ std::string worker::update(params_type &params) {
whitelist.push_back(new_peer_id);
std::cout << "Edited whitelist item from " << old_peer_id << " to " << new_peer_id << std::endl;
} else if (params["action"] == "update_announce_interval") {
- unsigned int interval = strtolong(params["new_announce_interval"]);
- conf->announce_interval = interval;
- std::cout << "Edited announce interval to " << interval << std::endl;
+ const std::string interval = params["new_announce_interval"];
+ conf->set("announce_interval", interval);
+ announce_interval = conf->get_uint("announce_interval");
+ std::cout << "Edited announce interval to " << announce_interval << std::endl;
} else if (params["action"] == "info_torrent") {
std::string info_hash_hex = params["info_hash"];
std::string info_hash = hex_decode(info_hash_hex);
std::cout << "Info for torrent '" << info_hash_hex << "'" << std::endl;
+ std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex);
auto torrent_it = torrents_list.find(info_hash);
if (torrent_it != torrents_list.end()) {
std::cout << "Torrent " << torrent_it->second.id
@@ -929,24 +992,27 @@ std::string worker::update(params_type &params) {
std::cout << "Failed to find torrent " << info_hash_hex << std::endl;
}
}
- return response("success", false, false);
+ return response("success", client_opts);
}
-peer_list::iterator worker::add_peer(peer_list &peer_list, std::string &peer_id) {
+peer_list::iterator worker::add_peer(peer_list &peer_list, const std::string &peer_key) {
peer new_peer;
- std::pair<peer_list::iterator, bool> insert
- = peer_list.insert(std::pair<std::string, peer>(peer_id, new_peer));
- return insert.first;
+ auto it = peer_list.insert(std::pair<std::string, peer>(peer_key, new_peer));
+ return it.first;
}
void worker::start_reaper() {
- std::thread thread(&worker::do_start_reaper, this);
- thread.detach();
+ if (!reaper_active) {
+ std::thread thread(&worker::do_start_reaper, this);
+ thread.detach();
+ }
}
void worker::do_start_reaper() {
+ reaper_active = true;
reap_peers();
reap_del_reasons();
+ reaper_active = false;
}
void worker::reap_peers() {
@@ -959,12 +1025,10 @@ void worker::reap_peers() {
auto p = t->second.leechers.begin();
peer_list::iterator del_p;
while (p != t->second.leechers.end()) {
- if (p->second.last_announced + conf->peers_timeout < cur_time) {
+ if (p->second.last_announced + peers_timeout < cur_time) {
+ std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex);
del_p = p++;
- std::unique_lock<std::mutex> us_lock(ustats_lock);
del_p->second.user->decr_leeching();
- us_lock.unlock();
- std::unique_lock<std::mutex> tl_lock(db->torrent_list_mutex);
t->second.leechers.erase(del_p);
reaped_this = true;
reaped_l++;
@@ -974,12 +1038,10 @@ void worker::reap_peers() {
}
p = t->second.seeders.begin();
while (p != t->second.seeders.end()) {
- if (p->second.last_announced + conf->peers_timeout < cur_time) {
+ if (p->second.last_announced + peers_timeout < cur_time) {
+ std::lock_guard<std::mutex> tl_lock(db->torrent_list_mutex);
del_p = p++;
- std::unique_lock<std::mutex> us_lock(ustats_lock);
del_p->second.user->decr_seeding();
- us_lock.unlock();
- std::unique_lock<std::mutex> tl_lock(db->torrent_list_mutex);
t->second.seeders.erase(del_p);
reaped_this = true;
reaped_s++;
@@ -996,7 +1058,6 @@ void worker::reap_peers() {
}
}
if (reaped_l || reaped_s) {
- std::unique_lock<std::mutex> lock(stats.mutex);
stats.leechers -= reaped_l;
stats.seeders -= reaped_s;
}
@@ -1006,13 +1067,13 @@ void worker::reap_peers() {
void worker::reap_del_reasons()
{
std::cout << "Starting del reason reaper" << std::endl;
- time_t max_time = time(NULL) - conf->del_reason_lifetime;
+ time_t max_time = time(NULL) - del_reason_lifetime;
auto it = del_reasons.begin();
unsigned int reaped = 0;
for (; it != del_reasons.end(); ) {
if (it->second.time <= max_time) {
auto del_it = it++;
- std::unique_lock<std::mutex> dr_lock(del_reasons_lock);
+ std::lock_guard<std::mutex> dr_lock(del_reasons_lock);
del_reasons.erase(del_it);
reaped++;
continue;