// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Heavy modification by Tim Hsu, Sharp Point Ltd. 2022.

#include "SPSocketClient.h"

#include <iostream>

namespace SPSocket
{
	void SPSocketClient::Connect(const std::string& host, int port)
	{
		status = ConnectionStatus::S_CLOSED;
		start(resolver_.resolve(host, std::to_string(port)));
	}

	void SPSocketClient::start(tcp::resolver::results_type endpoints)
	{
		// Start the connect actor.
		endpoints_ = endpoints;
		start_connect(endpoints_.begin());

		// Start the deadline actor. You will note that we're not setting any
		// particular deadline here. Instead, the connect and input actors will
		// update the deadline prior to each asynchronous operation.
		deadline_.async_wait(std::bind(&SPSocketClient::check_deadline, this, _1));
	}

	void SPSocketClient::UseReadUntil(char terminator)
	{
		use_read_until = true;
		read_terminator = terminator;
	}

	void SPSocketClient::UseSendHeartBeat(int sec_interval, const std::string& heartbeat)
	{
		hb_interval = sec_interval;
		heartbeat_str_ = heartbeat;
	}

	void SPSocketClient::Disconnect()
	{
		if (running_)
		{
			status = ConnectionStatus::S_CLOSED;
			boost::system::error_code ignored_error;
			socket_.close(ignored_error);
			deadline_.cancel();
			heartbeat_timer_.cancel();

			OnDisconnected();
			running_ = false;
		}
	}

	void SPSocketClient::start_async_reading()
	{
		if (!IsConnected())
			return;

		if (!running_)
		{
			running_ = true;

			if (use_read_until)
				start_read_until();
			else
				start_read();

			if (heartbeat_str_.length() > 0)
				send_heartbeat();
		}
	}

	void SPSocketClient::start_connect(tcp::resolver::results_type::iterator endpoint_iter)
	{
		if (endpoint_iter != endpoints_.end())
		{
			status = ConnectionStatus::S_CONNECTING;
			OnConnecting(endpoint_iter->endpoint());

			// Set a deadline for the connect operation.
			if (read_timeout > 0)
			{
				deadline_.expires_after(std::chrono::seconds(read_timeout));
			}

			// Start the asynchronous connect operation.
			socket_.async_connect(endpoint_iter->endpoint(),
				std::bind(&SPSocketClient::handle_connect,
					this, _1, endpoint_iter));
		}
		else
		{
			// There are no more endpoints to try. Shut down the client.
			Disconnect();
		}
	}

	void SPSocketClient::handle_connect(const boost::system::error_code& error,
		tcp::resolver::results_type::iterator endpoint_iter)
	{
		// The async_connect() function automatically opens the socket at the start
		// of the asynchronous operation. If the socket is closed at this time then
		// the timeout handler must have run first.
		if (!socket_.is_open())
		{
			status = ConnectionStatus::S_CONNECT_ERROR;
			OnConnectTimedOut(endpoint_iter->endpoint());			
			return;

			// Try the next available endpoint.
			//start_connect(++endpoint_iter);
		}

		// Check if the connect operation failed before the deadline expired.
		else if (error)
		{
			status = ConnectionStatus::S_CONNECT_ERROR;
			OnConnectionError(error.message());

			// We need to close the socket used in the previous connection attempt
			// before starting a new one.
			socket_.close();

			return;
			// Try the next available endpoint.
			//start_connect(++endpoint_iter);
		}

		// Otherwise we have successfully established a connection.
		else
		{
			status = ConnectionStatus::S_CONNECTED;
			OnConnected(endpoint_iter->endpoint());

			// Start the input actor.
			start_async_reading();
		}
	}

	void SPSocketClient::start_read()
	{
		if (!IsConnected())
			return;
		
		if (read_timeout > 0)
		{
			// Set a deadline for the read operation.
			deadline_.expires_after(std::chrono::seconds(read_timeout));
		}

		// simulate memset 0
		recv_buffer_.consume(recv_buffer_.size() + 1);

		boost::asio::async_read(socket_, recv_buffer_,
			boost::asio::transfer_at_least(1),
			std::bind(&SPSocketClient::handle_read, this, _1, _2));
	}

