Статьи

Поток клиента Socket в Python

При создании графического интерфейса пользователя, который должен взаимодействовать с внешним миром, возникает общий камень преткновения — как объединить код графического интерфейса с вводом-выводом. Ввод-вывод, будь то HTTP-запросы, протоколы RPC, обычная связь через сокет или последовательный порт, имеет тенденцию блокировать по своей природе, что плохо сочетается с кодом GUI. Никто не хочет, чтобы его GUI «завис», пока программа блокирует вызов чтения из сокета.

Есть много решений этой проблемы, два наиболее распространенных из которых:

  1. Делать ввод / вывод в отдельном потоке
  2. Использование асинхронного ввода-вывода с обратными вызовами, интегрированными в цикл событий GUI

По моему мнению, вариант 1 является более простым из двух, и это тот, который я обычно заканчиваю. Здесь я хочу представить простой пример кода, который реализует поток клиента сокета в Python. Хотя этот класс достаточно универсален, чтобы его можно было использовать во многих сценариях, я рассматриваю его скорее как образец, чем как законченный черный ящик. Сетевой код имеет тенденцию зависеть от множества факторов, и этот пример легко модифицировать для различных сценариев. Например, в то время как это клиент, повторно внедрить подобный сервер легко. Без дальнейших церемоний, вот код:

import socket
import struct
import threading
import Queue

class ClientCommand(object):
    """ A command to the client thread.
        Each command type has its associated data:

        CONNECT:    (host, port) tuple
        SEND:       Data string
        RECEIVE:    None
        CLOSE:      None
    """
    CONNECT, SEND, RECEIVE, CLOSE = range(4)

    def __init__(self, type, data=None):
        self.type = type
        self.data = data

class ClientReply(object):
    """ A reply from the client thread.
        Each reply type has its associated data:

        ERROR:      The error string
        SUCCESS:    Depends on the command - for RECEIVE it's the received
                    data string, for others None.
    """
    ERROR, SUCCESS = range(2)

    def __init__(self, type, data=None):
        self.type = type
        self.data = data

class SocketClientThread(threading.Thread):
    """ Implements the threading.Thread interface (start, join, etc.) and
        can be controlled via the cmd_q Queue attribute. Replies are
        placed in the reply_q Queue attribute.
    """
    def __init__(self, cmd_q=None, reply_q=None):
        super(SocketClientThread, self).__init__()
        self.cmd_q = cmd_q or Queue.Queue()
        self.reply_q = reply_q or Queue.Queue()
        self.alive = threading.Event()
        self.alive.set()
        self.socket = None

        self.handlers = {
            ClientCommand.CONNECT: self._handle_CONNECT,
            ClientCommand.CLOSE: self._handle_CLOSE,
            ClientCommand.SEND: self._handle_SEND,
            ClientCommand.RECEIVE: self._handle_RECEIVE,
        }

    def run(self):
        while self.alive.isSet():
            try:
                # Queue.get with timeout to allow checking self.alive
                cmd = self.cmd_q.get(True, 0.1)
                self.handlers[cmd.type](cmd)
            except Queue.Empty as e:
                continue

    def join(self, timeout=None):
        self.alive.clear()
        threading.Thread.join(self, timeout)

    def _handle_CONNECT(self, cmd):
        try:
            self.socket = socket.socket(
                socket.AF_INET, socket.SOCK_STREAM)
            self.socket.connect((cmd.data[0], cmd.data[1]))
            self.reply_q.put(self._success_reply())
        except IOError as e:
            self.reply_q.put(self._error_reply(str(e)))

    def _handle_CLOSE(self, cmd):
        self.socket.close()
        reply = ClientReply(ClientReply.SUCCESS)
        self.reply_q.put(reply)

    def _handle_SEND(self, cmd):
        header = struct.pack('<L', len(cmd.data))
        try:
            self.socket.sendall(header + cmd.data)
            self.reply_q.put(self._success_reply())
        except IOError as e:
            self.reply_q.put(self._error_reply(str(e)))

    def _handle_RECEIVE(self, cmd):
        try:
            header_data = self._recv_n_bytes(4)
            if len(header_data) == 4:
                msg_len = struct.unpack('<L', header_data)[0]
                data = self._recv_n_bytes(msg_len)
                if len(data) == msg_len:
                    self.reply_q.put(self._success_reply(data))
                    return
            self.reply_q.put(self._error_reply('Socket closed prematurely'))
        except IOError as e:
            self.reply_q.put(self._error_reply(str(e)))

    def _recv_n_bytes(self, n):
        """ Convenience method for receiving exactly n bytes from
            self.socket (assuming it's open and connected).
        """
        data = ''
        while len(data) < n:
            chunk = self.socket.recv(n - len(data))
            if chunk == '':
                break
            data += chunk
        return data

    def _error_reply(self, errstr):
        return ClientReply(ClientReply.ERROR, errstr)

    def _success_reply(self, data=None):
        return ClientReply(ClientReply.SUCCESS, data)

