Статьи

Слюни: подробное описание внутренних очисток кода для fireAllRules, fireUntilHalt и Timers

В июне мы написали в блоге о новом внутреннем автомате управления взаимодействием потоков User, Timer и Engine. Теперь мы сделали еще одну большую внутреннюю очистку этого кода, чтобы было легче читать и понимать.

Как упоминалось ранее, все действия (вставка, обновление, удаление и т. Д.) Теперь помещаются в потокобезопасную очередь распространения. Пользовательский поток при выполнении этих действий больше не касается движка, даже альфа-сети. Это дает улучшенную безопасность потока. Вместо этого, когда двигатель запускается, он сначала сливает и оценивает эту очередь, что может привести к оценке альфа-сети, прежде чем выполнять оценку правил и запускать.

Помимо разделения потоков между пользователем и механизмом, другой целью конечного автомата была координация потока таймера. Когда таймер запускается, двигатель может быть НЕАКТИВЕН или он работает. Если механизм активен, таймер должен просто отправить запись в очередь распространения и позволить текущему исполняющему потоку обработать задание. Если механизм не активен и правило таймера асинхронно, поток таймера должен позаботиться об оценке и запуске с помощью метода executeTask. Конечный автомат предназначен для минимизации синхронизации и блокировок, чтобы минимизировать количество конфликтов.

У двигателя теперь есть 5 возможных состояний, в которых он может находиться. INACTIVE — это начальное состояние.

8dDvaLZ

Оценка двигателя и запуск правил имеют три потенциальные точки входа fireAllRules, fireUntilHalt и правила асинхронного таймера — последнее выполняется через часть executeTask. Мы объединили fireAllRules и fireUntilHalt в один метод fireLoop, который использует класс стратегии, переданный в качестве аргумента, для обработки потенциального состояния покоя цикла. Движок считается находящимся в состоянии покоя, когда нет правил запуска, когда нет больше группы для оценки и когда очередь пуста.

Затем все правила FireAllRules установят двигатель в НЕАКТИВНОЕ, и цикл завершится. FireUntilHalt заставит текущий поток ждать, пока в очередь на обработку не поступит больше работы. Здесь была проделана работа, чтобы убедиться, что во время этих переходов между состояниями не было пробелов и потерь при выполнении.

Когда поток хочет перейти к FIRE_ALL_RULES или FIRE_UNTIL_HALT или EXECUTE_TASK, он должен пройти через waitAndEnterExecutionState. Если двигатель НЕАКТИВЕН, он может сразу перейти, а если нет, перейдет в состояние ожидания, пока текущий исполняющий поток не завершит работу и не вернет двигатель обратно в НЕАКТИВНО:

01
02
03
04
05
06
07
08
09
10
private void waitAndEnterExecutionState( ExecutionState newState ) {
    if (currentState != ExecutionState.INACTIVE) {
        try {
            stateMachineLock.wait();
        } catch (InterruptedException e) {
            throw new RuntimeException( e );
        }
    }
    setCurrentState( newState );
}

Давайте посмотрим, как это использует fireAllRules (). Во-первых, обратите внимание, что если двигатель уже работает, так как fireAllRules или fireUntilHalt были ранее вызваны и продолжают работать, он просто выйдет. Второе замечание: он удерживает точку синхронизации достаточно долго, чтобы либо выйти, либо сделать желаемый переход. Как только механизм находится в состоянии FIRE_ALL_RULES, он может освободить блок синхронизации, и конечный автомат прекратит что-либо мешающее ему.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
public int fireAllRules(AgendaFilter agendaFilter,
                        int fireLimit) {
    synchronized (stateMachineLock) {
        if (currentState.isFiring()) {
            return 0;
        }
        waitAndEnterExecutionState( ExecutionState.FIRING_ALL_RULES );
    }
 
 
   int fireCount = fireLoop(agendaFilter, fireLimit, RestHandler.FIRE_ALL_RULES);
 
   return fireCount;
}

