序
本文主要研究一下Elasticsearch的ReleasableLock
ReleasableLock
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java
public class ReleasableLock implements Releasable { private final Lock lock; // a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled private final ThreadLocalholdingThreads; public ReleasableLock(Lock lock) { this.lock = lock; if (Assertions.ENABLED) { holdingThreads = new ThreadLocal<>(); } else { holdingThreads = null; } } @Override public void close() { lock.unlock(); assert removeCurrentThread(); } public ReleasableLock acquire() throws EngineException { lock.lock(); assert addCurrentThread(); return this; } private boolean addCurrentThread() { final Integer current = holdingThreads.get(); holdingThreads.set(current == null ? 1 : current + 1); return true; } private boolean removeCurrentThread() { final Integer count = holdingThreads.get(); assert count != null && count > 0; if (count == 1) { holdingThreads.remove(); } else { holdingThreads.set(count - 1); } return true; } public boolean isHeldByCurrentThread() { if (holdingThreads == null) { throw new UnsupportedOperationException("asserts must be enabled"); } final Integer count = holdingThreads.get(); return count != null && count > 0; }}复制代码
- ReleasableLock实现了Releasable接口(
close方法
);它的构造器要求输入Lock参数,只有在开启了assertions的条件下才会初始化holdingThreads;isHeldByCurrentThread方法判断调用线程是否正在使用lock - acquire方法首先调用lock的lock方法,然后利用assert来断言addCurrentThread方法,该方法会增加调用线程正在使用lock的次数
- close方法首先调用lock的unlock方法,然后利用assert来断言removeCurrentThread方法,该方法会减少调用线程正在使用lock的次数
ReleasableLockTests
elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java
public class ReleasableLockTests extends ESTestCase { /** * Test that accounting on whether or not a thread holds a releasable lock is correct. Previously we had a bug where on a re-entrant * lock that if a thread entered the lock twice we would declare that it does not hold the lock after it exits its first entrance but * not its second entrance. * * @throws BrokenBarrierException if awaiting on the synchronization barrier breaks * @throws InterruptedException if awaiting on the synchronization barrier is interrupted */ public void testIsHeldByCurrentThread() throws BrokenBarrierException, InterruptedException { final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); final ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock()); final ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock()); final int numberOfThreads = scaledRandomIntBetween(1, 32); final int iterations = scaledRandomIntBetween(1, 32); final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); final Listthreads = new ArrayList<>(); for (int i = 0; i < numberOfThreads; i++) { final Thread thread = new Thread(() -> { try { barrier.await(); } catch (final BrokenBarrierException | InterruptedException e) { throw new RuntimeException(e); } for (int j = 0; j < iterations; j++) { if (randomBoolean()) { acquire(readLock, writeLock); } else { acquire(writeLock, readLock); } } try { barrier.await(); } catch (final BrokenBarrierException | InterruptedException e) { throw new RuntimeException(e); } }); threads.add(thread); thread.start(); } barrier.await(); barrier.await(); for (final Thread thread : threads) { thread.join(); } } private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) { try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) { assertTrue(lockToAcquire.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread()); try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) { assertTrue(lockToAcquire.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread()); } // previously there was a bug here and this would return false assertTrue(lockToAcquire.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread()); } assertFalse(lockToAcquire.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread()); }}复制代码
- ReleasableLockTests使用多线程随机执行acquire,该方法断言lockToAcquire被当前线程持有,而otherLock不被当前线程持有
Cache.CacheSegment
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/cache/Cache.java
private static class CacheSegment{ // read/write lock protecting mutations to the segment ReadWriteLock segmentLock = new ReentrantReadWriteLock(); ReleasableLock readLock = new ReleasableLock(segmentLock.readLock()); ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock()); Map >> map = new HashMap<>(); SegmentStats segmentStats = new SegmentStats(); /** * get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is * pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback * * @param key the key of the entry to get from the cache * @param now the access time of this entry * @param isExpired test if the entry is expired * @param onExpiration a callback if the entry associated to the key is expired * @return the entry if there was one, otherwise null */ Entry get(K key, long now, Predicate > isExpired, Consumer > onExpiration) { CompletableFuture > future; try (ReleasableLock ignored = readLock.acquire()) { future = map.get(key); } if (future != null) { Entry entry; try { entry = future.get(); } catch (ExecutionException e) { assert future.isCompletedExceptionally(); segmentStats.miss(); return null; } catch (InterruptedException e) { throw new IllegalStateException(e); } if (isExpired.test(entry)) { segmentStats.miss(); onExpiration.accept(entry); return null; } else { segmentStats.hit(); entry.accessTime = now; return entry; } } else { segmentStats.miss(); return null; } } /** * put an entry into the segment * * @param key the key of the entry to add to the cache * @param value the value of the entry to add to the cache * @param now the access time of this entry * @return a tuple of the new entry and the existing entry, if there was one otherwise null */ Tuple , Entry > put(K key, V value, long now) { Entry entry = new Entry<>(key, value, now); Entry existing = null; try (ReleasableLock ignored = writeLock.acquire()) { try { CompletableFuture > future = map.put(key, CompletableFuture.completedFuture(entry)); if (future != null) { existing = future.handle((ok, ex) -> { if (ok != null) { return ok; } else { return null; } }).get(); } } catch (ExecutionException | InterruptedException e) { throw new IllegalStateException(e); } } return Tuple.tuple(entry, existing); } /** * remove an entry from the segment * * @param key the key of the entry to remove from the cache * @param onRemoval a callback for the removed entry */ void remove(K key, Consumer >> onRemoval) { CompletableFuture > future; try (ReleasableLock ignored = writeLock.acquire()) { future = map.remove(key); } if (future != null) { segmentStats.eviction(); onRemoval.accept(future); } } /** * remove an entry from the segment iff the future is done and the value is equal to the * expected value * * @param key the key of the entry to remove from the cache * @param value the value expected to be associated with the key * @param onRemoval a callback for the removed entry */ void remove(K key, V value, Consumer >> onRemoval) { CompletableFuture > future; boolean removed = false; try (ReleasableLock ignored = writeLock.acquire()) { future = map.get(key); try { if (future != null) { if (future.isDone()) { Entry entry = future.get(); if (Objects.equals(value, entry.value)) { removed = map.remove(key, future); } } } } catch (ExecutionException | InterruptedException e) { throw new IllegalStateException(e); } } if (future != null && removed) { segmentStats.eviction(); onRemoval.accept(future); } } private static class SegmentStats { private final LongAdder hits = new LongAdder(); private final LongAdder misses = new LongAdder(); private final LongAdder evictions = new LongAdder(); void hit() { hits.increment(); } void miss() { misses.increment(); } void eviction() { evictions.increment(); } } }复制代码
- CacheSegment使用ReentrantReadWriteLock的readLock及writeLock创建了两个ReleasableLock,一个为readLock,一个为writeLock;由于ReleasableLock实现了Releasable接口(
close方法
),而该接口继承了java.lang.AutoCloseable接口,因而可以直接利用try with resources语法来自动close,从而释放锁
小结
- ReleasableLock实现了Releasable接口(
close方法
);它的构造器要求输入Lock参数,只有在开启了assertions的条件下才会初始化holdingThreads;isHeldByCurrentThread方法判断调用线程是否正在使用lock - acquire方法首先调用lock的lock方法,然后利用assert来断言addCurrentThread方法,该方法会增加调用线程正在使用lock的次数
- close方法首先调用lock的unlock方法,然后利用assert来断言removeCurrentThread方法,该方法会减少调用线程正在使用lock的次数
ReleasableLock实现了Releasable接口(
close方法
),而该接口继承了java.lang.AutoCloseable接口,因而可以直接利用try with resources语法来自动close,从而释放锁