SocketClientThread является основным классом здесь. Это поток Python, который можно запускать и завершать (присоединять), а также связываться с ним, передавая ему команды и возвращая ответы. ClientCommand и ClientReply — это простые классы данных для инкапсуляции этих команд и ответов.

Этот код, хотя и простой, демонстрирует множество шаблонов в потоке Python и сетевом коде. Вот краткое описание некоторых интересных мест в произвольном порядке:

  • Стандартный Queue.Queue используется для передачи данных между потоком и кодом пользователя. Очередь — отличный инструмент в наборе инструментов для программистов на Python — я использую его все время для отделения многопоточного кода. Самая большая трудность при написании многопоточных программ — защита общих данных. Очередь делает это без проблем, по сути, трансформируя модель совместного использования в передачу сообщений, которую намного проще безопасно использовать.

    Вы заметите, что SocketClientThread использует две очереди: одну для получения команд из основного потока, а другую для передачи ответов. Это распространенная идиома, которая хорошо подходит для большинства сценариев.

  • В общем, вы не можете заставить поток умереть в Python. Если вам нужно вручную прекратить темы, они должны согласиться умереть. Активный атрибут SocketClientThread демонстрирует один общий и безопасный способ его достижения. alive — это threading.Event — потокобезопасный флаг, который можно очистить в главном потоке с помощью вызова alive.clear () (что делается в методе join). Поток связи иногда проверяет, установлен ли этот флаг, и если нет, он корректно завершается.

    Здесь есть очень важная деталь реализации. Обратите внимание, как реализован метод запуска потока. Цикл выполняется, пока установлен живой, но для того, чтобы действительно выполнить этот тест, цикл не может блокироваться. Таким образом, извлечение команд из очереди команд выполняется с помощью get (True, 0.1), что означает, что действие блокируется, но с таймаутом в 100 миллисекунд. Это имеет два преимущества: с одной стороны, оно не блокируется бесконечно, и не более 100 мс пройдет, пока поток не заметит, что его флаг активности очищен. С другой стороны, поскольку это блокирует на 100 мс, поток не просто крутится на процессоре в ожидании команд. На самом деле его загрузка процессора незначительна. Обратите внимание, что поток все еще может блокироваться и отказываться от смерти, если он ожидает на recv сокета без поступления данных.

  • SocketClientThread использует TCP-сокет, который будет верно передавать все данные, но может делать это порциями непредсказуемого размера. Для этого необходимо как-то разграничить сообщения, чтобы другая сторона знала, когда сообщение начинается и заканчивается. Я использую технику длины префикса здесь. Перед отправкой сообщения его длина отправляется как упакованное 4-байтовое целое число с прямым порядком байтов. Когда сообщение получено, сначала получаются 4 байта для распаковки длины, а затем может быть получено реальное сообщение, так как теперь мы знаем, как долго это происходит.
  • По той же причине, что и в предыдущем пункте, необходимо соблюдать осторожность при отправке и получении данных через сокет TCP. При загрузке сети не гарантируется, что он действительно отправит или получит все ожидаемые байты за одну попытку. Чтобы решить эту потенциальную проблему при отправке, Python предоставляет функцию socket.sendall. При получении, это немного сложнее, требуя зацикливания на recv, пока не будет получено правильное количество байтов.

Чтобы показать пример использования SocketClientThread, этот архив кода также содержит пример графического интерфейса, реализованного с помощью PyQt. Этот графический интерфейс использует клиентский поток для подключения к серверу (по умолчанию localhost: 50007), отправки «привет» и ожидания ответа. В то же время, GUI продолжает рисовать симпатичные круги, чтобы продемонстрировать, что он не блокируется операциями с сокетами. Для достижения этого эффекта графический интерфейс использует еще одну интересную идиому — таймер, который используется для периодической проверки, помещает ли SocketClientThread новые данные в свою очередь ответов, вызывая reply_q.get (block = False). Эта комбинация get таймер + неблокирование позволяет эффективно взаимодействовать между потоком и GUI.

Я надеюсь, что этот пример кода будет полезен для других. Если у вас есть какие-либо вопросы по этому поводу, не стесняйтесь спрашивать в комментариях или отправьте мне письмо.

PS Как практически все примеры размещены здесь, этот код находится в свободном доступе.

Источник: http://eli.thegreenplace.net/2011/05/18/code-sample-socket-client-thread-in-python/