// ==========================================================================
// Author: Yee Hsu
// Date: 6/10/2012
//
// Desc: Watch Folder Manager. Uses a library that instantly watches
// a folder for file existance, modification, deletion, etc.
// Once a file has been add/del/mod it will notify your library
// with a callback. Very handy for real time processing while
// watching a folder for file control.
// ==========================================================================
#include "WatchFolderImporter.h"
#include <boost/algorithm/string.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/timer.hpp>
#include <boost/assign.hpp>
#ifdef WIN32
#include "CWatchFolder_Win.h"
#else
#include "CWatchFolder_Mac.h"
#endif
#include "ImportDummy.h"
namespace __ET__
{
ImportInterface* WatchFolderImporter::defaultImportInterface = new ImportDummy();
WatchFolderImporter::WatchFolderImporter()
: m_bInitialized(false)
, m_watch_map()
, m_excluded_pack()
, m_pwf(NULL)
, m_pimp(NULL)
, m_pt_ScanOnce(NULL)
, m_pt_ScanWorker(NULL)
, m_pt_CacheProcessor(NULL)
, m_context(NULL)
, m_callback(NULL)
{
}
WatchFolderImporter::~WatchFolderImporter()
{
}
WatchFolderImporter& WatchFolderImporter::Instance()
{
static WatchFolderImporter _instance;
return _instance;
}
bool WatchFolderImporter::Init(ImportInterface* impl, IWatchFolder::WatchCallback callback, void* context)
{
ET_TRACE("WatchFolderImporter::Init");
if (!m_bInitialized)
{
if (impl)
{
m_pimp = impl;
}
if (!m_pimp)
{
m_pimp = defaultImportInterface;
}
ET_ASSERT(m_pimp);
if (!m_pwf)
{
m_context = context;
m_callback = callback;
#if defined(__GNUC__)
m_pwf = static_cast<IWatchFolder*>(new CWatchFolder_Mac());
#elif defined(_WIN32)
m_pwf = static_cast<IWatchFolder*>(new CWatchFolder_Win());
#endif
ET_ASSERT(m_pwf);
if (m_pwf)
{
m_pwf->Init(WatchCallback, this);
m_pwf->GetExcludedPackageExtensions(m_excluded_pack);
}
}
m_bInitialized=true;
// load watch folders
if (m_pimp)
{
m_pimp->LoadFolderList();
LoadWatchFolder();
}
// cache file processor
if (!m_pt_CacheProcessor)
{
m_pt_CacheProcessor = m_tm_CacheProcessor.CreateThread(new WatchFolderImporter::CacheProcessor(this));
m_tm_CacheProcessor.StartAll();
}
// folder scanner thread
if (!m_pt_ScanWorker)
{
m_pt_ScanWorker = m_tm_ScanWorker.CreateThread(new WatchFolderImporter::ScanWorker(this));
m_tm_ScanWorker.StartAll();
}
}
return m_bInitialized;
}
void WatchFolderImporter::Shutdown()
{
ET_TRACE("[WatchFolderImporter::Shutdown] Entered");
if (m_bInitialized)
{
try
{
if (m_pimp)
{
m_pimp->SaveFolderList();
}
if (m_pt_ScanOnce)
{
m_tm_ScanOnce.StopAll();
m_pt_ScanOnce = NULL;
}
if (m_pt_ScanWorker)
{
m_tm_ScanWorker.StopAll();
m_pt_ScanWorker = NULL;
}
if (m_pt_CacheProcessor)
{
m_tm_CacheProcessor.StopAll();
m_pt_CacheProcessor = NULL;
}
if (m_pwf)
{
m_pwf->Shutdown();
delete m_pwf;
m_pwf = NULL;
}
}
catch (std::exception& e)
{
ET_ERROR( "Error in WatchFolderImporter::Shutdown: %s", e.what());
}
}
m_bInitialized=false;
}
void WatchFolderImporter::WatchCallback(int watchid, IWatchFolder::FileAction action, String rootpath, String filepath, void* context)
{
WatchFolderImporter* pwfm = static_cast<WatchFolderImporter*>(context);
if (!pwfm)
return;
#if defined(__GNUC__)
String spath = rootpath + "/" + filepath;
#elif defined(_WIN32)
String spath = rootpath + "\\" + filepath;
#endif
try
{
switch (action)
{
case IWatchFolder::FILE_ADD:
pwfm->HandleAdd(spath);
break;
case IWatchFolder::FILE_MODIFIED:
pwfm->HandleMod(spath);
break;
case IWatchFolder::FILE_RENAME_FROM:
pwfm->HandleRenameFm(spath);
break;
case IWatchFolder::FILE_RENAME_TO:
pwfm->HandleRenameTo(spath);
break;
case IWatchFolder::FILE_DELETE:
pwfm->HandleDel(spath);
break;
default:
ET_TRACE("WatchFolderImporter : Unknown [%d][%d][%s]\n", watchid, action, spath.c_str());
break;
}
if (pwfm && pwfm->m_callback)
{
pwfm->m_callback(watchid, action, rootpath, filepath, pwfm->m_context);
}
}
catch (std::exception& ex)
{
ET_ERROR( "Error in WatchFolderImporter::WatchCallback: %s", ex.what());
}
}
bool WatchFolderImporter::IsPathSubPathOfWatchFolder( const String& sDir )
{
WatchFolderInfoList folders;
if (GetWatchFolders(folders))
{
for (WatchFolderInfoList::iterator iter = folders.begin(); iter != folders.end(); ++iter)
{
if (iter->status.compare(WATCH_STATUS_DELETED) != 0)
{
if (BeginWith(sDir, iter->path))
return true;
}
}
}
return false;
}
long WatchFolderImporter::IsPathSuperPathOfWatchFolder(const String& sDir, std::vector<String>& slist)
{
WatchFolderInfoList folders;
slist.clear();
if (GetWatchFolders(folders))
{
for (WatchFolderInfoList::iterator iter = folders.begin(); iter != folders.end(); ++iter)
{
if (iter->status.compare(WATCH_STATUS_DELETED) != 0)
{
if (BeginWith(iter->path, sDir))
{
slist.push_back(iter->path);
}
}
}
}
return slist.size();
}
bool __ET__::WatchFolderImporter::AddWatchFolder( const String& sDir )
{
if (!m_pwf)
return false;
try
{
if (IsValidDirectory(sDir))
{
std::vector<String> slist;
// filter ignored list
if (IsMediaFiltered(sDir))
return false;
// super path exist, ignore request
if (IsPathSubPathOfWatchFolder(sDir))
return false;
// sub paths exist, delete old watch folders
if (IsPathSuperPathOfWatchFolder(sDir, slist) > 0)
{
for (std::vector<String>::iterator iter = slist.begin(); iter != slist.end(); ++iter)
DelWatchFolder(*iter);
}
// now add the new watch folder
if (m_pwf->GetWatchFolder(sDir) == 0)
{
m_pwf->AddWatchFolder(sDir, true);
AddWatchFolderInfo(sDir);
{
Thread::BoostScopedLock lock(m_tmutex_once);
m_path_once = sDir;
m_pt_ScanOnce = m_tm_ScanOnce.CreateThread(new WatchFolderImporter::ScanOnce(this));
m_tm_ScanOnce.StartAll();
}
SaveWatchFolder();
return true;
}
}
}
catch (std::exception& e)
{
ET_ERROR( "Error in WatchFolderImporter::AddWatchFolder: %s", e.what());
}
return false;
}
bool __ET__::WatchFolderImporter::DelWatchFolder( const String& sDir )
{
if (!m_pwf)
return false;
try
{
int wid = 0;
if ((wid = m_pwf->GetWatchFolder(sDir)))
{
m_pwf->DelWatchFolder(wid);
DelWatchFolderInfo(sDir);
SaveWatchFolder();
return true;
}
else
{
ET_WARN("Warn: Watchfolder %s doesn't exist!", sDir.c_str());
return false;
}
}
catch (std::exception& e)
{
ET_ERROR( "Error in WatchFolderImporter::DelWatchFolder: %s", e.what());
}
return false;
}
int __ET__::WatchFolderImporter::GetWatchFolders( WatchFolderInfoList& folders ) const
{
if (m_pimp)
{
m_pimp->GetFolderList(folders);
return folders.size();
}
return 0;
}
bool WatchFolderImporter::GetWatchFolderInfo( const String& sDir, WatchFolderInfo& wi) const
{
if (m_pimp)
{
if (m_pimp->FindWatchFolder(sDir, wi))
{
return true;
}
}
return false;
}
void __ET__::WatchFolderImporter::LoadWatchFolder()
{
if (m_pimp)
{
WatchFolderInfoList watch_folders;
m_pimp->GetFolderList(watch_folders);
for (WatchFolderInfoList::iterator iter = watch_folders.begin(); iter != watch_folders.end(); ++iter)
{
m_pwf->AddWatchFolder(iter->path, true);
AddWatchFolderInfo(iter->path);
}
}
}
void __ET__::WatchFolderImporter::SaveWatchFolder()
{
if (m_pimp)
{
m_pimp->SaveFolderList();
}
}
void __ET__::WatchFolderImporter::HandleAdd( const String& str )
{
try
{
if (!m_pimp)
return;
if (IsValidDirectory(str))
{
if (IsMediaFiltered(str))
return;
if (!m_pimp->IsMediaFolderExists(str))
{
m_pimp->AddMediaFolder(str);
ScanWatchFolder(str);
}
}
else if (IsMediaFile(str))
{
AddMediaFile(str);
}
}
catch (std::exception& ex)
{
ET_ERROR("HandleAdd: %s", ex.what());
}
}
void WatchFolderImporter::HandleMod( const String& str )
{
// TO DO...
}
void __ET__::WatchFolderImporter::HandleDel( const String& str )
{
if (!m_pimp)
return;
if (IsMediaFile(str))
{
m_pimp->DelMediaFile(str);
}
//else if (IsValidDirectory(str))
else // FIXME: all other files/folders will call this ...
{
m_pimp->DelMediaFolder(str);
}
}
void __ET__::WatchFolderImporter::HandleRenameFm( const String& str )
{
HandleDel(str);
}
void __ET__::WatchFolderImporter::HandleRenameTo( const String& str )
{
HandleAdd(str);
}
long __ET__::WatchFolderImporter::ScanWatchFolderPreparation( const String& sDir )
{
long ncount = 0;
try
{
if (IsValidDirectory(sDir))
{
UpdateWatchFolderStatus(sDir, WATCH_STATUS_SCANNING);
ncount = ScanWatchFolder(sDir);
UpdateWatchFolderStatus(sDir, WATCH_STATUS_SCANNED);
String scan_time = boost::lexical_cast<String>(TimeNow());
UpdateWatchFolderScanTime(sDir, scan_time);
AddWatchFolderImportedNumber(sDir, ncount);
}
else
{
UpdateWatchFolderStatus(sDir, WATCH_STATUS_DISCONNECTED);
}
SaveWatchFolder();
}
catch (std::exception& ex)
{
ET_ERROR( "Error in WatchFolderImporter::ScanWatchFolderPreparation: %s", ex.what());
}
return ncount;
}
long __ET__::WatchFolderImporter::ScanWatchFolder( const String& sDir )
{
long ncount = 0;
if(!IsValidDirectory(sDir))
return ncount;
if (IsMediaFiltered(sDir))
return ncount;
try
{
bf::directory_iterator end_iter ;
for ( bf::directory_iterator iter( sDir ) ; iter != end_iter ; ++iter )
{
bf::path dir = *iter;
if ( bf::is_directory( *iter ) )
{
if (m_pimp && !m_pimp->IsMediaFolderExists(dir.native_directory_string()))
m_pimp->AddMediaFolder(dir.native_directory_string());
ncount += ScanWatchFolder( dir.native_directory_string() );
}
else
{
if (IsMediaFile(dir.native_file_string()))
{
if (AddMediaFile(dir.native_file_string()))
ncount++;
}
}
}
}
catch (std::exception& ex)
{
ET_ERROR( "Error in WatchFolderImporter::ScanWatchFolder: %s", ex.what());
}
return ncount;
}
bool __ET__::WatchFolderImporter::ContainVideoOrAudioFiles(const Path& p)
{
if (!IsValidDirectory(p))
return false;
bf::directory_iterator end_iter;
for (bf::directory_iterator iter(p); iter != end_iter; ++iter)
{
if (IsValidFile( *iter ))
{
if (IsVideoFile( *iter ))
return true;
if (IsMusicFile( *iter ))
return true;
}
}
return false;
}
bool __ET__::WatchFolderImporter::AddMediaFile( const String& sFile )
{
try
{
if (!IsValidFile(sFile))
return false;
if (IsMediaFiltered(sFile))
return false;
#if !defined(ENABLE_PHOTO_IMPORTING)
// TEMPORARY: remove image/photo, we dont want to import these files yet
if (IsPictureFile(sFile))
return false;
// END TEMP
#endif
if (IsPictureFile(sFile))
{
Path p = sFile;
if (ContainVideoOrAudioFiles(p.parent_path()))
return false;
}
// insert file into cache, for processing into db
ET_LOG( "WatchFolderImporter::AddMediaFile: Detected File : %s", sFile.c_str());
{
Thread::BoostScopedLock lock(m_tmutex_cache);
m_watch_map[sFile] = false;
}
return true;
}
catch (std::exception& e)
{
ET_ERROR( "Error in WatchFolderImporter::AddMediaFile: %s", e.what());
}
return false;
}
void __ET__::WatchFolderImporter::AddWatchFolderInfo( const String& sDir )
{
WatchFolderInfo wfi;
wfi.id = 0;
wfi.path = sDir;
wfi.status = WATCH_STATUS_NORMAL;
wfi.type = "";
wfi.scan_time = "0";
wfi.update_time = "0";
wfi.num_imported = 0;
wfi.num_queued = 0;
wfi.num_total = 0;
wfi.num_unknown = 0;
if (m_pimp)
m_pimp->AddWatchFolder(wfi);
if (UpdateWatchFolderStatus(sDir, WATCH_STATUS_NORMAL))
return;
}
void __ET__::WatchFolderImporter::DelWatchFolderInfo( const String& sDir )
{
if (m_pimp)
{
m_pimp->DelWatchFolder(sDir);
}
}
bool WatchFolderImporter::AddWatchFolderImportedNumber( const String& sDir, int nNum )
{
if (m_pimp)
{
return m_pimp->AddWatchFolderImportedNumber(sDir, nNum);
}
return false;
}
bool WatchFolderImporter::GetStatus( JsonObject& jobj ) const
{
try
{
JsonArray jarr;
WatchFolderInfoList wlist;
GetWatchFolders(wlist);
for (WatchFolderInfoList::const_iterator iter = wlist.begin(); iter != wlist.end(); ++iter)
{
JsonObject _job;
_job.Set("path", iter->path);
_job.Set("status", iter->status);
_job.Set("scan_time", iter->scan_time);
_job.Set("update_time", iter->update_time);
_job.Set("num_imported", iter->num_imported);
_job.Set("num_total", iter->num_total);
jarr.Add(_job);
}
jobj.Set("Folders", jarr);
return true;
}
catch (std::exception& ex)
{
ET_ERROR("[%s] Error: %s", __func__, ex.what());
}
return false;
}
bool WatchFolderImporter::IsMediaFiltered( const Path& p ) const
{
try
{
String _filter;
// filter path ends with
static std::set<String> _end = boost::assign::list_of
(".app")
;
_filter = p.extension();
boost::to_lower(_filter);
if (_end.find(_filter) != _end.end())
return true;
// filter system path (full path start with system path)
static std::set<String> _begin = boost::assign::list_of
("c:\\windows")
("c:\\program files")
("c:\\appdata")
("/usr")
("/system")
("/sys")
("/opt")
("/etc")
;
_filter = p.native_file_string();
boost::to_lower(_filter);
for (std::set<String>::iterator iter = _begin.begin(); iter != _begin.end(); ++iter)
{
if (BeginWith(_filter, *iter))
return true;
}
// filter hidden path (path begins with a special character)
static std::set<String> _hidden = boost::assign::list_of
(".")
("~")
;
_filter = p.filename();
for (std::set<String>::iterator iter = _hidden.begin(); iter != _hidden.end(); ++iter)
{
if (BeginWith(_filter, *iter))
return true;
}
_filter = p.relative_path().parent_path().filename();
for (std::set<String>::iterator iter = _hidden.begin(); iter != _hidden.end(); ++iter)
{
if (BeginWith(_filter, *iter))
return true;
}
// filter parent directory ends with xxx, such as packages
for (std::set<String>::iterator iter = m_excluded_pack.begin(); iter != m_excluded_pack.end(); ++iter)
{
if (EndWith(_filter, *iter))
return true;
}
// filter path exactly contains
static std::set<String> _exa = boost::assign::list_of
(".")
("c:")
("c:\\")
("c:\\windows")
("c:\\program files")
("c:\\appdata")
;
_filter = p.stem();
if (_filter.empty())
return true;
boost::to_lower(_filter);
if (_exa.find(_filter) != _exa.end())
return true;
_filter = p.native_file_string();
boost::to_lower(_filter);
if (_exa.find(_filter) != _exa.end())
return true;
// filter path contains
static std::set<String> _pcon = boost::assign::list_of
("imovie ")
("cache")
("sample")
("trailer")
;
_filter = p.native_directory_string();
boost::to_lower(_filter);
for (std::set<String>::iterator iter = _pcon.begin(); iter != _pcon.end(); ++iter)
{
if (_filter.find(*iter) != String::npos)
return true;
}
// filter filename contains
static std::set<String> _con = boost::assign::list_of
("sample")
("trailer")
;
_filter = p.filename();
boost::to_lower(_filter);
for (std::set<String>::iterator iter = _con.begin(); iter != _con.end(); ++iter)
{
if (_filter.find(*iter) != String::npos)
return true;
}
}
catch (std::exception& ex)
{
ET_ERROR("IsMediaFiltered: %s", ex.what());
return true;
}
return false;
}
long WatchFolderImporter::ScanFolder( const String& sDir )
{
Thread::BoostScopedLock lock(m_tmutex_once);
m_path_once = sDir;
m_pt_ScanOnce = m_tm_ScanOnce.CreateThread(new WatchFolderImporter::ScanOnce(this));
m_tm_ScanOnce.StartAll();
return 0;
}
bool __ET__::WatchFolderImporter::UpdateWatchFolderStatus( const String& sDir, const String& status )
{
if (m_pimp)
{
return m_pimp->UpdateWatchFolderStatus(sDir, status);
}
return false;
}
bool __ET__::WatchFolderImporter::UpdateWatchFolderScanTime( const String& sDir, const String& stime )
{
if (m_pimp)
{
return m_pimp->UpdateWatchFolderScanTime(sDir, stime);
}
return false;
}
void WatchFolderImporter::ScanOnce::DoTask()
{
ET_ASSERT(m_pObj);
try
{
if (m_pObj)
{
m_pObj->ScanWatchFolderPreparation(m_pObj->m_path_once);
}
}
catch (std::exception& e)
{
ET_ERROR( "Error in WatchFolderImporter::ScanOnce: %s", e.what());
}
Stop();
}
void WatchFolderImporter::ScanWorker::DoTask()
{
// Thread::Sleep(5000); // added additional delay to prevent race condition
// DO SCAN TASK EVERY 5 SECONDS
static time_t lastScanTime = 0;
time_t now = TimeNow();
if (now - lastScanTime < 5)
{
Thread::Sleep(250);
return;
}
lastScanTime = now;
try
{
ET_ASSERT(m_pObj);
if (m_pObj)
{
WatchFolderInfoList folders;
m_pObj->GetWatchFolders(folders);
for (WatchFolderInfoList::iterator iter = folders.begin(); iter != folders.end(); ++iter)
{
// FIXME: IF the Watchfolder doesn't exist,
// DON'T scan it, and if the watchfolder is a local path, remove it from watchfolder list
if (!Exists(iter->path))
{
continue;
}
if (iter->status.compare(WATCH_STATUS_SCANNING) != 0)
{
time_t t = TimeNow();
if (t - boost::lexical_cast<time_t>(iter->scan_time) > (SCAN_INTERVAL_HOURS * 60 * 60))
{
ET_LOG( "Scanning folder [%s]", iter->path.c_str() );
m_pObj->ScanWatchFolderPreparation(iter->path);
}
}
}
}
}
catch (std::exception& e)
{
ET_ERROR( "Error in WatchFolderImporter::ScanWorker: %s", e.what());
}
}
void WatchFolderImporter::CacheProcessor::DoTask()
{
static time_t lastScanTime = 0;
time_t now = TimeNow();
if (now - lastScanTime < FILE_CHECK_TIME_SEC)
{
Thread::Sleep(250);
return;
}
lastScanTime = now;
//Thread::Sleep(FILE_CHECK_TIME_SEC * 1000);
ET_ASSERT(m_pObj);
try
{
if (m_pObj && m_pObj->m_pimp && m_pObj->m_pwf)
{
for (WatchMap::iterator iter = m_pObj->m_watch_map.begin(); iter != m_pObj->m_watch_map.end(); ++iter)
{
if (!iter->second && Exists(iter->first) && (bf::file_size(iter->first) > 0))
{
// FIXME: bug exist if file was imported, it cannot be imported again if file was deleted and re-add
// check filesize does NOT work, copy/download does not change filesize
// check last_mod does NOT work, copy/download does not change last_mod
// check exclusive read/write access to file
if (m_pObj->m_pwf->HasExclusiveAccess(iter->first))
{
{
Thread::BoostScopedLock lock(m_pObj->m_tmutex_cache);
iter->second = true;
}
if (!m_pObj->m_pimp->IsMediaFileExists(iter->first))
{
ET_LOG( "WatchFolderImporter::CacheProcessor: Imported File : %s", iter->first.c_str());
m_pObj->m_pimp->AddMediaFile(iter->first);
}
}
}
}
}
}
catch (std::exception& e)
{
ET_ERROR( "Error in WatchFolderImporter::CacheProcessor: %s", e.what());
}
}
}