Статьи

Java 5 + GPars: обработка действий регулирования

Сегодня в списке рассылки GPars возник интересный вопрос : как создать систему, которая генерирует события, как лучше всего уменьшить обработку событий до одного события в секунду? Я подумал об ответе … потом подумал еще немного … и наконец решил написать все это в этом посте. В примере используются Groovy и GPars, но он легко адаптируется к общему решению Java. Не позволяйте актерам пугать вас! (или отсутствие точек с запятой, в этом отношении).

Примером является классический » Спящий парикмахер»«проблема (я тоже об этом не слышал). В основном, там есть парикмахерская. Парикмахер спит. Клиенты периодически заходят в зал ожидания, и парикмахер просыпается, чтобы подстричь каждого из них. возвращается к своему дремоте. Это урок реакции: что-то спит, затем просыпается, чтобы выполнить какую-то работу, затем возвращается в сон.

Документы GPars предоставляют достойное решение на основе актерак этой проблеме: есть зал ожидания и парикмахерская, и оба являются актерами. Когда парикмахер свободен, клиент из зала ожидания перемещается в кресло парикмахера. Парикмахер и зал ожидания общаются посредством сообщений актера. Но что, если наш парикмахер — немного дива, и независимо от того, насколько занят магазин, он хочет делать одну стрижку каждые 15 минут и никогда больше (иначе, он может сгореть, видите ли). Это проблема регулирования: как вы убедитесь, что события обрабатываются (стрижка выполняется) не более x раз за данный период времени?

Мое решение: сохранить рабочую очередь и сделать так, чтобы запланированный исполнитель вытащил работу из очереди с заданным интервалом. Java 5 предоставляет вам все инструменты для этого, не прибегая к занятому ожиданию, опросу или написанию кода планирования. Занятия, о которых вы должны знать,ArrayBlockingQueue и ScheduledThreadPoolExecutor .

ArrayBlockingQueue — это очередь FIFO («первым пришел — первым обслужен»), которая поддерживает блокировку вместо ожидания «занято». Когда вы берете () элемент из ABQ, вызов блокируется до тех пор, пока элемент не станет доступным … нет запроса или ожидания, чтобы увидеть, готов ли элемент к доступности. Просто вызовите take (), и ваш код не будет работать, пока не будет найден элемент.

ScheduledThreadPoolExecutor поддерживает выполнение объектов Runnable и Callable с фиксированным интервалом. Если вы хотите выполнять одну и ту же задачу каждую 1 секунду, тогда STPE — это то, что вам нужно … Таймер для всех намерений и целей устарел.

Итак, вот парикмахер, который просто не выдержит переутомления … нам нужно все, чтобы найти клиентов, парикмахера и комнату ожидания:

class Customer {
String name
}

// waiting room can't hold more than 12!
def waitingRoom = new ArrayBlockingQueue(12)

def barber = Executors.newScheduledThreadPool(1)
barber.scheduleAtFixedRate({
println 'Barber: Next customer please!'
Customer customer = waitingRoom.take()
println "${customer.name} gets a haircut at ${new Date().format('H:m:s')}"
} as Runnable, 0, 15, TimeUnit.MINUTES)

Клиент — это простой боб; ничего интересного здесь. Зал ожидания — ArrayBlockingQueue, заполненный клиентами, которым нужна стрижка. И парикмахер является службой-исполнителем с запланированной задачей, чтобы подстричься. Количество потоков в запланированном пуле потоков равно 1, потому что есть только один цирюльник. Парикмахер забирает клиентов из зала ожидания и подстригает их раз в 15 минут. Вызов waitRoom.take () блокируется … если клиент готов, то он немедленно обслуживается, а если нет, то вызов блокируется, пока кто-то не станет доступен. Стоит заметить … waitRoom имеет размер 12 … если добавлен 13-й клиент, то вызывающий код будет либо блокироваться, пока не будет достаточно места, либо выдавать исключение. Существует API для любого случая.

Так как же клиенты попадают в зал ожидания? Вот где приходят актеры GPars. Парикмахерская — это «реактор» в терминологии GPars. Сообщения могут быть отправлены в парикмахерскую («Войти» в зал ожидания), и реактор добавляет клиента в зал ожидания. Метрдотель своего рода. Вот оно в действии:

def barberShop = new PooledActorGroup().reactor {message ->
switch (message) {
case Enter:
println "${message.customer.name} waits for a haircut..."
waitingRoom.add(message.customer)
break
}
}

class Enter {
Customer customer
}

barberShop << new Enter(customer: new Customer(name: 'Jerry'))
barberShop << new Enter(customer: new Customer(name: 'Phil'))
barberShop << new Enter(customer: new Customer(name: 'Bob'))
barberShop << new Enter(customer: new Customer(name: 'Ron'))

BarberShop — это PooledActorGroup, объект GPars, а «структура субъекта» просто означает добавление замыкания к методу реактора () этой группы. Закрытие или актер и отвечает на ввод сообщений, добавляя клиента в waitRoom. Внизу вы видите красивый синтаксис << для публикации событий в ActorGroup.

Так что у вас есть это. Есть много способов сделать это, но я думаю, что библиотеки Java 5 Concurrency — одни из лучших вариантов. Мне было бы интересно услышать и другие идеи. А теперь иди и дай этим хиппи прически!

С http://hamletdarcy.blogspot.com