diff options
author | Erik Andersson <erik@packy.se> | 2016-11-22 23:43:26 +0100 |
---|---|---|
committer | Erik Andersson <erik@packy.se> | 2016-11-22 23:43:26 +0100 |
commit | 25be47d53ab3decef58859992010848deda5279c (patch) | |
tree | fbf0a8935d87a1817e078f5da55be42a4156e305 | |
parent | 232f2f6e6791eb9fc58f71d1bd6d3cf1c3c96679 (diff) | |
download | ocelot-25be47d53ab3decef58859992010848deda5279c.zip ocelot-25be47d53ab3decef58859992010848deda5279c.tar.gz ocelot-25be47d53ab3decef58859992010848deda5279c.tar.bz2 |
Ocelot v0.3v0.3
-rw-r--r-- | src/COPYING.txt | 133 | ||||
-rw-r--r-- | src/INSTALL.txt | 15 | ||||
-rw-r--r-- | src/Makefile (renamed from src/makefile) | 2 | ||||
-rw-r--r-- | src/config.cpp.template | 6 | ||||
-rw-r--r-- | src/config.h | 10 | ||||
-rw-r--r-- | src/db.cpp | 579 | ||||
-rw-r--r-- | src/db.h | 56 | ||||
-rw-r--r-- | src/events.h | 3 | ||||
-rw-r--r-- | src/logger.cpp | 34 | ||||
-rw-r--r-- | src/logger.h | 26 | ||||
-rw-r--r-- | src/misc_functions.h | 3 | ||||
-rw-r--r-- | src/ocelot.cpp | 43 | ||||
-rw-r--r-- | src/ocelot.h | 6 | ||||
-rw-r--r-- | src/schedule.cpp | 30 | ||||
-rw-r--r-- | src/schedule.h | 5 | ||||
-rw-r--r-- | src/site_comm.cpp | 77 | ||||
-rw-r--r-- | src/site_comm.h | 22 | ||||
-rw-r--r-- | src/worker.cpp | 163 | ||||
-rw-r--r-- | src/worker.h | 12 |
19 files changed, 781 insertions, 444 deletions
diff --git a/src/COPYING.txt b/src/COPYING.txt deleted file mode 100644 index 048ca8e..0000000 --- a/src/COPYING.txt +++ /dev/null @@ -1,133 +0,0 @@ -COPYRIGHTED AND PATENTED PENDING BY WHAT.CD INCORPORATED, DBA/AKA PROJECT -GAZELLE/OCELOT - -END-USER LICENSE AGREEMENT - -The Ocelot Source Code (hereinafter referred to as the Software) is made -publically available free of charge by Project Gazelle (sometimes a trade and -business name for the legally consituted entity known as What.CD Incorporated, -a Panamanian Corporation), copyright and patent pending owner (hereinafter -referred to as the Licensor or Original Licensor) to anyone who wishes to use -and/or share or distribute and/or change or modify it (hereinafter referred to -as either the Licensee and/or End-User) FOR NONCOMMERCIAL PURPOSES ONLY under -the following terms and conditions (hereinafter referred to as the "License"): - -Section 1. You may copy and distribute verbatim copies of the Software's -source code as you receive it, in any medium, provided that you conspicuously -and appropriately publish on each copy an appropriate copyright notice and -disclaimer of warranty; keep intact all the notices that refer to this License -and to the absence of any warranty; and give any other recipients of the -Software a copy of this License along with the Software. - -Section 2: You may modify or change your copy or copies of the Software or -any portion of it, thus forming a work based on the Software, and copy and -distribute such modifications or work under the terms of Section 1 above, -provided that you also meet all of these conditions: - - 2.1: You must cause the modified files to carry prominent notices stating -that you changed the files and the date of any change. - - 2.2: You must cause any work that you distribute or publish, that in -whole or in part contains or is derived from the Software or any part thereof, -to be licensed as a whole at no charge to all third parties under the terms of -this License. - -Section 3: You may copy and distribute the Software (or a work based on it, -under the terms of Section 2) in object code or executable form under the -terms of Sections 1 and 2 above provided that you also do one of the -following: - - 3.1: Accompany it with the complete corresponding machine-readable source -code, which must be distributed under the terms of Sections 1 and 2 above on -a medium customarily used for software interchange; or, - - 3.2: Accompany it with a written offer, valid for at least one year, to -give any third party, for a charge no more than your cost of physically -performing source distribution, a complete machine-readable copy of the -corresponding source code, to be distributed under the terms of Sections 1 and -2 above on a medium customarily used for software interchange; or, - - 3.3: Accompany it with the information you received as to the offer to -distribute corresponding source code. (This alternative is allowed only for -noncommercial distribution and only if you received the Software in object -code or executable form with such an offer, in accord with Subsection 3.2 -above.) - -Section 4: You may not copy, modify, sublicense, or distribute the Software -except as expressly provided under this License. Any attempt otherwise to -copy, modify, sublicense or distribute the Software is void, and will -automatically terminate your rights under this License. Parties who have -received copies, or rights, from you under this License, however, will not -have their licenses terminated so long as such parties remain in full -compliance with the terms and conditions of this License. - -Section 5: This License and nothing else grants you permission to use, modify -and/or distribute the Software or its derivative works and only in accordance -with the terms and conditions of this License. Otherwise, use, modification -and/or distribution of this Software is prohibited by international laws and -treaties as regards intellectual property. Therefore, by using, modifying -and/or distributing the Software (or any work based on the Software), you -indicate your acceptance of this License to do so, and all its terms and -conditions for copying, distributing or modifying the software or works based -on it. - -Section 6: Each time you redistribute the Software (or any work based on the -Software), the recipient automatically receives a license from the Original -Licensor to copy, distribute and/or modify the Software subject to these terms -and conditions. You may not impose any further restrictions on the -recipients' exercise of the rights granted herein. You are not responsible for -enforcing compliance by third parties to this License. - -Section 7: NO WARRANTY - - 7.1: BECAUSE THE SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO -WARRANTY FOR THE SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT -WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT/PATENT HOLDERS AND/OR OTHER -PARTIES PROVIDE THE SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER -EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO -THE QUALITY AND PERFORMANCE OF THE SOFTWARE IS WITH YOU. SHOULD THE PROGRAM -PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR -CORRECTION. - - - 7.2:. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN -WRITING WILL ANY COPYRIGHT/PATENT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY -AND/OR REDISTRIBUTE THE SOFTWARE AS PERMITTED ABOVE, BE LIABLE TO YOU FOR -DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES -ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT -LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED -BY YOU OR THIRD PARTIES OR A FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER -SOFTWARE), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE -POSSIBILITY OF SUCH DAMAGES. - -Section 8: If, as a consequence of a court judgment or allegation of patent -infringement or for any other reason (not limited to patent issues), -conditions are imposed on you (whether by court order, agreement or otherwise) -that contradict the conditions of this License, they do not excuse you from -the conditions of this License. If you cannot distribute so as to satisfy -simultaneously your obligations under this License and any other pertinent -obligations, then as a consequence you may not distribute the Software at all. -For example, if a patent license would not permit royalty-free redistribution -of the Software by all those who receive copies directly or indirectly through -you, then the only way you could satisfy both it and this License would be to -refrain entirely from distribution of the Software. - -Section 9: If any portion or section of this License is held invalid or -unenforceable under any particular circumstance, the balance of this License -and pertinent section is intended to apply and the section as a whole is -intended to apply in other circumstances. - -Section 10: Each version of the Software is given a distinguishing version -number. If the Software specifies a version number of this License which -applies to it and "any later version", you have the option of following the -terms and conditions either of that version or of any later version. If the -Software does not specify a version number of this License, you may choose any -version ever published by the Licensor. - -Section 11: This Software is distributed pursuant to the terms and conditions -of this License and is distributed free of charge FOR NONCOMMERCIAL PURPOSES -ONLY. THE USE, DISTRIBUTION AND/OR MODIFICATION OF THIS SOFTWARE FOR -COMMERCIAL PURPOSES IS EXPRESSLY PROHIBITED BY INTERNATIONAL LAWS AND TREATIES -AS REGARDS INTELLECTUAL PROPERTY. - diff --git a/src/INSTALL.txt b/src/INSTALL.txt deleted file mode 100644 index cb22528..0000000 --- a/src/INSTALL.txt +++ /dev/null @@ -1,15 +0,0 @@ -Ocelot requires the following: - -libboost_thread >= 1.44.0 -libmysqlpp >= 3.0.0 -libev >= 3.43 -g++ >= 4.3.2 - -There is no autoconfigure file yet, so you may need to play with the makefile. -A common tripping point is that your libboost_thread may be called libbost_thread-mt, -or you may need to include files from strange include directories. - -Once everything is okay, just type 'make'. - -Have fun :P - diff --git a/src/makefile b/src/Makefile index a1cfe15..7f0576b 100644 --- a/src/makefile +++ b/src/Makefile @@ -1,7 +1,7 @@ CXX=g++ CXXFLAGS=-march=native -O2 -fomit-frame-pointer -fno-ident -fvisibility-inlines-hidden -fvisibility=hidden -Wall -std=c++0x -iquote -Wl,O1 -Wl,--as-needed -I/usr/include/mysql -I/usr/include/mysql++ -I/usr/local/include/boost LDFLAGS=-L/usr/local/lib -LIBS=-lmysqlpp -lboost_thread -lev +LIBS=-lmysqlpp -lboost_system -lboost_thread -lev OCELOT=ocelot OBJS=$(patsubst %.cpp,%.o,$(wildcard *.cpp)) all: $(OCELOT) diff --git a/src/config.cpp.template b/src/config.cpp.template index d22bd4c..900c24e 100644 --- a/src/config.cpp.template +++ b/src/config.cpp.template @@ -6,16 +6,12 @@ config::config() { max_connections = 512; max_read_buffer = 4096; timeout_interval = 20; - schedule_interval = 5; + schedule_interval = 3; max_middlemen = 5000; announce_interval = 1800; peers_timeout = 2700; //Announce interval * 1.5 - flush_torrents_interval = 40; - flush_users_interval = 40; - flush_peers_interval = 40; - flush_snatches_interval = 40; reap_peers_interval = 1800; mysql_db = "gazelle"; diff --git a/src/config.h b/src/config.h index 85e2843..6f72db8 100644 --- a/src/config.h +++ b/src/config.h @@ -1,8 +1,12 @@ +#ifndef OCELOT_CONFIG_H +#define OCELOT_CONFIG_H + #include <string> class config { public: std::string host; + std::string site_host; unsigned int port; unsigned int max_connections; unsigned int max_read_buffer; @@ -13,10 +17,6 @@ class config { unsigned int announce_interval; int peers_timeout; - unsigned int flush_torrents_interval; - unsigned int flush_users_interval; - unsigned int flush_peers_interval; - unsigned int flush_snatches_interval; unsigned int reap_peers_interval; // MySQL @@ -29,3 +29,5 @@ class config { config(); }; + +#endif @@ -1,272 +1,467 @@ #include "ocelot.h" #include "db.h" +#include "misc_functions.h" #include <string> #include <iostream> +#include <queue> +#include <unistd.h> +#include <time.h> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/locks.hpp> +#include <boost/lexical_cast.hpp> + +#define DB_LOCK_TIMEOUT 50 mysql::mysql(std::string mysql_db, std::string mysql_host, std::string username, std::string password) { - if(!conn.connect(mysql_db.c_str(), mysql_host.c_str(), username.c_str(), password.c_str(), 0)) { - std::cout << "Could not connect to MySQL" << std::endl; - return; - } - - update_user_buffer = ""; - update_torrent_buffer = ""; - update_peer_buffer = ""; - update_snatch_buffer = ""; + if(!conn.connect(mysql_db.c_str(), mysql_host.c_str(), username.c_str(), password.c_str(), 0)) { + std::cout << "Could not connect to MySQL" << std::endl; + return; + } + + db = mysql_db, server = mysql_host, db_user = username, pw = password; + u_active = false; t_active = false; p_active = false; s_active = false; tok_active = false; + + std::cout << "Connected to MySQL" << std::endl; + update_user_buffer = ""; + update_torrent_buffer = ""; + update_peer_buffer = ""; + update_snatch_buffer = ""; + + logger_ptr = logger::get_instance(); } void mysql::load_torrents(std::unordered_map<std::string, torrent> &torrents) { - mysqlpp::Query query = conn.query("SELECT ID, info_hash, freetorrent, Snatched FROM torrents ORDER BY ID;"); - if(mysqlpp::StoreQueryResult res = query.store()) { - mysqlpp::String one("1"); // Hack to get around bug in mysql++3.0.0 - size_t num_rows = res.num_rows(); - for(size_t i = 0; i < num_rows; i++) { - std::string info_hash; - res[i][1].to_string(info_hash); - - torrent t; - t.id = res[i][0]; - if(res[i][2].compare(one) == 0) { - t.free_torrent = true; - } else { - t.free_torrent = false; - } - t.balance = 0; - t.completed = res[i][3]; - t.last_selected_seeder = ""; - torrents[info_hash] = t; - } - } + mysqlpp::Query query = conn.query("SELECT ID, info_hash, freetorrent, Snatched FROM torrents ORDER BY ID;"); + if(mysqlpp::StoreQueryResult res = query.store()) { + mysqlpp::String one("1"); // Hack to get around bug in mysql++3.0.0 + mysqlpp::String two("2"); + size_t num_rows = res.num_rows(); + for(size_t i = 0; i < num_rows; i++) { + std::string info_hash; + res[i][1].to_string(info_hash); + + torrent t; + t.id = res[i][0]; + if(res[i][2].compare(one) == 0) { + t.free_torrent = FREE; + } else if(res[i][2].compare(two) == 0) { + t.free_torrent = NEUTRAL; + } else { + t.free_torrent = NORMAL; + } + t.balance = 0; + t.completed = res[i][3]; + t.last_selected_seeder = ""; + torrents[info_hash] = t; + } + } } void mysql::load_users(std::unordered_map<std::string, user> &users) { - mysqlpp::Query query = conn.query("SELECT ID, can_leech, torrent_pass FROM users_main WHERE Enabled='1';"); - if(mysqlpp::StoreQueryResult res = query.store()) { - size_t num_rows = res.num_rows(); - for(size_t i = 0; i < num_rows; i++) { - std::string passkey; - res[i][2].to_string(passkey); - - user u; - u.id = res[i][0]; - u.can_leech = res[i][1]; - users[passkey] = u; - } - } + mysqlpp::Query query = conn.query("SELECT ID, can_leech, torrent_pass FROM users_main WHERE Enabled='1';"); + if(mysqlpp::StoreQueryResult res = query.store()) { + size_t num_rows = res.num_rows(); + for(size_t i = 0; i < num_rows; i++) { + std::string passkey; + res[i][2].to_string(passkey); + + user u; + u.id = res[i][0]; + u.can_leech = res[i][1]; + users[passkey] = u; + } + } } -void mysql::load_whitelist(std::vector<std::string> &whitelist) { - mysqlpp::Query query = conn.query("SELECT peer_id FROM xbt_client_whitelist;"); - if(mysqlpp::StoreQueryResult res = query.store()) { - size_t num_rows = res.num_rows(); - for(size_t i = 0; i<num_rows; i++) { - whitelist.push_back(res[i][0].c_str()); - } - } +void mysql::load_tokens(std::unordered_map<std::string, torrent> &torrents) { + mysqlpp::Query query = conn.query("SELECT uf.UserID, t.info_hash FROM users_freeleeches AS uf JOIN torrents AS t ON t.ID = uf.TorrentID WHERE uf.Expired = '0';"); + if (mysqlpp::StoreQueryResult res = query.store()) { + size_t num_rows = res.num_rows(); + for (size_t i = 0; i < num_rows; i++) { + std::string info_hash; + res[i][1].to_string(info_hash); + std::unordered_map<std::string, torrent>::iterator it = torrents.find(info_hash); + if (it != torrents.end()) { + torrent &tor = it->second; + tor.tokened_users.insert(res[i][0]); + } + } + } } +void mysql::load_whitelist(std::vector<std::string> &whitelist) { + mysqlpp::Query query = conn.query("SELECT peer_id FROM xbt_client_whitelist;"); + if(mysqlpp::StoreQueryResult res = query.store()) { + size_t num_rows = res.num_rows(); + for(size_t i = 0; i<num_rows; i++) { + whitelist.push_back(res[i][0].c_str()); + } + } +} +void mysql::record_token(std::string &record) { + boost::mutex::scoped_lock lock(user_token_lock); + if (update_token_buffer != "") { + update_token_buffer += ","; + } + update_token_buffer += record; +} void mysql::record_user(std::string &record) { - boost::mutex::scoped_lock lock(user_buffer_lock); - if(update_user_buffer != "") { - update_user_buffer += ","; - } - update_user_buffer += record; + boost::mutex::scoped_lock lock(user_buffer_lock); + if(update_user_buffer != "") { + update_user_buffer += ","; + } + update_user_buffer += record; } void mysql::record_torrent(std::string &record) { - boost::mutex::scoped_lock lock(torrent_buffer_lock); - if(update_torrent_buffer != "") { - update_torrent_buffer += ","; - } - update_torrent_buffer += record; + boost::mutex::scoped_lock lock(torrent_buffer_lock); + if(update_torrent_buffer != "") { + update_torrent_buffer += ","; + } + update_torrent_buffer += record; } void mysql::record_peer(std::string &record, std::string &ip, std::string &peer_id, std::string &useragent) { - boost::mutex::scoped_lock lock(peer_buffer_lock); - if(update_peer_buffer != "") { - update_peer_buffer += ","; + boost::mutex::scoped_lock lock(peer_buffer_lock); + if(update_peer_buffer != "") { + update_peer_buffer += ","; + } + mysqlpp::Query q = conn.query(); + q << record << mysqlpp::quote << ip << ',' << mysqlpp::quote << peer_id << ',' << mysqlpp::quote << useragent << "," << time(NULL) << ')'; + + update_peer_buffer += q.str(); +} + +void mysql::record_peer_hist(std::string &record, std::string &peer_id, int tid) { + boost::mutex::scoped_lock (peer_hist_buffer_lock); + if (update_peer_hist_buffer != "") { + update_peer_hist_buffer += ","; } mysqlpp::Query q = conn.query(); - q << record << mysqlpp::quote << ip << ',' << mysqlpp::quote << peer_id << ',' << mysqlpp::quote << useragent << ')'; - - update_peer_buffer += q.str(); + q << record << ',' << mysqlpp::quote << peer_id << ',' << tid << ',' << time(NULL) << ')'; + update_peer_hist_buffer += q.str(); } + void mysql::record_snatch(std::string &record) { - boost::mutex::scoped_lock lock(snatch_buffer_lock); - if(update_snatch_buffer != "") { - update_snatch_buffer += ","; - } - update_snatch_buffer += record; + boost::mutex::scoped_lock lock(mysql::snatch_buffer_lock); + if(update_snatch_buffer != "") { + update_snatch_buffer += ","; + } + update_snatch_buffer += record; } +bool mysql::all_clear() { + return (user_queue.size() == 0 && torrent_queue.size() == 0 && peer_queue.size() == 0 && snatch_queue.size() == 0 && token_queue.size() == 0 && peer_hist_queue.size() == 0); +} - +void mysql::flush() { + flush_users(); + flush_torrents(); + flush_snatches(); + flush_peers(); + flush_peer_hist(); + flush_tokens(); +} void mysql::flush_users() { - boost::thread thread(&mysql::do_flush_users, this); + std::string sql; + boost::mutex::scoped_lock lock(user_buffer_lock); + if (update_user_buffer == "") { + return; + } + sql = "INSERT INTO users_main (ID, Uploaded, Downloaded) VALUES " + update_user_buffer + + " ON DUPLICATE KEY UPDATE Uploaded = Uploaded + VALUES(Uploaded), Downloaded = Downloaded + VALUES(Downloaded)"; + user_queue.push(sql); + update_user_buffer.clear(); + if (user_queue.size() == 1 && u_active == false) { + boost::thread thread(&mysql::do_flush_users, this); + } } -void mysql::do_flush_users() { - std::cout << "flushing users" << std::endl; +void mysql::flush_torrents() { std::string sql; - { // Lock mutex - boost::mutex::scoped_lock lock(user_buffer_lock); - if(update_user_buffer == "") { - return; - } else { - sql = "INSERT INTO users_main(ID, Uploaded, Downloaded) VALUES "; - sql += update_user_buffer; - sql += " ON DUPLICATE KEY UPDATE Uploaded=Uploaded+VALUES(Uploaded), Downloaded=Downloaded+VALUES(Downloaded)"; - update_user_buffer.clear(); - } + boost::mutex::scoped_lock lock(torrent_buffer_lock); + if (update_torrent_buffer == "") { + return; + } + sql = "INSERT INTO torrents (ID,Seeders,Leechers,Snatched,Balance) VALUES " + update_torrent_buffer + + " ON DUPLICATE KEY UPDATE Seeders=VALUES(Seeders), Leechers=VALUES(Leechers), " + + "Snatched=Snatched+VALUES(Snatched), Balance=VALUES(Balance), last_action = " + + "IF(VALUES(Seeders) > 0, NOW(), last_action)"; + torrent_queue.push(sql); + update_torrent_buffer.clear(); + sql.clear(); + sql = "DELETE FROM torrents WHERE info_hash = ''"; + torrent_queue.push(sql); + if (torrent_queue.size() == 2 && t_active == false) { + boost::thread thread(&mysql::do_flush_torrents, this); + } +} + +void mysql::flush_snatches() { + std::string sql; + boost::mutex::scoped_lock lock(snatch_buffer_lock); + if (update_snatch_buffer == "" ) { + return; + } + sql = "INSERT INTO xbt_snatched (uid, fid, tstamp, IP) VALUES " + update_snatch_buffer; + snatch_queue.push(sql); + update_snatch_buffer.clear(); + if (snatch_queue.size() == 1 && s_active == false) { + boost::thread thread(&mysql::do_flush_snatches, this); + } +} + +void mysql::flush_peers() { + std::string sql; + boost::mutex::scoped_lock lock(peer_buffer_lock); + // because xfu inserts are slow and ram is not infinite we need to + // limit this queue's size + if (peer_queue.size() >= 1000) { + peer_queue.pop(); + } + if (update_peer_buffer == "") { + return; } - boost::mutex::scoped_lock db_lock(db_mutex); - + if (peer_queue.size() == 0) { + sql = "SET session sql_log_bin = 0"; + peer_queue.push(sql); + sql.clear(); + } - mysqlpp::Query query = conn.query(sql); - for(int i = 0; i < 3; i++) { - try { - query.execute(); - break; - } catch(const mysqlpp::BadQuery& er) { // deadlock - std::cout << "Query error: " << er.what() << std::endl; - sleep(3); - } catch (const mysqlpp::Exception& er) { // Weird unpredictable shit - std::cout << "Query error: " << er.what() << std::endl; - sleep(3); - } + sql = "INSERT INTO xbt_files_users (uid,fid,active,uploaded,downloaded,upspeed,downspeed,remaining," + + std::string("timespent,announced,ip,peer_id,useragent,mtime) VALUES ") + update_peer_buffer + + " ON DUPLICATE KEY UPDATE active=VALUES(active), uploaded=VALUES(uploaded), " + + "downloaded=VALUES(downloaded), upspeed=VALUES(upspeed), " + + "downspeed=VALUES(downspeed), remaining=VALUES(remaining), " + + "timespent=VALUES(timespent), announced=VALUES(announced), " + + "mtime=VALUES(mtime)"; + peer_queue.push(sql); + update_peer_buffer.clear(); + if (peer_queue.size() == 2 && p_active == false) { + boost::thread thread(&mysql::do_flush_peers, this); } - std::cout << "flushed users" << std::endl; } +void mysql::flush_peer_hist() { + std::string sql; + boost::mutex::scoped_lock lock(peer_hist_buffer_lock); + if (update_peer_hist_buffer == "") { + return; + } -void mysql::flush_torrents() { - boost::thread thread(&mysql::do_flush_torrents, this); + if (peer_hist_queue.size() == 0) { + sql = "SET session sql_log_bin = 0"; + peer_hist_queue.push(sql); + sql.clear(); + } + + sql = "INSERT IGNORE INTO xbt_peers_history (uid, downloaded, remaining, uploaded, upspeed, downspeed, timespent, peer_id, fid, mtime) VALUES " + update_peer_hist_buffer; + peer_hist_queue.push(sql); + update_peer_hist_buffer.clear(); + if (peer_hist_queue.size() == 2 && hist_active == false) { + boost::thread thread(&mysql::do_flush_peer_hist, this); + } } -void mysql::do_flush_torrents() { - std::cout << "flushing torrents" << std::endl; +void mysql::flush_tokens() { std::string sql; - { // Lock mutex - boost::mutex::scoped_lock lock(torrent_buffer_lock); - if(update_torrent_buffer == "") { - return; - } else { - sql = "INSERT INTO torrents(ID,Seeders,Leechers,Snatched,Balance) VALUES "; - sql += update_torrent_buffer; - sql += " ON DUPLICATE KEY UPDATE Seeders=VALUES(Seeders), Leechers=VALUES(Leechers), Snatched=Snatched+VALUES(Snatched), Balance=VALUES(Balance), last_action = IF(VALUES(Seeders) > 0, NOW(), last_action)"; - update_torrent_buffer.clear(); - } + boost::mutex::scoped_lock lock(user_token_lock); + if (update_token_buffer == "") { + return; } - - boost::mutex::scoped_lock db_lock(db_mutex); - - mysqlpp::Query query = conn.query(sql); - for(int i = 0; i < 3; i++) { + sql = "INSERT INTO users_freeleeches (UserID, TorrentID, Downloaded) VALUES " + update_token_buffer + + " ON DUPLICATE KEY UPDATE Downloaded = Downloaded + VALUES(Downloaded)"; + token_queue.push(sql); + update_token_buffer.clear(); + if (token_queue.size() == 1 && tok_active == false) { + boost::thread(&mysql::do_flush_tokens, this); + } +} + +void mysql::do_flush_users() { + u_active = true; + mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + while (user_queue.size() > 0) { try { - query.execute(); - break; - } catch(const mysqlpp::BadQuery& er) { // deadlock - std::cout << "Query error: " << er.what() << std::endl; + std::string sql = user_queue.front(); + mysqlpp::Query query = c.query(sql); + if (!query.exec()) { + std::cout << "User flush failed (" << user_queue.size() << " remain)" << std::endl; + sleep(3); + continue; + } else { + boost::mutex::scoped_lock lock(user_buffer_lock); + user_queue.pop(); + std::cout << "Users flushed (" << user_queue.size() << " remain)" << std::endl; + } + } + catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error: " << er.what() << " in flush users with a qlength: " << user_queue.front().size() << " queue size: " << user_queue.size() << std::endl; sleep(3); - } catch (const mysqlpp::Exception& er) { // Weird unpredictable shit - std::cout << "Query error: " << er.what() << std::endl; + continue; + } catch (const mysqlpp::Exception &er) { + std::cerr << "Query error: " << er.what() << " in flush users with a qlength: " << user_queue.front().size() << " queue size: " << user_queue.size() << std::endl; sleep(3); + continue; } } - std::cout << "flushed torrents" << std::endl; + u_active = false; +} - sql = "DELETE FROM torrents WHERE info_hash = ''"; - mysqlpp::Query dquery = conn.query(sql); - for (int i = 0; i < 3; i++) { +void mysql::do_flush_torrents() { + t_active = true; + mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + while (torrent_queue.size() > 0) { try { - dquery.execute(); - break; - } catch (const mysqlpp::BadQuery& er) { - std::cout << "Query error: " << er.what() << std::endl; + std::string sql = torrent_queue.front(); + if (sql == "") { + torrent_queue.pop(); + continue; + } + mysqlpp::Query query = c.query(sql); + if (!query.exec()) { + std::cout << "Torrent flush failed (" << torrent_queue.size() << " remain)" << std::endl; + sleep(3); + continue; + } else { + boost::mutex::scoped_lock lock(torrent_buffer_lock); + torrent_queue.pop(); + std::cout << "Torrents flushed (" << torrent_queue.size() << " remain)" << std::endl; + } + } + catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error: " << er.what() << " in flush torrents with a qlength: " << torrent_queue.front().size() << " queue size: " << torrent_queue.size() << std::endl; sleep(3); - } catch (const mysqlpp::Exception& er) { - std::cout << "Query error: " << er.what() << std::endl; + continue; + } catch (const mysqlpp::Exception &er) { + std::cerr << "Query error: " << er.what() << " in flush torrents with a qlength: " << torrent_queue.front().size() << " queue size: " << torrent_queue.size() << std::endl; sleep(3); + continue; } } + t_active = false; } -void mysql::flush_snatches() { - boost::thread thread(&mysql::do_flush_snatches, this); -} - -void mysql::do_flush_snatches() { - std::cout << "flushing snatches" << std::endl; - std::string sql; - { // lock mutex - boost::mutex::scoped_lock lock(snatch_buffer_lock); - if(update_snatch_buffer == "") { - return; - } else { - sql = "INSERT INTO xbt_snatched(uid,fid,tstamp,IP) VALUES "; - sql += update_snatch_buffer; - update_snatch_buffer.clear(); - } - } - - boost::mutex::scoped_lock db_lock(db_mutex); - - mysqlpp::Query query = conn.query(sql); - for(int i = 0; i < 3; i++) { +void mysql::do_flush_peers() { + p_active = true; + mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + while (peer_queue.size() > 0) { try { - query.execute(); - break; - } catch(const mysqlpp::BadQuery& er) { // deadlock - std::cout << "Query error: " << er.what() << std::endl; + std::string sql = peer_queue.front(); + mysqlpp::Query query = c.query(sql); + if (!query.exec()) { + std::cout << "Peer flush failed (" << peer_queue.size() << " remain)" << std::endl; + sleep(3); + continue; + } else { + boost::mutex::scoped_lock lock(peer_buffer_lock); + peer_queue.pop(); + std::cout << "Peers flushed (" << peer_queue.size() << " remain)" << std::endl; + } + } + catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error: " << er.what() << " in flush peers with a qlength: " << peer_queue.front().size() << " queue size: " << peer_queue.size() << std::endl; sleep(3); - } catch (const mysqlpp::Exception& er) { // Weird unpredictable shit - std::cout << "Query error: " << er.what() << std::endl; + continue; + } catch (const mysqlpp::Exception &er) { + std::cerr << "Query error: " << er.what() << " in flush peers with a qlength: " << peer_queue.front().size() << " queue size: " << peer_queue.size() << std::endl; sleep(3); + continue; } } - std::cout << "flushed snatches" << std::endl; + p_active = false; } -void mysql::flush_peers() { - boost::thread thread(&mysql::do_flush_peers, this); +void mysql::do_flush_peer_hist() { + hist_active = true; + mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + while (peer_hist_queue.size() > 0) { + try { + std::string sql = peer_hist_queue.front(); + mysqlpp::Query query = c.query(sql); + if (!query.exec()) { + std::cout << "Peer history flush failed (" << peer_hist_queue.size() << " remain)" << std::endl; + sleep(3); + continue; + } else { + boost::mutex::scoped_lock lock(peer_hist_buffer_lock); + peer_hist_queue.pop(); + std::cout << "Peer history flushed (" << peer_hist_queue.size() << " remain)" << std::endl; + } + } + catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error: " << er.what() << " in flush peer history with a qlength: " << peer_hist_queue.front().size() << " queue size: " << peer_hist_queue.size() << std::endl; + sleep(3); + continue; + } catch (const mysqlpp::Exception &er) { + std::cerr << "Query error: " << er.what() << " in flush peer history with a qlength: " << peer_hist_queue.front().size() << " queue size: " << peer_hist_queue.size() << std::endl; + sleep(3); + continue; + } + } + hist_active = false; } -void mysql::do_flush_peers() { - std::cout << "flushing peers" << std::endl; - std::string sql; - { // lock mutex - boost::mutex::scoped_lock lock(peer_buffer_lock); - if(update_peer_buffer == "") { - return; - } else { - sql = "INSERT INTO xbt_files_users(uid,fid,active,uploaded,downloaded,upspeed,downspeed,remaining,timespent,announced,ip,peer_id,useragent) VALUES "; - sql += update_peer_buffer; - sql += " ON DUPLICATE KEY UPDATE active=VALUES(active), uploaded=VALUES(uploaded), downloaded=VALUES(downloaded), upspeed=VALUES(upspeed), downspeed=VALUES(downspeed), "; - sql += "remaining=VALUES(remaining), timespent=VALUES(timespent), announced=VALUES(announced), peer_id=VALUES(peer_id), useragent=VALUES(useragent),mtime=UNIX_TIMESTAMP(NOW())"; - update_peer_buffer.clear(); +void mysql::do_flush_snatches() { + s_active = true; + mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + while (snatch_queue.size() > 0) { + try { + std::string sql = snatch_queue.front(); + mysqlpp::Query query = c.query(sql); + if (!query.exec()) { + std::cout << "Snatch flush failed (" << snatch_queue.size() << " remain)" << std::endl; + sleep(3); + continue; + } else { + boost::mutex::scoped_lock lock(snatch_buffer_lock); + snatch_queue.pop(); + std::cout << "Snatches flushed (" << snatch_queue.size() << " remain)" << std::endl; + } + } + catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error: " << er.what() << " in flush snatches with a qlength: " << snatch_queue.front().size() << " queue size: " << snatch_queue.size() << std::endl; + sleep(3); + continue; + } catch (const mysqlpp::Exception &er) { + std::cerr << "Query error: " << er.what() << " in flush snatches with a qlength: " << snatch_queue.front().size() << " queue size: " << snatch_queue.size() << std::endl; + sleep(3); + continue; } } - - boost::mutex::scoped_lock db_lock(db_mutex); - - mysqlpp::Query query = conn.query(sql); - for(int i = 0; i < 3; i++) { + s_active = false; +} + +void mysql::do_flush_tokens() { + tok_active = true; + mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + while (token_queue.size() > 0) { try { - query.execute(); - break; - } catch(const mysqlpp::BadQuery& er) { // deadlock - std::cout << "Query error: " << er.what() << std::endl; + std::string sql = token_queue.front(); + mysqlpp::Query query = c.query(sql); + if (!query.exec()) { + std::cout << "Token flush failed (" << token_queue.size() << " remain)" << std::endl; + sleep(3); + continue; + } else { + boost::mutex::scoped_lock lock(user_token_lock); + token_queue.pop(); + std::cout << "Tokens flushed (" << token_queue.size() << " remain)" << std::endl; + } + } + catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error: " << er.what() << " in flush tokens with a qlength: " << token_queue.front().size() << " queue size: " << token_queue.size() << std::endl; sleep(3); - } catch (const mysqlpp::Exception& er) { // Weird unpredictable shit - std::cout << "Query error: " << er.what() << std::endl; + continue; + } catch (const mysqlpp::Exception &er) { + std::cerr << "Query error: " << er.what() << " in flush tokens with a qlength: " << token_queue.front().size() << " queue size: " << token_queue.size() << std::endl; sleep(3); + continue; } } - std::cout << "flushed peers" << std::endl; + tok_active = false; } @@ -4,8 +4,11 @@ #include <mysql++/mysql++.h> #include <string> #include <unordered_map> +#include <queue> #include <boost/thread/mutex.hpp> +#include "logger.h" + class mysql { private: mysqlpp::Connection conn; @@ -13,37 +16,64 @@ class mysql { std::string update_torrent_buffer; std::string update_peer_buffer; std::string update_snatch_buffer; + std::string update_token_buffer; + std::string update_peer_hist_buffer; + std::queue<std::string> user_queue; + std::queue<std::string> torrent_queue; + std::queue<std::string> peer_queue; + std::queue<std::string> snatch_queue; + std::queue<std::string> token_queue; + std::queue<std::string> peer_hist_queue; + + std::string db, server, db_user, pw; + bool u_active, t_active, p_active, s_active, tok_active, hist_active; + + // These locks prevent more than one thread from reading/writing the buffers. + // These should be held for the minimum time possible. boost::mutex user_buffer_lock; boost::mutex torrent_buffer_lock; boost::mutex peer_buffer_lock; boost::mutex snatch_buffer_lock; + boost::mutex user_token_lock; + boost::mutex peer_hist_buffer_lock; - boost::mutex user_list_mutex; - - boost::mutex db_mutex; + void do_flush_users(); + void do_flush_torrents(); + void do_flush_snatches(); + void do_flush_peers(); + void do_flush_tokens(); + void do_flush_peer_hist(); + + void flush_users(); + void flush_torrents(); + void flush_snatches(); + void flush_peers(); + void flush_tokens(); + void flush_peer_hist(); + public: mysql(std::string mysql_db, std::string mysql_host, std::string username, std::string password); void load_torrents(std::unordered_map<std::string, torrent> &torrents); void load_users(std::unordered_map<std::string, user> &users); + void load_tokens(std::unordered_map<std::string, torrent> &torrents); void load_whitelist(std::vector<std::string> &whitelist); void record_user(std::string &record); // (id,uploaded_change,downloaded_change) void record_torrent(std::string &record); // (id,seeders,leechers,snatched_change,balance) void record_snatch(std::string &record); // (uid,fid,tstamp) void record_peer(std::string &record, std::string &ip, std::string &peer_id, std::string &useragent); // (uid,fid,active,peerid,useragent,ip,uploaded,downloaded,upspeed,downspeed,left,timespent,announces) - - void flush_users(); - void flush_torrents(); - void flush_snatches(); - void flush_peers(); - - void do_flush_users(); - void do_flush_torrents(); - void do_flush_snatches(); - void do_flush_peers(); + void record_token(std::string &record); + void record_peer_hist(std::string &record, std::string &peer_id, int tid); + + void flush(); + bool all_clear(); + boost::mutex torrent_list_mutex; + + logger* logger_ptr; }; + #pragma GCC visibility pop #endif diff --git a/src/events.h b/src/events.h index 36a0866..85ed708 100644 --- a/src/events.h +++ b/src/events.h @@ -44,7 +44,6 @@ THE WORKER - // THE MOTHER - Spawns connection middlemen class connection_mother { private: @@ -67,7 +66,7 @@ class connection_mother { int get_open_connections() { return open_connections; } int get_opened_connections() { return opened_connections; } - + void handle_connect(ev::io &watcher, int events_flags); ~connection_mother(); }; diff --git a/src/logger.cpp b/src/logger.cpp new file mode 100644 index 0000000..691c6cf --- /dev/null +++ b/src/logger.cpp @@ -0,0 +1,34 @@ +#include "logger.h" + +logger* logger::singletonInstance_ = 0; + +logger::logger(std::string filename) { + logger::log_file_.open(filename.c_str(), std::ios::out); + if(logger::log_file_.is_open()) { + singletonInstance_ = this; + } +} + +logger::~logger(void) { + if(log_file_.is_open()) { + log_file_.close(); + } + singletonInstance_ = 0; +} + +logger *logger::get_instance(void) { + if(singletonInstance_ != 0) { + return singletonInstance_; + } + return NULL; +} + +bool logger::log(std::string msg) { + boost::mutex::scoped_lock lock(log_lock_); + if(log_file_.is_open()) { + log_file_ << msg << std::endl; + log_file_.flush(); + return true; + } + return false; +} diff --git a/src/logger.h b/src/logger.h new file mode 100644 index 0000000..08f7c2c --- /dev/null +++ b/src/logger.h @@ -0,0 +1,26 @@ +#ifndef OCELOT_LOGGER_H +#define OCELOT_LOGGER_H + +#include <string> +#include <iostream> +#include <fstream> + +#include <boost/thread/mutex.hpp> + +class logger { + + public: + logger(std::string filename); + virtual ~logger(void); + bool log(std::string msg); + static logger* get_instance(void); + + protected: + + private: + static logger* singletonInstance_; + boost::mutex log_lock_; + std::ofstream log_file_; +}; + +#endif diff --git a/src/misc_functions.h b/src/misc_functions.h index 38e014d..1b70a64 100644 --- a/src/misc_functions.h +++ b/src/misc_functions.h @@ -1,10 +1,11 @@ #ifndef MISC_FUNCTIONS__H #define MISC_FUNCTIONS__H #include <string> - +#include <cstdlib> long strtolong(const std::string& str); long long strtolonglong(const std::string& str); std::string inttostr(int i); std::string hex_decode(const std::string &in); +int timeval_subtract (timeval* result, timeval* x, timeval* y); #endif diff --git a/src/ocelot.cpp b/src/ocelot.cpp index 05f1823..b7a484d 100644 --- a/src/ocelot.cpp +++ b/src/ocelot.cpp @@ -4,19 +4,21 @@ #include "worker.h" #include "events.h" #include "schedule.h" +#include "logger.h" +#include "site_comm.h" static mysql *db_ptr; static connection_mother *mother; +static worker *work; +static logger *log_ptr; +static site_comm *sc_ptr; static void sig_handler(int sig) { std::cout << "Caught SIGINT/SIGTERM" << std::endl; - delete(mother); - db_ptr->flush_torrents(); - db_ptr->flush_users(); - db_ptr->flush_snatches(); - db_ptr->flush_peers(); - exit(0); + if (work->signal(sig)) { + exit(0); + } } int main() { @@ -24,28 +26,37 @@ int main() { signal(SIGINT, sig_handler); signal(SIGTERM, sig_handler); - + + log_ptr = new logger("debug.log"); + mysql db(conf.mysql_db, conf.mysql_host, conf.mysql_username, conf.mysql_password); db_ptr = &db; + + site_comm sc(conf); + sc_ptr = ≻ - std::unordered_map<std::string, torrent> torrents_list; - db.load_torrents(torrents_list); - std::cout << "Loaded " << torrents_list.size() << " torrents" << std::endl; + std::vector<std::string> whitelist; + db.load_whitelist(whitelist); + std::cout << "Loaded " << whitelist.size() << " clients into the whitelist" << std::endl; + if(whitelist.size() == 0) { + std::cout << "Assuming no whitelist desired, disabling" << std::endl; + } std::unordered_map<std::string, user> users_list; db.load_users(users_list); std::cout << "Loaded " << users_list.size() << " users" << std::endl; - std::vector<std::string> whitelist; - db.load_whitelist(whitelist); - std::cout << "Loaded " << whitelist.size() << " clients into the whitelist" << std::endl; - + std::unordered_map<std::string, torrent> torrents_list; + db.load_torrents(torrents_list); + std::cout << "Loaded " << torrents_list.size() << " torrents" << std::endl; + + db.load_tokens(torrents_list); // Create worker object, which handles announces and scrapes and all that jazz - worker work(torrents_list, users_list, whitelist, &conf, &db); + work = new worker(torrents_list, users_list, whitelist, &conf, &db, sc); // Create connection mother, which binds to its socket and handles the event stuff - mother = new connection_mother(&work, &conf, &db); + mother = new connection_mother(work, &conf, &db); return 0; } diff --git a/src/ocelot.h b/src/ocelot.h index 99e8d8f..0ceadea 100644 --- a/src/ocelot.h +++ b/src/ocelot.h @@ -2,6 +2,7 @@ #include <map> #include <vector> #include <unordered_map> +#include <set> #include <boost/thread/thread.hpp> typedef struct { @@ -21,15 +22,18 @@ typedef struct { typedef std::map<std::string, peer> peer_list; +enum freetype { NORMAL, FREE, NEUTRAL }; + typedef struct { int id; time_t last_seeded; long long balance; int completed; - bool free_torrent; + freetype free_torrent; std::map<std::string, peer> seeders; std::map<std::string, peer> leechers; std::string last_selected_seeder; + std::set<int> tokened_users; time_t last_flushed; } torrent; diff --git a/src/schedule.cpp b/src/schedule.cpp index 622b539..b33d1c8 100644 --- a/src/schedule.cpp +++ b/src/schedule.cpp @@ -10,10 +10,6 @@ schedule::schedule(connection_mother * mother_obj, worker* worker_obj, config* c counter = 0; last_opened_connections = 0; - next_flush_torrents = time(NULL) + conf->flush_torrents_interval; - next_flush_users = time(NULL) + conf->flush_users_interval + 10; - next_flush_peers = time(NULL) + conf->flush_peers_interval + 20; - next_flush_snatches = time(NULL) + conf->flush_snatches_interval + 30; next_reap_peers = time(NULL) + conf->reap_peers_interval + 40; } //---------- Schedule - gets called every schedule_interval seconds @@ -25,28 +21,16 @@ void schedule::handle(ev::timer &watcher, int events_flags) { << ((mother->get_opened_connections()-last_opened_connections)/conf->schedule_interval) << "/s" << std::endl; } + if ((work->get_status() == CLOSING) && db->all_clear()) { + std::cout << "all clear, shutting down" << std::endl; + exit(0); + } + last_opened_connections = mother->get_opened_connections(); + db->flush(); + time_t cur_time = time(NULL); - if(cur_time > next_flush_torrents) { - db->flush_torrents(); - next_flush_torrents = cur_time + conf->flush_torrents_interval; - } - - if(cur_time > next_flush_users) { - db->flush_users(); - next_flush_users = cur_time + conf->flush_users_interval; - } - - if(cur_time > next_flush_snatches) { - db->flush_snatches(); - next_flush_snatches = cur_time + conf->flush_snatches_interval; - } - - if(cur_time > next_flush_peers) { - db->flush_peers(); - next_flush_peers = cur_time + conf->flush_peers_interval; - } if(cur_time > next_reap_peers) { work->reap_peers(); diff --git a/src/schedule.h b/src/schedule.h index e52946a..8bc70ff 100644 --- a/src/schedule.h +++ b/src/schedule.h @@ -11,10 +11,7 @@ class schedule { int last_opened_connections; int counter; - time_t next_flush_torrents; - time_t next_flush_users; - time_t next_flush_peers; - time_t next_flush_snatches; + time_t next_flush; time_t next_reap_peers; public: schedule(connection_mother * mother_obj, worker * worker_obj, config* conf_obj, mysql * db_obj); diff --git a/src/site_comm.cpp b/src/site_comm.cpp new file mode 100644 index 0000000..5294d01 --- /dev/null +++ b/src/site_comm.cpp @@ -0,0 +1,77 @@ +#include <iostream> +#include <istream> +#include <ostream> +#include <string> +#include <boost/asio.hpp> + +#include "config.h" +#include "site_comm.h" + +using boost::asio::ip::tcp; + +site_comm::site_comm(config &config) +{ + conf = config; +} + +bool site_comm::expire_token(int torrent, int user) +{ + try { + boost::asio::io_service io_service; + + tcp::resolver resolver(io_service); + tcp::resolver::query query(conf.site_host, "http"); + tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); + tcp::resolver::iterator end; + + tcp::socket socket(io_service); + boost::system::error_code error = boost::asio::error::host_not_found; + while (error && endpoint_iterator != end) { + socket.close(); + socket.connect(*endpoint_iterator++, error); + } + if (error) { + throw boost::system::system_error(error); + } + + boost::asio::streambuf request; + std::ostream request_stream(&request); + request_stream << "GET /tools.php?key=" << conf.site_password << "&type=expiretoken&action=ocelot&torrentid=" << torrent << "&userid=" << user << " HTTP/1.0\r\n"; + request_stream << "Host: " << conf.site_host << "\r\n"; + request_stream << "Accept: */*\r\n"; + request_stream << "Connection: close\r\n\r\n"; + + boost::asio::write(socket, request); + + boost::asio::streambuf response; + boost::asio::read_until(socket, response, "\r\n"); + + std::istream response_stream(&response); + std::string http_version; + response_stream >> http_version; + unsigned int status_code; + response_stream >> status_code; + std::string status_message; + std::getline(response_stream, status_message); + + if (!response_stream || http_version.substr(0, 5) != "HTTP/") { + std::cout << "Invalid response" << std::endl; + return false; + } + + if (status_code == 200) { + return true; + } else { + std::cout << "Response returned with status code " << status_code << " when trying to expire a token!" << std::endl;; + return false; + } + } catch (std::exception &e) { + std::cout << "Exception: " << e.what() << std::endl; + return false; + } + return true; +} + +site_comm::~site_comm() +{ +} diff --git a/src/site_comm.h b/src/site_comm.h new file mode 100644 index 0000000..0741059 --- /dev/null +++ b/src/site_comm.h @@ -0,0 +1,22 @@ +#ifndef OCELOT_SITE_COMM_H +#define OCELOT_SITE_COMM_H +#include <iostream> +#include <istream> +#include <ostream> +#include <string> +#include <boost/asio.hpp> + +#include "config.h" + +using boost::asio::ip::tcp; + +class site_comm { + private: + config conf; + + public: + site_comm(config &conf); + bool expire_token(int torrent, int user); + ~site_comm(); +}; +#endif diff --git a/src/worker.cpp b/src/worker.cpp index c0e2e16..7ec604e 100644 --- a/src/worker.cpp +++ b/src/worker.cpp @@ -5,6 +5,8 @@ #include <sstream> #include <list> #include <vector> +#include <set> +#include <algorithm> #include <netinet/in.h> #include <arpa/inet.h> @@ -14,6 +16,7 @@ #include "db.h" #include "worker.h" #include "misc_functions.h" +#include "site_comm.h" #include <boost/thread/mutex.hpp> #include <boost/thread/thread.hpp> @@ -21,7 +24,20 @@ #include <boost/bind.hpp> //---------- Worker - does stuff with input -worker::worker(torrent_list &torrents, user_list &users, std::vector<std::string> &_whitelist, config * conf_obj, mysql * db_obj) : torrents_list(torrents), users_list(users), whitelist(_whitelist), conf(conf_obj), db(db_obj) { +worker::worker(torrent_list &torrents, user_list &users, std::vector<std::string> &_whitelist, config * conf_obj, mysql * db_obj, site_comm &sc) : torrents_list(torrents), users_list(users), whitelist(_whitelist), conf(conf_obj), db(db_obj), s_comm(sc) { + status = OPEN; +} +bool worker::signal(int sig) { + if (status == OPEN) { + status = CLOSING; + std::cout << "closing tracker... press Ctrl-C again to terminate" << std::endl; + return false; + } else if (status == CLOSING) { + std::cout << "shutting down uncleanly" << std::endl; + return true; + } else { + return false; + } } std::string worker::work(std::string &input, std::string &ip) { unsigned int input_length = input.length(); @@ -69,6 +85,10 @@ std::string worker::work(std::string &input, std::string &ip) { if(action == INVALID) { return error("invalid action"); } + + if ((status != OPEN) && (action != UPDATE)) { + return error("The tracker is temporarily unavailable."); + } // Parse URL params std::list<std::string> infohashes; // For scrape only @@ -86,6 +106,7 @@ std::string worker::work(std::string &input, std::string &ip) { if(action == SCRAPE && key == "info_hash") { infohashes.push_back(value); } else { + params[key] = value; } key.clear(); @@ -118,6 +139,7 @@ std::string worker::work(std::string &input, std::string &ip) { if(found_data) { found_data = false; // dodge for getting around \r\n or just \n + std::transform(key.begin(), key.end(), key.begin(), ::tolower); headers[key] = value; key.clear(); value.clear(); @@ -186,6 +208,7 @@ std::string worker::announce(torrent &tor, user &u, std::map<std::string, std::s bool inserted = false; // If we insert the peer as opposed to update bool update_torrent = false; // Whether or not we should update the torrent in the DB + bool expire_token = false; // Whether or not to expire a token after torrent completion std::map<std::string, std::string>::const_iterator peer_id_iterator = params.find("peer_id"); if(peer_id_iterator == params.end()) { @@ -194,19 +217,20 @@ std::string worker::announce(torrent &tor, user &u, std::map<std::string, std::s std::string peer_id = peer_id_iterator->second; peer_id = hex_decode(peer_id); - bool found = false; // Found client in whitelist? - for(unsigned int i = 0; i < whitelist.size(); i++) { - if(peer_id.find(whitelist[i]) == 0) { - found = true; - break; + if(whitelist.size() > 0) { + bool found = false; // Found client in whitelist? + for(unsigned int i = 0; i < whitelist.size(); i++) { + if(peer_id.find(whitelist[i]) == 0) { + found = true; + break; + } } - } - if(!found) { - return error("Your client is not on the whitelist"); + if(!found) { + return error("Your client is not on the whitelist"); + } } - peer * p; peer_list::iterator i; // Insert/find the peer in the torrent list @@ -246,29 +270,33 @@ std::string worker::announce(torrent &tor, user &u, std::map<std::string, std::s p->left = left; long long upspeed = 0; long long downspeed = 0; + long long real_uploaded_change = 0; + long long real_downloaded_change = 0; if(inserted || params["event"] == "started" || uploaded < p->uploaded || downloaded < p->downloaded) { //New peer on this torrent update_torrent = true; p->userid = u.id; p->peer_id = peer_id; - p->user_agent = headers["User-Agent"]; + p->user_agent = headers["user-agent"]; p->first_announced = cur_time; p->last_announced = 0; p->uploaded = uploaded; p->downloaded = downloaded; p->announces = 1; } else { - p->announces++; - long long uploaded_change = 0; long long downloaded_change = 0; + p->announces++; + if(uploaded != p->uploaded) { uploaded_change = uploaded - p->uploaded; + real_uploaded_change = uploaded_change; p->uploaded = uploaded; } if(downloaded != p->downloaded) { downloaded_change = downloaded - p->downloaded; + real_downloaded_change = downloaded_change; p->downloaded = downloaded; } if(uploaded_change || downloaded_change) { @@ -282,8 +310,18 @@ std::string worker::announce(torrent &tor, user &u, std::map<std::string, std::s upspeed = uploaded_change / (cur_time - p->last_announced); downspeed = downloaded_change / (cur_time - p->last_announced); } - - if(tor.free_torrent == true) { + std::set<int>::iterator sit = tor.tokened_users.find(u.id); + if (tor.free_torrent == NEUTRAL) { + downloaded_change = 0; + uploaded_change = 0; + } else if(tor.free_torrent == FREE || sit != tor.tokened_users.end()) { + if(sit != tor.tokened_users.end()) { + expire_token = true; + std::stringstream record; + record << '(' << u.id << ',' << tor.id << ',' << downloaded_change << ')'; + std::string record_str = record.str(); + db->record_token(record_str); + } downloaded_change = 0; } @@ -371,6 +409,11 @@ std::string worker::announce(torrent &tor, user &u, std::map<std::string, std::s // User is a seeder now! tor.seeders.insert(std::pair<std::string, peer>(peer_id, *p)); tor.leechers.erase(peer_id); + if(expire_token) { + (&s_comm)->expire_token(tor.id, u.id); + tor.tokened_users.erase(u.id); + } + // do cache expire } std::string peers; @@ -444,7 +487,14 @@ std::string worker::announce(torrent &tor, user &u, std::map<std::string, std::s std::stringstream record; record << '(' << u.id << ',' << tor.id << ',' << active << ',' << uploaded << ',' << downloaded << ',' << upspeed << ',' << downspeed << ',' << left << ',' << (cur_time - p->first_announced) << ',' << p->announces << ','; std::string record_str = record.str(); - db->record_peer(record_str, ip, peer_id, headers["User-Agent"]); + db->record_peer(record_str, ip, peer_id, headers["user-agent"]); + + if (real_uploaded_change > 0 || real_downloaded_change > 0) { + record.str(""); + record << '(' << u.id << ',' << downloaded << ',' << left << ',' << uploaded << ',' << upspeed << ',' << downspeed << ',' << (cur_time - p->first_announced); + record_str = record.str(); + db->record_peer_hist(record_str, peer_id, tor.id); + } std::string response = "d8:intervali"; response.reserve(350); @@ -499,34 +549,44 @@ std::string worker::scrape(const std::list<std::string> &infohashes) { //TODO: Restrict to local IPs std::string worker::update(std::map<std::string, std::string> ¶ms) { - std::cout << "Got update" << std::endl; if(params["action"] == "change_passkey") { std::string oldpasskey = params["oldpasskey"]; std::string newpasskey = params["newpasskey"]; - users_list[newpasskey] = users_list[oldpasskey]; - users_list.erase(oldpasskey); - std::cout << "changed passkey from " << oldpasskey << " to " << newpasskey << " for user " << users_list[newpasskey].id << std::endl; + user_list::iterator i = users_list.find(oldpasskey); + if (i == users_list.end()) { + std::cout << "No user with passkey " << oldpasskey << " exists when attempting to change passkey to " << newpasskey << std::endl; + } else { + users_list[newpasskey] = users_list[oldpasskey]; + users_list.erase(oldpasskey); + std::cout << "changed passkey from " << oldpasskey << " to " << newpasskey << " for user " << users_list[newpasskey].id << std::endl; + } } else if(params["action"] == "add_torrent") { torrent t; t.id = strtolong(params["id"]); std::string info_hash = params["info_hash"]; info_hash = hex_decode(info_hash); - bool fl = false; - if(params["freetorrent"] == "1") { - fl = true; + if(params["freetorrent"] == "0") { + t.free_torrent = NORMAL; + } else if(params["freetorrent"] == "1") { + t.free_torrent = FREE; + } else { + t.free_torrent = NEUTRAL; } t.balance = 0; t.completed = 0; t.last_selected_seeder = ""; - t.free_torrent = fl; torrents_list[info_hash] = t; - std::cout << "Added torrent " << t.id << std::endl; + std::cout << "Added torrent " << t.id<< ". FL: " << t.free_torrent << " " << params["freetorrent"] << std::endl; } else if(params["action"] == "update_torrent") { std::string info_hash = params["info_hash"]; info_hash = hex_decode(info_hash); - bool fl = false; - if(params["freetorrent"] == "1") { - fl = true; + freetype fl; + if(params["freetorrent"] == "0") { + fl = NORMAL; + } else if(params["freetorrent"] == "1") { + fl = FREE; + } else { + fl = NEUTRAL; } if(torrents_list.find(info_hash) != torrents_list.end()) { torrents_list[info_hash].free_torrent = fl; @@ -538,9 +598,13 @@ std::string worker::update(std::map<std::string, std::string> ¶ms) { // Each decoded infohash is exactly 20 characters long. std::string info_hashes = params["info_hashes"]; info_hashes = hex_decode(info_hashes); - bool fl = false; - if(params["freetorrent"] == "1") { - fl = true; + freetype fl; + if(params["freetorrent"] == "0") { + fl = NORMAL; + } else if(params["freetorrent"] == "1") { + fl = FREE; + } else { + fl = NEUTRAL; } for(unsigned int pos = 0; pos < info_hashes.length(); pos += 20) { std::string info_hash = info_hashes.substr(pos, 20); @@ -551,6 +615,22 @@ std::string worker::update(std::map<std::string, std::string> ¶ms) { std::cout << "Failed to find torrent " << info_hash << " to FL " << fl << std::endl; } } + } else if(params["action"] == "add_token") { + std::string info_hash = hex_decode(params["info_hash"]); + int user_id = atoi(params["userid"].c_str()); + if(torrents_list.find(info_hash) != torrents_list.end()) { + torrents_list[info_hash].tokened_users.insert(user_id); + } else { + std::cout << "Failed to find torrent to add a token for user " << user_id << std::endl; + } + } else if(params["action"] == "remove_token") { + std::string info_hash = hex_decode(params["info_hash"]); + int user_id = atoi(params["userid"].c_str()); + if(torrents_list.find(info_hash) != torrents_list.end()) { + torrents_list[info_hash].tokened_users.erase(user_id); + } else { + std::cout << "Failed to find torrent " << info_hash << " to remove token for user " << user_id << std::endl; + } } else if(params["action"] == "delete_torrent") { std::string info_hash = params["info_hash"]; info_hash = hex_decode(info_hash); @@ -586,8 +666,13 @@ std::string worker::update(std::map<std::string, std::string> ¶ms) { if(params["can_leech"] == "0") { can_leech = false; } - users_list[passkey].can_leech = can_leech; - std::cout << "Updated user " << passkey << std::endl; + user_list::iterator i = users_list.find(passkey); + if (i == users_list.end()) { + std::cout << "No user with passkey " << passkey << " found when attempting to change leeching status!" << std::endl; + } else { + users_list[passkey].can_leech = can_leech; + std::cout << "Updated user " << passkey << std::endl; + } } else if(params["action"] == "add_whitelist") { std::string peer_id = params["peer_id"]; whitelist.push_back(peer_id); @@ -616,6 +701,16 @@ std::string worker::update(std::map<std::string, std::string> ¶ms) { unsigned int interval = strtolong(params["new_announce_interval"]); conf->announce_interval = interval; std::cout << "Edited announce interval to " << interval << std::endl; + } else if(params["action"] == "info_torrent") { + std::string info_hash_hex = params["info_hash"]; + std::string info_hash = hex_decode(info_hash_hex); + std::cout << "Info for torrent '" << info_hash_hex << "'" << std::endl; + if(torrents_list.find(info_hash) != torrents_list.end()) { + std::cout << "Torrent " << torrents_list[info_hash].id + << ", freetorrent = " << torrents_list[info_hash].free_torrent << std::endl; + } else { + std::cout << "Failed to find torrent " << info_hash_hex << std::endl; + } } return "success"; } @@ -626,6 +721,7 @@ void worker::reap_peers() { } void worker::do_reap_peers() { + db->logger_ptr->log("Began worker::do_reap_peers()"); time_t cur_time = time(NULL); unsigned int reaped = 0; std::unordered_map<std::string, torrent>::iterator i = torrents_list.begin(); @@ -657,4 +753,5 @@ void worker::do_reap_peers() { } } std::cout << "Reaped " << reaped << " peers" << std::endl; + db->logger_ptr->log("Completed worker::do_reap_peers()"); } diff --git a/src/worker.h b/src/worker.h index 431535b..5e57174 100644 --- a/src/worker.h +++ b/src/worker.h @@ -6,6 +6,9 @@ #include <arpa/inet.h> #include <iostream> #include <fstream> +#include "site_comm.h" + +enum tracker_status { OPEN, PAUSED, CLOSING }; // tracker status class worker { private: @@ -15,13 +18,20 @@ class worker { config * conf; mysql * db; void do_reap_peers(); + tracker_status status; + site_comm s_comm; + public: - worker(torrent_list &torrents, user_list &users, std::vector<std::string> &_whitelist, config * conf_obj, mysql * db_obj); + worker(torrent_list &torrents, user_list &users, std::vector<std::string> &_whitelist, config * conf_obj, mysql * db_obj, site_comm &sc); std::string work(std::string &input, std::string &ip); std::string error(std::string err); std::string announce(torrent &tor, user &u, std::map<std::string, std::string> ¶ms, std::map<std::string, std::string> &headers, std::string &ip); std::string scrape(const std::list<std::string> &infohashes); std::string update(std::map<std::string, std::string> ¶ms); + bool signal(int sig); + + tracker_status get_status() { return status; } + void reap_peers(); }; |