using Serilog;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
namespace CoinbaseEngine
{
public sealed class RouterServer
{
private static readonly Lazy<RouterServer> lazy = new Lazy<RouterServer>(() => new RouterServer());
public static RouterServer Instance { get { return lazy.Value; } }
public bool RouterRunning { get; private set; } = false;
private readonly List<string> tradingPairs = CoinbaseConfig.TradingPairs;
private readonly int LocalPort = CoinbaseConfig.LocalPort;
private readonly string PriceProt = CoinbaseConfig.PriceProt;
private volatile int m_nConnections = 0;
private TcpListener m_server { get; set; }
private RouterServer()
{
}
public async void Start()
{
this.m_server = new TcpListener(IPAddress.Any, LocalPort);
this.m_server.Start();
RouterRunning = true;
Log.Information("Server start: {0}", this.m_server.LocalEndpoint.ToString());
// start listening to socket port and accept incoming device messages
while (RouterRunning)
{
try
{
TcpClient client = await this.m_server.AcceptTcpClientAsync();
Thread t = new Thread(HandleReadThread); // possibly may need to decrease stack size
t.IsBackground = true;
t.Start(client);
}
catch (SocketException ex)
{
if (!RouterRunning) break;
Log.Error("SocketException Start: {0}", ex.Message);
}
catch (Exception ex)
{
if (!RouterRunning) break;
Log.Error("Exception Start: {0}", ex.Message);
}
}
// listening at port ended, socket server stopped running
if (!RouterRunning)
{
Log.Information("Server stop: {0}", this.m_server.LocalEndpoint.ToString());
}
}
public void Stop()
{
RouterRunning = false;
this.m_server.Stop();
Log.Information("Server stop.");
}
private async void HandleReadThread(object obj)
{
byte[] data = new byte[128];
TcpClient remclient = (TcpClient)obj;
NetworkStream netstream = remclient.GetStream();
Thread writeThr = null;
Log.Information("Client Connected. Active Connections = {0}", ++m_nConnections);
int thrId = m_nConnections;
// main loop to service the client
while (netstream.CanRead)
{
var stat = remclient.GetState();
if (stat != TcpState.Established)
{
Log.Information("Remote Client Disconnected");
break;
}
if (!netstream.DataAvailable)
{
await Task.Delay(500);
continue;
}
try
{
// clear buffer and read from network packet [blocking call]
Array.Clear(data, 0, data.Length);
var recv = await netstream.ReadAsync(data, 0, data.Length);
// handle login and acknowledge client
string str = Encoding.ASCII.GetString(data, 0, recv);
Log.Information($"RECV[{thrId}]: {str}");
if (str.StartsWith("1\r\n") && writeThr == null)
{
var reply = String.Format($"1,1,0\r\n{PriceProt}\r\n");
netstream.Write(Encoding.ASCII.GetBytes(reply), 0, reply.Length);
writeThr = new Thread(HandleSendThread); // possibly may need to decrease stack size
writeThr.IsBackground = true;
writeThr.Name = string.Format($"Price Read/Write Thread [{thrId}]");
writeThr.Start(netstream);
}
}
catch (SocketException ex)
{
Log.Error("HandleClientThread - SocketException: {0}", ex.Message);
break;
}
catch (Exception ex)
{
Log.Error("HandleClientThread - Exception: {0}", ex.Message);
break;
}
}
netstream.Close();
remclient.Close();
Log.Information("Client Disconnected. Active Connections = {0}", --m_nConnections);
}
private async void HandleSendThread(object obj)
{
NetworkStream netstream = (NetworkStream)obj;
bool _canSend = true;
Log.Information(Thread.CurrentThread.Name);
while (_canSend)
{
// main loop to service the client
tradingPairs.ForEach(p => {
try
{
var cbProd = CommonUtil.GetCoinbasePairBySPTraderCode(p);
var str = CoinbasePrice.Instance.GetProductPrice(cbProd);
netstream.WriteAsync(Encoding.ASCII.GetBytes(str), 0, str.Length);
}
catch (SocketException e)
{
Log.Error($"RouterServer.HandleSendThread.SocketException: {e.Message}");
_canSend = false;
}
catch (Exception e)
{
Log.Error($"RouterServer.HandleSendThread.Exception: {e.Message}");
_canSend = false;
}
});
await Task.Delay(CoinbaseConfig.PriceInterval);
}
netstream.Close();
}
}
}