Статьи

Параллелизм Java: блокировки чтения / записи

Якоб сделал большую серию по Java Concurrency — ознакомьтесь с первыми 14 статьями в его блоге . В дальнейшем мы рады объявить, что вы также сможете следить за серией здесь на JavaLobby.

Блокировка чтения / записи является более сложной блокировкой, чем Lockреализации, показанные в тексте Блокировки в Java . Представьте, что у вас есть приложение, которое читает и пишет какой-то ресурс, но написание его не так много, как чтение. Два потока, читающие один и тот же ресурс, не создают проблем друг для друга, поэтому нескольким потокам, которые хотят прочитать ресурс, предоставляется доступ одновременно, перекрывая друг друга. Но если один поток хочет выполнить запись в ресурс, никакие другие операции чтения и записи не должны выполняться одновременно. Чтобы решить эту проблему, допускающую использование нескольких читателей, но только одного писателя, вам потребуется блокировка чтения / записи.

Java 5 поставляется с реализациями блокировки чтения / записи в java.util.concurrentпакете. Тем не менее, может быть полезно знать теорию их реализации.

Вот список тем, затронутых в этом тексте:

  1. Реализация блокировки чтения / записи в Java
  2. Чтение / запись блокировки входа
  3. Читать реентранс
  4. Написать реентранс
  5. Читать, чтобы написать вход
  6. Написать, чтобы прочитать вход
  7. Полностью реентерабельная реализация Java
  8. Вызов unlock () из оператора finally

Реализация блокировки чтения / записи в Java

Для начала давайте суммируем условия получения доступа на чтение и запись к ресурсу:

Доступ для чтения Если никакие потоки не пишут, и никакие потоки не запросили доступ на запись.
Доступ для записи Если нет тем, которые читают или пишут.

Если поток хочет прочитать ресурс, это нормально, если нет потоков, которые пишут в него, и ни один поток не запросил доступ на запись к ресурсу. При повышении приоритета запросов на запись мы предполагаем, что запросы на запись важнее запросов на чтение. Кроме того, если чтение — это то, что происходит чаще всего, и мы не приоритезируем записи, может возникнуть голодание . Потоки, запрашивающие доступ для записи, будут заблокированы, пока все читатели не разблокируют ReadWriteLock. Если новым потокам постоянно предоставляется доступ на чтение, поток, ожидающий доступа на запись, будет заблокирован на неопределенный срок, что приведет к голоданию . Поэтому поток может быть предоставлен только для чтения, если ни один из потоков в настоящее время не заблокировал ReadWriteLockзапись и не запросил блокировку записи.

Поток, которому нужен доступ на запись к ресурсу, может быть предоставлен, если потоки не читают и не пишут в ресурс. Не имеет значения, сколько потоков запросило доступ на запись или в какой последовательности, если только вы не хотите гарантировать справедливость между потоками, запрашивающими доступ на запись.

Имея в виду эти простые правила, мы можем реализовать, ReadWriteLockкак показано ниже:

public class ReadWriteLock{

private int readers = 0;
private int writers = 0;
private int writeRequests = 0;

public synchronized void lockRead() throws InterruptedException{
while(writers > 0 || writeRequests > 0){
wait();
}
readers++;
}

public synchronized void unlockRead(){
readers--;
notifyAll();
}

public synchronized void lockWrite() throws InterruptedException{
writeRequests++;

while(readers > 0 || writers > 0){
wait();
}
writeRequests--;
writers++;
}

public synchronized void unlockWrite() throws InterruptedException{
writers--;
notifyAll();
}
}

ReadWriteLockИмеет два метода блокировки и два метода разблокировки. Один метод блокировки и разблокировки для доступа на чтение и один блокировка и разблокировка для доступа на запись.

Правила доступа для чтения реализованы в lockRead()методе. Все потоки получают доступ на чтение, если нет потока с правом записи или один или несколько потоков запросили доступ на запись.

