// ==========================================================================
// Author:  Yee Hsu
// Date:    6/7/2010
//
// Desc:    Simple Mass Connection Server.
//          Allows muliple simulataneous connection oriented logins.
//          Server can serve muliple instances of users by creating a
//          thread per user and servicing them.
// ==========================================================================


// ----------------------------------------------------------------------------------------------------
// ----- includes -----
// ----------------------------------------------------------------------------------------------------
#include <sys/syscall.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <signal.h>
#include <fcntl.h>
#include <netdb.h>
#include <mysql.h>
#include <errno.h>
#include <stdio.h>
#include <time.h>

// ----------------------------------------------------------------------------------------------------
// ----- log level -----
// higher the log level, the more log output to file
//
// LEVEL 0      No Logging
// LEVEL 1      + data received loggings
// LEVEL 2      + more log + error logs
// LEVEL 3      + more log + signal logs
// LEVEL 4      + more log + additional error logs
// LEVEL 5      + maximum detail logs
// ----------------------------------------------------------------------------------------------------
#define LOG_LEVEL             5

// ----------------------------------------------------------------------------------------------------
// ----- constants -----
// ----------------------------------------------------------------------------------------------------
#define MYSQLLOCAL_HOST       "localhost"
#define MYSQLLOCAL_USER       "root"
#define MYSQLLOCAL_PASSWORD   "password"
#define MYSQLLOCAL_DATABASE   "dbtable"

#define REC_STATUS_ACTIVE     'A'
#define MAX_SOCKET            8192
#define MAX_BUFFER            8192
#define MAX_QUEUE             8192
#define NUM_PORT              9999        // your server port number
#define MAX_EXPECTED_TOKENS   14
#define HB_RESP               ">ALIVE\r\n"

// ----------------------------------------------------------------------------------------------------
// ----- global variables -----
// ----------------------------------------------------------------------------------------------------
MYSQL *_oMySQL;          // mysql handler 
int   _nSocketID;        // socket descriptor   

// ----------------------------------------------------------------------------------------------------
// ----- interfaces -----
// ----------------------------------------------------------------------------------------------------
void CleanUp(void);
void ExitSignal(int);
void KillThread(int);
void ExitError(const char*);
void Log(const char*, const int, const int, const char*);
void ReplaceSocket(const int, const char*);
long IsThreadValid(const int);
void* ServeClientThread(void*);

struct stDeviceSocket
{
    int deviceid;
    int socketid;
    
    pid_t     threadpd;    
    pthread_t threadid;
};

// ----------------------------------------------------------------------------------------------------
// ----- functions -----
// ----------------------------------------------------------------------------------------------------

/* On exit procedure. */
void CleanUp(void)
{
    mysql_close(_oMySQL);
    close(_nSocketID);
}

/* On exit by signals */
void ExitSignal(int sig)
{
    Log("localhost", 3, NUM_PORT, "FlexTrac Device Server Stopped.\n");
    CleanUp();
    exit(0);
}

/* On exit procedure. */
void ExitError(const char* szMsg)
{
    char strbuf[MAX_BUFFER] = {0};
    sprintf(strbuf, "%s - %d - %s\n", szMsg, errno, strerror(errno));
    Log("localhost", 2, NUM_PORT, strbuf);    
    CleanUp();
    exit(-1);
}

void KillThread(int sig) 
{
    // do nothing, 
    // so when function returns, recv() becomes nonblock and exists
    // pthread_exit(NULL);
}

long IsThreadValid(const int ntpid)
{
    char spath[MAX_BUFFER] = {0};
    memset(spath, 0, sizeof(spath));
          
    sprintf(spath, "/proc/%d/task/%ld", getpid(), ntpid);     
    return access(spath, F_OK);
}

/* Write raw data to log file */
void Log(const char* szIpAddr, const int nLevel, const int nSocket, const char* szData)
{
    if (LOG_LEVEL < nLevel)
      return;

    char buffer[MAX_BUFFER+64] = {0};
    time_t curtime;
    struct tm *loctime;

    curtime = time(NULL);
    loctime = localtime(&curtime);
    strftime(buffer, sizeof(buffer), "/var/log/s6138_%Y%m%d.log", loctime);

    FILE* fp = NULL;

    if ((fp = fopen(buffer, "a")) != NULL)
    {
        memset(buffer, 0, sizeof(buffer));
        strftime(buffer, sizeof(buffer), "%F %T", loctime);
        sprintf(buffer, "%s\t%s\t%d\t%s", buffer, szIpAddr, nSocket, szData);
        fprintf(fp, "%s", buffer);
        fclose(fp);
    }
}

