// used by AbstractWatchKey to enqueue key finalvoidenqueueKey(WatchKey key) { pendingKeys.offer(key); }
/** * Throws ClosedWatchServiceException if watch service is closed */ privatevoidcheckOpen() { if (closed) thrownewClosedWatchServiceException(); }
/** * Checks the key isn't the special CLOSE_KEY used to unblock threads when * the watch service is closed. */ privatevoidcheckKey(WatchKey key) { if (key == CLOSE_KEY) { // 重新入队,防止有线程阻塞,直到不再有线程阻塞。 enqueueKey(key); } checkOpen(); }
/** * Tells whether or not this watch service is open. */ finalbooleanisOpen() { return !closed; }
/** * Retrieves the object upon which the close method synchronizes. */ final Object closeLock() { return closeLock; }
/** * Closes this watch service. This method is invoked by the close * method to perform the actual work of closing the watch service. */ abstractvoidimplClose()throws IOException;
// clear pending keys and queue special key to ensure that any // threads blocked in take/poll wakeup pendingKeys.clear(); // 插入一条标志结束的key,防止调用线程一直阻塞 pendingKeys.offer(CLOSE_KEY); } } }
final Set<WatchEvent.Kind<?>> eventSet = newHashSet<WatchEvent.Kind<?>>(events.length); for (WatchEvent.Kind<?> event: events) { // standard events if (event == StandardWatchEventKinds.ENTRY_CREATE || event == StandardWatchEventKinds.ENTRY_MODIFY || event == StandardWatchEventKinds.ENTRY_DELETE){ eventSet.add(event); continue; // 跳过无法识别的 event } // OVERFLOW is ignored if (event == StandardWatchEventKinds.OVERFLOW) { continue; }
// null/unsupported if (event == null) thrownewNullPointerException("An element in event set is 'null'"); thrownewUnsupportedOperationException(event.name()); } if (eventSet.isEmpty()) thrownewIllegalArgumentException("No events to register");
// 改变 polling 的时间间隔,敏感度调节(如果支持) SensitivityWatchEventModifiersensivity= SensitivityWatchEventModifier.MEDIUM; if (modifiers.length > 0) { for (WatchEvent.Modifier modifier: modifiers) { if (modifier == null) thrownewNullPointerException(); if (modifier instanceof SensitivityWatchEventModifier) { sensivity = (SensitivityWatchEventModifier)modifier; continue; } thrownewUnsupportedOperationException("Modifier not supported"); } }
// check if watch service is closed if (!isOpen()) thrownewClosedWatchServiceException();
// registration is done in privileged block as it requires the // attributes of the entries in the directory. try { finalSensitivityWatchEventModifiers= sensivity; return AccessController.doPrivileged( newPrivilegedExceptionAction<PollingWatchKey>() { @Override public PollingWatchKey run()throws IOException { return doPrivilegedRegister(path, eventSet, s); } }); } catch (PrivilegedActionException pae) { Throwablecause= pae.getCause(); if (cause != null && cause instanceof IOException) throw (IOException)cause; thrownewAssertionError(pae); } }
// registers directory returning a new key if not already registered or // existing key if already registered private PollingWatchKey doPrivilegedRegister(Path path, Set<? extends WatchEvent.Kind<?>> events, SensitivityWatchEventModifier sensivity) throws IOException { // check file is a directory and get its file key if possible BasicFileAttributesattrs= Files.readAttributes(path, BasicFileAttributes.class); if (!attrs.isDirectory()) { thrownewNotDirectoryException(path.toString()); } ObjectfileKey= attrs.fileKey(); if (fileKey == null) thrownewAssertionError("File keys must be supported");
// grab close lock to ensure that watch service cannot be closed synchronized (closeLock()) { if (!isOpen()) thrownewClosedWatchServiceException();
// maps a context to the last event for the context (iff the last queued // event for the context is an ENTRY_MODIFY event). private Map<Object,WatchEvent<?>> lastModifyEvents;
final AbstractWatchService watcher() { return watcher; }
/** * Return the original watchable (Path) */ @Override public Path watchable() { return dir; }
/** * Enqueues this key to the watch service */ finalvoidsignal() { synchronized (this) { if (state == State.READY) { // 避免重复入队 state = State.SIGNALLED; watcher.enqueueKey(this); } } }
/** * 发送事件 */ @SuppressWarnings("unchecked") finalvoidsignalEvent(WatchEvent.Kind<?> kind, Object context) { booleanisModify= (kind == StandardWatchEventKinds.ENTRY_MODIFY); synchronized (this) { intsize= events.size(); if (size > 0) { // if the previous event is an OVERFLOW event or this is a // repeated event then we simply increment the counter // 合并相同事件 WatchEvent<?> prev = events.get(size-1); if ((prev.kind() == StandardWatchEventKinds.OVERFLOW) || ((kind == prev.kind() && Objects.equals(context, prev.context())))) { ((Event<?>)prev).increment(); return; }
// if this is a modify event and the last entry for the context // is a modify event then we simply increment the count // 合并修改事件 if (!lastModifyEvents.isEmpty()) { if (isModify) { WatchEvent<?> ev = lastModifyEvents.get(context); if (ev != null) { assert ev.kind() == StandardWatchEventKinds.ENTRY_MODIFY; ((Event<?>)ev).increment(); return; } } else { // not a modify event so remove from the map as the // last event will no longer be a modify event. // 如果不是修改事件,删除最近修改记录。 lastModifyEvents.remove(context); } }
// if the list has reached the limit then drop pending events // and queue an OVERFLOW event if (size >= MAX_EVENT_LIST_SIZE) { kind = StandardWatchEventKinds.OVERFLOW; isModify = false; context = null; } }
// non-repeated event Event<Object> ev = newEvent<Object>((WatchEvent.Kind<Object>)kind, context); if (isModify) { lastModifyEvents.put(context, ev); } elseif (kind == StandardWatchEventKinds.OVERFLOW) { // drop all pending events events.clear(); lastModifyEvents.clear(); } events.add(ev); signal(); // 此处watchkey入队 } }
/** * Polls the directory to detect for new files, modified files, or * deleted files. */ synchronizedvoidpoll() { if (!valid) { return; }
// update tick, 轮询次数 tickCount++;
// open directory DirectoryStream<Path> stream = null; try { stream = Files.newDirectoryStream(watchable()); } catch (IOException x) { // directory is no longer accessible so cancel key cancel(); signal(); return; }
// iterate over all entries in directory try { for (Path entry: stream) { longlastModified=0L; try { lastModified = Files.getLastModifiedTime(entry, LinkOption.NOFOLLOW_LINKS).toMillis(); } catch (IOException x) { // unable to get attributes of entry. If file has just // been deleted then we'll report it as deleted on the // next poll continue; }
// lookup cache, 根据上次快照,产生文件变化事件 CacheEntrye= entries.get(entry.getFileName()); if (e == null) { // new file found entries.put(entry.getFileName(), newCacheEntry(lastModified, tickCount));
// queue ENTRY_CREATE if event enabled if (events.contains(StandardWatchEventKinds.ENTRY_CREATE)) { signalEvent(StandardWatchEventKinds.ENTRY_CREATE, entry.getFileName()); continue; } else { // if ENTRY_CREATE is not enabled and ENTRY_MODIFY is // enabled then queue event to avoid missing out on // modifications to the file immediately after it is // created. if (events.contains(StandardWatchEventKinds.ENTRY_MODIFY)) { signalEvent(StandardWatchEventKinds.ENTRY_MODIFY, entry.getFileName()); } } continue; }
// check if file has changed if (e.lastModified != lastModified) { if (events.contains(StandardWatchEventKinds.ENTRY_MODIFY)) { signalEvent(StandardWatchEventKinds.ENTRY_MODIFY, entry.getFileName()); } } // entry in cache so update poll time e.update(lastModified, tickCount);
} } catch (DirectoryIteratorException e) { // ignore for now; if the directory is no longer accessible // then the key will be cancelled on the next poll } finally {
// iterate over cache to detect entries that have been deleted Iterator<Map.Entry<Path,CacheEntry>> i = entries.entrySet().iterator(); while (i.hasNext()) { Map.Entry<Path,CacheEntry> mapEntry = i.next(); CacheEntryentry= mapEntry.getValue(); if (entry.lastTickCount() != tickCount) { Pathname= mapEntry.getKey(); // remove from map and queue delete event (if enabled) i.remove(); if (events.contains(StandardWatchEventKinds.ENTRY_DELETE)) { signalEvent(StandardWatchEventKinds.ENTRY_DELETE, name); } } } } }
/** * Entry in directory cache to record file last-modified-time and tick-count */ privatestaticclassCacheEntry { privatelong lastModified; privateint lastTickCount;