diff options
Diffstat (limited to 'src/events.cpp')
-rw-r--r-- | src/events.cpp | 171 |
1 files changed, 106 insertions, 65 deletions
diff --git a/src/events.cpp b/src/events.cpp index 9d87536..fb3e61e 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -1,12 +1,11 @@ +#include <cerrno> #include "ocelot.h" #include "config.h" #include "db.h" #include "worker.h" -#include "events.h" #include "schedule.h" #include "response.h" -#include <cerrno> -#include <mutex> +#include "events.h" // Define the connection mother (first half) and connection middlemen (second half) @@ -14,68 +13,104 @@ //---------- Connection mother - spawns middlemen and lets them deal with the connection -connection_mother::connection_mother(worker * worker_obj, config * config_obj, mysql * db_obj, site_comm * sc_obj) : work(worker_obj), conf(config_obj), db(db_obj), sc(sc_obj) { - memset(&address, 0, sizeof(address)); - addr_len = sizeof(address); +connection_mother::connection_mother(config * conf, worker * worker_obj, mysql * db_obj, site_comm * sc_obj, schedule * sched) : work(worker_obj), db(db_obj) { + // Handle config stuff first + load_config(conf); - listen_socket = socket(AF_INET, SOCK_STREAM, 0); + listen_socket = create_listen_socket(); - // Stop old sockets from hogging the port - int yes = 1; - if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { - std::cout << "Could not reuse socket" << std::endl; + listen_event.set<connection_mother, &connection_mother::handle_connect>(this); + listen_event.start(listen_socket, ev::READ); + // Create libev timer + schedule_event.set<schedule, &schedule::handle>(sched); + schedule_event.start(sched->schedule_interval, sched->schedule_interval); // After interval, every interval +} + +void connection_mother::load_config(config * conf) { + listen_port = conf->get_uint("listen_port"); + max_connections = conf->get_uint("max_connections"); + max_middlemen = conf->get_uint("max_middlemen"); + connection_timeout = conf->get_uint("connection_timeout"); + keepalive_timeout = conf->get_uint("keepalive_timeout"); + max_read_buffer = conf->get_uint("max_read_buffer"); + max_request_size = conf->get_uint("max_request_size"); +} + +void connection_mother::reload_config(config * conf) { + unsigned int old_listen_port = listen_port; + unsigned int old_max_connections = max_connections; + load_config(conf); + if (old_listen_port != listen_port) { + std::cout << "Changing listen port from " << old_listen_port << " to " << listen_port << std::endl; + int new_listen_socket = create_listen_socket(); + if (new_listen_socket != 0) { + listen_event.stop(); + listen_event.start(new_listen_socket, ev::READ); + close(listen_socket); + listen_socket = new_listen_socket; + } else { + std::cout << "Couldn't create new listen socket when reloading config" << std::endl; + } + } else if (old_max_connections != max_connections) { + listen(listen_socket, max_connections); } +} - // Create libev event loop - ev::io event_loop_watcher; +int connection_mother::create_listen_socket() { + sockaddr_in address; + memset(&address, 0, sizeof(address)); + int new_listen_socket = socket(AF_INET, SOCK_STREAM, 0); - event_loop_watcher.set<connection_mother, &connection_mother::handle_connect>(this); - event_loop_watcher.start(listen_socket, ev::READ); + // Stop old sockets from hogging the port + int yes = 1; + if (setsockopt(new_listen_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { + std::cout << "Could not reuse socket: " << strerror(errno) << std::endl; + return 0; + } // Get ready to bind address.sin_family = AF_INET; //address.sin_addr.s_addr = inet_addr(conf->host.c_str()); // htonl(INADDR_ANY) address.sin_addr.s_addr = htonl(INADDR_ANY); - address.sin_port = htons(conf->port); + address.sin_port = htons(listen_port); // Bind - if (bind(listen_socket, (sockaddr *) &address, sizeof(address)) == -1) { - std::cout << "Bind failed " << errno << std::endl; + if (bind(new_listen_socket, (sockaddr *) &address, sizeof(address)) == -1) { + std::cout << "Bind failed: " << strerror(errno) << std::endl; + return 0; } // Listen - if (listen(listen_socket, conf->max_connections) == -1) { - std::cout << "Listen failed" << std::endl; + if (listen(new_listen_socket, max_connections) == -1) { + std::cout << "Listen failed: " << strerror(errno) << std::endl; + return 0; } // Set non-blocking - int flags = fcntl(listen_socket, F_GETFL); + int flags = fcntl(new_listen_socket, F_GETFL); if (flags == -1) { - std::cout << "Could not get socket flags" << std::endl; + std::cout << "Could not get socket flags: " << strerror(errno) << std::endl; + return 0; } - if (fcntl(listen_socket, F_SETFL, flags | O_NONBLOCK) == -1) { - std::cout << "Could not set non-blocking" << std::endl; + if (fcntl(new_listen_socket, F_SETFL, flags | O_NONBLOCK) == -1) { + std::cout << "Could not set non-blocking: " << strerror(errno) << std::endl; + return 0; } - // Create libev timer - schedule timer(this, worker_obj, conf, db, sc); - - schedule_event.set<schedule, &schedule::handle>(&timer); - schedule_event.set(conf->schedule_interval, conf->schedule_interval); // After interval, every interval - schedule_event.start(); + return new_listen_socket; +} - std::cout << "Sockets up, starting event loop!" << std::endl; +void connection_mother::run() { + std::cout << "Sockets up on port " << listen_port << ", starting event loop!" << std::endl; ev_loop(ev_default_loop(0), 0); } - void connection_mother::handle_connect(ev::io &watcher, int events_flags) { // Spawn a new middleman - if (stats.open_connections < conf->max_middlemen) { - std::unique_lock<std::mutex> lock(stats.mutex); + if (stats.open_connections < max_middlemen) { stats.opened_connections++; - lock.unlock(); - new connection_middleman(listen_socket, address, addr_len, work, this, conf); + stats.open_connections++; + new connection_middleman(listen_socket, work, this); } } @@ -92,15 +127,13 @@ connection_mother::~connection_mother() //---------- Connection middlemen - these little guys live until their connection is closed -connection_middleman::connection_middleman(int &listen_socket, sockaddr_in &address, socklen_t &addr_len, worker * new_work, connection_mother * mother_arg, config * config_obj) : - conf(config_obj), mother (mother_arg), work(new_work) { - - connect_sock = accept(listen_socket, (sockaddr *) &address, &addr_len); +connection_middleman::connection_middleman(int &listen_socket, worker * new_work, connection_mother * mother_arg) : + written(0), mother(mother_arg), work(new_work) +{ + connect_sock = accept(listen_socket, NULL, NULL); if (connect_sock == -1) { std::cout << "Accept failed, errno " << errno << ": " << strerror(errno) << std::endl; delete this; - std::unique_lock<std::mutex> lock(stats.mutex); - stats.open_connections++; // destructor decrements open connections return; } @@ -114,10 +147,7 @@ connection_middleman::connection_middleman(int &listen_socket, sockaddr_in &addr } // Get their info - if (getpeername(connect_sock, (sockaddr *) &client_addr, &addr_len) == -1) { - //std::cout << "Could not get client info" << std::endl; - } - request.reserve(conf->max_read_buffer); + request.reserve(mother->max_read_buffer); written = 0; read_event.set<connection_middleman, &connection_middleman::handle_read>(this); @@ -125,47 +155,50 @@ connection_middleman::connection_middleman(int &listen_socket, sockaddr_in &addr // Let the socket timeout in timeout_interval seconds timeout_event.set<connection_middleman, &connection_middleman::handle_timeout>(this); - timeout_event.set(conf->timeout_interval, 0); + timeout_event.set(mother->connection_timeout, mother->keepalive_timeout); timeout_event.start(); - - std::unique_lock<std::mutex> lock(stats.mutex); - stats.open_connections++; } connection_middleman::~connection_middleman() { close(connect_sock); - std::unique_lock<std::mutex> lock(stats.mutex); stats.open_connections--; } // Handler to read data from the socket, called by event loop when socket is readable void connection_middleman::handle_read(ev::io &watcher, int events_flags) { - char buffer[conf->max_read_buffer + 1]; - memset(buffer, 0, conf->max_read_buffer + 1); - int ret = recv(connect_sock, &buffer, conf->max_read_buffer, 0); + char buffer[mother->max_read_buffer + 1]; + memset(buffer, 0, mother->max_read_buffer + 1); + int ret = recv(connect_sock, &buffer, mother->max_read_buffer, 0); if (ret <= 0) { delete this; return; } - std::unique_lock<std::mutex> lock(stats.mutex); stats.bytes_read += ret; - lock.unlock(); request.append(buffer, ret); size_t request_size = request.size(); - if (request_size > conf->max_request_size || (request_size >= 4 && request.compare(request_size - 4, std::string::npos, "\r\n\r\n") == 0)) { + if (request_size > mother->max_request_size || (request_size >= 4 && request.compare(request_size - 4, std::string::npos, "\r\n\r\n") == 0)) { + stats.requests++; read_event.stop(); + client_opts.gzip = false; + client_opts.html = false; + client_opts.http_close = true; - if (request_size > conf->max_request_size) { + if (request_size > mother->max_request_size) { shutdown(connect_sock, SHUT_RD); - response = error("GET string too long"); + response = error("GET string too long", client_opts); } else { char ip[INET_ADDRSTRLEN]; + sockaddr_in client_addr; + socklen_t addr_len = sizeof(client_addr); + getpeername(connect_sock, (sockaddr *) &client_addr, &addr_len); inet_ntop(AF_INET, &(client_addr.sin_addr), ip, INET_ADDRSTRLEN); std::string ip_str = ip; //--- CALL WORKER - response = work->work(request, ip_str); + response = work->work(request, ip_str, client_opts); + request.clear(); + request_size = 0; } // Find out when the socket is writeable. @@ -178,14 +211,22 @@ void connection_middleman::handle_read(ev::io &watcher, int events_flags) { // Handler to write data to the socket, called by event loop when socket is writeable void connection_middleman::handle_write(ev::io &watcher, int events_flags) { int ret = send(connect_sock, response.c_str()+written, response.size()-written, MSG_NOSIGNAL); - written += ret; - std::unique_lock<std::mutex> lock(stats.mutex); + if (ret == -1) { + return; + } stats.bytes_written += ret; - lock.unlock(); + written += ret; if (written == response.size()) { write_event.stop(); - timeout_event.stop(); - delete this; + if (client_opts.http_close) { + timeout_event.stop(); + delete this; + return; + } + timeout_event.again(); + read_event.start(); + response.clear(); + written = 0; } } |