Статьи

Создание распределенной системы в 300 строк с помощью Mesos, Docker и Go

Построить распределенные системы сложно. Они должны быть масштабируемыми, отказоустойчивыми, высокодоступными, последовательными, безопасными, эластичными и эффективными. Для достижения этих свойств распределенным системам требуется много сложных компонентов для совместной работы сложным образом. Например, Apache Hadoop зависит от файловой системы с высокой отказоустойчивостью (HDFS), чтобы обеспечить высокую пропускную способность для обработки многотерабайтных наборов данных параллельно на больших кластерах.

В прошлом каждая новая распределенная система, такая как Hadoop или Cassandra, должна была создавать свою собственную структуру для обмена сообщениями, хранения, сетевого взаимодействия, отказоустойчивости и эластичности. К счастью, такие системы, как Apache Mesos (и его коммерческая версия, Mesosphere DCOS ) упрощают задачу построения распределенных систем и управления ими, предоставляя примитивы, подобные операционной системе, для ключевых строительных блоков распределенных систем. Mesos абстрагирует ЦП, память, хранилище и другие вычислительные ресурсы, чтобы разработчики могли писать распределенные приложения, как если бы их кластеры центров обработки данных были одной гигантской машиной.

Приложения для Mesos называются фреймворками и могут создаваться для решения различных задач. Apache Spark, популярный инструмент кластерных вычислений общего назначения, используемый в анализе данных, и Chronos, распределенный и отказоустойчивый cron-like планировщик, являются двумя примерами сред, построенных на основе Mesos. Фреймворки могут быть построены на многих языках, включая C ++, Go, Python, Java, Haskell и Scala.

Биткойн-майнинг является ярким примером проблемы, которая требует распределенного решения. Биткойн варьирует сложность генерации приемлемого хеша для проверки подлинности блока транзакций. Дайте или возьмите несколько десятилетий, один ноутбук потребует более 150 лет, чтобы добыть один блок. В результате теперь существуют «пулы майнинга», которые позволяют майнерам объединять свои вычислительные ресурсы вместе и быстрее майнить. Один из наших стажеров, Дерек, написал структуру майнинга биткойнов, которая использует ресурсы кластера для выполнения той же работы. В оставшейся части этой статьи мы рассмотрим его код .

Среда Mesos состоит из планировщика и исполнителя. Планировщик связывается с мастером Mesos и решает, какие задачи запускать, тогда как исполнитель работает на подчиненных, чтобы фактически выполнить намеченные задачи. Большинство фреймворков реализуют свой собственный планировщик и используют одного из стандартных исполнителей, предоставляемых Mesos. Фреймворки могут также реализовать свой собственный исполнитель. В этом случае мы напишем наш собственный планировщик и будем использовать стандартный исполнитель команд для запуска образов Docker, содержащих наши сервисы биткойнов.

Для нашего планировщика мы хотим запустить два вида задач: одну задачу майнер-сервера и несколько рабочих задач майнера. Сервер майнера связывается с пулом майнинга биткойнов и назначает блоки каждому работнику майнера. Шахтеры выполняют тяжелую работу по добыче биткойнов.

Задачи на самом деле инкапсулированы в executor фреймворка, поэтому запуск задачи означает указание мастеру Mesos запустить executor на одном из его подчиненных. Поскольку мы будем использовать стандартный исполнитель команд, мы можем указать задачи для двоичного исполняемого файла, сценария bash или другой команды. В нашем случае, поскольку Mesos изначально поддерживает Docker, мы будем использовать исполняемые образы Docker. Docker — это технология, которая позволяет упаковывать ваше приложение со всеми необходимыми зависимостями времени выполнения.

Чтобы использовать образы Docker в Mesos, вам просто нужны их имена в реестре Docker:

const (
    MinerServerDockerImage = "derekchiang/p2pool"
    MinerDaemonDockerImage = "derekchiang/cpuminer"
)

Затем мы определяем несколько констант, которые определяют требования к ресурсам каждой задачи:

const (
    MemPerDaemonTask = 128  // mining shouldn't be memory-intensive
    MemPerServerTask = 256
    CPUPerServerTask = 1    // a miner server does not use much CPU
)

Теперь мы определим фактический планировщик. Планировщик должен отслеживать состояние, необходимое для правильной работы.

type MinerScheduler struct {
    // bitcoind RPC credentials
    bitcoindAddr string
    rpcUser      string
    rpcPass      string

    // mutable state
    minerServerRunning  bool
    minerServerHostname string 
    minerServerPort     int    // the port that miner daemons 
                               // connect to

    // unique task ids
    tasksLaunched        int
    currentDaemonTaskIDs []*mesos.TaskID
}

Планировщик должен реализовывать следующий интерфейс:

type Scheduler interface {
    Registered(SchedulerDriver, *mesos.FrameworkID, *mesos.MasterInfo)
    Reregistered(SchedulerDriver, *mesos.MasterInfo)
    Disconnected(SchedulerDriver)
    ResourceOffers(SchedulerDriver, []*mesos.Offer)
    OfferRescinded(SchedulerDriver, *mesos.OfferID)
    StatusUpdate(SchedulerDriver, *mesos.TaskStatus)
    FrameworkMessage(SchedulerDriver, *mesos.ExecutorID, 
                     *mesos.SlaveID, string)
    SlaveLost(SchedulerDriver, *mesos.SlaveID)
    ExecutorLost(SchedulerDriver, *mesos.ExecutorID, *mesos.SlaveID, 
                 int)
    Error(SchedulerDriver, string)
}

Теперь давайте посмотрим на обратные вызовы:

