Статьи

Своп Python не атомный

За последние несколько месяцев я переписал пул соединений PyMongo . Среди проблем с параллелизмом, которые мне пришлось зафиксировать, было, если поток сбрасывает пул соединений, поскольку другой поток использует пул, как мне не дать им наступить друг на друга?

Я думал, что прибил это, но конечно я не сделал. Здесь есть условие гонки:

class Pool(object):
    def __init__(self):
        self.sockets = set()

    def reset(self):
        # Close sockets before deleting them
        sockets, self.sockets = self.sockets, set()
        for sock_info in sockets: sock_info.close()

Я думал, что своп будет атомарным: первый поток, который введет reset () , заменит self.sockets пустым набором, затем закроет все старые сокеты, и все последующие потоки обнаружат, что self.sockets пуст. Оказывается, это не так.

Состояние гонки иногда обнаруживалось в прогонах огромного набора тестов PyMongo. Один из тестов раскручивает 40 одновременных потоков. Каждый поток запрашивает MongoDB, вызывает reset () и снова запрашивает MongoDB. Вот как тест не проходит:

test_disconnect (test.test_pooling.TestPooling) ... Exception in thread Thread-45:
Traceback (most recent call last):
 < ... snip ... >
 File "pymongo/pool.py", line 159, in reset
   for sock_info in sockets: sock_info.close()
RuntimeError: Set changed size during iteration

 Как я уже сказал, я думал, что обмен был атомарным, но на самом деле он требует полдюжины инструкций байт-кода. Это одна строка обмена:

 sockets, self.sockets = self.sockets, set()

 … разбирает на:

  0 LOAD_FAST                0 (self)
            3 LOAD_ATTR                0 (sockets)
            6 LOAD_GLOBAL              1 (set)
            9 CALL_FUNCTION            0
           12 ROT_TWO          <- this is the swap
           13 STORE_FAST               1 (sockets)
           16 LOAD_FAST                0 (self)
           19 STORE_ATTR               0 (sockets)

 Скажем, что поток 1 выполняет эту функцию. Поток 1 загружает self.sockets и пустой набор в свой стек и обменивает их, и прежде чем он попадает в STORE_ATTR (где фактически заменяется self.sockets), он прерывается потоком 2. Поток 2 запускает некоторую другую часть пула соединений код, например:

def return_socket(self, sock_info):
        self.sockets.add(sock_info)

Это разбирает на:

24 LOAD_FAST                0 (self)
           27 LOAD_ATTR                1 (sockets)
           30 LOAD_ATTR                3 (add)
           33 LOAD_FAST                1 (sock_info)
           36 CALL_FUNCTION            1

 Допустим, поток 2 достигает байт-кода LOAD_ATTR 1 . Теперь в его стеке есть self.sockets, и он прерывается потоком 1, который все еще находится в reset (). Поток 1 заменяет self.sockets пустым набором. Но, увы, «старый» список сокетов потока 1 и «self.sockets» потока 2 совпадают . Поток 1 начинает перебирать старый список сокетов, закрывая их:

  for sock_info in sockets: sock_info.close()

 … но он снова прерывается потоком 2, который …

self.sockets.add(sock_info), 

… увеличение размера набора на единицу. Когда поток 1 затем возобновляется, он пытается продолжить итерацию и вызывает исключение «Задать измененный размер во время итерации».

Давайте погрузимся глубже на минуту. Вы можете подумать, что на практике два потока Python не будут так часто прерывать друг друга. Действительно, интерпретатор выполняет 100 байт-кодов за раз, прежде чем он даже подумает о переключении потоков . Но в нашем случае поток 1 неоднократно вызывает socket.close () , который записывается в socketmodule.c так:

static PyObject * sock_close(PySocketSockObject *s) {
    SOCKET_T fd;

    if ((fd = s->sock_fd) != -1) {
        s->sock_fd = -1;
        Py_BEGIN_ALLOW_THREADS
        (void) SOCKETCLOSE(fd);
        Py_END_ALLOW_THREADS
    }
    Py_INCREF(Py_None);
    return Py_None;
}

Этот макрос Py_BEGIN_ALLOW_THREADS освобождает глобальную блокировку интерпретатора, а Py_END_ALLOW_THREADS ожидает ее повторного получения. В многопоточной программе Python освобождение GIL делает очень вероятным, что другой поток, ожидающий GIL, немедленно получит его. (Несмотря на выступление Дэвида Бизли о GIL — он демонстрирует, что связанные с ЦП и связанные с IO потоки, конкурирующие за GIL в многоядерной системе, слишком редко прерывают друг друга , но в этом случае я имею дело только с потоками, связанными с IO.)

Поэтому вызов socket.close () в цикле гарантирует, что этот поток будет постоянно прерываться. Вероятность того, что какой-то поток в return_socket () получит ссылку на набор и изменит его, перемежая с каким-либо другим потоком в reset (), получив ссылку на тот же набор и выполнив итерацию, достаточно высока, чтобы разорвать юнит-тест PyMongo примерно на 1% времени.

Решение было очевидно, когда я понял проблему:

class Pool(object):
    def __init__(self):
        self.sockets = set()
        self.lock = threading.Lock()

    def reset(self):
        self.lock.acquire()
        try:
            # Close sockets before deleting them
            sockets, self.sockets = self.sockets, set()
        finally:
            self.lock.release()

        # Now only this thread can have a reference to this set of sockets
        for sock_info in sockets: sock_info.close()
 
   def return_socket(self, sock_info):
        self.lock.acquire()
        try:
            self.sockets.add(sock_info)
        finally:
            self.lock.release()

Инструкции с однобайтовым кодом в Python являются атомарными, и если вы можете использовать эту атомарность, чтобы избежать мьютексов, то я считаю, что вы должны — не только ваш код быстрее и проще, но вы также избегаете риска взаимоблокировок, которые являются худшими ошибками параллелизма. Но не все, что выглядит атомно. В случае сомнений используйте модуль dis, чтобы проверить свой байт-код и узнать наверняка.