Watcher.cc 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. #include "Watcher.hh"
  2. #include <unordered_set>
  3. using namespace Napi;
  4. struct WatcherHash {
  5. std::size_t operator() (WatcherRef const &k) const {
  6. return std::hash<std::string>()(k->mDir);
  7. }
  8. };
  9. struct WatcherCompare {
  10. size_t operator() (WatcherRef const &a, WatcherRef const &b) const {
  11. return *a == *b;
  12. }
  13. };
  14. static std::unordered_set<WatcherRef , WatcherHash, WatcherCompare>& getSharedWatchers() {
  15. static std::unordered_set<WatcherRef , WatcherHash, WatcherCompare>* sharedWatchers =
  16. new std::unordered_set<WatcherRef , WatcherHash, WatcherCompare>();
  17. return *sharedWatchers;
  18. }
  19. WatcherRef Watcher::getShared(std::string dir, std::unordered_set<std::string> ignorePaths, std::unordered_set<Glob> ignoreGlobs) {
  20. WatcherRef watcher = std::make_shared<Watcher>(dir, ignorePaths, ignoreGlobs);
  21. auto found = getSharedWatchers().find(watcher);
  22. if (found != getSharedWatchers().end()) {
  23. return *found;
  24. }
  25. getSharedWatchers().insert(watcher);
  26. return watcher;
  27. }
  28. void removeShared(Watcher *watcher) {
  29. for (auto it = getSharedWatchers().begin(); it != getSharedWatchers().end(); it++) {
  30. if (it->get() == watcher) {
  31. getSharedWatchers().erase(it);
  32. break;
  33. }
  34. }
  35. // Free up memory.
  36. if (getSharedWatchers().size() == 0) {
  37. getSharedWatchers().rehash(0);
  38. }
  39. }
  40. Watcher::Watcher(std::string dir, std::unordered_set<std::string> ignorePaths, std::unordered_set<Glob> ignoreGlobs)
  41. : mDir(dir),
  42. mIgnorePaths(ignorePaths),
  43. mIgnoreGlobs(ignoreGlobs) {
  44. mDebounce = Debounce::getShared();
  45. mDebounce->add(this, [this] () {
  46. triggerCallbacks();
  47. });
  48. }
  49. Watcher::~Watcher() {
  50. mDebounce->remove(this);
  51. }
  52. void Watcher::wait() {
  53. std::unique_lock<std::mutex> lk(mMutex);
  54. mCond.wait(lk);
  55. }
  56. void Watcher::notify() {
  57. std::unique_lock<std::mutex> lk(mMutex);
  58. mCond.notify_all();
  59. if (mCallbacks.size() > 0 && mEvents.size() > 0) {
  60. // We must release our lock before calling into the debouncer
  61. // to avoid a deadlock: the debouncer thread itself will require
  62. // our lock from its thread when calling into `triggerCallbacks`
  63. // while holding its own debouncer lock.
  64. lk.unlock();
  65. mDebounce->trigger();
  66. }
  67. }
  68. struct CallbackData {
  69. std::string error;
  70. std::vector<Event> events;
  71. CallbackData(std::string error, std::vector<Event> events) : error(error), events(events) {}
  72. };
  73. Value callbackEventsToJS(const Env &env, std::vector<Event> &events) {
  74. EscapableHandleScope scope(env);
  75. Array arr = Array::New(env, events.size());
  76. uint32_t currentEventIndex = 0;
  77. for (auto eventIterator = events.begin(); eventIterator != events.end(); eventIterator++) {
  78. arr.Set(currentEventIndex++, eventIterator->toJS(env));
  79. }
  80. return scope.Escape(arr);
  81. }
  82. void callJSFunction(Napi::Env env, Function jsCallback, CallbackData *data) {
  83. HandleScope scope(env);
  84. auto err = data->error.size() > 0 ? Error::New(env, data->error).Value() : env.Null();
  85. auto events = callbackEventsToJS(env, data->events);
  86. jsCallback.Call({err, events});
  87. delete data;
  88. // Throw errors from the callback as fatal exceptions
  89. // If we don't handle these node segfaults...
  90. if (env.IsExceptionPending()) {
  91. Napi::Error err = env.GetAndClearPendingException();
  92. napi_fatal_exception(env, err.Value());
  93. }
  94. }
  95. void Watcher::notifyError(std::exception &err) {
  96. std::unique_lock<std::mutex> lk(mMutex);
  97. for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
  98. CallbackData *data = new CallbackData(err.what(), {});
  99. it->tsfn.BlockingCall(data, callJSFunction);
  100. }
  101. clearCallbacks();
  102. }
  103. // This function is called from the debounce thread.
  104. void Watcher::triggerCallbacks() {
  105. std::unique_lock<std::mutex> lk(mMutex);
  106. if (mCallbacks.size() > 0 && (mEvents.size() > 0 || mEvents.hasError())) {
  107. auto error = mEvents.getError();
  108. auto events = mEvents.getEvents();
  109. mEvents.clear();
  110. for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
  111. it->tsfn.BlockingCall(new CallbackData(error, events), callJSFunction);
  112. }
  113. }
  114. }
  115. // This should be called from the JavaScript thread.
  116. bool Watcher::watch(Function callback) {
  117. std::unique_lock<std::mutex> lk(mMutex);
  118. auto it = findCallback(callback);
  119. if (it != mCallbacks.end()) {
  120. return false;
  121. }
  122. auto tsfn = ThreadSafeFunction::New(
  123. callback.Env(),
  124. callback,
  125. "Watcher callback",
  126. 0, // Unlimited queue
  127. 1 // Initial thread count
  128. );
  129. mCallbacks.push_back(Callback {
  130. tsfn,
  131. Napi::Persistent(callback),
  132. std::this_thread::get_id()
  133. });
  134. return true;
  135. }
  136. // This should be called from the JavaScript thread.
  137. std::vector<Callback>::iterator Watcher::findCallback(Function callback) {
  138. for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
  139. // Only consider callbacks created by the same thread, or V8 will panic.
  140. if (it->threadId == std::this_thread::get_id() && it->ref.Value() == callback) {
  141. return it;
  142. }
  143. }
  144. return mCallbacks.end();
  145. }
  146. // This should be called from the JavaScript thread.
  147. bool Watcher::unwatch(Function callback) {
  148. std::unique_lock<std::mutex> lk(mMutex);
  149. bool removed = false;
  150. auto it = findCallback(callback);
  151. if (it != mCallbacks.end()) {
  152. it->tsfn.Release();
  153. it->ref.Unref();
  154. mCallbacks.erase(it);
  155. removed = true;
  156. }
  157. if (removed && mCallbacks.size() == 0) {
  158. unref();
  159. return true;
  160. }
  161. return false;
  162. }
  163. void Watcher::unref() {
  164. if (mCallbacks.size() == 0) {
  165. removeShared(this);
  166. }
  167. }
  168. void Watcher::destroy() {
  169. std::unique_lock<std::mutex> lk(mMutex);
  170. clearCallbacks();
  171. }
  172. // Private because it doesn't lock.
  173. void Watcher::clearCallbacks() {
  174. for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
  175. it->tsfn.Release();
  176. it->ref.Unref();
  177. }
  178. mCallbacks.clear();
  179. unref();
  180. }
  181. bool Watcher::isIgnored(std::string path) {
  182. for (auto it = mIgnorePaths.begin(); it != mIgnorePaths.end(); it++) {
  183. auto dir = *it + DIR_SEP;
  184. if (*it == path || path.compare(0, dir.size(), dir) == 0) {
  185. return true;
  186. }
  187. }
  188. auto basePath = mDir + DIR_SEP;
  189. if (path.rfind(basePath, 0) != 0) {
  190. return false;
  191. }
  192. auto relativePath = path.substr(basePath.size());
  193. for (auto it = mIgnoreGlobs.begin(); it != mIgnoreGlobs.end(); it++) {
  194. if (it->isIgnored(relativePath)) {
  195. return true;
  196. }
  197. }
  198. return false;
  199. }