Правила доступа для записи реализованы в lockWrite()методе. Поток, которому нужен доступ для записи, начинается с запроса доступа для записи ( writeRequests++). Затем он проверит, может ли он получить доступ к записи. Поток может получить доступ на запись, если нет потоков с доступом для чтения к ресурсу и нет потоков с доступом для записи к ресурсу. Сколько потоков запросило доступ для записи, не имеет значения.

Стоит отметить, что unlockRead()и unlockWrite()звонит, notifyAll()а не notify(). Чтобы объяснить, почему это так, представьте следующую ситуацию:

Внутри ReadWriteLock есть потоки, ожидающие доступа для чтения, и потоки, ожидающие доступа для записи. Если поток, который notify()был разбужен, был потоком доступа для чтения, он вернется в режим ожидания, поскольку существуют потоки, ожидающие доступа для записи. Однако ни один из потоков, ожидающих доступа для записи, не пробуждается, поэтому больше ничего не происходит. Никакие потоки не получают ни права чтения, ни записи. При вызове noftifyAll()все ожидающие потоки просыпаются и проверяют, могут ли они получить желаемый доступ.

Вызов notifyAll()также имеет еще одно преимущество. Если несколько потоков ожидают доступа для чтения и не имеют доступа для записи, и unlockWrite()вызывается, все потоки, ожидающие доступа для чтения, получают доступ для чтения сразу, а не один за другим.

Чтение / запись блокировки входа

ReadWriteLockКласс показано ранее не возвратный . Если поток, который имеет доступ для записи, запрашивает его снова, он блокируется, потому что уже есть один модуль записи — сам. Кроме того, рассмотрим этот случай:

  1. Поток 1 получает доступ для чтения.
  2. Поток 2 запрашивает доступ для записи, но заблокирован, потому что есть один читатель.
  3. Поток 1 повторно запрашивает доступ на чтение (повторно вводит блокировку), но блокируется, поскольку существует запрос на запись

В этой ситуации предыдущий ReadWriteLockзаперт — ситуация, похожая на тупик. Никакие потоки, не запрашивающие права на чтение и запись, не будут предоставлены.

Для ReadWriteLockповторного входа необходимо внести несколько изменений. Вход для читателей и писателей будет рассматриваться отдельно.

 

Читать реентранс

Чтобы сделать ReadWriteLockреентерацию читателям, мы сначала установим правила чтения реентера:

  • Потоку предоставляется повторное чтение, если он может получить доступ на чтение (без записи или запросов на запись) или если он уже имеет доступ на чтение (независимо от запросов на запись).

Чтобы определить, имеет ли поток доступ для чтения, ссылка на каждый поток, которому предоставлен доступ на чтение, хранится на карте вместе с тем, сколько раз он получил блокировку чтения. При определении, может ли быть предоставлен доступ для чтения, эта Карта будет проверяться на предмет ссылки на вызывающий поток. Вот как выглядят методы lockRead () и unlockRead () после этого изменения:

