При написании поточно-ориентированных классов основной проблемой является разделение данных на несколько независимых частей — и выбор правильного размера для этих частей. Если часть слишком мала, наш класс не является потокобезопасным. Если часть слишком большая, класс не масштабируется.
Вам также может понравиться:
7 методов для поточно-безопасных классов
Давайте посмотрим на пример, который дополнительно иллюстрирует этот сценарий:
Пример
Предположим, мы хотим отследить, сколько людей живет в городе. Мы хотим поддержать два метода: один, чтобы узнать текущее количество людей, живущих в городе, и один, чтобы переместить человека из одного города в другой. Итак, у нас есть следующий интерфейс:
Джава
1
public interface CityToCount {
2
static final String[] ALL_CITIES =
3
new String[] { "Springfield" , "South Park" };
4
static final int POPULATION_COUNT = 1000000;
5
void move( String from, String to );
6
int count(String name);
7
}
Вы можете скачать исходный код всех примеров с GitHub здесь .
Поскольку мы хотим использовать этот интерфейс из нескольких потоков параллельно, у нас есть варианты для реализации этого интерфейса. Либо используйте класс, java.util.concurrent. ConcurrentHashMap
либо используйте класс java.util.HashMap
и одну блокировку. Вот реализация с использованием класса java.util.concurrent.ConcurrentHashMap
:
Джава
xxxxxxxxxx
1
public class CityToCountUsingConcurrentHashMap
2
implements CityToCount {
3
private ConcurrentHashMap<String, Integer> map =
4
new ConcurrentHashMap<String, Integer>();
5
public CityToCountUsingConcurrentHashMap() {
6
for (String city : ALL_CITIES) {
7
map.put(city, POPULATION_COUNT);
8
}
9
}
10
public void move(String from, String to) {
11
map.compute(from, (key, value) -> {
12
if (value == null) {
13
return POPULATION_COUNT - 1;
14
}
15
return value - 1;
16
});
17
map.compute(to, (key, value) -> {
18
if (value == null) {
19
return POPULATION_COUNT + 1;
20
}
21
22
return value + 1;
23
});
24
}
25
public int count(String name) {
26
return map.get(name);
27
}
28
}
Метод move использует потокобезопасный метод compute для уменьшения числа в исходном городе. Затем вычисление используется для увеличения счетчика в целевом городе. Метод count использует потокобезопасный метод get
.
А вот реализация с использованием класса java.util.HashMap
:
Джава
xxxxxxxxxx
1
public class CityToCountUsingSynchronizedHashMap
2
implements CityToCount {
3
private HashMap<String, Integer> map =
4
new HashMap<String, Integer>();
5
private Object lock = new Object();
6
public CityToCountUsingSynchronizedHashMap() {
7
for (String city : ALL_CITIES) {
8
map.put(city, POPULATION_COUNT);
9
}
10
}
11
public void move(String from, String to) {
12
synchronized (lock) {
13
map.compute(from, (key, value) -> {
14
if (value == null) {
15
return POPULATION_COUNT - 1;
16
}
17
return value - 1;
18
});
19
map.compute(to, (key, value) -> {
20
if (value == null) {
21
return POPULATION_COUNT + 1;
22
}
23
return value + 1;
24
});
25
}
26
}
27
public int count(String name) {
28
synchronized (lock) {
29
return map.get(name);
30
}
31
}
32
}
Метод move
также использует этот метод compute
для увеличения и уменьшения числа в исходном и целевом городе. Только на этот раз, поскольку compute
метод не является потокобезопасным, оба метода окружены синхронизированным блоком. count
Метод использует get
метод снова окруженный синхронизированный блок.
Оба решения являются поточно-ориентированными.
Но при использовании решения ConcurrentHashMap
несколько городов могут обновляться параллельно из разных потоков. А в решении, использующем a HashMap
, поскольку блокировка завершена HashMap
, только один поток может обновить ее HashMap
в данный момент времени. Таким образом, использование решения ConcurrentHashMap
должно быть более масштабируемым. Покажи нам.
Слишком большие средства не масштабируются
Чтобы сравнить масштабируемость двух реализаций, я использую следующий тест:
Джава
xxxxxxxxxx
1
import org.openjdk.jmh.annotations.Benchmark;
2
import org.openjdk.jmh.annotations.State;
3
import org.openjdk.jmh.annotations.Scope;
4
Scope.Benchmark) (
5
public class CityToCountBenchmark {
6
public CityToCount cityToCountUsingSynchronizedHashMap
7
= new CityToCountUsingSynchronizedHashMap();
8
public CityToCount cityToCountUsingConcurrentHashMap
9
= new CityToCountUsingConcurrentHashMap();
10
11
public void synchronizedHashMap() {
12
String name = Thread.currentThread().getName();
13
cityToCountUsingSynchronizedHashMap.move(name, name + "2");
14
15
}
16
17
public void concurrentHashMap() {
18
String name = Thread.currentThread().getName();
19
cityToCountUsingConcurrentHashMap.move(name, name + "2");
20
21
}
22
23
}
В тесте используется jmh , платформа OpenJDK для микро-тестов. В тесте я перемещаю людей из одного города в другой. Каждый рабочий поток обновляет разные города. Название исходного города — это просто идентификатор потока, а целевой город — идентификатор потока плюс два. Я запустил эталонный тест на четырехъядерном процессоре Intel i5 со следующими результатами:
Как видим, решение с использованием ConcurrentHashMap
весов лучше. Начиная с двух потоков, он работает лучше, чем решение с использованием одной блокировки.
Слишком маленькие средства не потокобезопасны
Теперь я хочу дополнительный метод, чтобы получить полный подсчет городов в целом. Вот этот метод для реализации с использованием класса ConcurrentHashMap
:
Джава
xxxxxxxxxx
1
public int completeCount() {
2
int completeCount = 0;
3
for (Integer value : map.values()) {
4
completeCount += value;
5
}
6
return completeCount;
7
}
Чтобы увидеть, является ли это решение поточно-ориентированным, я использую следующий тест:
Джава
xxxxxxxxxx
1
import com.vmlens.api.AllInterleavings;
2
public class TestCompleteCountConcurrentHashMap {
3
4
public void test() throws InterruptedException {
5
try (AllInterleavings allInterleavings =
6
new AllInterleavings("TestCompleteCountConcurrentHashMap");) {
7
while (allInterleavings.hasNext()) {
8
CityToCount cityToCount =
9
new CityToCountUsingConcurrentHashMap();
10
Thread first = new Thread(() -> {
11
cityToCount.move("Springfield", "South Park");
12
});
13
14
first.start();
15
assertEquals(2 * CityToCount.POPULATION_COUNT,
16
cityToCount.completeCount());
17
first.join();
18
19
}
20
}
21
}
22
23
}
Мне нужно два потока, чтобы проверить, является ли метод completeCount
потокобезопасным. Одним потоком я перевожу одного человека из Спрингфилда в Саус Парк. В другом потоке я получаю completeCount
и проверяю, равен ли результат ожидаемому результату.
Чтобы проверить все чередования потоков, мы помещаем полный тест в цикл while, итерируя по всем чередованиям потоков, используя класс AllInterleavings
из vmlens , строка 7. Выполняя тест, я вижу следующую ошибку:
Джава
xxxxxxxxxx
1
expected:<2000000> but was:<1999999>
Отчет vmlens показывает, что пошло не так:
Как мы видим, проблема в том, что вычисление полного счета выполняется, пока другой поток все еще перемещает человека из Спрингфилда в Саут-Парк. Уменьшение для Спрингфилда уже было выполнено, но не для увеличения в Южном парке.
Допуская параллельное обновление разных городов, комбинация между completeCount
и move
приводит к неверным результатам. Если у нас есть методы, которые работают во всех городах, нам нужно заблокировать все города во время этого метода. Итак, для поддержки такого метода нам нужно использовать второе решение с использованием единой блокировки. Для этого решения мы можем реализовать потокобезопасный countComplete
метод, как показано ниже:
Джава
xxxxxxxxxx
1
public int completeCount() {
2
synchronized (lock) {
3
int completeCount = 0;
4
for (Integer value : map.values()) {
5
completeCount += value;
6
}
7
return completeCount;
8
}
9
}
Заключение
Этот простой пример, безусловно, не отражает сложность вашей структуры данных. Но то, что верно в примере, верно и в реальном мире. Нет способа обновить несколько зависимых полей потокобезопасным способом, кроме как обновить их один поток за другим.
Нет способа обновить несколько зависимых полей потокобезопасным способом, кроме как обновить их один поток за другим.
Таким образом, единственный способ достичь масштабируемости и безопасности потоков — найти независимые части в ваших данных. Затем обновите их параллельно из нескольких потоков.
Дальнейшие чтения
7 методов для многопоточных классов
Как проверить, является ли класс поточно-ориентированным в Java