Commit 5f5c86c9 authored by Gabriel Margiani's avatar Gabriel Margiani
Browse files

make eventhook threadsave and nonblocking. Segfaults somewhere.

parent 80f2becd
Loading
Loading
Loading
Loading
+47 −8
Original line number Diff line number Diff line
@@ -30,24 +30,53 @@ p3::eventHook::eventHook(config& c, phonebook& p) :
	if (command == "none") {
		command = "";
	}

	stop_worker = false;
	worker_thread = new std::thread(&p3::eventHook::event_worker, this);

	signal(SIGCHLD, SIG_IGN);
}

p3::eventHook::~eventHook() {
	stop_worker = true;
	worker_condition.notify_all();
	if (worker_thread->joinable()) {
		worker_thread->join();
	}
	delete worker_thread;
}

std::list<p3::eventHook::callback_type>::iterator p3::eventHook::register_callback(const callback_type& c) {
	std::lock_guard<std::mutex> g(callback_mutex);
	return callbacks.insert(callbacks.end(), c);
}

void p3::eventHook::unregister_callback(const std::list<p3::eventHook::callback_type>::iterator& id) {
	std::lock_guard<std::mutex> g(callback_mutex);
	callbacks.erase(id);
}

void p3::eventHook::run_call_hook(const std::string& event, int cid, const std::string& nr, const std::string& data) {
	run_hook(event, std::to_string(cid), book.address_by_nr(nr), data);
void p3::eventHook::event_worker() {
	while (!stop_worker) {
		std::lock_guard<std::mutex> g(queue_mutex);
		if (!event_queue.empty()) {
			auto& args = event_queue.front();
			queue_mutex.unlock();
			handle_event(args);
			queue_mutex.lock();
			event_queue.pop_front();
		}
		if (!stop_worker && event_queue.empty()) {
			queue_mutex.unlock();
			std::unique_lock<std::mutex> g(worker_mutex);
			worker_condition.wait(g);
		}
	}
}

void p3::eventHook::run_hook(const std::string& event, const std::string& a2, const std::string& a3, const std::string& a4) {
void p3::eventHook::handle_event(const std::array<std::string, 4>& args) {
	for (auto c : callbacks) {
		c(event, a2, a3, a4);
		c(args[0], args[1], args[2], args[3]);
	}

	if (command.empty()) {
@@ -59,12 +88,22 @@ void p3::eventHook::run_hook(const std::string& event, const std::string& a2, co
		execl(
				command.c_str(),
				command.c_str(),
				event.c_str(),
				a2.c_str(),
				a3.c_str(),
				a4.c_str(),
				args[0].c_str(),
				args[1].c_str(),
				args[2].c_str(),
				args[2].c_str(),
				(char *)0
			);
	}
}

void p3::eventHook::run_call_hook(const std::string& event, int cid, const std::string& nr, const std::string& data) {
	run_hook(event, std::to_string(cid), book.address_by_nr(nr), data);
}

void p3::eventHook::run_hook(const std::string& event, const std::string& a2, const std::string& a3, const std::string& a4) {
	std::lock_guard<std::mutex> g(queue_mutex);
	event_queue.push_back({event, a2, a3, a4});
	worker_condition.notify_all();
}
+20 −1
Original line number Diff line number Diff line
@@ -21,6 +21,12 @@

#include <string>
#include <list>
#include <array>

#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>

#include "phonebook.h"
#include "config.h"
@@ -53,15 +59,28 @@ namespace p3 {
			std::string command;

			std::list<callback_type> callbacks;
			std::mutex callback_mutex;

		public:
			std::list<std::array<std::string, 4> > event_queue;
			std::mutex queue_mutex;

			std::thread * worker_thread;
			std::atomic_bool stop_worker;
			std::mutex worker_mutex;
			std::condition_variable worker_condition;
		public:

			explicit eventHook(config& c, phonebook& p);
			~eventHook();

			eventHook(const eventHook &) = delete;

			void run_hook(const std::string& event, const std::string& a2 = "", const std::string& a3 = "", const std::string& a4 = "");
			void run_call_hook(const std::string& event, int call_id = 0, const std::string& nr = "", const std::string& data = "");

			void event_worker();
			void handle_event(const std::array<std::string, 4>& args);

			std::list<callback_type>::iterator register_callback(const callback_type& callback);
			void unregister_callback(const std::list<callback_type>::iterator& id);
	};
+5 −5
Original line number Diff line number Diff line
@@ -97,16 +97,16 @@ p3::mQueue::~mQueue() {
	}
}

void p3::mQueue::send(p3::protocol c, const std::string& v) {
void p3::mQueue::send(p3::protocol c, const std::string& v, int timeout) {
	p3::mQueueMessage q = p3::mQueueMessage(c, v);
	send(q);
	send(q, timeout);
}

void p3::mQueue::send(const p3::mQueueMessage & qmsg) {
void p3::mQueue::send(const p3::mQueueMessage & qmsg, int timeout) {
	std::string msg = qmsg.get_str();
	if (printConversation) std::cout << "S: " << key << ": " << msg << std::endl;
	timespec tsp;
	tsp.tv_sec = time(NULL) + DefaultSendTimeout;
	tsp.tv_sec = time(NULL) + timeout;
	tsp.tv_nsec = 0;
	if (mq_timedsend(mqOutId, msg.c_str(), msg.length()+1, 0, &tsp) != 0) {
		switch (errno) {
@@ -161,7 +161,7 @@ long p3::mQueue::get_msg_count() {

p3::mQueueMessage::mQueueMessage(const char * message) : command(static_cast<p3::protocol>(*message)), value(message+1) {};

p3::mQueueMessage::mQueueMessage(protocol c, std::string v) : command(c), value(v) {};
p3::mQueueMessage::mQueueMessage(protocol c, const std::string& v) : command(c), value(v) {};

p3::protocol p3::mQueueMessage::get_command() const {
	return command;
+3 −3
Original line number Diff line number Diff line
@@ -45,7 +45,7 @@ namespace p3 {

		public:
			explicit mQueueMessage(const char * message);
			mQueueMessage(protocol c, std::string v);
			mQueueMessage(protocol c, const std::string& v);

			protocol get_command() const;
			std::string get_value() const;
@@ -79,8 +79,8 @@ namespace p3 {
			~mQueue();


			void send(const mQueueMessage & m);
			void send(protocol c, const std::string& w);
			void send(const mQueueMessage & m, int timeout = DefaultSendTimeout);
			void send(protocol c, const std::string& w, int timeout = DefaultSendTimeout);

			// Timeout in sec. -1 for blocking. Will throw on timeout!
			mQueueMessage receive(int timeout = -1);
+25 −11
Original line number Diff line number Diff line
@@ -362,18 +362,32 @@ void p3::clientHandler::push_event(
		) {
	std::lock_guard<std::recursive_mutex> g(connection_write_mutex);

	connection.send(p3::protocol::BEGINTEXT, "Event");
	connection.send(p3::protocol::TEXT, e);
	try {
		connection.send(p3::protocol::BEGINTEXT, "Event", 1);
		connection.send(p3::protocol::TEXT, e, 1);
		if (!a2.empty()) {
		connection.send(p3::protocol::TEXT, a2);
			connection.send(p3::protocol::TEXT, a2, 1);
		}
		if (!a3.empty()) {
		connection.send(p3::protocol::TEXT, a3);
			connection.send(p3::protocol::TEXT, a3, 1);
		}
		if (!a4.empty()) {
		connection.send(p3::protocol::TEXT, a4);
			connection.send(p3::protocol::TEXT, a4, 1);
		}
		connection.send(p3::protocol::ENDTEXT, "End Event", 1);
	} catch (p3::perror & e) {
		hook.unregister_callback(push_event_callback);
		push_events_enabled = false;
		std::thread t([this, e] () {
			std::lock_guard<std::recursive_mutex> g(connection_write_mutex);
			try {
				connection.send(p3::protocol::ERROR, e.what() + std::string(" - Events disabled."));
			} catch (p3::perror & e) {
				return;
			}
		});
		t.detach();
	}
	connection.send(p3::protocol::ENDTEXT, "End Event");
}

int p3::clientHandler::get_call_id(const std::string& q, p3::callState filter) {