summaryrefslogtreecommitdiffstats
path: root/src/events.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/events.cpp')
-rw-r--r--src/events.cpp171
1 files changed, 106 insertions, 65 deletions
diff --git a/src/events.cpp b/src/events.cpp
index 9d87536..fb3e61e 100644
--- a/src/events.cpp
+++ b/src/events.cpp
@@ -1,12 +1,11 @@
+#include <cerrno>
#include "ocelot.h"
#include "config.h"
#include "db.h"
#include "worker.h"
-#include "events.h"
#include "schedule.h"
#include "response.h"
-#include <cerrno>
-#include <mutex>
+#include "events.h"
// Define the connection mother (first half) and connection middlemen (second half)
@@ -14,68 +13,104 @@
//---------- Connection mother - spawns middlemen and lets them deal with the connection
-connection_mother::connection_mother(worker * worker_obj, config * config_obj, mysql * db_obj, site_comm * sc_obj) : work(worker_obj), conf(config_obj), db(db_obj), sc(sc_obj) {
- memset(&address, 0, sizeof(address));
- addr_len = sizeof(address);
+connection_mother::connection_mother(config * conf, worker * worker_obj, mysql * db_obj, site_comm * sc_obj, schedule * sched) : work(worker_obj), db(db_obj) {
+ // Handle config stuff first
+ load_config(conf);
- listen_socket = socket(AF_INET, SOCK_STREAM, 0);
+ listen_socket = create_listen_socket();
- // Stop old sockets from hogging the port
- int yes = 1;
- if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
- std::cout << "Could not reuse socket" << std::endl;
+ listen_event.set<connection_mother, &connection_mother::handle_connect>(this);
+ listen_event.start(listen_socket, ev::READ);
+ // Create libev timer
+ schedule_event.set<schedule, &schedule::handle>(sched);
+ schedule_event.start(sched->schedule_interval, sched->schedule_interval); // After interval, every interval
+}
+
+void connection_mother::load_config(config * conf) {
+ listen_port = conf->get_uint("listen_port");
+ max_connections = conf->get_uint("max_connections");
+ max_middlemen = conf->get_uint("max_middlemen");
+ connection_timeout = conf->get_uint("connection_timeout");
+ keepalive_timeout = conf->get_uint("keepalive_timeout");
+ max_read_buffer = conf->get_uint("max_read_buffer");
+ max_request_size = conf->get_uint("max_request_size");
+}
+
+void connection_mother::reload_config(config * conf) {
+ unsigned int old_listen_port = listen_port;
+ unsigned int old_max_connections = max_connections;
+ load_config(conf);
+ if (old_listen_port != listen_port) {
+ std::cout << "Changing listen port from " << old_listen_port << " to " << listen_port << std::endl;
+ int new_listen_socket = create_listen_socket();
+ if (new_listen_socket != 0) {
+ listen_event.stop();
+ listen_event.start(new_listen_socket, ev::READ);
+ close(listen_socket);
+ listen_socket = new_listen_socket;
+ } else {
+ std::cout << "Couldn't create new listen socket when reloading config" << std::endl;
+ }
+ } else if (old_max_connections != max_connections) {
+ listen(listen_socket, max_connections);
}
+}
- // Create libev event loop
- ev::io event_loop_watcher;
+int connection_mother::create_listen_socket() {
+ sockaddr_in address;
+ memset(&address, 0, sizeof(address));
+ int new_listen_socket = socket(AF_INET, SOCK_STREAM, 0);
- event_loop_watcher.set<connection_mother, &connection_mother::handle_connect>(this);
- event_loop_watcher.start(listen_socket, ev::READ);
+ // Stop old sockets from hogging the port
+ int yes = 1;
+ if (setsockopt(new_listen_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
+ std::cout << "Could not reuse socket: " << strerror(errno) << std::endl;
+ return 0;
+ }
// Get ready to bind
address.sin_family = AF_INET;
//address.sin_addr.s_addr = inet_addr(conf->host.c_str()); // htonl(INADDR_ANY)
address.sin_addr.s_addr = htonl(INADDR_ANY);
- address.sin_port = htons(conf->port);
+ address.sin_port = htons(listen_port);
// Bind
- if (bind(listen_socket, (sockaddr *) &address, sizeof(address)) == -1) {
- std::cout << "Bind failed " << errno << std::endl;
+ if (bind(new_listen_socket, (sockaddr *) &address, sizeof(address)) == -1) {
+ std::cout << "Bind failed: " << strerror(errno) << std::endl;
+ return 0;
}
// Listen
- if (listen(listen_socket, conf->max_connections) == -1) {
- std::cout << "Listen failed" << std::endl;
+ if (listen(new_listen_socket, max_connections) == -1) {
+ std::cout << "Listen failed: " << strerror(errno) << std::endl;
+ return 0;
}
// Set non-blocking
- int flags = fcntl(listen_socket, F_GETFL);
+ int flags = fcntl(new_listen_socket, F_GETFL);
if (flags == -1) {
- std::cout << "Could not get socket flags" << std::endl;
+ std::cout << "Could not get socket flags: " << strerror(errno) << std::endl;
+ return 0;
}
- if (fcntl(listen_socket, F_SETFL, flags | O_NONBLOCK) == -1) {
- std::cout << "Could not set non-blocking" << std::endl;
+ if (fcntl(new_listen_socket, F_SETFL, flags | O_NONBLOCK) == -1) {
+ std::cout << "Could not set non-blocking: " << strerror(errno) << std::endl;
+ return 0;
}
- // Create libev timer
- schedule timer(this, worker_obj, conf, db, sc);
-
- schedule_event.set<schedule, &schedule::handle>(&timer);
- schedule_event.set(conf->schedule_interval, conf->schedule_interval); // After interval, every interval
- schedule_event.start();
+ return new_listen_socket;
+}
- std::cout << "Sockets up, starting event loop!" << std::endl;
+void connection_mother::run() {
+ std::cout << "Sockets up on port " << listen_port << ", starting event loop!" << std::endl;
ev_loop(ev_default_loop(0), 0);
}
-
void connection_mother::handle_connect(ev::io &watcher, int events_flags) {
// Spawn a new middleman
- if (stats.open_connections < conf->max_middlemen) {
- std::unique_lock<std::mutex> lock(stats.mutex);
+ if (stats.open_connections < max_middlemen) {
stats.opened_connections++;
- lock.unlock();
- new connection_middleman(listen_socket, address, addr_len, work, this, conf);
+ stats.open_connections++;
+ new connection_middleman(listen_socket, work, this);
}
}
@@ -92,15 +127,13 @@ connection_mother::~connection_mother()
//---------- Connection middlemen - these little guys live until their connection is closed
-connection_middleman::connection_middleman(int &listen_socket, sockaddr_in &address, socklen_t &addr_len, worker * new_work, connection_mother * mother_arg, config * config_obj) :
- conf(config_obj), mother (mother_arg), work(new_work) {
-
- connect_sock = accept(listen_socket, (sockaddr *) &address, &addr_len);
+connection_middleman::connection_middleman(int &listen_socket, worker * new_work, connection_mother * mother_arg) :
+ written(0), mother(mother_arg), work(new_work)
+{
+ connect_sock = accept(listen_socket, NULL, NULL);
if (connect_sock == -1) {
std::cout << "Accept failed, errno " << errno << ": " << strerror(errno) << std::endl;
delete this;
- std::unique_lock<std::mutex> lock(stats.mutex);
- stats.open_connections++; // destructor decrements open connections
return;
}
@@ -114,10 +147,7 @@ connection_middleman::connection_middleman(int &listen_socket, sockaddr_in &addr
}
// Get their info
- if (getpeername(connect_sock, (sockaddr *) &client_addr, &addr_len) == -1) {
- //std::cout << "Could not get client info" << std::endl;
- }
- request.reserve(conf->max_read_buffer);
+ request.reserve(mother->max_read_buffer);
written = 0;
read_event.set<connection_middleman, &connection_middleman::handle_read>(this);
@@ -125,47 +155,50 @@ connection_middleman::connection_middleman(int &listen_socket, sockaddr_in &addr
// Let the socket timeout in timeout_interval seconds
timeout_event.set<connection_middleman, &connection_middleman::handle_timeout>(this);
- timeout_event.set(conf->timeout_interval, 0);
+ timeout_event.set(mother->connection_timeout, mother->keepalive_timeout);
timeout_event.start();
-
- std::unique_lock<std::mutex> lock(stats.mutex);
- stats.open_connections++;
}
connection_middleman::~connection_middleman() {
close(connect_sock);
- std::unique_lock<std::mutex> lock(stats.mutex);
stats.open_connections--;
}
// Handler to read data from the socket, called by event loop when socket is readable
void connection_middleman::handle_read(ev::io &watcher, int events_flags) {
- char buffer[conf->max_read_buffer + 1];
- memset(buffer, 0, conf->max_read_buffer + 1);
- int ret = recv(connect_sock, &buffer, conf->max_read_buffer, 0);
+ char buffer[mother->max_read_buffer + 1];
+ memset(buffer, 0, mother->max_read_buffer + 1);
+ int ret = recv(connect_sock, &buffer, mother->max_read_buffer, 0);
if (ret <= 0) {
delete this;
return;
}
- std::unique_lock<std::mutex> lock(stats.mutex);
stats.bytes_read += ret;
- lock.unlock();
request.append(buffer, ret);
size_t request_size = request.size();
- if (request_size > conf->max_request_size || (request_size >= 4 && request.compare(request_size - 4, std::string::npos, "\r\n\r\n") == 0)) {
+ if (request_size > mother->max_request_size || (request_size >= 4 && request.compare(request_size - 4, std::string::npos, "\r\n\r\n") == 0)) {
+ stats.requests++;
read_event.stop();
+ client_opts.gzip = false;
+ client_opts.html = false;
+ client_opts.http_close = true;
- if (request_size > conf->max_request_size) {
+ if (request_size > mother->max_request_size) {
shutdown(connect_sock, SHUT_RD);
- response = error("GET string too long");
+ response = error("GET string too long", client_opts);
} else {
char ip[INET_ADDRSTRLEN];
+ sockaddr_in client_addr;
+ socklen_t addr_len = sizeof(client_addr);
+ getpeername(connect_sock, (sockaddr *) &client_addr, &addr_len);
inet_ntop(AF_INET, &(client_addr.sin_addr), ip, INET_ADDRSTRLEN);
std::string ip_str = ip;
//--- CALL WORKER
- response = work->work(request, ip_str);
+ response = work->work(request, ip_str, client_opts);
+ request.clear();
+ request_size = 0;
}
// Find out when the socket is writeable.
@@ -178,14 +211,22 @@ void connection_middleman::handle_read(ev::io &watcher, int events_flags) {
// Handler to write data to the socket, called by event loop when socket is writeable
void connection_middleman::handle_write(ev::io &watcher, int events_flags) {
int ret = send(connect_sock, response.c_str()+written, response.size()-written, MSG_NOSIGNAL);
- written += ret;
- std::unique_lock<std::mutex> lock(stats.mutex);
+ if (ret == -1) {
+ return;
+ }
stats.bytes_written += ret;
- lock.unlock();
+ written += ret;
if (written == response.size()) {
write_event.stop();
- timeout_event.stop();
- delete this;
+ if (client_opts.http_close) {
+ timeout_event.stop();
+ delete this;
+ return;
+ }
+ timeout_event.again();
+ read_event.start();
+ response.clear();
+ written = 0;
}
}