#include "running_mainloop.h"

#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <poll.h>
#include <assert.h>
#include <map>
#include <queue>
#include <utility>
#include "thread_synchronization.h"
#include "os_utils.h"
#include "http_structures/client_request_parse.h"
#include "http_structures/response_gen.h"
#include "baza_inter.h"

namespace een9 {
    struct QElementHttpConnections {
        SlaveTask task;
        QElementHttpConnections* nxt = NULL;

        explicit QElementHttpConnections(SlaveTask task): task(std::move(task)) {}
    };

    struct WorkersTaskQueue {
        QElementHttpConnections* first = NULL;
        QElementHttpConnections** afterLastPtr;
        size_t sz = 0;

        WorkersTaskQueue() {
            afterLastPtr = &first;
        }

        bool empty() const {
            return sz == 0;
        }

        size_t size() const {
            return sz;
        }

        void push_back(SlaveTask task) {
            /* Throws a goddamn execption. Because why not. Ofcourse everything has to throw an exception */
            /* CLion says. Allocated memory is leaking. YOUR MOTHER IS LEAKING YOU FOOL!! MY CODE IS FINE!! */
            QElementHttpConnections* el = new QElementHttpConnections(std::move(task));
            /* Exception does not leave queue in incorrect state */
            *afterLastPtr = el;
            afterLastPtr = &(el->nxt);
            sz++;
        }

        void pop_first(SlaveTask& ret_task) {
            assert(!empty());
            ret_task = std::move(first->task);
            if (sz == 1) {
                delete first;
                first = NULL;
                afterLastPtr = &first;
                sz = 0;
            } else {
                /* Before I popped the first, this element was second, but now it took place of the first */
                QElementHttpConnections* old_deut = first->nxt;
                delete first;
                first = old_deut;
                sz--;
            }
        }
    };

    struct WorkersEnvCommon {
        /* This alarm notifies about new tasks and termination signal. Because we are polite people, we don't cancel threads */
        CondVarBedObj corvee_bed;
        WorkersTaskQueue queue;
        bool& termination;
        guest_core_t guest_core;

        /* Parser programs */
        ClientRequestParser_CommonPrograms parser_programs;

        WorkersEnvCommon(bool& term, guest_core_t g_c): termination(term), guest_core(std::move(g_c)){}
    };

    struct WorkersEnv {
        WorkersEnvCommon& wtec;
        worker_id_t id;
        ClientRequestParser_WorkerBuffers personal_parser_buffer;

        explicit WorkersEnv(WorkersEnvCommon& wtec, worker_id_t id): wtec(wtec), id(id), personal_parser_buffer(wtec.parser_programs){}
    };

    // todo: add timeout for multiple bytes, add more settings
    ClientRequest process_connection_input(int fd, const EEN9_ServerTips& s_tips, WorkersEnv& wte) {
        ClientRequest res;
        ClientHttpRequestParser_Ctx parser(res, wte.personal_parser_buffer, wte.wtec.parser_programs);
        int ret;
        char buf[2048];
        ASSERT_pl(parser.status == 0);
        while ((ret = (int)recv(fd, buf, 2048, 0)) > 0) {
            for (size_t i = 0; i < ret; i++) {
                /* Throws ServerError on bad input */
                if (parser.feedCharacter(buf[i]) > 0) {
                    break;
                }
            }
            if (parser.status > 0)
                break;
        }
        ASSERT_on_iret(ret, "recv");
        ASSERT_pl(parser.status == 1);
        // printf("Log: worker received clients request\n%s\n", client_request.toString().c_str());
        return res;
    }

    void process_connection_output(int fd, const std::string& server_response) {
        size_t N = server_response.size(), i = 0;
        while (i < N) {
            /* MSG_NOSIGNAL set to prevent SIGPIPE */
            int written = (int)send(fd, &server_response[i], std::min(2048lu, N - i), MSG_NOSIGNAL);
            ASSERT_on_iret(written, "sending");
            ASSERT_pl(written > 0);
            i += written;
        }
        printf("Log: worker: succesfully asnwered with response\n");
    }

    void process_connection(const SlaveTask& task, WorkersEnv& wte) {
        ClientRequest client_request = process_connection_input(task.fd(), task.s_tips, wte);
        std::string server_response = wte.wtec.guest_core(task, client_request, wte.id);
        process_connection_output(task.fd(), server_response);
    }

    void* worker_func(void* wte_ptr) {
        WorkersEnv& wte = *((WorkersEnv*)wte_ptr);
        WorkersEnvCommon& wtec = wte.wtec;
        printf("Worker started\n");
        while (true) {
            try {
                MutexLockGuard cb_lg(wtec.corvee_bed, __func__);
                woke:
                if (wtec.termination)
                    break;
                if (wtec.queue.empty()) {
                    wtec.corvee_bed.sleep(__func__);
                    goto woke;
                }
                SlaveTask task;
                wtec.queue.pop_first(task);
                process_connection(task, wte);
            } catch (const std::exception& e) {
                printf("Client request procession failure in worker\n");
                printf("%s\n", e.what());
                /* Under mysterious some circumstances, in this place destructor of string in SystemError causes segfault. I can't fix that */
            }
        }
        printf("Worker finished\n");
        return NULL;
    }