func (s *MinerScheduler) Registered(_ sched.SchedulerDriver, 
      frameworkId *mesos.FrameworkID, masterInfo *mesos.MasterInfo) {
    log.Infoln("Framework registered with Master ", masterInfo)
}

func (s *MinerScheduler) Reregistered(_ sched.SchedulerDriver, 
      masterInfo *mesos.MasterInfo) {
    log.Infoln("Framework Re-Registered with Master ", masterInfo)
}

func (s *MinerScheduler) Disconnected(sched.SchedulerDriver) {
    log.Infoln("Framework disconnected with Master")
}

«Registered» вызывается, когда планировщик успешно зарегистрирован мастером Mesos.

«Reregistered» вызывается, когда планировщик отключается от мастера Mesos, а затем снова регистрируется, например, когда мастер перезапускается.

Disconnected вызывается, когда планировщик отключается от мастера Mesos. Это может произойти, когда мастер падает.

До сих пор мы печатали только сообщения журнала в функциях обратного вызова, потому что большинство обратных вызовов можно оставить пустыми для простой структуры, подобной этой. Однако следующий обратный вызов лежит в основе каждого фреймворка и должен быть написан с осторожностью.

ResourceOffers вызывается, когда планировщик получает предложение от мастера. Каждое предложение содержит список ресурсов, доступных для платформы для использования в кластере. Ресурсы обычно включают процессор, память, порты и диск. Фреймворк может использовать некоторые, все или ни один из предложенных ресурсов.

Для каждого предложения мы хотим собрать предлагаемые ресурсы и принять решение о том, хотим ли мы запустить задачу на сервере или рабочую задачу. Вы можете запустить столько задач, сколько вы можете вписать в каждое предложение, но, поскольку майнинг биткойнов зависит от ЦП, мы запускаем одну задачу майнера для каждого предложения, используя все доступные ресурсы ЦП.

for i, offer := range offers {
    // … Gather resource being offered and do setup
    if !s.minerServerRunning && mems >= MemPerServerTask &&
            cpus >= CPUPerServerTask && ports >= 2 {
        // … Launch a server task since no server is running and we 
        // have resources to launch it.
    } else if s.minerServerRunning && mems >= MemPerDaemonTask {
        // … Launch a miner since a server is running and we have mem 
        // to launch one.
    }
}

Для каждой задачи нам нужно создать соответствующее сообщение TaskInfo, содержащее информацию, необходимую для запуска этой задачи.

s.tasksLaunched++
taskID = &mesos.TaskID {
    Value: proto.String("miner-server-" + 
                        strconv.Itoa(s.tasksLaunched)),
}

Идентификаторы задач определяются структурой и должны быть уникальными для каждой структуры.

containerType := mesos.ContainerInfo_DOCKER
task = &mesos.TaskInfo {
    Name: proto.String("task-" + taskID.GetValue()),
    TaskId: taskID,
    SlaveId: offer.SlaveId,
    Container: &mesos.ContainerInfo {
        Type: &containerType,
        Docker: &mesos.ContainerInfo_DockerInfo {
            Image: proto.String(MinerServerDockerImage),
        },
    },
    Command: &mesos.CommandInfo {
        Shell: proto.Bool(false),
        Arguments: []string {
            // these arguments will be passed to run_p2pool.py
            "--bitcoind-address", s.bitcoindAddr,
            "--p2pool-port", strconv.Itoa(int(p2poolPort)),
            "-w", strconv.Itoa(int(workerPort)),
            s.rpcUser, s.rpcPass,
        },
    },
    Resources: []*mesos.Resource {
        util.NewScalarResource("cpus", CPUPerServerTask),
        util.NewScalarResource("mem", MemPerServerTask),
    },
}

The TaskInfo message specifies a few important pieces of metadata about the task that allow the Mesos node to launch the Docker container. In particular, we specify a name, the task ID, container information, and arguments to be passed to the container. We also specify the resources required by the task.

Now that we have constructed our TaskInfo, we can launch our task using:

driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, &mesos.Filters{RefuseSeconds: proto.Float64(1)})

And now we’re launching tasks! The last thing we need to handle in our framework is what happens when the miner server shuts down. We can do this with the StatusUpdate function.

There are a few types of status updates, corresponding to the different stages in a task’s lifetime.  For our framework, we want to make sure that if the miner server fails for any reason, we kill all the miner workers to avoid wasting resources.  Here is the relevant code:

if strings.Contains(status.GetTaskId().GetValue(), "server") &&
    (status.GetState() == mesos.TaskState_TASK_LOST ||
        status.GetState() == mesos.TaskState_TASK_KILLED ||
        status.GetState() == mesos.TaskState_TASK_FINISHED ||
        status.GetState() == mesos.TaskState_TASK_ERROR ||
        status.GetState() == mesos.TaskState_TASK_FAILED) {

    s.minerServerRunning = false

    // kill all tasks
    for _, taskID := range s.currentDaemonTaskIDs {
        _, err := driver.KillTask(taskID)
        if err != nil {
            log.Errorf("Failed to kill task %s", taskID)
        }
    }
    s.currentDaemonTaskIDs = make([]*mesos.TaskID, 0)
}

And that’s it! We have a working distributed bitcoin mining framework on Apache Mesos in (roughly) 300 lines of Go. This demonstrates how quick and straightforward writing distributed systems can be using the Mesos framework API. We encourage you to try writing your own framework. If you’re looking for inspiration, check out RENDLER, a distributed web-crawler, and ANAGRAMMER, an anagram finder.

Mesosphere engineers who contributed to this report include Cody Roseborough, Lily Chen, Neeral Dodhia, Derek Chiang, Luke Leslie, and Brendan Chang.