/* Replace and close existing socket descriptor with new descriptor if it had already exist */
void ReplaceSocket(const int nSocket, const char* szData)
{    
    static pthread_mutex_t id_mutex = PTHREAD_MUTEX_INITIALIZER;
    static struct stDeviceSocket stdev[MAX_SOCKET] = {0}; 
 
    int i = 0;
    
    if (nSocket >= MAX_SOCKET)
        return;

    if (strlen(szData) < MAX_EXPECTED_TOKENS)
        return;
        
    char szDevice[8] = {0};
    memset(szDevice, 0, sizeof(szDevice));
    strncpy(szDevice, szData+2, 6);
    int nDevice = atoi(szDevice);
    
    // entire function here is a critical section
    pthread_mutex_lock(&id_mutex);    
    
    for (i = 0; i < MAX_SOCKET; i++)
    {
        if (stdev[i].deviceid == 0)
        {
            // new socket, add device
            stdev[i].deviceid = nDevice;
            stdev[i].socketid = nSocket;
            stdev[i].threadid = pthread_self();
            stdev[i].threadpd = syscall(SYS_gettid);                  // get thread task process id
            break;        
        }
    
        if (stdev[i].deviceid == nDevice)        
        {
            if (stdev[i].socketid != nSocket)
            {                      
                // old socket exist, replace it
                if (IsThreadValid(stdev[i].threadpd) == 0)            // test if thread is alive
                {                                                              
                    fcntl(stdev[i].socketid, F_SETFL, O_NONBLOCK);    // make client thread recv() nonblock              
                    pthread_kill(stdev[i].threadid, SIGALRM);         // send a signal to thread
                    //pthread_cancel(stdev[i].threadid);                
                }                                 
                Log("localhost", 5, nSocket, "Socket Replaced\n");                                
                stdev[i].socketid = nSocket;                        
            }               
            stdev[i].threadid = pthread_self();                       // reset thread id in case client close itself and establish using same socket
            stdev[i].threadpd = syscall(SYS_gettid);                  // get thread task process id               
            break;                                    
        }        
    }
    pthread_mutex_unlock(&id_mutex);    
}

/*
    Client Thread. Each client is executed on its own thread.
    Continueously read and serve client until it closes the connection.
*/
void* ServeClientThread(void* pArg)
{
    pthread_detach(pthread_self()); 

    static pthread_mutex_t cs_mutex = PTHREAD_MUTEX_INITIALIZER;
    struct sockaddr_in clientsock;
    struct timeval tv;
        
    int clientsd = *((int*)(pArg));        
    socklen_t paddrlen = (socklen_t) sizeof(clientsock);       
    long readsize = 0, ret;    

    char szClientDataBuff[MAX_BUFFER+1] = {0};
    char szSqlStatement[MAX_BUFFER+1] = {0};
    
    // for connection timeouts, set and clear file des, set timeout      
    fd_set rfds;
    FD_ZERO(&rfds);
    FD_SET(clientsd, &rfds);
    tv.tv_sec = 300;     
    tv.tv_usec = 0;                         

    // get client info
    getpeername(clientsd, (struct sockaddr*) &clientsock, &paddrlen);
    Log(inet_ntoa(clientsock.sin_addr), 2, clientsd, "Client Connected.\n");
    
    // handle certain signals    
    signal(SIGALRM, KillThread);    // uses an interrupt to exit curent thread
          
    while (1)
    {                  
        memset(szClientDataBuff, 0, sizeof(szClientDataBuff));
        memset(szSqlStatement, 0, sizeof(szSqlStatement));
        
        // wait until socket stream has data to read                       
        if ((ret = select(clientsd+1, &rfds, NULL, NULL, &tv)) <= 0)
        {
            if (ret == 0)
            {
                Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "select(0) break, timed out.\n");                      
            }          
            else if (ret == -1)
            {
                Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "select(-1) break, socket replaced. closing.\n");                             
            }
            else
            {
                Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "select(?) break, error unknown.\n");                
            }         
            break;
        }           

        // read packet from client, get flextrac string
        // blocking call, will eventually catch a signal to wake up and kill thread        
        if ((readsize = recv(clientsd, szClientDataBuff, MAX_BUFFER, 0)) <= 0)
        {
            if (readsize == 0)
            {
                Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "recv(0) break, client closed connection.\n");                                                 
            }          
            else if (readsize == -1)
            {
                Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "recv(-1) break, socket replaced. closing.\n");                                
            }
            else
            {
                Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "recv(?) break, error unknown.\n");                
            }
            break;             
        }             

        // log any and all data sent to server
        Log(inet_ntoa(clientsock.sin_addr), 1, clientsd, szClientDataBuff);
        tv.tv_sec = 86400;     
                    
        // respond to incoming heartbeat request                    
        if (strncmp(szClientDataBuff, ">H", 2) == 0)
        {        
            // awkknowledges heartbeat
            if (send(clientsd, HB_RESP, 8, 0) <= 0)
            {
                Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "send(HB) error.\n");
                break;
            }
            
            // check for previous connection, if so, close it
            ReplaceSocket(clientsd, szClientDataBuff);            
                
            // log heartbeat request and response
            Log(inet_ntoa(clientsock.sin_addr), 2, clientsd, HB_RESP);                                                         
        }                                     
                                        
        // respond to incoming new Flextrac string
        if (strncmp(szClientDataBuff, ">C", 2) == 0 || strncmp(szClientDataBuff, ">B", 2) == 0)
        {        
            // check for previous connection, if so, close it
            ReplaceSocket(clientsd, szClientDataBuff);
            
            // truncate any data beyond newline
            char* pp = strchr(szClientDataBuff, '\n');
            pp != NULL ? *(pp+1) = 0 : 0;         
    
            // critical section, write to database
            pthread_mutex_lock(&cs_mutex);
            mysql_query(_oMySQL, szSqlStatement);
            pthread_mutex_unlock(&cs_mutex);            
        }                                                       
    }
    Log(inet_ntoa(clientsock.sin_addr), 2, clientsd, "Client Closed.\n");
    close(clientsd);
    pthread_exit(NULL);
}