	void SPSocketClient::start_read_until()
	{
		if (!IsConnected())
			return;

		if (read_timeout > 0)
		{
			// Set a deadline for the read operation.
			deadline_.expires_after(std::chrono::seconds(read_timeout));
		}	

		// Start an asynchronous operation to read a newline-delimited message.
		boost::asio::async_read_until(socket_,
			boost::asio::dynamic_buffer(input_buffer_), read_terminator,
			std::bind(&SPSocketClient::handle_read_until, this, _1, _2));
	}

	void SPSocketClient::handle_read(const boost::system::error_code& error, std::size_t n)
	{
		if (!error)
		{
			// Empty messages are heartbeats and so ignored.
			if (n > 0)
			{
				boost::asio::streambuf::const_buffers_type bufs = recv_buffer_.data();
				std::string str(boost::asio::buffers_begin(bufs),
					boost::asio::buffers_begin(bufs) + recv_buffer_.size());

				// Extract the delimited message from the buffer.
				std::string str_recv(str.substr(0, n - 1));

				if (use_recv_polling)
					push(str_recv);
				else
					OnReceive(str_recv);
			}
			start_read();
		}
		else
		{
			OnReceiveError(error.message());
			Disconnect();
		}
	}

	void SPSocketClient::handle_read_until(const boost::system::error_code& error, std::size_t n)
	{
		if (!error)
		{
			// Extract the delimited message from the buffer.
			std::string str_recv(input_buffer_.substr(0, n - 1));
			input_buffer_.erase(0, n);

			// Empty messages are heartbeats and so ignored.
			if (!str_recv.empty())
			{
				//std::thread([=] { OnReceive(str_recv); }).detach();		// use async callback instead, maybe dangerous
				if (use_recv_polling)
					push(str_recv);
				else
					OnReceive(str_recv);
			}
			start_read_until();
		}
		else
		{
			OnReceiveError(error.message());
			Disconnect();
		}
	}

	void SPSocketClient::send(const std::string& content)
	{
		if (!IsConnected())
			return;

		// Start an asynchronous operation to send a heartbeat message.
		boost::asio::async_write(socket_, boost::asio::buffer(content, content.length()),
			std::bind(&SPSocketClient::handle_send, this, _1));
	}

	void SPSocketClient::handle_send(const boost::system::error_code& error)
	{
		if (error)
		{
			OnSendError(error.message());
			Disconnect();
		}
	}

	void SPSocketClient::send_heartbeat()
	{
		if (!IsConnected())
			return;

		// Start an asynchronous operation to send a heartbeat message.
		boost::asio::async_write(socket_, boost::asio::buffer(heartbeat_str_, heartbeat_str_.length()),
			std::bind(&SPSocketClient::handle_send_heartbeat, this, _1));
	}

	void SPSocketClient::handle_send_heartbeat(const boost::system::error_code& error)
	{
		if (!error)
		{
			// Wait X seconds before sending the next heartbeat.
			heartbeat_timer_.expires_after(std::chrono::seconds(hb_interval));
			heartbeat_timer_.async_wait(std::bind(&SPSocketClient::send_heartbeat, this));
		}
		else
		{
			OnHeartBeatError(error.message());
			Disconnect();
		}
	}

	void SPSocketClient::check_deadline(const boost::system::error_code& error)
	{
		// Check whether the deadline has passed. We compare the deadline against
		// the current time since a new asynchronous operation may have moved the
		// deadline before this actor had a chance to run.
		if (deadline_.expiry() <= steady_timer::clock_type::now())
		{
			if (error)
			{
				OnReceiveTimeOut(error.message());

				// The deadline has passed. The socket is closed so that any outstanding
				// asynchronous operations are cancelled.
				socket_.close();
			}

			// There is no longer an active deadline. The expiry is set to the
			// maximum time point so that the actor takes no action until a new
			// deadline is set.
			deadline_.expires_at(steady_timer::time_point::max());
		}

		// Put the actor back to sleep.
		deadline_.async_wait(std::bind(&SPSocketClient::check_deadline, this, _1));
	}

	void SPSocketClient::push(const std::string& data)
	{
		std::lock_guard<std::mutex> lock(mtx_);
		recv_queue_.push(data);
	}

	void SPSocketClient::pop()
	{
		std::string data = "";
		{	// RAII
			std::lock_guard<std::mutex> lock(mtx_);
			if (recv_queue_.empty()) return;
			data = recv_queue_.front();
			recv_queue_.pop();
		}
		OnReceive(data);			
	}
}