FireLoop теперь является универсальным и используется как fireAllRules, так и fireUntilHalt, с использованием стратегии RestHandler для обработки логики, когда движок приходит в точку покоя.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private int fireLoop(AgendaFilter agendaFilter,
                     int fireLimit,
                     RestHandler restHandler) {
 
        // The engine comes to potential rest (inside the loop) when there are no propagations and no rule firings.        // It's potentially at rest, because we cannot guarantee it is at rest.        // This is because external async actions (timer rules) can populate the queue that must be executed immediately.        // A final takeAll within the sync point determines if it can safely come to rest.        // if takeAll returns null, the engine is now safely at rest. If it returns something        // the engine is not at rest and the loop continues.        //        // When FireUntilHalt comes to a safe rest, the thread is put into a wait state,        // when the queue is populated the thread is notified and the loop begins again.        //        // When FireAllRules comes to a safe rest it will put the engine into an INACTIVE state        // and the loop can exit.        //        // When a halt() command is added to the propagation queue and that queue is flushed        // the engine is put into a HALTING state. At this point isFiring returns false and        // no more rules can fire and the loop exits.
 
 
 
 
int fireCount = 0;
    try {
        PropagationEntry head = workingMemory.takeAllPropagations();
        int returnedFireCount = 0;
 
        boolean limitReached = fireLimit == 0; // -1 or > 0 will return false. No reason for user to give 0, just handled for completeness.
        boolean loop = true;
 
        while ( isFiring()  )  {
            if ( head != null ) {
                // it is possible that there are no action propagations, but there are rules to fire.                this.workingMemory.flushPropagations(head);
                head = null;
            }
 
            // a halt may have occurred during the flushPropagations,            // which changes the isFiring state. So a second isFiring guard is needed            if (!isFiring()) {
                break;
            }
 
            evaluateEagerList();
            InternalAgendaGroup group = getNextFocus();
            if ( group != null && !limitReached ) {
                // only fire rules while the limit has not reached.
                returnedFireCount = fireNextItem( agendaFilter, fireCount, fireLimit, group );
                fireCount += returnedFireCount;
 
                limitReached = ( fireLimit > 0 && fireCount >= fireLimit );
                head = workingMemory.takeAllPropagations();
            } else {
                returnedFireCount = 0; // no rules fired this iteration, so we know this is 0                group = null; // set the group to null in case the fire limit has been reached            }
 
            if ( returnedFireCount == 0 && head == null && ( group == null || !group.isAutoDeactivate() ) ) {
                // if true, the engine is now considered potentially at rest                head = restHandler.handleRest( workingMemory, this );
            }
        }
 
        if ( this.focusStack.size() == 1 && getMainAgendaGroup().isEmpty() ) {
            // the root MAIN agenda group is empty, reset active to false, so it can receive more activations.            getMainAgendaGroup().setActive( false );
        }
    } finally {
        // makes sure the engine is inactive, if an exception is thrown.        // if it safely returns, then the engine should already be inactive
 
        // it also notifies the state machine, so that another thread can take over        immediateHalt();
    }
    return fireCount;
}

Цикл запуска проходит через единственную точку синхронизации, когда выполняет takeAll (), что является простой операцией для возврата текущего экземпляра заголовка, а также обнуляет поле заголовка элемента, так что очередь пуста. Во время этого takeAll () это означает, что любые операции пользователя или таймера будут ожидать освобождения синхронизации, прежде чем они смогут добавить в очередь. После этого остальная часть метода, оценивающая возвращенный список элементов и оценивающая сеть и правила запуска, может происходить без необходимости повторной синхронизации или блокировки.

Остальные обработчики — это две очень простые части кода:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
interface RestHandler {
    RestHandler FIRE_ALL_RULES = new FireAllRulesRestHandler();
    RestHandler FIRE_UNTIL_HALT = new FireUntilHaltRestHandler();
 
    PropagationEntry handleRest(InternalWorkingMemory wm, DefaultAgenda agenda);
 
