summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorErik Andersson <erik@packy.se>2016-11-22 23:43:26 +0100
committerErik Andersson <erik@packy.se>2016-11-22 23:43:26 +0100
commit25be47d53ab3decef58859992010848deda5279c (patch)
treefbf0a8935d87a1817e078f5da55be42a4156e305
parent232f2f6e6791eb9fc58f71d1bd6d3cf1c3c96679 (diff)
downloadocelot-25be47d53ab3decef58859992010848deda5279c.zip
ocelot-25be47d53ab3decef58859992010848deda5279c.tar.gz
ocelot-25be47d53ab3decef58859992010848deda5279c.tar.bz2
Ocelot v0.3v0.3
-rw-r--r--src/COPYING.txt133
-rw-r--r--src/INSTALL.txt15
-rw-r--r--src/Makefile (renamed from src/makefile)2
-rw-r--r--src/config.cpp.template6
-rw-r--r--src/config.h10
-rw-r--r--src/db.cpp579
-rw-r--r--src/db.h56
-rw-r--r--src/events.h3
-rw-r--r--src/logger.cpp34
-rw-r--r--src/logger.h26
-rw-r--r--src/misc_functions.h3
-rw-r--r--src/ocelot.cpp43
-rw-r--r--src/ocelot.h6
-rw-r--r--src/schedule.cpp30
-rw-r--r--src/schedule.h5
-rw-r--r--src/site_comm.cpp77
-rw-r--r--src/site_comm.h22
-rw-r--r--src/worker.cpp163
-rw-r--r--src/worker.h12
19 files changed, 781 insertions, 444 deletions
diff --git a/src/COPYING.txt b/src/COPYING.txt
deleted file mode 100644
index 048ca8e..0000000
--- a/src/COPYING.txt
+++ /dev/null
@@ -1,133 +0,0 @@
-COPYRIGHTED AND PATENTED PENDING BY WHAT.CD INCORPORATED, DBA/AKA PROJECT
-GAZELLE/OCELOT
-
-END-USER LICENSE AGREEMENT
-
-The Ocelot Source Code (hereinafter referred to as the Software) is made
-publically available free of charge by Project Gazelle (sometimes a trade and
-business name for the legally consituted entity known as What.CD Incorporated,
-a Panamanian Corporation), copyright and patent pending owner (hereinafter
-referred to as the Licensor or Original Licensor) to anyone who wishes to use
-and/or share or distribute and/or change or modify it (hereinafter referred to
-as either the Licensee and/or End-User) FOR NONCOMMERCIAL PURPOSES ONLY under
-the following terms and conditions (hereinafter referred to as the "License"):
-
-Section 1. You may copy and distribute verbatim copies of the Software's
-source code as you receive it, in any medium, provided that you conspicuously
-and appropriately publish on each copy an appropriate copyright notice and
-disclaimer of warranty; keep intact all the notices that refer to this License
-and to the absence of any warranty; and give any other recipients of the
-Software a copy of this License along with the Software.
-
-Section 2: You may modify or change your copy or copies of the Software or
-any portion of it, thus forming a work based on the Software, and copy and
-distribute such modifications or work under the terms of Section 1 above,
-provided that you also meet all of these conditions:
-
- 2.1: You must cause the modified files to carry prominent notices stating
-that you changed the files and the date of any change.
-
- 2.2: You must cause any work that you distribute or publish, that in
-whole or in part contains or is derived from the Software or any part thereof,
-to be licensed as a whole at no charge to all third parties under the terms of
-this License.
-
-Section 3: You may copy and distribute the Software (or a work based on it,
-under the terms of Section 2) in object code or executable form under the
-terms of Sections 1 and 2 above provided that you also do one of the
-following:
-
- 3.1: Accompany it with the complete corresponding machine-readable source
-code, which must be distributed under the terms of Sections 1 and 2 above on
-a medium customarily used for software interchange; or,
-
- 3.2: Accompany it with a written offer, valid for at least one year, to
-give any third party, for a charge no more than your cost of physically
-performing source distribution, a complete machine-readable copy of the
-corresponding source code, to be distributed under the terms of Sections 1 and
-2 above on a medium customarily used for software interchange; or,
-
- 3.3: Accompany it with the information you received as to the offer to
-distribute corresponding source code. (This alternative is allowed only for
-noncommercial distribution and only if you received the Software in object
-code or executable form with such an offer, in accord with Subsection 3.2
-above.)
-
-Section 4: You may not copy, modify, sublicense, or distribute the Software
-except as expressly provided under this License. Any attempt otherwise to
-copy, modify, sublicense or distribute the Software is void, and will
-automatically terminate your rights under this License. Parties who have
-received copies, or rights, from you under this License, however, will not
-have their licenses terminated so long as such parties remain in full
-compliance with the terms and conditions of this License.
-
-Section 5: This License and nothing else grants you permission to use, modify
-and/or distribute the Software or its derivative works and only in accordance
-with the terms and conditions of this License. Otherwise, use, modification
-and/or distribution of this Software is prohibited by international laws and
-treaties as regards intellectual property. Therefore, by using, modifying
-and/or distributing the Software (or any work based on the Software), you
-indicate your acceptance of this License to do so, and all its terms and
-conditions for copying, distributing or modifying the software or works based
-on it.
-
-Section 6: Each time you redistribute the Software (or any work based on the
-Software), the recipient automatically receives a license from the Original
-Licensor to copy, distribute and/or modify the Software subject to these terms
-and conditions. You may not impose any further restrictions on the
-recipients' exercise of the rights granted herein. You are not responsible for
-enforcing compliance by third parties to this License.
-
-Section 7: NO WARRANTY
-
- 7.1: BECAUSE THE SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO
-WARRANTY FOR THE SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT
-WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT/PATENT HOLDERS AND/OR OTHER
-PARTIES PROVIDE THE SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER
-EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
-MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO
-THE QUALITY AND PERFORMANCE OF THE SOFTWARE IS WITH YOU. SHOULD THE PROGRAM
-PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR
-CORRECTION.
-
-
- 7.2:. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN
-WRITING WILL ANY COPYRIGHT/PATENT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY
-AND/OR REDISTRIBUTE THE SOFTWARE AS PERMITTED ABOVE, BE LIABLE TO YOU FOR
-DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES
-ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT
-LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED
-BY YOU OR THIRD PARTIES OR A FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER
-SOFTWARE), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE
-POSSIBILITY OF SUCH DAMAGES.
-
-Section 8: If, as a consequence of a court judgment or allegation of patent
-infringement or for any other reason (not limited to patent issues),
-conditions are imposed on you (whether by court order, agreement or otherwise)
-that contradict the conditions of this License, they do not excuse you from
-the conditions of this License. If you cannot distribute so as to satisfy
-simultaneously your obligations under this License and any other pertinent
-obligations, then as a consequence you may not distribute the Software at all.
-For example, if a patent license would not permit royalty-free redistribution
-of the Software by all those who receive copies directly or indirectly through
-you, then the only way you could satisfy both it and this License would be to
-refrain entirely from distribution of the Software.
-
-Section 9: If any portion or section of this License is held invalid or
-unenforceable under any particular circumstance, the balance of this License
-and pertinent section is intended to apply and the section as a whole is
-intended to apply in other circumstances.
-
-Section 10: Each version of the Software is given a distinguishing version
-number. If the Software specifies a version number of this License which
-applies to it and "any later version", you have the option of following the
-terms and conditions either of that version or of any later version. If the
-Software does not specify a version number of this License, you may choose any
-version ever published by the Licensor.
-
-Section 11: This Software is distributed pursuant to the terms and conditions
-of this License and is distributed free of charge FOR NONCOMMERCIAL PURPOSES
-ONLY. THE USE, DISTRIBUTION AND/OR MODIFICATION OF THIS SOFTWARE FOR
-COMMERCIAL PURPOSES IS EXPRESSLY PROHIBITED BY INTERNATIONAL LAWS AND TREATIES
-AS REGARDS INTELLECTUAL PROPERTY.
-
diff --git a/src/INSTALL.txt b/src/INSTALL.txt
deleted file mode 100644
index cb22528..0000000
--- a/src/INSTALL.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-Ocelot requires the following:
-
-libboost_thread >= 1.44.0
-libmysqlpp >= 3.0.0
-libev >= 3.43
-g++ >= 4.3.2
-
-There is no autoconfigure file yet, so you may need to play with the makefile.
-A common tripping point is that your libboost_thread may be called libbost_thread-mt,
-or you may need to include files from strange include directories.
-
-Once everything is okay, just type 'make'.
-
-Have fun :P
-
diff --git a/src/makefile b/src/Makefile
index a1cfe15..7f0576b 100644
--- a/src/makefile
+++ b/src/Makefile
@@ -1,7 +1,7 @@
CXX=g++
CXXFLAGS=-march=native -O2 -fomit-frame-pointer -fno-ident -fvisibility-inlines-hidden -fvisibility=hidden -Wall -std=c++0x -iquote -Wl,O1 -Wl,--as-needed -I/usr/include/mysql -I/usr/include/mysql++ -I/usr/local/include/boost
LDFLAGS=-L/usr/local/lib
-LIBS=-lmysqlpp -lboost_thread -lev
+LIBS=-lmysqlpp -lboost_system -lboost_thread -lev
OCELOT=ocelot
OBJS=$(patsubst %.cpp,%.o,$(wildcard *.cpp))
all: $(OCELOT)
diff --git a/src/config.cpp.template b/src/config.cpp.template
index d22bd4c..900c24e 100644
--- a/src/config.cpp.template
+++ b/src/config.cpp.template
@@ -6,16 +6,12 @@ config::config() {
max_connections = 512;
max_read_buffer = 4096;
timeout_interval = 20;
- schedule_interval = 5;
+ schedule_interval = 3;
max_middlemen = 5000;
announce_interval = 1800;
peers_timeout = 2700; //Announce interval * 1.5
- flush_torrents_interval = 40;
- flush_users_interval = 40;
- flush_peers_interval = 40;
- flush_snatches_interval = 40;
reap_peers_interval = 1800;
mysql_db = "gazelle";
diff --git a/src/config.h b/src/config.h
index 85e2843..6f72db8 100644
--- a/src/config.h
+++ b/src/config.h
@@ -1,8 +1,12 @@
+#ifndef OCELOT_CONFIG_H
+#define OCELOT_CONFIG_H
+
#include <string>
class config {
public:
std::string host;
+ std::string site_host;
unsigned int port;
unsigned int max_connections;
unsigned int max_read_buffer;
@@ -13,10 +17,6 @@ class config {
unsigned int announce_interval;
int peers_timeout;
- unsigned int flush_torrents_interval;
- unsigned int flush_users_interval;
- unsigned int flush_peers_interval;
- unsigned int flush_snatches_interval;
unsigned int reap_peers_interval;
// MySQL
@@ -29,3 +29,5 @@ class config {
config();
};
+
+#endif
diff --git a/src/db.cpp b/src/db.cpp
index aba9db5..d9ba4b9 100644
--- a/src/db.cpp
+++ b/src/db.cpp
@@ -1,272 +1,467 @@
#include "ocelot.h"
#include "db.h"
+#include "misc_functions.h"
#include <string>
#include <iostream>
+#include <queue>
+#include <unistd.h>
+#include <time.h>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
+#include <boost/lexical_cast.hpp>
+
+#define DB_LOCK_TIMEOUT 50
mysql::mysql(std::string mysql_db, std::string mysql_host, std::string username, std::string password) {
- if(!conn.connect(mysql_db.c_str(), mysql_host.c_str(), username.c_str(), password.c_str(), 0)) {
- std::cout << "Could not connect to MySQL" << std::endl;
- return;
- }
-
- update_user_buffer = "";
- update_torrent_buffer = "";
- update_peer_buffer = "";
- update_snatch_buffer = "";
+ if(!conn.connect(mysql_db.c_str(), mysql_host.c_str(), username.c_str(), password.c_str(), 0)) {
+ std::cout << "Could not connect to MySQL" << std::endl;
+ return;
+ }
+
+ db = mysql_db, server = mysql_host, db_user = username, pw = password;
+ u_active = false; t_active = false; p_active = false; s_active = false; tok_active = false;
+
+ std::cout << "Connected to MySQL" << std::endl;
+ update_user_buffer = "";
+ update_torrent_buffer = "";
+ update_peer_buffer = "";
+ update_snatch_buffer = "";
+
+ logger_ptr = logger::get_instance();
}
void mysql::load_torrents(std::unordered_map<std::string, torrent> &torrents) {
- mysqlpp::Query query = conn.query("SELECT ID, info_hash, freetorrent, Snatched FROM torrents ORDER BY ID;");
- if(mysqlpp::StoreQueryResult res = query.store()) {
- mysqlpp::String one("1"); // Hack to get around bug in mysql++3.0.0
- size_t num_rows = res.num_rows();
- for(size_t i = 0; i < num_rows; i++) {
- std::string info_hash;
- res[i][1].to_string(info_hash);
-
- torrent t;
- t.id = res[i][0];
- if(res[i][2].compare(one) == 0) {
- t.free_torrent = true;
- } else {
- t.free_torrent = false;
- }
- t.balance = 0;
- t.completed = res[i][3];
- t.last_selected_seeder = "";
- torrents[info_hash] = t;
- }
- }
+ mysqlpp::Query query = conn.query("SELECT ID, info_hash, freetorrent, Snatched FROM torrents ORDER BY ID;");
+ if(mysqlpp::StoreQueryResult res = query.store()) {
+ mysqlpp::String one("1"); // Hack to get around bug in mysql++3.0.0
+ mysqlpp::String two("2");
+ size_t num_rows = res.num_rows();
+ for(size_t i = 0; i < num_rows; i++) {
+ std::string info_hash;
+ res[i][1].to_string(info_hash);
+
+ torrent t;
+ t.id = res[i][0];
+ if(res[i][2].compare(one) == 0) {
+ t.free_torrent = FREE;
+ } else if(res[i][2].compare(two) == 0) {
+ t.free_torrent = NEUTRAL;
+ } else {
+ t.free_torrent = NORMAL;
+ }
+ t.balance = 0;
+ t.completed = res[i][3];
+ t.last_selected_seeder = "";
+ torrents[info_hash] = t;
+ }
+ }
}
void mysql::load_users(std::unordered_map<std::string, user> &users) {
- mysqlpp::Query query = conn.query("SELECT ID, can_leech, torrent_pass FROM users_main WHERE Enabled='1';");
- if(mysqlpp::StoreQueryResult res = query.store()) {
- size_t num_rows = res.num_rows();
- for(size_t i = 0; i < num_rows; i++) {
- std::string passkey;
- res[i][2].to_string(passkey);
-
- user u;
- u.id = res[i][0];
- u.can_leech = res[i][1];
- users[passkey] = u;
- }
- }
+ mysqlpp::Query query = conn.query("SELECT ID, can_leech, torrent_pass FROM users_main WHERE Enabled='1';");
+ if(mysqlpp::StoreQueryResult res = query.store()) {
+ size_t num_rows = res.num_rows();
+ for(size_t i = 0; i < num_rows; i++) {
+ std::string passkey;
+ res[i][2].to_string(passkey);
+
+ user u;
+ u.id = res[i][0];
+ u.can_leech = res[i][1];
+ users[passkey] = u;
+ }
+ }
}
-void mysql::load_whitelist(std::vector<std::string> &whitelist) {
- mysqlpp::Query query = conn.query("SELECT peer_id FROM xbt_client_whitelist;");
- if(mysqlpp::StoreQueryResult res = query.store()) {
- size_t num_rows = res.num_rows();
- for(size_t i = 0; i<num_rows; i++) {
- whitelist.push_back(res[i][0].c_str());
- }
- }
+void mysql::load_tokens(std::unordered_map<std::string, torrent> &torrents) {
+ mysqlpp::Query query = conn.query("SELECT uf.UserID, t.info_hash FROM users_freeleeches AS uf JOIN torrents AS t ON t.ID = uf.TorrentID WHERE uf.Expired = '0';");
+ if (mysqlpp::StoreQueryResult res = query.store()) {
+ size_t num_rows = res.num_rows();
+ for (size_t i = 0; i < num_rows; i++) {
+ std::string info_hash;
+ res[i][1].to_string(info_hash);
+ std::unordered_map<std::string, torrent>::iterator it = torrents.find(info_hash);
+ if (it != torrents.end()) {
+ torrent &tor = it->second;
+ tor.tokened_users.insert(res[i][0]);
+ }
+ }
+ }
}
+void mysql::load_whitelist(std::vector<std::string> &whitelist) {
+ mysqlpp::Query query = conn.query("SELECT peer_id FROM xbt_client_whitelist;");
+ if(mysqlpp::StoreQueryResult res = query.store()) {
+ size_t num_rows = res.num_rows();
+ for(size_t i = 0; i<num_rows; i++) {
+ whitelist.push_back(res[i][0].c_str());
+ }
+ }
+}
+void mysql::record_token(std::string &record) {
+ boost::mutex::scoped_lock lock(user_token_lock);
+ if (update_token_buffer != "") {
+ update_token_buffer += ",";
+ }
+ update_token_buffer += record;
+}
void mysql::record_user(std::string &record) {
- boost::mutex::scoped_lock lock(user_buffer_lock);
- if(update_user_buffer != "") {
- update_user_buffer += ",";
- }
- update_user_buffer += record;
+ boost::mutex::scoped_lock lock(user_buffer_lock);
+ if(update_user_buffer != "") {
+ update_user_buffer += ",";
+ }
+ update_user_buffer += record;
}
void mysql::record_torrent(std::string &record) {
- boost::mutex::scoped_lock lock(torrent_buffer_lock);
- if(update_torrent_buffer != "") {
- update_torrent_buffer += ",";
- }
- update_torrent_buffer += record;
+ boost::mutex::scoped_lock lock(torrent_buffer_lock);
+ if(update_torrent_buffer != "") {
+ update_torrent_buffer += ",";
+ }
+ update_torrent_buffer += record;
}
void mysql::record_peer(std::string &record, std::string &ip, std::string &peer_id, std::string &useragent) {
- boost::mutex::scoped_lock lock(peer_buffer_lock);
- if(update_peer_buffer != "") {
- update_peer_buffer += ",";
+ boost::mutex::scoped_lock lock(peer_buffer_lock);
+ if(update_peer_buffer != "") {
+ update_peer_buffer += ",";
+ }
+ mysqlpp::Query q = conn.query();
+ q << record << mysqlpp::quote << ip << ',' << mysqlpp::quote << peer_id << ',' << mysqlpp::quote << useragent << "," << time(NULL) << ')';
+
+ update_peer_buffer += q.str();
+}
+
+void mysql::record_peer_hist(std::string &record, std::string &peer_id, int tid) {
+ boost::mutex::scoped_lock (peer_hist_buffer_lock);
+ if (update_peer_hist_buffer != "") {
+ update_peer_hist_buffer += ",";
}
mysqlpp::Query q = conn.query();
- q << record << mysqlpp::quote << ip << ',' << mysqlpp::quote << peer_id << ',' << mysqlpp::quote << useragent << ')';
-
- update_peer_buffer += q.str();
+ q << record << ',' << mysqlpp::quote << peer_id << ',' << tid << ',' << time(NULL) << ')';
+ update_peer_hist_buffer += q.str();
}
+
void mysql::record_snatch(std::string &record) {
- boost::mutex::scoped_lock lock(snatch_buffer_lock);
- if(update_snatch_buffer != "") {
- update_snatch_buffer += ",";
- }
- update_snatch_buffer += record;
+ boost::mutex::scoped_lock lock(mysql::snatch_buffer_lock);
+ if(update_snatch_buffer != "") {
+ update_snatch_buffer += ",";
+ }
+ update_snatch_buffer += record;
}
+bool mysql::all_clear() {
+ return (user_queue.size() == 0 && torrent_queue.size() == 0 && peer_queue.size() == 0 && snatch_queue.size() == 0 && token_queue.size() == 0 && peer_hist_queue.size() == 0);
+}
-
+void mysql::flush() {
+ flush_users();
+ flush_torrents();
+ flush_snatches();
+ flush_peers();
+ flush_peer_hist();
+ flush_tokens();
+}
void mysql::flush_users() {
- boost::thread thread(&mysql::do_flush_users, this);
+ std::string sql;
+ boost::mutex::scoped_lock lock(user_buffer_lock);
+ if (update_user_buffer == "") {
+ return;
+ }
+ sql = "INSERT INTO users_main (ID, Uploaded, Downloaded) VALUES " + update_user_buffer +
+ " ON DUPLICATE KEY UPDATE Uploaded = Uploaded + VALUES(Uploaded), Downloaded = Downloaded + VALUES(Downloaded)";
+ user_queue.push(sql);
+ update_user_buffer.clear();
+ if (user_queue.size() == 1 && u_active == false) {
+ boost::thread thread(&mysql::do_flush_users, this);
+ }
}
-void mysql::do_flush_users() {
- std::cout << "flushing users" << std::endl;
+void mysql::flush_torrents() {
std::string sql;
- { // Lock mutex
- boost::mutex::scoped_lock lock(user_buffer_lock);
- if(update_user_buffer == "") {
- return;
- } else {
- sql = "INSERT INTO users_main(ID, Uploaded, Downloaded) VALUES ";
- sql += update_user_buffer;
- sql += " ON DUPLICATE KEY UPDATE Uploaded=Uploaded+VALUES(Uploaded), Downloaded=Downloaded+VALUES(Downloaded)";
- update_user_buffer.clear();
- }
+ boost::mutex::scoped_lock lock(torrent_buffer_lock);
+ if (update_torrent_buffer == "") {
+ return;
+ }
+ sql = "INSERT INTO torrents (ID,Seeders,Leechers,Snatched,Balance) VALUES " + update_torrent_buffer +
+ " ON DUPLICATE KEY UPDATE Seeders=VALUES(Seeders), Leechers=VALUES(Leechers), " +
+ "Snatched=Snatched+VALUES(Snatched), Balance=VALUES(Balance), last_action = " +
+ "IF(VALUES(Seeders) > 0, NOW(), last_action)";
+ torrent_queue.push(sql);
+ update_torrent_buffer.clear();
+ sql.clear();
+ sql = "DELETE FROM torrents WHERE info_hash = ''";
+ torrent_queue.push(sql);
+ if (torrent_queue.size() == 2 && t_active == false) {
+ boost::thread thread(&mysql::do_flush_torrents, this);
+ }
+}
+
+void mysql::flush_snatches() {
+ std::string sql;
+ boost::mutex::scoped_lock lock(snatch_buffer_lock);
+ if (update_snatch_buffer == "" ) {
+ return;
+ }
+ sql = "INSERT INTO xbt_snatched (uid, fid, tstamp, IP) VALUES " + update_snatch_buffer;
+ snatch_queue.push(sql);
+ update_snatch_buffer.clear();
+ if (snatch_queue.size() == 1 && s_active == false) {
+ boost::thread thread(&mysql::do_flush_snatches, this);
+ }
+}
+
+void mysql::flush_peers() {
+ std::string sql;
+ boost::mutex::scoped_lock lock(peer_buffer_lock);
+ // because xfu inserts are slow and ram is not infinite we need to
+ // limit this queue's size
+ if (peer_queue.size() >= 1000) {
+ peer_queue.pop();
+ }
+ if (update_peer_buffer == "") {
+ return;
}
- boost::mutex::scoped_lock db_lock(db_mutex);
-
+ if (peer_queue.size() == 0) {
+ sql = "SET session sql_log_bin = 0";
+ peer_queue.push(sql);
+ sql.clear();
+ }
- mysqlpp::Query query = conn.query(sql);
- for(int i = 0; i < 3; i++) {
- try {
- query.execute();
- break;
- } catch(const mysqlpp::BadQuery& er) { // deadlock
- std::cout << "Query error: " << er.what() << std::endl;
- sleep(3);
- } catch (const mysqlpp::Exception& er) { // Weird unpredictable shit
- std::cout << "Query error: " << er.what() << std::endl;
- sleep(3);
- }
+ sql = "INSERT INTO xbt_files_users (uid,fid,active,uploaded,downloaded,upspeed,downspeed,remaining," +
+ std::string("timespent,announced,ip,peer_id,useragent,mtime) VALUES ") + update_peer_buffer +
+ " ON DUPLICATE KEY UPDATE active=VALUES(active), uploaded=VALUES(uploaded), " +
+ "downloaded=VALUES(downloaded), upspeed=VALUES(upspeed), " +
+ "downspeed=VALUES(downspeed), remaining=VALUES(remaining), " +
+ "timespent=VALUES(timespent), announced=VALUES(announced), " +
+ "mtime=VALUES(mtime)";
+ peer_queue.push(sql);
+ update_peer_buffer.clear();
+ if (peer_queue.size() == 2 && p_active == false) {
+ boost::thread thread(&mysql::do_flush_peers, this);
}
- std::cout << "flushed users" << std::endl;
}
+void mysql::flush_peer_hist() {
+ std::string sql;
+ boost::mutex::scoped_lock lock(peer_hist_buffer_lock);
+ if (update_peer_hist_buffer == "") {
+ return;
+ }
-void mysql::flush_torrents() {
- boost::thread thread(&mysql::do_flush_torrents, this);
+ if (peer_hist_queue.size() == 0) {
+ sql = "SET session sql_log_bin = 0";
+ peer_hist_queue.push(sql);
+ sql.clear();
+ }
+
+ sql = "INSERT IGNORE INTO xbt_peers_history (uid, downloaded, remaining, uploaded, upspeed, downspeed, timespent, peer_id, fid, mtime) VALUES " + update_peer_hist_buffer;
+ peer_hist_queue.push(sql);
+ update_peer_hist_buffer.clear();
+ if (peer_hist_queue.size() == 2 && hist_active == false) {
+ boost::thread thread(&mysql::do_flush_peer_hist, this);
+ }
}
-void mysql::do_flush_torrents() {
- std::cout << "flushing torrents" << std::endl;
+void mysql::flush_tokens() {
std::string sql;
- { // Lock mutex
- boost::mutex::scoped_lock lock(torrent_buffer_lock);
- if(update_torrent_buffer == "") {
- return;
- } else {
- sql = "INSERT INTO torrents(ID,Seeders,Leechers,Snatched,Balance) VALUES ";
- sql += update_torrent_buffer;
- sql += " ON DUPLICATE KEY UPDATE Seeders=VALUES(Seeders), Leechers=VALUES(Leechers), Snatched=Snatched+VALUES(Snatched), Balance=VALUES(Balance), last_action = IF(VALUES(Seeders) > 0, NOW(), last_action)";
- update_torrent_buffer.clear();
- }
+ boost::mutex::scoped_lock lock(user_token_lock);
+ if (update_token_buffer == "") {
+ return;
}
-
- boost::mutex::scoped_lock db_lock(db_mutex);
-
- mysqlpp::Query query = conn.query(sql);
- for(int i = 0; i < 3; i++) {
+ sql = "INSERT INTO users_freeleeches (UserID, TorrentID, Downloaded) VALUES " + update_token_buffer +
+ " ON DUPLICATE KEY UPDATE Downloaded = Downloaded + VALUES(Downloaded)";
+ token_queue.push(sql);
+ update_token_buffer.clear();
+ if (token_queue.size() == 1 && tok_active == false) {
+ boost::thread(&mysql::do_flush_tokens, this);
+ }
+}
+
+void mysql::do_flush_users() {
+ u_active = true;
+ mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ while (user_queue.size() > 0) {
try {
- query.execute();
- break;
- } catch(const mysqlpp::BadQuery& er) { // deadlock
- std::cout << "Query error: " << er.what() << std::endl;
+ std::string sql = user_queue.front();
+ mysqlpp::Query query = c.query(sql);
+ if (!query.exec()) {
+ std::cout << "User flush failed (" << user_queue.size() << " remain)" << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ boost::mutex::scoped_lock lock(user_buffer_lock);
+ user_queue.pop();
+ std::cout << "Users flushed (" << user_queue.size() << " remain)" << std::endl;
+ }
+ }
+ catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error: " << er.what() << " in flush users with a qlength: " << user_queue.front().size() << " queue size: " << user_queue.size() << std::endl;
sleep(3);
- } catch (const mysqlpp::Exception& er) { // Weird unpredictable shit
- std::cout << "Query error: " << er.what() << std::endl;
+ continue;
+ } catch (const mysqlpp::Exception &er) {
+ std::cerr << "Query error: " << er.what() << " in flush users with a qlength: " << user_queue.front().size() << " queue size: " << user_queue.size() << std::endl;
sleep(3);
+ continue;
}
}
- std::cout << "flushed torrents" << std::endl;
+ u_active = false;
+}
- sql = "DELETE FROM torrents WHERE info_hash = ''";
- mysqlpp::Query dquery = conn.query(sql);
- for (int i = 0; i < 3; i++) {
+void mysql::do_flush_torrents() {
+ t_active = true;
+ mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ while (torrent_queue.size() > 0) {
try {
- dquery.execute();
- break;
- } catch (const mysqlpp::BadQuery& er) {
- std::cout << "Query error: " << er.what() << std::endl;
+ std::string sql = torrent_queue.front();
+ if (sql == "") {
+ torrent_queue.pop();
+ continue;
+ }
+ mysqlpp::Query query = c.query(sql);
+ if (!query.exec()) {
+ std::cout << "Torrent flush failed (" << torrent_queue.size() << " remain)" << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ boost::mutex::scoped_lock lock(torrent_buffer_lock);
+ torrent_queue.pop();
+ std::cout << "Torrents flushed (" << torrent_queue.size() << " remain)" << std::endl;
+ }
+ }
+ catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error: " << er.what() << " in flush torrents with a qlength: " << torrent_queue.front().size() << " queue size: " << torrent_queue.size() << std::endl;
sleep(3);
- } catch (const mysqlpp::Exception& er) {
- std::cout << "Query error: " << er.what() << std::endl;
+ continue;
+ } catch (const mysqlpp::Exception &er) {
+ std::cerr << "Query error: " << er.what() << " in flush torrents with a qlength: " << torrent_queue.front().size() << " queue size: " << torrent_queue.size() << std::endl;
sleep(3);
+ continue;
}
}
+ t_active = false;
}
-void mysql::flush_snatches() {
- boost::thread thread(&mysql::do_flush_snatches, this);
-}
-
-void mysql::do_flush_snatches() {
- std::cout << "flushing snatches" << std::endl;
- std::string sql;
- { // lock mutex
- boost::mutex::scoped_lock lock(snatch_buffer_lock);
- if(update_snatch_buffer == "") {
- return;
- } else {
- sql = "INSERT INTO xbt_snatched(uid,fid,tstamp,IP) VALUES ";
- sql += update_snatch_buffer;
- update_snatch_buffer.clear();
- }
- }
-
- boost::mutex::scoped_lock db_lock(db_mutex);
-
- mysqlpp::Query query = conn.query(sql);
- for(int i = 0; i < 3; i++) {
+void mysql::do_flush_peers() {
+ p_active = true;
+ mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ while (peer_queue.size() > 0) {
try {
- query.execute();
- break;
- } catch(const mysqlpp::BadQuery& er) { // deadlock
- std::cout << "Query error: " << er.what() << std::endl;
+ std::string sql = peer_queue.front();
+ mysqlpp::Query query = c.query(sql);
+ if (!query.exec()) {
+ std::cout << "Peer flush failed (" << peer_queue.size() << " remain)" << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ boost::mutex::scoped_lock lock(peer_buffer_lock);
+ peer_queue.pop();
+ std::cout << "Peers flushed (" << peer_queue.size() << " remain)" << std::endl;
+ }
+ }
+ catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error: " << er.what() << " in flush peers with a qlength: " << peer_queue.front().size() << " queue size: " << peer_queue.size() << std::endl;
sleep(3);
- } catch (const mysqlpp::Exception& er) { // Weird unpredictable shit
- std::cout << "Query error: " << er.what() << std::endl;
+ continue;
+ } catch (const mysqlpp::Exception &er) {
+ std::cerr << "Query error: " << er.what() << " in flush peers with a qlength: " << peer_queue.front().size() << " queue size: " << peer_queue.size() << std::endl;
sleep(3);
+ continue;
}
}
- std::cout << "flushed snatches" << std::endl;
+ p_active = false;
}
-void mysql::flush_peers() {
- boost::thread thread(&mysql::do_flush_peers, this);
+void mysql::do_flush_peer_hist() {
+ hist_active = true;
+ mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ while (peer_hist_queue.size() > 0) {
+ try {
+ std::string sql = peer_hist_queue.front();
+ mysqlpp::Query query = c.query(sql);
+ if (!query.exec()) {
+ std::cout << "Peer history flush failed (" << peer_hist_queue.size() << " remain)" << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ boost::mutex::scoped_lock lock(peer_hist_buffer_lock);
+ peer_hist_queue.pop();
+ std::cout << "Peer history flushed (" << peer_hist_queue.size() << " remain)" << std::endl;
+ }
+ }
+ catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error: " << er.what() << " in flush peer history with a qlength: " << peer_hist_queue.front().size() << " queue size: " << peer_hist_queue.size() << std::endl;
+ sleep(3);
+ continue;
+ } catch (const mysqlpp::Exception &er) {
+ std::cerr << "Query error: " << er.what() << " in flush peer history with a qlength: " << peer_hist_queue.front().size() << " queue size: " << peer_hist_queue.size() << std::endl;
+ sleep(3);
+ continue;
+ }
+ }
+ hist_active = false;
}
-void mysql::do_flush_peers() {
- std::cout << "flushing peers" << std::endl;
- std::string sql;
- { // lock mutex
- boost::mutex::scoped_lock lock(peer_buffer_lock);
- if(update_peer_buffer == "") {
- return;
- } else {
- sql = "INSERT INTO xbt_files_users(uid,fid,active,uploaded,downloaded,upspeed,downspeed,remaining,timespent,announced,ip,peer_id,useragent) VALUES ";
- sql += update_peer_buffer;
- sql += " ON DUPLICATE KEY UPDATE active=VALUES(active), uploaded=VALUES(uploaded), downloaded=VALUES(downloaded), upspeed=VALUES(upspeed), downspeed=VALUES(downspeed), ";
- sql += "remaining=VALUES(remaining), timespent=VALUES(timespent), announced=VALUES(announced), peer_id=VALUES(peer_id), useragent=VALUES(useragent),mtime=UNIX_TIMESTAMP(NOW())";
- update_peer_buffer.clear();
+void mysql::do_flush_snatches() {
+ s_active = true;
+ mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ while (snatch_queue.size() > 0) {
+ try {
+ std::string sql = snatch_queue.front();
+ mysqlpp::Query query = c.query(sql);
+ if (!query.exec()) {
+ std::cout << "Snatch flush failed (" << snatch_queue.size() << " remain)" << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ boost::mutex::scoped_lock lock(snatch_buffer_lock);
+ snatch_queue.pop();
+ std::cout << "Snatches flushed (" << snatch_queue.size() << " remain)" << std::endl;
+ }
+ }
+ catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error: " << er.what() << " in flush snatches with a qlength: " << snatch_queue.front().size() << " queue size: " << snatch_queue.size() << std::endl;
+ sleep(3);
+ continue;
+ } catch (const mysqlpp::Exception &er) {
+ std::cerr << "Query error: " << er.what() << " in flush snatches with a qlength: " << snatch_queue.front().size() << " queue size: " << snatch_queue.size() << std::endl;
+ sleep(3);
+ continue;
}
}
-
- boost::mutex::scoped_lock db_lock(db_mutex);
-
- mysqlpp::Query query = conn.query(sql);
- for(int i = 0; i < 3; i++) {
+ s_active = false;
+}
+
+void mysql::do_flush_tokens() {
+ tok_active = true;
+ mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0);
+ while (token_queue.size() > 0) {
try {
- query.execute();
- break;
- } catch(const mysqlpp::BadQuery& er) { // deadlock
- std::cout << "Query error: " << er.what() << std::endl;
+ std::string sql = token_queue.front();
+ mysqlpp::Query query = c.query(sql);
+ if (!query.exec()) {
+ std::cout << "Token flush failed (" << token_queue.size() << " remain)" << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ boost::mutex::scoped_lock lock(user_token_lock);
+ token_queue.pop();
+ std::cout << "Tokens flushed (" << token_queue.size() << " remain)" << std::endl;
+ }
+ }
+ catch (const mysqlpp::BadQuery &er) {
+ std::cerr << "Query error: " << er.what() << " in flush tokens with a qlength: " << token_queue.front().size() << " queue size: " << token_queue.size() << std::endl;
sleep(3);
- } catch (const mysqlpp::Exception& er) { // Weird unpredictable shit
- std::cout << "Query error: " << er.what() << std::endl;
+ continue;
+ } catch (const mysqlpp::Exception &er) {
+ std::cerr << "Query error: " << er.what() << " in flush tokens with a qlength: " << token_queue.front().size() << " queue size: " << token_queue.size() << std::endl;
sleep(3);
+ continue;
}
}
- std::cout << "flushed peers" << std::endl;
+ tok_active = false;
}
diff --git a/src/db.h b/src/db.h
index 4c12019..4712dfc 100644
--- a/src/db.h
+++ b/src/db.h
@@ -4,8 +4,11 @@
#include <mysql++/mysql++.h>
#include <string>
#include <unordered_map>
+#include <queue>
#include <boost/thread/mutex.hpp>
+#include "logger.h"
+
class mysql {
private:
mysqlpp::Connection conn;
@@ -13,37 +16,64 @@ class mysql {
std::string update_torrent_buffer;
std::string update_peer_buffer;
std::string update_snatch_buffer;
+ std::string update_token_buffer;
+ std::string update_peer_hist_buffer;
+ std::queue<std::string> user_queue;
+ std::queue<std::string> torrent_queue;
+ std::queue<std::string> peer_queue;
+ std::queue<std::string> snatch_queue;
+ std::queue<std::string> token_queue;
+ std::queue<std::string> peer_hist_queue;
+
+ std::string db, server, db_user, pw;
+ bool u_active, t_active, p_active, s_active, tok_active, hist_active;
+
+ // These locks prevent more than one thread from reading/writing the buffers.
+ // These should be held for the minimum time possible.
boost::mutex user_buffer_lock;
boost::mutex torrent_buffer_lock;
boost::mutex peer_buffer_lock;
boost::mutex snatch_buffer_lock;
+ boost::mutex user_token_lock;
+ boost::mutex peer_hist_buffer_lock;
- boost::mutex user_list_mutex;
-
- boost::mutex db_mutex;
+ void do_flush_users();
+ void do_flush_torrents();
+ void do_flush_snatches();
+ void do_flush_peers();
+ void do_flush_tokens();
+ void do_flush_peer_hist();
+
+ void flush_users();
+ void flush_torrents();
+ void flush_snatches();
+ void flush_peers();
+ void flush_tokens();
+ void flush_peer_hist();
+
public:
mysql(std::string mysql_db, std::string mysql_host, std::string username, std::string password);
void load_torrents(std::unordered_map<std::string, torrent> &torrents);
void load_users(std::unordered_map<std::string, user> &users);
+ void load_tokens(std::unordered_map<std::string, torrent> &torrents);
void load_whitelist(std::vector<std::string> &whitelist);
void record_user(std::string &record); // (id,uploaded_change,downloaded_change)
void record_torrent(std::string &record); // (id,seeders,leechers,snatched_change,balance)
void record_snatch(std::string &record); // (uid,fid,tstamp)
void record_peer(std::string &record, std::string &ip, std::string &peer_id, std::string &useragent); // (uid,fid,active,peerid,useragent,ip,uploaded,downloaded,upspeed,downspeed,left,timespent,announces)
-
- void flush_users();
- void flush_torrents();
- void flush_snatches();
- void flush_peers();
-
- void do_flush_users();
- void do_flush_torrents();
- void do_flush_snatches();
- void do_flush_peers();
+ void record_token(std::string &record);
+ void record_peer_hist(std::string &record, std::string &peer_id, int tid);
+
+ void flush();
+ bool all_clear();
+
boost::mutex torrent_list_mutex;
+
+ logger* logger_ptr;
};
+
#pragma GCC visibility pop
#endif
diff --git a/src/events.h b/src/events.h
index 36a0866..85ed708 100644
--- a/src/events.h
+++ b/src/events.h
@@ -44,7 +44,6 @@ THE WORKER
-
// THE MOTHER - Spawns connection middlemen
class connection_mother {
private:
@@ -67,7 +66,7 @@ class connection_mother {
int get_open_connections() { return open_connections; }
int get_opened_connections() { return opened_connections; }
-
+
void handle_connect(ev::io &watcher, int events_flags);
~connection_mother();
};
diff --git a/src/logger.cpp b/src/logger.cpp
new file mode 100644
index 0000000..691c6cf
--- /dev/null
+++ b/src/logger.cpp
@@ -0,0 +1,34 @@
+#include "logger.h"
+
+logger* logger::singletonInstance_ = 0;
+
+logger::logger(std::string filename) {
+ logger::log_file_.open(filename.c_str(), std::ios::out);
+ if(logger::log_file_.is_open()) {
+ singletonInstance_ = this;
+ }
+}
+
+logger::~logger(void) {
+ if(log_file_.is_open()) {
+ log_file_.close();
+ }
+ singletonInstance_ = 0;
+}
+
+logger *logger::get_instance(void) {
+ if(singletonInstance_ != 0) {
+ return singletonInstance_;
+ }
+ return NULL;
+}
+
+bool logger::log(std::string msg) {
+ boost::mutex::scoped_lock lock(log_lock_);
+ if(log_file_.is_open()) {
+ log_file_ << msg << std::endl;
+ log_file_.flush();
+ return true;
+ }
+ return false;
+}
diff --git a/src/logger.h b/src/logger.h
new file mode 100644
index 0000000..08f7c2c
--- /dev/null
+++ b/src/logger.h
@@ -0,0 +1,26 @@
+#ifndef OCELOT_LOGGER_H
+#define OCELOT_LOGGER_H
+
+#include <string>
+#include <iostream>
+#include <fstream>
+
+#include <boost/thread/mutex.hpp>
+
+class logger {
+
+ public:
+ logger(std::string filename);
+ virtual ~logger(void);
+ bool log(std::string msg);
+ static logger* get_instance(void);
+
+ protected:
+
+ private:
+ static logger* singletonInstance_;
+ boost::mutex log_lock_;
+ std::ofstream log_file_;
+};
+
+#endif
diff --git a/src/misc_functions.h b/src/misc_functions.h
index 38e014d..1b70a64 100644
--- a/src/misc_functions.h
+++ b/src/misc_functions.h
@@ -1,10 +1,11 @@
#ifndef MISC_FUNCTIONS__H
#define MISC_FUNCTIONS__H
#include <string>
-
+#include <cstdlib>
long strtolong(const std::string& str);
long long strtolonglong(const std::string& str);
std::string inttostr(int i);
std::string hex_decode(const std::string &in);
+int timeval_subtract (timeval* result, timeval* x, timeval* y);
#endif
diff --git a/src/ocelot.cpp b/src/ocelot.cpp
index 05f1823..b7a484d 100644
--- a/src/ocelot.cpp
+++ b/src/ocelot.cpp
@@ -4,19 +4,21 @@
#include "worker.h"
#include "events.h"
#include "schedule.h"
+#include "logger.h"
+#include "site_comm.h"
static mysql *db_ptr;
static connection_mother *mother;
+static worker *work;
+static logger *log_ptr;
+static site_comm *sc_ptr;
static void sig_handler(int sig)
{
std::cout << "Caught SIGINT/SIGTERM" << std::endl;
- delete(mother);
- db_ptr->flush_torrents();
- db_ptr->flush_users();
- db_ptr->flush_snatches();
- db_ptr->flush_peers();
- exit(0);
+ if (work->signal(sig)) {
+ exit(0);
+ }
}
int main() {
@@ -24,28 +26,37 @@ int main() {
signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler);
-
+
+ log_ptr = new logger("debug.log");
+
mysql db(conf.mysql_db, conf.mysql_host, conf.mysql_username, conf.mysql_password);
db_ptr = &db;
+
+ site_comm sc(conf);
+ sc_ptr = &sc;
- std::unordered_map<std::string, torrent> torrents_list;
- db.load_torrents(torrents_list);
- std::cout << "Loaded " << torrents_list.size() << " torrents" << std::endl;
+ std::vector<std::string> whitelist;
+ db.load_whitelist(whitelist);
+ std::cout << "Loaded " << whitelist.size() << " clients into the whitelist" << std::endl;
+ if(whitelist.size() == 0) {
+ std::cout << "Assuming no whitelist desired, disabling" << std::endl;
+ }
std::unordered_map<std::string, user> users_list;
db.load_users(users_list);
std::cout << "Loaded " << users_list.size() << " users" << std::endl;
- std::vector<std::string> whitelist;
- db.load_whitelist(whitelist);
- std::cout << "Loaded " << whitelist.size() << " clients into the whitelist" << std::endl;
-
+ std::unordered_map<std::string, torrent> torrents_list;
+ db.load_torrents(torrents_list);
+ std::cout << "Loaded " << torrents_list.size() << " torrents" << std::endl;
+
+ db.load_tokens(torrents_list);
// Create worker object, which handles announces and scrapes and all that jazz
- worker work(torrents_list, users_list, whitelist, &conf, &db);
+ work = new worker(torrents_list, users_list, whitelist, &conf, &db, sc);
// Create connection mother, which binds to its socket and handles the event stuff
- mother = new connection_mother(&work, &conf, &db);
+ mother = new connection_mother(work, &conf, &db);
return 0;
}
diff --git a/src/ocelot.h b/src/ocelot.h
index 99e8d8f..0ceadea 100644
--- a/src/ocelot.h
+++ b/src/ocelot.h
@@ -2,6 +2,7 @@
#include <map>
#include <vector>
#include <unordered_map>
+#include <set>
#include <boost/thread/thread.hpp>
typedef struct {
@@ -21,15 +22,18 @@ typedef struct {
typedef std::map<std::string, peer> peer_list;
+enum freetype { NORMAL, FREE, NEUTRAL };
+
typedef struct {
int id;
time_t last_seeded;
long long balance;
int completed;
- bool free_torrent;
+ freetype free_torrent;
std::map<std::string, peer> seeders;
std::map<std::string, peer> leechers;
std::string last_selected_seeder;
+ std::set<int> tokened_users;
time_t last_flushed;
} torrent;
diff --git a/src/schedule.cpp b/src/schedule.cpp
index 622b539..b33d1c8 100644
--- a/src/schedule.cpp
+++ b/src/schedule.cpp
@@ -10,10 +10,6 @@ schedule::schedule(connection_mother * mother_obj, worker* worker_obj, config* c
counter = 0;
last_opened_connections = 0;
- next_flush_torrents = time(NULL) + conf->flush_torrents_interval;
- next_flush_users = time(NULL) + conf->flush_users_interval + 10;
- next_flush_peers = time(NULL) + conf->flush_peers_interval + 20;
- next_flush_snatches = time(NULL) + conf->flush_snatches_interval + 30;
next_reap_peers = time(NULL) + conf->reap_peers_interval + 40;
}
//---------- Schedule - gets called every schedule_interval seconds
@@ -25,28 +21,16 @@ void schedule::handle(ev::timer &watcher, int events_flags) {
<< ((mother->get_opened_connections()-last_opened_connections)/conf->schedule_interval) << "/s" << std::endl;
}
+ if ((work->get_status() == CLOSING) && db->all_clear()) {
+ std::cout << "all clear, shutting down" << std::endl;
+ exit(0);
+ }
+
last_opened_connections = mother->get_opened_connections();
+ db->flush();
+
time_t cur_time = time(NULL);
- if(cur_time > next_flush_torrents) {
- db->flush_torrents();
- next_flush_torrents = cur_time + conf->flush_torrents_interval;
- }
-
- if(cur_time > next_flush_users) {
- db->flush_users();
- next_flush_users = cur_time + conf->flush_users_interval;
- }
-
- if(cur_time > next_flush_snatches) {
- db->flush_snatches();
- next_flush_snatches = cur_time + conf->flush_snatches_interval;
- }
-
- if(cur_time > next_flush_peers) {
- db->flush_peers();
- next_flush_peers = cur_time + conf->flush_peers_interval;
- }
if(cur_time > next_reap_peers) {
work->reap_peers();
diff --git a/src/schedule.h b/src/schedule.h
index e52946a..8bc70ff 100644
--- a/src/schedule.h
+++ b/src/schedule.h
@@ -11,10 +11,7 @@ class schedule {
int last_opened_connections;
int counter;
- time_t next_flush_torrents;
- time_t next_flush_users;
- time_t next_flush_peers;
- time_t next_flush_snatches;
+ time_t next_flush;
time_t next_reap_peers;
public:
schedule(connection_mother * mother_obj, worker * worker_obj, config* conf_obj, mysql * db_obj);
diff --git a/src/site_comm.cpp b/src/site_comm.cpp
new file mode 100644
index 0000000..5294d01
--- /dev/null
+++ b/src/site_comm.cpp
@@ -0,0 +1,77 @@
+#include <iostream>
+#include <istream>
+#include <ostream>
+#include <string>
+#include <boost/asio.hpp>
+
+#include "config.h"
+#include "site_comm.h"
+
+using boost::asio::ip::tcp;
+
+site_comm::site_comm(config &config)
+{
+ conf = config;
+}
+
+bool site_comm::expire_token(int torrent, int user)
+{
+ try {
+ boost::asio::io_service io_service;
+
+ tcp::resolver resolver(io_service);
+ tcp::resolver::query query(conf.site_host, "http");
+ tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
+ tcp::resolver::iterator end;
+
+ tcp::socket socket(io_service);
+ boost::system::error_code error = boost::asio::error::host_not_found;
+ while (error && endpoint_iterator != end) {
+ socket.close();
+ socket.connect(*endpoint_iterator++, error);
+ }
+ if (error) {
+ throw boost::system::system_error(error);
+ }
+
+ boost::asio::streambuf request;
+ std::ostream request_stream(&request);
+ request_stream << "GET /tools.php?key=" << conf.site_password << "&type=expiretoken&action=ocelot&torrentid=" << torrent << "&userid=" << user << " HTTP/1.0\r\n";
+ request_stream << "Host: " << conf.site_host << "\r\n";
+ request_stream << "Accept: */*\r\n";
+ request_stream << "Connection: close\r\n\r\n";
+
+ boost::asio::write(socket, request);
+
+ boost::asio::streambuf response;
+ boost::asio::read_until(socket, response, "\r\n");
+
+ std::istream response_stream(&response);
+ std::string http_version;
+ response_stream >> http_version;
+ unsigned int status_code;
+ response_stream >> status_code;
+ std::string status_message;
+ std::getline(response_stream, status_message);
+
+ if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
+ std::cout << "Invalid response" << std::endl;
+ return false;
+ }
+
+ if (status_code == 200) {
+ return true;
+ } else {
+ std::cout << "Response returned with status code " << status_code << " when trying to expire a token!" << std::endl;;
+ return false;
+ }
+ } catch (std::exception &e) {
+ std::cout << "Exception: " << e.what() << std::endl;
+ return false;
+ }
+ return true;
+}
+
+site_comm::~site_comm()
+{
+}
diff --git a/src/site_comm.h b/src/site_comm.h
new file mode 100644
index 0000000..0741059
--- /dev/null
+++ b/src/site_comm.h
@@ -0,0 +1,22 @@
+#ifndef OCELOT_SITE_COMM_H
+#define OCELOT_SITE_COMM_H
+#include <iostream>
+#include <istream>
+#include <ostream>
+#include <string>
+#include <boost/asio.hpp>
+
+#include "config.h"
+
+using boost::asio::ip::tcp;
+
+class site_comm {
+ private:
+ config conf;
+
+ public:
+ site_comm(config &conf);
+ bool expire_token(int torrent, int user);
+ ~site_comm();
+};
+#endif
diff --git a/src/worker.cpp b/src/worker.cpp
index c0e2e16..7ec604e 100644
--- a/src/worker.cpp
+++ b/src/worker.cpp
@@ -5,6 +5,8 @@
#include <sstream>
#include <list>
#include <vector>
+#include <set>
+#include <algorithm>
#include <netinet/in.h>
#include <arpa/inet.h>
@@ -14,6 +16,7 @@
#include "db.h"
#include "worker.h"
#include "misc_functions.h"
+#include "site_comm.h"
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
@@ -21,7 +24,20 @@
#include <boost/bind.hpp>
//---------- Worker - does stuff with input
-worker::worker(torrent_list &torrents, user_list &users, std::vector<std::string> &_whitelist, config * conf_obj, mysql * db_obj) : torrents_list(torrents), users_list(users), whitelist(_whitelist), conf(conf_obj), db(db_obj) {
+worker::worker(torrent_list &torrents, user_list &users, std::vector<std::string> &_whitelist, config * conf_obj, mysql * db_obj, site_comm &sc) : torrents_list(torrents), users_list(users), whitelist(_whitelist), conf(conf_obj), db(db_obj), s_comm(sc) {
+ status = OPEN;
+}
+bool worker::signal(int sig) {
+ if (status == OPEN) {
+ status = CLOSING;
+ std::cout << "closing tracker... press Ctrl-C again to terminate" << std::endl;
+ return false;
+ } else if (status == CLOSING) {
+ std::cout << "shutting down uncleanly" << std::endl;
+ return true;
+ } else {
+ return false;
+ }
}
std::string worker::work(std::string &input, std::string &ip) {
unsigned int input_length = input.length();
@@ -69,6 +85,10 @@ std::string worker::work(std::string &input, std::string &ip) {
if(action == INVALID) {
return error("invalid action");
}
+
+ if ((status != OPEN) && (action != UPDATE)) {
+ return error("The tracker is temporarily unavailable.");
+ }
// Parse URL params
std::list<std::string> infohashes; // For scrape only
@@ -86,6 +106,7 @@ std::string worker::work(std::string &input, std::string &ip) {
if(action == SCRAPE && key == "info_hash") {
infohashes.push_back(value);
} else {
+
params[key] = value;
}
key.clear();
@@ -118,6 +139,7 @@ std::string worker::work(std::string &input, std::string &ip) {
if(found_data) {
found_data = false; // dodge for getting around \r\n or just \n
+ std::transform(key.begin(), key.end(), key.begin(), ::tolower);
headers[key] = value;
key.clear();
value.clear();
@@ -186,6 +208,7 @@ std::string worker::announce(torrent &tor, user &u, std::map<std::string, std::s
bool inserted = false; // If we insert the peer as opposed to update
bool update_torrent = false; // Whether or not we should update the torrent in the DB
+ bool expire_token = false; // Whether or not to expire a token after torrent completion
std::map<std::string, std::string>::const_iterator peer_id_iterator = params.find("peer_id");
if(peer_id_iterator == params.end()) {
@@ -194,19 +217,20 @@ std::string worker::announce(torrent &tor, user &u, std::map<std::string, std::s
std::string peer_id = peer_id_iterator->second;
peer_id = hex_decode(peer_id);
- bool found = false; // Found client in whitelist?
- for(unsigned int i = 0; i < whitelist.size(); i++) {
- if(peer_id.find(whitelist[i]) == 0) {
- found = true;
- break;
+ if(whitelist.size() > 0) {
+ bool found = false; // Found client in whitelist?
+ for(unsigned int i = 0; i < whitelist.size(); i++) {
+ if(peer_id.find(whitelist[i]) == 0) {
+ found = true;
+ break;
+ }
}
- }
- if(!found) {
- return error("Your client is not on the whitelist");
+ if(!found) {
+ return error("Your client is not on the whitelist");
+ }
}
-
peer * p;
peer_list::iterator i;
// Insert/find the peer in the torrent list
@@ -246,29 +270,33 @@ std::string worker::announce(torrent &tor, user &u, std::map<std::string, std::s
p->left = left;
long long upspeed = 0;
long long downspeed = 0;
+ long long real_uploaded_change = 0;
+ long long real_downloaded_change = 0;
if(inserted || params["event"] == "started" || uploaded < p->uploaded || downloaded < p->downloaded) {
//New peer on this torrent
update_torrent = true;
p->userid = u.id;
p->peer_id = peer_id;
- p->user_agent = headers["User-Agent"];
+ p->user_agent = headers["user-agent"];
p->first_announced = cur_time;
p->last_announced = 0;
p->uploaded = uploaded;
p->downloaded = downloaded;
p->announces = 1;
} else {
- p->announces++;
-
long long uploaded_change = 0;
long long downloaded_change = 0;
+ p->announces++;
+
if(uploaded != p->uploaded) {
uploaded_change = uploaded - p->uploaded;
+ real_uploaded_change = uploaded_change;
p->uploaded = uploaded;
}
if(downloaded != p->downloaded) {
downloaded_change = downloaded - p->downloaded;
+ real_downloaded_change = downloaded_change;
p->downloaded = downloaded;
}
if(uploaded_change || downloaded_change) {
@@ -282,8 +310,18 @@ std::string worker::announce(torrent &tor, user &u, std::map<std::string, std::s
upspeed = uploaded_change / (cur_time - p->last_announced);
downspeed = downloaded_change / (cur_time - p->last_announced);
}
-
- if(tor.free_torrent == true) {
+ std::set<int>::iterator sit = tor.tokened_users.find(u.id);
+ if (tor.free_torrent == NEUTRAL) {
+ downloaded_change = 0;
+ uploaded_change = 0;
+ } else if(tor.free_torrent == FREE || sit != tor.tokened_users.end()) {
+ if(sit != tor.tokened_users.end()) {
+ expire_token = true;
+ std::stringstream record;
+ record << '(' << u.id << ',' << tor.id << ',' << downloaded_change << ')';
+ std::string record_str = record.str();
+ db->record_token(record_str);
+ }
downloaded_change = 0;
}
@@ -371,6 +409,11 @@ std::string worker::announce(torrent &tor, user &u, std::map<std::string, std::s
// User is a seeder now!
tor.seeders.insert(std::pair<std::string, peer>(peer_id, *p));
tor.leechers.erase(peer_id);
+ if(expire_token) {
+ (&s_comm)->expire_token(tor.id, u.id);
+ tor.tokened_users.erase(u.id);
+ }
+ // do cache expire
}
std::string peers;
@@ -444,7 +487,14 @@ std::string worker::announce(torrent &tor, user &u, std::map<std::string, std::s
std::stringstream record;
record << '(' << u.id << ',' << tor.id << ',' << active << ',' << uploaded << ',' << downloaded << ',' << upspeed << ',' << downspeed << ',' << left << ',' << (cur_time - p->first_announced) << ',' << p->announces << ',';
std::string record_str = record.str();
- db->record_peer(record_str, ip, peer_id, headers["User-Agent"]);
+ db->record_peer(record_str, ip, peer_id, headers["user-agent"]);
+
+ if (real_uploaded_change > 0 || real_downloaded_change > 0) {
+ record.str("");
+ record << '(' << u.id << ',' << downloaded << ',' << left << ',' << uploaded << ',' << upspeed << ',' << downspeed << ',' << (cur_time - p->first_announced);
+ record_str = record.str();
+ db->record_peer_hist(record_str, peer_id, tor.id);
+ }
std::string response = "d8:intervali";
response.reserve(350);
@@ -499,34 +549,44 @@ std::string worker::scrape(const std::list<std::string> &infohashes) {
//TODO: Restrict to local IPs
std::string worker::update(std::map<std::string, std::string> &params) {
- std::cout << "Got update" << std::endl;
if(params["action"] == "change_passkey") {
std::string oldpasskey = params["oldpasskey"];
std::string newpasskey = params["newpasskey"];
- users_list[newpasskey] = users_list[oldpasskey];
- users_list.erase(oldpasskey);
- std::cout << "changed passkey from " << oldpasskey << " to " << newpasskey << " for user " << users_list[newpasskey].id << std::endl;
+ user_list::iterator i = users_list.find(oldpasskey);
+ if (i == users_list.end()) {
+ std::cout << "No user with passkey " << oldpasskey << " exists when attempting to change passkey to " << newpasskey << std::endl;
+ } else {
+ users_list[newpasskey] = users_list[oldpasskey];
+ users_list.erase(oldpasskey);
+ std::cout << "changed passkey from " << oldpasskey << " to " << newpasskey << " for user " << users_list[newpasskey].id << std::endl;
+ }
} else if(params["action"] == "add_torrent") {
torrent t;
t.id = strtolong(params["id"]);
std::string info_hash = params["info_hash"];
info_hash = hex_decode(info_hash);
- bool fl = false;
- if(params["freetorrent"] == "1") {
- fl = true;
+ if(params["freetorrent"] == "0") {
+ t.free_torrent = NORMAL;
+ } else if(params["freetorrent"] == "1") {
+ t.free_torrent = FREE;
+ } else {
+ t.free_torrent = NEUTRAL;
}
t.balance = 0;
t.completed = 0;
t.last_selected_seeder = "";
- t.free_torrent = fl;
torrents_list[info_hash] = t;
- std::cout << "Added torrent " << t.id << std::endl;
+ std::cout << "Added torrent " << t.id<< ". FL: " << t.free_torrent << " " << params["freetorrent"] << std::endl;
} else if(params["action"] == "update_torrent") {
std::string info_hash = params["info_hash"];
info_hash = hex_decode(info_hash);
- bool fl = false;
- if(params["freetorrent"] == "1") {
- fl = true;
+ freetype fl;
+ if(params["freetorrent"] == "0") {
+ fl = NORMAL;
+ } else if(params["freetorrent"] == "1") {
+ fl = FREE;
+ } else {
+ fl = NEUTRAL;
}
if(torrents_list.find(info_hash) != torrents_list.end()) {
torrents_list[info_hash].free_torrent = fl;
@@ -538,9 +598,13 @@ std::string worker::update(std::map<std::string, std::string> &params) {
// Each decoded infohash is exactly 20 characters long.
std::string info_hashes = params["info_hashes"];
info_hashes = hex_decode(info_hashes);
- bool fl = false;
- if(params["freetorrent"] == "1") {
- fl = true;
+ freetype fl;
+ if(params["freetorrent"] == "0") {
+ fl = NORMAL;
+ } else if(params["freetorrent"] == "1") {
+ fl = FREE;
+ } else {
+ fl = NEUTRAL;
}
for(unsigned int pos = 0; pos < info_hashes.length(); pos += 20) {
std::string info_hash = info_hashes.substr(pos, 20);
@@ -551,6 +615,22 @@ std::string worker::update(std::map<std::string, std::string> &params) {
std::cout << "Failed to find torrent " << info_hash << " to FL " << fl << std::endl;
}
}
+ } else if(params["action"] == "add_token") {
+ std::string info_hash = hex_decode(params["info_hash"]);
+ int user_id = atoi(params["userid"].c_str());
+ if(torrents_list.find(info_hash) != torrents_list.end()) {
+ torrents_list[info_hash].tokened_users.insert(user_id);
+ } else {
+ std::cout << "Failed to find torrent to add a token for user " << user_id << std::endl;
+ }
+ } else if(params["action"] == "remove_token") {
+ std::string info_hash = hex_decode(params["info_hash"]);
+ int user_id = atoi(params["userid"].c_str());
+ if(torrents_list.find(info_hash) != torrents_list.end()) {
+ torrents_list[info_hash].tokened_users.erase(user_id);
+ } else {
+ std::cout << "Failed to find torrent " << info_hash << " to remove token for user " << user_id << std::endl;
+ }
} else if(params["action"] == "delete_torrent") {
std::string info_hash = params["info_hash"];
info_hash = hex_decode(info_hash);
@@ -586,8 +666,13 @@ std::string worker::update(std::map<std::string, std::string> &params) {
if(params["can_leech"] == "0") {
can_leech = false;
}
- users_list[passkey].can_leech = can_leech;
- std::cout << "Updated user " << passkey << std::endl;
+ user_list::iterator i = users_list.find(passkey);
+ if (i == users_list.end()) {
+ std::cout << "No user with passkey " << passkey << " found when attempting to change leeching status!" << std::endl;
+ } else {
+ users_list[passkey].can_leech = can_leech;
+ std::cout << "Updated user " << passkey << std::endl;
+ }
} else if(params["action"] == "add_whitelist") {
std::string peer_id = params["peer_id"];
whitelist.push_back(peer_id);
@@ -616,6 +701,16 @@ std::string worker::update(std::map<std::string, std::string> &params) {
unsigned int interval = strtolong(params["new_announce_interval"]);
conf->announce_interval = interval;
std::cout << "Edited announce interval to " << interval << std::endl;
+ } else if(params["action"] == "info_torrent") {
+ std::string info_hash_hex = params["info_hash"];
+ std::string info_hash = hex_decode(info_hash_hex);
+ std::cout << "Info for torrent '" << info_hash_hex << "'" << std::endl;
+ if(torrents_list.find(info_hash) != torrents_list.end()) {
+ std::cout << "Torrent " << torrents_list[info_hash].id
+ << ", freetorrent = " << torrents_list[info_hash].free_torrent << std::endl;
+ } else {
+ std::cout << "Failed to find torrent " << info_hash_hex << std::endl;
+ }
}
return "success";
}
@@ -626,6 +721,7 @@ void worker::reap_peers() {
}
void worker::do_reap_peers() {
+ db->logger_ptr->log("Began worker::do_reap_peers()");
time_t cur_time = time(NULL);
unsigned int reaped = 0;
std::unordered_map<std::string, torrent>::iterator i = torrents_list.begin();
@@ -657,4 +753,5 @@ void worker::do_reap_peers() {
}
}
std::cout << "Reaped " << reaped << " peers" << std::endl;
+ db->logger_ptr->log("Completed worker::do_reap_peers()");
}
diff --git a/src/worker.h b/src/worker.h
index 431535b..5e57174 100644
--- a/src/worker.h
+++ b/src/worker.h
@@ -6,6 +6,9 @@
#include <arpa/inet.h>
#include <iostream>
#include <fstream>
+#include "site_comm.h"
+
+enum tracker_status { OPEN, PAUSED, CLOSING }; // tracker status
class worker {
private:
@@ -15,13 +18,20 @@ class worker {
config * conf;
mysql * db;
void do_reap_peers();
+ tracker_status status;
+ site_comm s_comm;
+
public:
- worker(torrent_list &torrents, user_list &users, std::vector<std::string> &_whitelist, config * conf_obj, mysql * db_obj);
+ worker(torrent_list &torrents, user_list &users, std::vector<std::string> &_whitelist, config * conf_obj, mysql * db_obj, site_comm &sc);
std::string work(std::string &input, std::string &ip);
std::string error(std::string err);
std::string announce(torrent &tor, user &u, std::map<std::string, std::string> &params, std::map<std::string, std::string> &headers, std::string &ip);
std::string scrape(const std::list<std::string> &infohashes);
std::string update(std::map<std::string, std::string> &params);
+ bool signal(int sig);
+
+ tracker_status get_status() { return status; }
+
void reap_peers();
};