public class ReadWriteLock{

private Map<Thread, Integer> readingThreads =
new HashMap<Thread, Integer>();

private int writers = 0;
private int writeRequests = 0;

public synchronized void lockRead() throws InterruptedException{
Thread callingThread = Thread.currentThread();
while(! canGrantReadAccess(callingThread)){
wait();
}

readingThreads.put(callingThread,
(getAccessCount(callingThread) + 1));
}


public synchronized void unlockRead(){
Thread callingThread = Thread.currentThread();
int accessCount = getAccessCount(callingThread);
if(accessCount == 1){ readingThreads.remove(callingThread); }
else { readingThreads.put(callingThread, (accessCount -1)); }
notifyAll();
}


private boolean canGrantReadAccess(Thread callingThread){
if(writers > 0) return false;
if(isReader(callingThread) return true;
if(writeRequests > 0) return false;
return true;
}

private int getReadAccessCount(Thread callingThread){
Integer accessCount = readingThreads.get(callingThread);
if(accessCount == null) return 0;
return accessCount.intValue();
}

private boolean isReader(Thread callingThread){
return readers.get(callingThread) != null;
}

}

Как видите, повторный вход в чтение предоставляется только в том случае, если в данный момент в ресурс не записываются потоки. Как вы можете видеть, если вызывающий поток уже имеет доступ для чтения, он имеет приоритет над любыми запросами на запись.

Написать реентранс

Повторный вход в запись предоставляется, только если поток уже имеет доступ для записи. Вот как выглядят методы lockWrite()и unlockWrite()после этого небольшого изменения:

public class ReadWriteLock{

private Map&lt;Thread, Integer&gt; readingThreads =
new HashMap&lt;Thread, Integer&gt();

private int writeAccesses = 0;
private int writeRequests = 0;
private Thread writingThread = null;

public synchronized void lockWrite() throws InterruptedException{
writeRequests++;
Thread callingThread = Thread.currentThread();
while(! canGrantWriteAccess(callingThread)){
wait();
}
writeRequests--;
writeAccesses++;
writingThread = callingThread;
}

public synchronized void unlockWrite() throws InterruptedException{
writeAccesses--;
if(writeAccesses == 0){
writingThread = null;
}
notifyAll();
}

private boolean canGrantWriteAccess(Thread callingThread){
if(hasReaders()) return false;
if(writingThread == null) return true;
if(writingThread != callingThread) return false;
return true;
}

private boolean hasReaders(){
return readingThreads.size() > 0;
}

}

Обратите внимание, как поток, в настоящее время удерживающий блокировку записи, теперь учитывается при определении, может ли вызывающий поток получить доступ на запись.

Читать, чтобы написать вход

Иногда потоку, имеющему доступ для чтения, также необходимо получить доступ для записи. Чтобы это было разрешено, поток должен быть единственным читателем. Для достижения этого writeLock()метод должен быть немного изменен. Вот как это будет выглядеть:

public class ReadWriteLock{

private Map<Thread, Integer> readingThreads =
new HashMap<Thread, Integer>();

private int writeAccesses = 0;
private int writeRequests = 0;
private Thread writingThread = null;

public synchronized void lockWrite() throws InterruptedException{
writeRequests++;
Thread callingThread = Thread.currentThread();
while(! canGrantWriteAccess(callingThread)){
wait();
}
writeRequests--;
writeAccesses++;
writingThread = callingThread;
}

public synchronized void unlockWrite() throws InterruptedException{
writeAccesses--;
if(writeAccesses == 0){
writingThread = null;
}
notifyAll();
}

private boolean canGrantWriteAccess(Thread callingThread){
if(isOnlyReader(callingThread)) return true;
if(hasReaders()) return false;
if(writingThread == null) return true;
if(writingThread != callingThread) return false;
return true;
}

private boolean hasReaders(){
return readingThreads.size() > 0;
}

private boolean isOnlyReader(Thread thread){
return readers == 1 && readingThreads.get(callingThread) != null;
}

}

Теперь ReadWriteLockкласс доступен для чтения и записи.

Написать, чтобы прочитать вход

Иногда потоку, который имеет доступ на запись, тоже нужен доступ на чтение. Автор должен всегда иметь доступ для чтения, если требуется. Если у потока есть права на чтение, другие потоки не могут иметь права на чтение и запись, так что это не опасно. Вот как canGrantReadAccess()будет выглядеть метод с этим изменением:

public class ReadWriteLock{

private boolean canGrantReadAccess(Thread callingThread){
if(isWriter(callingThread)) return true;
if(writingThread != null) return false;
if(isReader(callingThread) return true;
if(writeRequests > 0) return false;
return true;
}
}

Полностью реентерабельный ReadWriteLock

Ниже приведена полностью реентранская ReadWriteLockреализация. Я сделал несколько рефакторингов для условий доступа, чтобы их было легче читать, и тем самым легче убедить себя в том, что они верны.

public class ReadWriteLock{

private Map<Thread, Integer> readingThreads =
new HashMap<Thread, Integer>();

private int writeAccesses = 0;
private int writeRequests = 0;
private Thread writingThread = null;


public synchronized void lockRead() throws InterruptedException{
Thread callingThread = Thread.currentThread();
while(! canGrantReadAccess(callingThread)){
wait();
}

readingThreads.put(callingThread,
(getReadAccessCount(callingThread) + 1));
}

private boolean canGrantReadAccess(Thread callingThread){
if( isWriter(callingThread) ) return true;
if( hasWriter() ) return false;
if( isReader(callingThread) ) return true;
if( hasWriteRequests() ) return false;
return true;
}


public synchronized void unlockRead(){
Thread callingThread = Thread.currentThread();
if(!isReader(callingThread)){
throw new IllegalMonitorStateException("Calling Thread does not" +
" hold a read lock on this ReadWriteLock");
}
int accessCount = getReadAccessCount(callingThread);
if(accessCount == 1){ readingThreads.remove(callingThread); }
else { readingThreads.put(callingThread, (accessCount -1)); }
notifyAll();
}

public synchronized void lockWrite() throws InterruptedException{
writeRequests++;
Thread callingThread = Thread.currentThread();
while(! canGrantWriteAccess(callingThread)){
wait();
}
writeRequests--;
writeAccesses++;
writingThread = callingThread;
}

public synchronized void unlockWrite() throws InterruptedException{
    if(!isWriter(Thread.currentThread()){
throw new IllegalMonitorStateException("Calling Thread does not" +
" hold the write lock on this ReadWriteLock");
}

writeAccesses--;
if(writeAccesses == 0){
writingThread = null;
}
notifyAll();
}

private boolean canGrantWriteAccess(Thread callingThread){
if(isOnlyReader(callingThread)) return true;
if(hasReaders()) return false;
if(writingThread == null) return true;
if(!isWriter(callingThread)) return false;
return true;
}


private int getReadAccessCount(Thread callingThread){
Integer accessCount = readingThreads.get(callingThread);
if(accessCount == null) return 0;
return accessCount.intValue();
}


private boolean hasReaders(){
return readingThreads.size() > 0;
}

private boolean isReader(Thread callingThread){
return readingThreads.get(callingThread) != null;
}

private boolean isOnlyReader(Thread callingThread){
return readingThreads.size() == 1 && readingThreads.get(callingThread) != null;
}

private boolean hasWriter(){
return writingThread != null;
}

private boolean isWriter(Thread callingThread){
return writingThread == callingThread;
}

private boolean hasWriteRequests(){
return this.writeRequests > 0;
}

}

Вызов unlock () из оператора finally

При защите критического раздела с помощью ReadWriteLock, и критический раздел может выдавать исключения, важно вызывать методы readUnlock()и writeUnlock()изнутри finally-clause. Это гарантирует, что ReadWriteLockон разблокирован, чтобы другие потоки могли его заблокировать. Вот пример:

lock.lockWrite();
try{
//do critical section code, which may throw exception
} finally {
lock.unlockWrite();
}

This little construct makes sure that the ReadWriteLock is unlocked in case an exception is thrown from the code in the critical section. If unlockWrite() was not called from inside a finally-clause, and an exception was thrown from the critical section, the ReadWriteLock would remain write locked forever, causing all threads calling lockRead() or lockWrite() on that ReadWriteLock instance to halt indefinately. The only thing that could unlock the ReadWriteLockagain would be if the ReadWriteLock is reentrant, and the thread that had it locked when the exception was thrown, later succeeds in locking it, executing the critical section and calling unlockWrite() again afterwards. That would unlock the ReadWriteLock again. But why wait for that to happen, if it happens? Calling unlockWrite() from a finally-clause is a much more robust solution.