    // todo: retrieve address of connected client

    void electric_boogaloo(const MainloopParameters& params, bool& termination_trigger) {
        WorkersEnvCommon wtec(termination_trigger, params.guest_core);
        ASSERT(params.slave_number > 0, "No workers spawned");
        size_t Nip = params.ports_to_listen.size();
        ASSERT(Nip > 0, "No open listeting addresses");

        std::vector<pthread_t> workers(params.slave_number);
        std::vector<uptr<WorkersEnv>> wtes(params.slave_number);
        for (size_t i = 0; i < params.slave_number; i++) {
            wtes[i] = std::make_unique<WorkersEnv>(wtec, (worker_id_t)i);
        }
        for (size_t i = 0; i < params.slave_number; i++) {
            pthread_create(&workers[i], NULL, worker_func, wtes[i].get());
        }

        try {
            int ret;
            std::vector<UniqueFdWrapper> listening_socks(Nip);
            for (size_t i = 0; i < Nip; i++) {
                printf("Creating listening socket\n");
                uint16_t port = params.ports_to_listen[i];
                int listening_socket_fd = socket(AF_INET, SOCK_STREAM, 0);
                ASSERT_on_iret(listening_socket_fd, "Listening socket creation");
                UniqueFdWrapper listening_socket(listening_socket_fd);
                printf("Listening socket created\n");
                sockaddr_in listening_address;
                listening_address.sin_family = AF_INET;
                listening_address.sin_port = htons(port);
                uint32_t lca = (127u << 24) | 1;
                listening_address.sin_addr.s_addr = htonl(lca);
                int reuseaddr_nozero_option_value = 1;
                ret = setsockopt(listening_socket_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_nozero_option_value, sizeof(int));
                ASSERT_on_iret(ret, "setting SO_REUSEADDR befire binding to address");
                ret = bind(listening_socket(), (const sockaddr*)&listening_address, sizeof(listening_address));
                ASSERT_on_iret(ret, "binding to INADDR_ANY:" + std::to_string(port));
                printf("Binded socket to address\n");
                ret = listen(listening_socket(), 128);
                ASSERT_on_iret(ret, "listening for connections");
                printf("Listening socket succesfully started listening\n");
                listening_socks[i] = std::move(listening_socket);
            }
            std::vector<pollfd> pollfds(Nip);
            for (size_t i = 0; i < Nip; i++) {
                pollfds[i].fd = listening_socks[i]();
                pollfds[i].events = POLLRDNORM;
            }
            printf("Entering mainloop\n");
            ASSERT(params.mainloop_recheck_interval_us > 0, "Incorrect poll timeout");
            while (true) {
                MutexLockGuard lg1(wtec.corvee_bed, "poller termination check");
                if (wtec.termination)
                    break;
                lg1.unlock();
                for (size_t i = 0; i < Nip; i++) {
                    pollfds[i].revents = 0;
                }
                errno = 0;
                ret = poll(pollfds.data(), Nip, params.mainloop_recheck_interval_us);
                if (errno == EINTR)
                    break;
                ASSERT_on_iret(ret, "polling");
                for (size_t i = 0; i < Nip; i++) {
                    if ((pollfds[i].revents & POLLRDNORM)) {
                        try {
                            sockaddr client_address;
                            socklen_t client_addr_len = sizeof(client_address);
                            int session_sock = accept(pollfds[i].fd, &client_address, &client_addr_len);
                            ASSERT_on_iret(session_sock, "Failed to accept incoming connection");
                            printf("Log: successful connection\n");
                            UniqueFdWrapper session_sock_fdw(session_sock);
                            configure_socket_rcvsndtimeo(session_sock_fdw(), params.s_conf.request_timeout);
                            { MutexLockGuard lg2(wtec.corvee_bed, "poller adds connection");
                                SlaveTask task{ConnectionInfo{}, std::move(session_sock_fdw),
                                    EEN9_ServerTips{wtec.queue.size(),
                                        params.s_conf.critical_load_1, params.s_conf.request_timeout}};
                                if (wtec.queue.size() < params.s_conf.critical_load_2)
                                    wtec.queue.push_back(std::move(task));
                            }
                            wtec.corvee_bed.din_don();
                        } catch (const std::exception& e) {
                            printf("Error aceepting connection\n");
                            printf("%s\n", e.what());
                        }
                    }
                }
            }
        } catch (const std::exception& e) {
            printf("System failure 2\n");
            printf("%s\n", e.what());
            /* There is no need to tiptoe around this multi-access field. It is write-onle-and-for-good-kind  */
            wtec.termination = true;
            wtec.corvee_bed.wake_them_all();
        }
        wtec.termination = true;
        wtec.corvee_bed.wake_them_all();
        for (size_t i = 0; i < params.slave_number; i++) {
            pthread_join(workers[i], NULL);
        }
    }
}