// ==========================================================================
// Author: Yee Hsu
// Date: 6/7/2010
//
// Desc: Simple Mass Connection Server.
// Allows muliple simulataneous connection oriented logins.
// Server can serve muliple instances of users by creating a
// thread per user and servicing them.
// ==========================================================================
// ----------------------------------------------------------------------------------------------------
// ----- includes -----
// ----------------------------------------------------------------------------------------------------
#include <sys/syscall.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <signal.h>
#include <fcntl.h>
#include <netdb.h>
#include <mysql.h>
#include <errno.h>
#include <stdio.h>
#include <time.h>
// ----------------------------------------------------------------------------------------------------
// ----- log level -----
// higher the log level, the more log output to file
//
// LEVEL 0 No Logging
// LEVEL 1 + data received loggings
// LEVEL 2 + more log + error logs
// LEVEL 3 + more log + signal logs
// LEVEL 4 + more log + additional error logs
// LEVEL 5 + maximum detail logs
// ----------------------------------------------------------------------------------------------------
#define LOG_LEVEL 5
// ----------------------------------------------------------------------------------------------------
// ----- constants -----
// ----------------------------------------------------------------------------------------------------
#define MYSQLLOCAL_HOST "localhost"
#define MYSQLLOCAL_USER "root"
#define MYSQLLOCAL_PASSWORD "password"
#define MYSQLLOCAL_DATABASE "dbtable"
#define REC_STATUS_ACTIVE 'A'
#define MAX_SOCKET 8192
#define MAX_BUFFER 8192
#define MAX_QUEUE 8192
#define NUM_PORT 9999 // your server port number
#define MAX_EXPECTED_TOKENS 14
#define HB_RESP ">ALIVE\r\n"
// ----------------------------------------------------------------------------------------------------
// ----- global variables -----
// ----------------------------------------------------------------------------------------------------
MYSQL *_oMySQL; // mysql handler
int _nSocketID; // socket descriptor
// ----------------------------------------------------------------------------------------------------
// ----- interfaces -----
// ----------------------------------------------------------------------------------------------------
void CleanUp(void);
void ExitSignal(int);
void KillThread(int);
void ExitError(const char*);
void Log(const char*, const int, const int, const char*);
void ReplaceSocket(const int, const char*);
long IsThreadValid(const int);
void* ServeClientThread(void*);
struct stDeviceSocket
{
int deviceid;
int socketid;
pid_t threadpd;
pthread_t threadid;
};
// ----------------------------------------------------------------------------------------------------
// ----- functions -----
// ----------------------------------------------------------------------------------------------------
/* On exit procedure. */
void CleanUp(void)
{
mysql_close(_oMySQL);
close(_nSocketID);
}
/* On exit by signals */
void ExitSignal(int sig)
{
Log("localhost", 3, NUM_PORT, "FlexTrac Device Server Stopped.\n");
CleanUp();
exit(0);
}
/* On exit procedure. */
void ExitError(const char* szMsg)
{
char strbuf[MAX_BUFFER] = {0};
sprintf(strbuf, "%s - %d - %s\n", szMsg, errno, strerror(errno));
Log("localhost", 2, NUM_PORT, strbuf);
CleanUp();
exit(-1);
}
void KillThread(int sig)
{
// do nothing,
// so when function returns, recv() becomes nonblock and exists
// pthread_exit(NULL);
}
long IsThreadValid(const int ntpid)
{
char spath[MAX_BUFFER] = {0};
memset(spath, 0, sizeof(spath));
sprintf(spath, "/proc/%d/task/%ld", getpid(), ntpid);
return access(spath, F_OK);
}
/* Write raw data to log file */
void Log(const char* szIpAddr, const int nLevel, const int nSocket, const char* szData)
{
if (LOG_LEVEL < nLevel)
return;
char buffer[MAX_BUFFER+64] = {0};
time_t curtime;
struct tm *loctime;
curtime = time(NULL);
loctime = localtime(&curtime);
strftime(buffer, sizeof(buffer), "/var/log/s6138_%Y%m%d.log", loctime);
FILE* fp = NULL;
if ((fp = fopen(buffer, "a")) != NULL)
{
memset(buffer, 0, sizeof(buffer));
strftime(buffer, sizeof(buffer), "%F %T", loctime);
sprintf(buffer, "%s\t%s\t%d\t%s", buffer, szIpAddr, nSocket, szData);
fprintf(fp, "%s", buffer);
fclose(fp);
}
}
/* Replace and close existing socket descriptor with new descriptor if it had already exist */
void ReplaceSocket(const int nSocket, const char* szData)
{
static pthread_mutex_t id_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct stDeviceSocket stdev[MAX_SOCKET] = {0};
int i = 0;
if (nSocket >= MAX_SOCKET)
return;
if (strlen(szData) < MAX_EXPECTED_TOKENS)
return;
char szDevice[8] = {0};
memset(szDevice, 0, sizeof(szDevice));
strncpy(szDevice, szData+2, 6);
int nDevice = atoi(szDevice);
// entire function here is a critical section
pthread_mutex_lock(&id_mutex);
for (i = 0; i < MAX_SOCKET; i++)
{
if (stdev[i].deviceid == 0)
{
// new socket, add device
stdev[i].deviceid = nDevice;
stdev[i].socketid = nSocket;
stdev[i].threadid = pthread_self();
stdev[i].threadpd = syscall(SYS_gettid); // get thread task process id
break;
}
if (stdev[i].deviceid == nDevice)
{
if (stdev[i].socketid != nSocket)
{
// old socket exist, replace it
if (IsThreadValid(stdev[i].threadpd) == 0) // test if thread is alive
{
fcntl(stdev[i].socketid, F_SETFL, O_NONBLOCK); // make client thread recv() nonblock
pthread_kill(stdev[i].threadid, SIGALRM); // send a signal to thread
//pthread_cancel(stdev[i].threadid);
}
Log("localhost", 5, nSocket, "Socket Replaced\n");
stdev[i].socketid = nSocket;
}
stdev[i].threadid = pthread_self(); // reset thread id in case client close itself and establish using same socket
stdev[i].threadpd = syscall(SYS_gettid); // get thread task process id
break;
}
}
pthread_mutex_unlock(&id_mutex);
}
/*
Client Thread. Each client is executed on its own thread.
Continueously read and serve client until it closes the connection.
*/
void* ServeClientThread(void* pArg)
{
pthread_detach(pthread_self());
static pthread_mutex_t cs_mutex = PTHREAD_MUTEX_INITIALIZER;
struct sockaddr_in clientsock;
struct timeval tv;
int clientsd = *((int*)(pArg));
socklen_t paddrlen = (socklen_t) sizeof(clientsock);
long readsize = 0, ret;
char szClientDataBuff[MAX_BUFFER+1] = {0};
char szSqlStatement[MAX_BUFFER+1] = {0};
// for connection timeouts, set and clear file des, set timeout
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(clientsd, &rfds);
tv.tv_sec = 300;
tv.tv_usec = 0;
// get client info
getpeername(clientsd, (struct sockaddr*) &clientsock, &paddrlen);
Log(inet_ntoa(clientsock.sin_addr), 2, clientsd, "Client Connected.\n");
// handle certain signals
signal(SIGALRM, KillThread); // uses an interrupt to exit curent thread
while (1)
{
memset(szClientDataBuff, 0, sizeof(szClientDataBuff));
memset(szSqlStatement, 0, sizeof(szSqlStatement));
// wait until socket stream has data to read
if ((ret = select(clientsd+1, &rfds, NULL, NULL, &tv)) <= 0)
{
if (ret == 0)
{
Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "select(0) break, timed out.\n");
}
else if (ret == -1)
{
Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "select(-1) break, socket replaced. closing.\n");
}
else
{
Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "select(?) break, error unknown.\n");
}
break;
}
// read packet from client, get flextrac string
// blocking call, will eventually catch a signal to wake up and kill thread
if ((readsize = recv(clientsd, szClientDataBuff, MAX_BUFFER, 0)) <= 0)
{
if (readsize == 0)
{
Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "recv(0) break, client closed connection.\n");
}
else if (readsize == -1)
{
Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "recv(-1) break, socket replaced. closing.\n");
}
else
{
Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "recv(?) break, error unknown.\n");
}
break;
}
// log any and all data sent to server
Log(inet_ntoa(clientsock.sin_addr), 1, clientsd, szClientDataBuff);
tv.tv_sec = 86400;
// respond to incoming heartbeat request
if (strncmp(szClientDataBuff, ">H", 2) == 0)
{
// awkknowledges heartbeat
if (send(clientsd, HB_RESP, 8, 0) <= 0)
{
Log(inet_ntoa(clientsock.sin_addr), 4, clientsd, "send(HB) error.\n");
break;
}
// check for previous connection, if so, close it
ReplaceSocket(clientsd, szClientDataBuff);
// log heartbeat request and response
Log(inet_ntoa(clientsock.sin_addr), 2, clientsd, HB_RESP);
}
// respond to incoming new Flextrac string
if (strncmp(szClientDataBuff, ">C", 2) == 0 || strncmp(szClientDataBuff, ">B", 2) == 0)
{
// check for previous connection, if so, close it
ReplaceSocket(clientsd, szClientDataBuff);
// truncate any data beyond newline
char* pp = strchr(szClientDataBuff, '\n');
pp != NULL ? *(pp+1) = 0 : 0;
// critical section, write to database
pthread_mutex_lock(&cs_mutex);
mysql_query(_oMySQL, szSqlStatement);
pthread_mutex_unlock(&cs_mutex);
}
}
Log(inet_ntoa(clientsock.sin_addr), 2, clientsd, "Client Closed.\n");
close(clientsd);
pthread_exit(NULL);
}
/*
Main Driver Routine
*/
int main(int argc, char* argv[])
{
const int reuse = 1;
pthread_t tid;
pthread_attr_t attr;
int err, clientsd;
struct sockaddr_in localsock;
struct sockaddr_in remotesock;
// handle certain signals
signal(SIGPIPE, SIG_IGN); // ignore client self term
signal(SIGINT, ExitSignal); // close on interrupts
signal(SIGTERM, ExitSignal); // close on terminates
// init mysql
if ((_oMySQL = mysql_init(NULL)) == NULL)
ExitError("Error: mysql_init()");
// select db to connect
if (mysql_real_connect(_oMySQL, MYSQLLOCAL_HOST, MYSQLLOCAL_USER, MYSQLLOCAL_PASSWORD, MYSQLLOCAL_DATABASE, MYSQL_PORT, NULL, 0) == NULL)
ExitError("Error: mysql_real_connect()");
_oMySQL->reconnect = 1;
// setup local socket
memset(&localsock, 0, sizeof(localsock));
localsock.sin_family = AF_INET;
localsock.sin_addr.s_addr = INADDR_ANY;
localsock.sin_port = htons(NUM_PORT);
// create a socket
if ((_nSocketID = socket(PF_INET, SOCK_STREAM, 0)) < 0)
ExitError("Error: socket()");
// make socket reuseable
if (setsockopt(_nSocketID, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
ExitError("Error: setsockopt()");
// bind to socket
if (bind(_nSocketID, (struct sockaddr*) &localsock, sizeof(localsock)) < 0)
ExitError("Error: bind()");
int paddrlen = (int) sizeof(remotesock);
// get socket name
if (getsockname(_nSocketID, (struct sockaddr*) &remotesock, &paddrlen) < 0)
ExitError("Error: getsockname()");
// listen on socket
if (listen(_nSocketID, MAX_QUEUE) < 0)
ExitError("Error: listen()");
// initialize the thread attribute
if (pthread_attr_init(&attr) != 0)
ExitError("Error: pthread_attr_init()");
// Set the stack size of the thread, the smaller this is, the larger the max threads can be created
if (pthread_attr_setstacksize(&attr, 120*1024) != 0)
ExitError("Error: pthread_attr_setstacksize()");
// Set thread to detached state. No need for pthread_join
if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0)
ExitError("Error: pthread_attr_setdetachstate()");
Log("localhost", 3, ntohs(remotesock.sin_port), "Server Started.\n");
while (1)
{
// acccept incomming client connection
if ((clientsd = accept(_nSocketID, (struct sockaddr*) &remotesock, &paddrlen)) < 0)
ExitError("Error: accept()");
// serve each client with its own thread
if (pthread_create(&tid, &attr, (void*) ServeClientThread, &clientsd) != 0)
ExitError("Error: pthread_create()");
// pthread_join(tid, NULL);
// close(clientsd);
}
CleanUp();
return 0;
}