/*
    Main Driver Routine
*/
int main(int argc, char* argv[])
{
    const int reuse = 1;

    pthread_t tid;
    pthread_attr_t attr;

    int err, clientsd;        
    struct sockaddr_in localsock;
    struct sockaddr_in remotesock;      

    // handle certain signals
    signal(SIGPIPE, SIG_IGN);     // ignore client self term
    signal(SIGINT,  ExitSignal);     // close on interrupts
    signal(SIGTERM, ExitSignal);     // close on terminates

    // init mysql
    if ((_oMySQL = mysql_init(NULL)) == NULL)
        ExitError("Error: mysql_init()");              

    // select db to connect
    if (mysql_real_connect(_oMySQL, MYSQLLOCAL_HOST, MYSQLLOCAL_USER, MYSQLLOCAL_PASSWORD, MYSQLLOCAL_DATABASE, MYSQL_PORT, NULL, 0) == NULL)
        ExitError("Error: mysql_real_connect()");

    _oMySQL->reconnect = 1;

    // setup local socket
    memset(&localsock, 0, sizeof(localsock));
    localsock.sin_family = AF_INET;
    localsock.sin_addr.s_addr = INADDR_ANY;
    localsock.sin_port = htons(NUM_PORT);

    // create a socket
    if ((_nSocketID = socket(PF_INET, SOCK_STREAM, 0)) < 0)
        ExitError("Error: socket()");

    // make socket reuseable
    if (setsockopt(_nSocketID, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
        ExitError("Error: setsockopt()");

    // bind to socket
    if (bind(_nSocketID, (struct sockaddr*) &localsock, sizeof(localsock)) < 0)
        ExitError("Error: bind()");

    int paddrlen = (int) sizeof(remotesock);

    // get socket name
    if (getsockname(_nSocketID, (struct sockaddr*) &remotesock, &paddrlen) < 0)
        ExitError("Error: getsockname()");

    // listen on socket
    if (listen(_nSocketID, MAX_QUEUE) < 0)
        ExitError("Error: listen()");
    
    // initialize the thread attribute
    if (pthread_attr_init(&attr) != 0)
        ExitError("Error: pthread_attr_init()");
    
    // Set the stack size of the thread, the smaller this is, the larger the max threads can be created
    if (pthread_attr_setstacksize(&attr, 120*1024) != 0)
        ExitError("Error: pthread_attr_setstacksize()");
    
    // Set thread to detached state. No need for pthread_join
    if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0)
        ExitError("Error: pthread_attr_setdetachstate()");

    Log("localhost", 3, ntohs(remotesock.sin_port), "Server Started.\n");

    while (1)
    {
        // acccept incomming client connection
        if ((clientsd = accept(_nSocketID, (struct sockaddr*) &remotesock, &paddrlen)) < 0)
            ExitError("Error: accept()");

        // serve each client with its own thread
        if (pthread_create(&tid, &attr, (void*) ServeClientThread, &clientsd) != 0)
            ExitError("Error: pthread_create()");
        
        // pthread_join(tid, NULL);
        // close(clientsd);
    }
    CleanUp();

    return 0;
}