// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Heavy modification by Tim Hsu, Sharp Point Ltd. 2022.

#include "SPSocketServer.h"

namespace SPSocket
{
    TCP_Session::TCP_Session(tcp::socket socket, Channel& ch, SPSocketServerPtr sp)
        : channel_(ch), socket_(std::move(socket)), socket_server_(sp)
    {
        input_deadline_.expires_at(steady_timer::time_point::max());
        output_deadline_.expires_at(steady_timer::time_point::max());

        // The non_empty_output_queue_ steady_timer is set to the maximum time
        // point whenever the output queue is empty. This ensures that the output
        // actor stays asleep until a message is put into the queue.
        non_empty_output_queue_.expires_at(steady_timer::time_point::max());
    }

    void TCP_Session::UseReadUntil(char terminator)
    {
        read_terminator = terminator;
    }

    void TCP_Session::Start()
    {
        channel_.Join(shared_from_this());

        std::string chost = socket_.remote_endpoint().address().to_string();
        unsigned short cport = socket_.remote_endpoint().port();

        socket_server_->OnClientConnected(chost, cport);

        read_line();
        check_deadline(input_deadline_);

        await_output();
        check_deadline(output_deadline_);
    }

    void TCP_Session::stop()
    {
        channel_.Leave(shared_from_this());

        std::string chost = socket_.remote_endpoint().address().to_string();
        unsigned short cport = socket_.remote_endpoint().port();

        socket_server_->OnClientDisconnected(chost, cport);

        boost::system::error_code ignored_error;
        socket_.close(ignored_error);
        input_deadline_.cancel();
        non_empty_output_queue_.cancel();
        output_deadline_.cancel();
    }

    bool TCP_Session::stopped() const
    {
        return !socket_.is_open();
    }

    void TCP_Session::deliver(const std::string& msg)
    {
        output_queue_.push_back(msg);

        // Signal that the output queue contains messages. Modifying the expiry
        // will wake the output actor, if it is waiting on the timer.
        non_empty_output_queue_.expires_at(steady_timer::time_point::min());
    }

    void TCP_Session::read_line()
    {
        // Set a deadline for the read operation.     
        if (rw_timeout > 0)
        {
            input_deadline_.expires_after(std::chrono::seconds(rw_timeout));
        }

        // Start an asynchronous operation to read a newline-delimited message.
        auto self(shared_from_this());
        boost::asio::async_read_until(socket_,
            boost::asio::dynamic_buffer(input_buffer_), read_terminator,
            [this, self](const boost::system::error_code& error, std::size_t n)
        {
            // Check if the session was stopped while the operation was pending.
            if (stopped())
                return;

            if (!error)
            {
                // Extract the delimited message from the buffer.
                std::string str_recv(input_buffer_.substr(0, n - 1));
                input_buffer_.erase(0, n);

                if (!str_recv.empty())
                {
                    socket_server_->OnReceive(str_recv);

                    // Send data to connecting client only
                    //Send(str_recv);
                }
                else
                {
                    // We received a heartbeat message from the client. If there's
                    // nothing else being sent or ready to be sent, send a heartbeat
                    // right back.
                    if (output_queue_.empty())
                    {
                        output_queue_.push_back("HB" + read_terminator);

                        // Signal that the output queue contains messages. Modifying
                        // the expiry will wake the output actor, if it is waiting on
                        // the timer.
                        non_empty_output_queue_.expires_at(steady_timer::time_point::min());
                    }
                }
                read_line();
            }
            else
            {
                socket_server_->OnReceiveError(error.message());
                stop();
            }
        });
    }

    void TCP_Session::await_output()
    {
        auto self(shared_from_this());
        non_empty_output_queue_.async_wait(
            [this, self](const boost::system::error_code& /*error*/)
        {
            // Check if the session was stopped while the operation was pending.
            if (stopped())
                return;

            if (output_queue_.empty())
            {
                // There are no messages that are ready to be sent. The actor goes
                // to sleep by waiting on the non_empty_output_queue_ timer. When a
                // new message is added, the timer will be modified and the actor
                // will wake.
                non_empty_output_queue_.expires_at(steady_timer::time_point::max());
                await_output();
            }
            else
            {
                write_line();
            }
        });
    }

    void TCP_Session::write_line()
    {
        // Set a deadline for the write operation.
        if (rw_timeout > 0)
        {
            output_deadline_.expires_after(std::chrono::seconds(rw_timeout));
        }

        // Start an asynchronous operation to send a message.
        auto self(shared_from_this());
        boost::asio::async_write(socket_,
            boost::asio::buffer(output_queue_.front()),
            [this, self](const boost::system::error_code& error, std::size_t /*n*/)
        {
            // Check if the session was stopped while the operation was pending.
            if (stopped())
                return;

            if (!error)
            {
                output_queue_.pop_front();
                await_output();
            }
            else
            {
                stop();
            }
        });
    }

    void TCP_Session::check_deadline(steady_timer& deadline)
    {
        auto self(shared_from_this());
        deadline.async_wait(
            [this, self, &deadline](const boost::system::error_code& /*error*/)
        {
            // Check if the session was stopped while the operation was pending.
            if (stopped())
                return;

            // Check whether the deadline has passed. We compare the deadline
            // against the current time since a new asynchronous operation may
            // have moved the deadline before this actor had a chance to run.
            if (deadline.expiry() <= steady_timer::clock_type::now())
            {
                // The deadline has passed. Stop the session. The other actors will
                // terminate as soon as possible.
                stop();
            }
            else
            {
                // Put the actor back to sleep.
                check_deadline(deadline);
            }
        });
    }

    //----------------------------------------------------------------------

    UDP_Broadcaster::UDP_Broadcaster(boost::asio::io_context& io_context,
        const udp::endpoint& broadcast_endpoint)
        : socket_(io_context)
    {
        socket_.connect(broadcast_endpoint);
        socket_.set_option(udp::socket::broadcast(true));
    }

    void UDP_Broadcaster::deliver(const std::string& msg)
    {
        boost::system::error_code ignored_error;
        socket_.send(boost::asio::buffer(msg), 0, ignored_error);
    }

    //----------------------------------------------------------------------

    SPSocketServer::SPSocketServer(boost::asio::io_context& io_context,
        const tcp::endpoint& listen_endpoint,
        const udp::endpoint& broadcast_endpoint)
        : io_context_(io_context),
        acceptor_(io_context, listen_endpoint)
    {
        channel_.Join(std::make_shared<UDP_Broadcaster>(io_context_, broadcast_endpoint));        
    }

    void SPSocketServer::accept()
    {
        acceptor_.async_accept(
            [this](const boost::system::error_code& error, tcp::socket socket)
        {
            if (!error)
            {
                auto tcp_ptr = std::make_shared<TCP_Session>(std::move(socket), channel_, this);
                tcp_ptr->UseReadUntil(read_terminator);
                tcp_ptr->UseReadWriteTimeOut(read_write_timeout);
                tcp_ptr->Start();
            }
            accept();
        });
    }

    void SPSocketServer::StopServer()
    {
        if (acceptor_.is_open())
        {
            acceptor_.cancel();
            acceptor_.close();
        }
        OnServerStopped();
    }
}