using Serilog;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace CoinbaseEngine
{
public sealed class RouterClient
{
private static readonly Lazy<RouterClient> lazy = new Lazy<RouterClient>(() => new RouterClient());
public static RouterClient Instance { get { return lazy.Value; } }
public bool RouterRunning { get; private set; } = false;
private readonly string REMOTE_HOST = CoinbaseConfig.RemoteHost;
private readonly int REMOTE_PORT = CoinbaseConfig.RemotePort;
private Thread _readThread;
private TcpClient _tcpClient;
private NetworkStream _networkStream;
private ModelParser modelParser;
private bool _isLoggedIn = false;
private bool _sentLoginReq = false;
private RouterClient()
{
modelParser = new ModelParser();
}
public void Start()
{
Reconnect();
}
private void Stop()
{
if (_readThread != null && RouterRunning)
{
if (_readThread.Join(1000) == false)
_readThread.Abort();
RouterRunning = false;
_isLoggedIn = false;
}
_readThread = null;
Log.Information($"RouterClient.Stop");
PersistantCache.Instance.Clear();
}
public void Term()
{
if (_networkStream != null)
{
_networkStream.Close();
_networkStream = null;
_tcpClient?.Close();
}
Stop();
Log.Information($"RouterClient.Dispose");
}
private void Reconnect()
{
try
{
_tcpClient = new TcpClient(REMOTE_HOST, REMOTE_PORT);
//_tcpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, false);
//_tcpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
_networkStream = new NetworkStream(_tcpClient.Client);
RouterRunning = true;
_isLoggedIn = false;
if (_readThread == null)
{
_readThread = new Thread(Recv);
_readThread.Name = "TServerRecv";
_readThread.IsBackground = true;
_readThread.Start();
}
Log.Information($"Connected to TServer [{REMOTE_HOST}:{REMOTE_PORT}]");
// connection established, send template list request
Send("1\r\n");
}
catch (SocketException ex)
{
Stop();
Log.Error($"RouterClient.Reconnect: {ex.Message}");
}
catch (IOException ex)
{
Stop();
Log.Error($"RouterClient.Reconnect: {ex.Message}");
}
catch (Exception ex)
{
Stop();
Log.Error($"RouterClient.Reconnect: {ex.Message}");
}
}
private async void Recv()
{
while (RouterRunning && _networkStream.CanRead)
{
if (!_networkStream.DataAvailable)
{
await Task.Delay(50);
continue;
}
try
{
var reader = new StreamReader(_networkStream);
_sentLoginReq = false;
while (!reader.EndOfStream)
{
var readStr = await reader.ReadLineAsync();
Log.Information($"RECV: [{readStr}]");
ProcessRecv(readStr);
if (!_sentLoginReq)
SendLoginRequest(null);
}
}
catch (SocketException ex)
{
Stop();
Log.Error($"RouterClient.Recv: {ex.Message}");
}
catch (IOException ex)
{
//Stop();
Log.Error($"RouterClient.Recv: {ex.Message}");
}
catch (Exception ex)
{
//Stop();
Log.Error($"RouterClient.Recv: {ex.Message}");
}
}
}
public void Send(string sendBuffer)
{
if (!RouterRunning)
Reconnect();
if (!RouterRunning)
{
Log.Error($"RouterClient.Send: Unable to establish connection.");
return;
}
Log.Information($"SEND: [{sendBuffer.Substring(0, sendBuffer.Length-2)}]");
var writeBuf = Encoding.ASCII.GetBytes(sendBuffer);
try
{
if (_networkStream.CanWrite && writeBuf.Length > 0)
_networkStream.WriteAsync(writeBuf, 0, writeBuf.Length);
}
catch (SocketException ex)
{
Stop();
Log.Error($"RouterClient.Send: {ex.Message}");
}
catch (IOException ex)
{
Stop();
Log.Error($"RouterClient.Send: {ex.Message}");
}
catch (Exception ex)
{
Stop();
Log.Error($"RouterClient.Send: {ex.Message}");
}
}
private ModelData ProcessRecv(string inputStr)
{
var md = modelParser.InterpretData(inputStr);
if (md == null) return null;
switch (md.MsgType)
{
case MsgId.LoginRequest:
{
_isLoggedIn = true;
Log.Information("Logged in TServer.");
}
break;
case MsgId.HeartBeatRequest:
SendHeartBeatReply(md);
break;
case MsgId.OrderRequest:
SendOrder(md);
break;
case MsgId.ResyncOrderRequest:
{
SendOrderRequestStart(md);
var total = CoinbaseProService.Instance.SendOrderRequestData(md).Result;
SendOrderRequestReply(md, total);
}
break;
case MsgId.ResyncTradeRequest:
{
SendTradeRequestReply(md);
CoinbaseProService.Instance.SendTradeRequestData(md);
}
break;
case MsgId.DoneTrade:
AcknowledgeDoneTrade(md);
break;
default:
break;
}
return md;
}
private void SendLoginRequest(ModelData md)
{
var cc = PersistantCache.Instance.GetClientMsgCount();
var sc = PersistantCache.Instance.GetServerMsgCount();
if (cc == 0 && sc == 0) return;
// send login request
if (cc == PersistantCache.Instance.clientTempList && sc == PersistantCache.Instance.serverTempList)
{
SendLoginRequest();
}
}
private void SendLoginRequest()
{
if (_isLoggedIn)
{
Log.Information("Already logged in TServer");
return;
}
var msgTagStr = new CsvMsgTag();
msgTagStr.SetMsgTag(TagMsgId.ReqNo, 1);
msgTagStr.SetMsgTag(TagMsgId.GatewayCode, CoinbaseConfig.GatewayCode);
msgTagStr.SetMsgTag(TagMsgId.ServerId, CoinbaseConfig.ServerId);
msgTagStr.SetMsgTag(TagMsgId.LoginTime, DateTime.Now);
var prot = PersistantCache.Instance.GetClientMsg(MsgId.LoginRequest);
var str = msgTagStr.GetCsvMsgTagString(prot);
Send(str);
_sentLoginReq = true;
}
private void SendHeartBeatReply(ModelData md)
{
var str = string.Format($"{Convert.ToInt32(MsgId.HeartBeatReply)},{md.ReqNo}\r\n");
Send(str);
}
private void SendOrderRequestStart(ModelData md)
{
var str = string.Format($"{Convert.ToInt32(MsgId.ResyncOrderStart)},{md.ReqNo}\r\n");
Send(str);
}
private void SendOrderRequestReply(ModelData md, int tot)
{
var str = string.Format($"{Convert.ToInt32(MsgId.ResyncOrderRequest)},{md.ReqNo},0,,{tot}\r\n");
Send(str);
}
private void SendTradeRequestReply(ModelData md)
{
var str = string.Format($"{Convert.ToInt32(MsgId.ResyncTradeRequest)},{md.ReqNo},0,\r\n");
Send(str);
}
private void SendOrder(ModelData md)
{
if (md == null) return;
var msgTag = md.csvMsgTag;
var action = msgTag.GetMsgTag(TagMsgId.Action);
if (action.Equals("1")) // add
{
// insert ord acc_no to db for mapping
SQLite.Instance.InsertToOrderIdMap(md);
if (msgTag.GetMsgTag(TagMsgId.OrderType).Equals("6"))
CoinbaseProService.Instance.PlaceMarketOrder(md);
else if (msgTag.GetMsgTag(TagMsgId.OrderType).Equals("0"))
{
if (msgTag.GetMsgTag(TagMsgId.StopType).Equals("L"))
CoinbaseProService.Instance.PlaceStopOrder(md);
else
CoinbaseProService.Instance.PlaceLimitOrder(md);
}
else if (msgTag.GetMsgTag(TagMsgId.OrderType).Equals("2"))
CoinbaseProService.Instance.RejectOrder(md, "Auction Order Not Supported");
else
CoinbaseProService.Instance.RejectOrder(md, "Reject Unknown Order");
}
else if (action.Equals("2")) // change
{
CoinbaseProService.Instance.ChangeOrderById(md);
}
else if (action.Equals("3")) // delete
{
CoinbaseProService.Instance.CancelOrderById(md);
}
else
{
CoinbaseProService.Instance.RejectOrder(md, "Reject Unknown Order");
}
}
private void AcknowledgeDoneTrade(ModelData md)
{
var tradeNo = md.csvMsgTag.GetMsgTag(TagMsgId.TradeNo);
var seqNo = md.csvMsgTag.GetMsgTag(TagMsgId.SeqNo);
SQLite.Instance.AcknowledgeDoneTrade(Convert.ToInt64(tradeNo), Convert.ToInt64(seqNo));
}
}
}