    class FireAllRulesRestHandler implements RestHandler {
        @Override        public PropagationEntry handleRest(InternalWorkingMemory wm, DefaultAgenda agenda) {
            synchronized (agenda.stateMachineLock) {
                PropagationEntry head = wm.takeAllPropagations();
                if (head == null) {
                    agenda.halt();
                }
                return head;
            }
        }
    }
 
    class FireUntilHaltRestHandler  implements RestHandler {
        @Override        public PropagationEntry handleRest(InternalWorkingMemory wm, DefaultAgenda agenda) {
            return wm.handleRestOnFireUntilHalt( agenda.currentState );
        }
    }
}
 
 
@Override
 
public PropagationEntry handleRestOnFireUntilHalt(DefaultAgenda.ExecutionState currentState) {
    // this must use the same sync target as takeAllPropagations, to ensure this entire block is atomic, up to the point of wait    synchronized (propagationList) {
        PropagationEntry head = takeAllPropagations();
 
        // if halt() has called, the thread should not be put into a wait state        // instead this is just a safe way to make sure the queue is flushed before exiting the loop        if (head == null && currentState == DefaultAgenda.ExecutionState.FIRING_UNTIL_HALT) {
            propagationList.waitOnRest();
            head = takeAllPropagations();
        }
        return head;
    }
}

Обратите внимание, что FireAllRulesRestHandler должен получить stateMachineLock, пока он выполняет последний takeAll, прежде чем он сможет узнать, что он действительно безопасен для возврата. Это связано с таймерами, которые могут быть помещены в очередь, которые требуют немедленного срабатывания. Если двигатель должен был вернуться, таймер не сработал бы сразу — это то, что мы называем «пробелом» в поведении, которого теперь избегают.

FireUntilHalt получает блокировку в очереди распространения, потому что, помимо выполнения takeAll, он должен выполнять проверку нуля и операцию ожидания, все атомарно. Опять же, если нулевая проверка не была в точке синхронизации, мы бы получили еще один потенциальный пробел в поведении, которого теперь избегают.

Последняя часть головоломки — executeTask. Это позволяет выполнять асинхронные операции, обычно задачу таймера, оптимальным образом. Если движок уже запущен из-за FireAllRules или FireUntilHalt, просто отправьте задачу в очередь и дайте текущему запущенному потоку обработать ее. Если нет, то войдите в состояние EXECUTING_TASK и выполните его в текущем потоке.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
@Overridepublic void executeTask( ExecutableEntry executable ) {
    synchronized (stateMachineLock) {
        // state is never changed outside of a sync block, so this is safe.        if (isFiring()) {
            executable.enqueue();
            return;
        } else if (currentState != ExecutionState.EXECUTING_TASK) {
            waitAndEnterExecutionState( ExecutionState.EXECUTING_TASK );
        }
    }
 
    try {
        executable.execute();
    } finally {
        immediateHalt();
    }
}

Я должен добавить, что halt () теперь передается как команда и оценивается как часть стандартного стока очереди. При выполнении он превращает двигатель в HALTING внутри блока синхронизации. Это позволит внешнему циклу выйти:

1
2
3
4
5
6
7
public void halt() {
    synchronized (stateMachineLock) {
        if (currentState.isFiring()) {
            setCurrentState( ExecutionState.HALTING );
        }
    }
}

Таким образом, теперь у нас есть действительно надежный код для обработки взаимодействий между потоками пользователя, таймера и движка, который имеет понятное поведение. Мы приложили немало усилий, чтобы код и поведение могли быть понятны каждому.

Есть одна последняя часть двигателя, которая все еще будет считаться небезопасной. Здесь пользователь вызывает установщик вставленного факта в одном потоке, пока работает двигатель. Это может закончиться слезами. Мы планируем разрешить пользователям отправлять задачи в эту очередь, чтобы они могли выполняться с тем же потоком, что и работающий механизм. Это позволит пользователям отправлять обновления pojo из другого потока вне движка, в качестве задач, для безопасного выполнения.