using System;
using System.Data;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Timers;
using System.Diagnostics;
using MapCommon;
/// <summary>
/// Package for all related classes that handles the parsing of data and saving to
/// the main database.
/// The engine for MapParser.
/// </summary>
namespace ParserEngine
{
/// <summary>
/// Handles parsing of device data to MapDest Application data.
/// </summary>
public sealed class ParserEngine
{
private static readonly ParserEngine _instance = new ParserEngine();
/// <summary>
/// Explicit static constructor to tell C# compiler not to mark type as beforefieldinit.
/// </summary>
static ParserEngine() { }
private ParserEngine() { }
public static ParserEngine Instance { get { return _instance; } }
/// <summary>
/// Volatile boolean for thread safety.
/// </summary>
private volatile bool _running = false;
/// <summary>
/// Thread array for parser multi-threading.
/// </summary>
private Thread[] thr_parsers = null;
/// <summary>
/// The actual start of the parsing process.
/// Creates multiple threads base on model list.
/// Model list is taken from <see cref="ParserCfg.Models"/>.
/// <para>Is called by <see cref="MapParser.ParserService.OnStart"/>.</para>
/// </summary>
public void Start()
{
try
{
string models = ParserCfg.Models;
int interval = ParserCfg.Interval;
string[] model_list = models.Split(',');
thr_parsers = new Thread[model_list.Length];
_running = true;
for (int i = 0; i < model_list.Length; i++)
{
ManualResetEvent manReset = new ManualResetEvent(false);
// this way allows passing multiple objects
thr_parsers[i] = new Thread(
() =>
{
//manReset.Set();
Start_Parse_Loop(ref manReset, model_list[i]);
});
thr_parsers[i].Name = model_list[i];
thr_parsers[i].IsBackground = true;
thr_parsers[i].Start();
manReset.WaitOne(); // wait for child thread to initiate signal before continuing
}
}
catch (Exception ex)
{
_running = false;
MapLogger.LogErrorLine("ParserEngine.Start - Exception: {0}", ex.Message);
}
}
/// <summary>
/// Aborts the threads doing parsing process.
/// Clears thread array.
/// </summary>
public void Stop()
{
_running = false;
try
{
// stop all parsing threads, may need to force abort
foreach (Thread thr in thr_parsers)
{
if (thr.Join(10 * 1000) == false) // abort if fail to finish in 10 secs
thr.Abort();
}
thr_parsers = null;
//GC.Collect();
}
catch
{
MapLogger.LogErrorLine("ParserEngine.Stop - Failed to terminate all threads.");
}
MapLogger.LogInfoLine("ParserEngine - All Stop.");
}
/// <summary>
/// After creating the thread on <see cref="Start"/> function,
/// starts the thread and begin looping for actual parsing process.
/// This function will execute on its respective thread.
/// </summary>
/// <param name="manReset">Thread event controller (blocks thread or continues it)</param>
/// <param name="model">Device model</param>
/// <seealso cref="MapCommon.DeviceModels"/>
private void Start_Parse_Loop(ref ManualResetEvent manReset, string model)
{
int wait_sec = ParserCfg.Interval;
try
{
MapLogger.LogInfoLine("ParserEngine - Started for [{0}]", model);
TrackerSQL trkConn = new TrackerSQL();
MapDestSQL MapConn = null;
switch (model)
{
case DeviceModels.RFID:
MapConn = new RptRfidSQL();
break;
case DeviceModels.RCD:
MapConn = new MapRCDSQL();
break;
case DeviceModels.NEW1:
case DeviceModels.NEW2:
MapConn = new MapNewSQL();
break;
case DeviceModels.SET1:
case DeviceModels.SET2:
MapConn = new DevSetCfgSQL();
break;
default:
MapConn = new MapDestSQL();
break;
}
manReset.Set(); // signal the calling thread [main thread] to continue running
// Main parser loop, there is where all the action happens!
while (_running)
{
Start_Parsing_For(model, trkConn, MapConn);
Thread.Sleep(wait_sec);
}
MapLogger.LogInfoLine("ParserEngine - Stopped for [{0}]", model);
}
catch (Exception ex)
{
_running = false;
MapLogger.LogErrorLine("Exception Start_Parse_Loop: {0}", ex.Message);
}
}
/// <summary>
/// Handles the insertion of parsed data to main database
/// </summary>
/// <param name="posdata">Parsed data. MapDest Application readable data</param>
/// <param name="MapType">Which Sql connection/transaction to use. See <see cref="ParserEngine.MapDestSQL"/></param>
/// <param name="conn_str">Connection string</param>
/// <seealso cref="ParserEngine.ParserCfg"/>
/// <returns>Returns <see cref="MapCommon.ParseResult"/></returns>
private static ParseResult StandaloneInsertPosData(ref PosData posdata, Type MapType, string conn_str)
{
// MapLogger.LogInfoLine("Previous insert error. Relying on ParserEngine.StandaloneInsertPosData.");
try
{
using (MapDestSQL MapConn = (MapDestSQL)Activator.CreateInstance(MapType, conn_str))
{
if (!MapConn.Connect())
{
// It seems we are unable to connect to DB, we assume DB or network is down.
// Force data to be re-parse and enter DB at next loop
MapLogger.LogErrorLine("ParserEngine.StandaloneInsertPosData: Unable to connect to DB.");
return ParseResult.NEWDATA;
}
Thread.Sleep(10);
if (!MapConn.IsConnected())
{
// It seems we are unable to connect to DB, we assume DB or network is down.
// Force data to be re-parse and enter DB at next loop
MapLogger.LogErrorLine("ParserEngine.StandaloneInsertPosData: Unable to connect to DB.");
return ParseResult.NEWDATA;
}
Thread.Sleep(10);
string strerr = string.Empty;
posdata.ID = MapConn.InsertPosData(posdata, ref strerr);
if (posdata.ID < 0)
{
MapLogger.LogErrorLine("ParserEngine.StandaloneInsertPosData Bad Insert: {0}", strerr);
return ParseResult.PARSE_OK_NEED_REPARSE;
}
return ParseResult.PARSE_OK;
}
}
catch (Exception ex)
{
MapLogger.LogErrorLine("ParserEngine.StandaloneInsertPosData Error: {0}", ex.Message);
return ParseResult.PARSE_OK_NEED_REPARSE;
}
}
/// <summary>
/// The actual parsing process. Parses device data
/// </summary>
/// <param name="model">Device model</param>
/// <param name="trkConn">Socket/Parser database connection</param>
/// <param name="MapConn">database connection</param>
private void Start_Parsing_For(string model, TrackerSQL trkConn, MapDestSQL MapConn)
{
try
{
IParserBase pParserObj = null;
int nRows = 0;
do
{
// get datagram
DataTable dt = trkConn.FetchTrackerLogData(model, ParseResult.NEWDATA);
if (dt != null && dt.Rows.Count > 0)
{
Stopwatch sw_trk = new Stopwatch();
Stopwatch sw_Map = new Stopwatch();
nRows = dt.Rows.Count;
// loop through and parse data
for (int i = 0; i < nRows; i++)
{
//MapLogger.LogInfoLine("Parsing Data[{0}]... {1}", i, DateTime.Now.ToString());
long nId = Convert.ToInt64(dt.Rows[i]["ID"]);
string sDevModel = dt.Rows[i]["DevModel"].ToString();
byte[] szDatagram = (byte[]) dt.Rows[i]["Datagram"];
DateTime dtRecvTime = (DateTime) dt.Rows[i]["RecvTime"];
// parse Datagram base on device model
ParseResult result = ParseResult.NEWDATA;
PosData posdata = new PosData();
posdata.SvrDtTm = dtRecvTime;
posdata.deviceInfo.DevModel = sDevModel;
switch (sDevModel)
{
case DeviceModels.DEV1:
pParserObj = Parse_DEV1.Instance;
break;
case DeviceModels.DEV2:
pParserObj = Parse_DEV2.Instance;
break;
case DeviceModels.DEV3:
pParserObj = Parse_DEV3.Instance;
break;
default:
pParserObj = null;
result = ParseResult.PARSE_MODEL_NOT_FOUND;
break;
}
// call the appropriate parser
if (pParserObj != null)
{
result = pParserObj.ParseDataGram(szDatagram, MapConn, ref posdata);
}
// insert into MapDest DB
if (result == ParseResult.PARSE_OK)
{
sw_Map.Reset();
sw_Map.Start();
string strerr = string.Empty;
posdata.ID = MapConn.InsertPosData(posdata, ref strerr);
if (posdata.ID < 0) // 2nd try
posdata.ID = MapConn.InsertPosData(posdata, ref strerr);
if (posdata.ID < 0) // 3rd try
posdata.ID = MapConn.InsertPosData(posdata, ref strerr);
// 3 tries and we failed to insert into DB, let's try one last time
if (posdata.ID < 0)
{
// insert posdata into DB in its own instance independent of previous connection
result = ParserEngine.StandaloneInsertPosData(ref posdata, MapConn.GetType(), ParserCfg.MapConnectionString);
}
// final try using a different network leg
if (posdata.ID < 0)
{
result = ParserEngine.StandaloneInsertPosData(ref posdata, MapConn.GetType(), ParserCfg.MapAlternativeConnectionString);
}
sw_Map.Stop();
}
// update result if needed
if (posdata.ID == (int) ParseResult.PARSE_OK_SET_ONLY)
{
result = ParseResult.PARSE_OK_SET_ONLY;
}
// update Tracker log table and log it
if (pParserObj != null)
{
sw_trk.Reset();
sw_trk.Start();
string strerr = string.Empty;
trkConn.UpdateTrackerLog(nId, posdata, result, out strerr);
sw_trk.Stop();
// log data
pParserObj.LogData(
nId,
posdata.ID,
result, // ParseResult
sw_Map.Elapsed.TotalSeconds,
sw_trk.Elapsed.TotalSeconds,
posdata.DevID, // Device Id
szDatagram); // Original raw datagram
}
if (!_running) break;
}
if (!_running) break;
}
Thread.Sleep(10);
}
while (_running && nRows >= trkConn.GetLimit());
}
catch (Exception ex)
{
MapLogger.LogErrorLine("Exception Start_Parsing_For: {0}", ex.Message);
}
}
}
}