Статьи

Groovy Goodness: обмен данными в параллельной среде с переменными потока данных

Работать с данными в параллельной среде может быть сложно. Groovy включает GPars, да, нам не нужно загружать какие-либо зависимости, чтобы предоставить некоторые модели для простой работы с данными в параллельной среде. В этом посте мы рассмотрим пример, в котором мы используем переменные потока данных для обмена данными между параллельными задачами. В алгоритме потока данных мы определяем определенные функции или задачи, которые имеют вход и выход. Задача запускается, когда для нее доступны входные данные. Таким образом, вместо определения обязательной последовательности задач, которые должны быть выполнены, мы определяем серию задач, которые начнут выполняться, когда будет доступен их ввод. И хорошо то, что каждая из этих задач является независимой и может выполняться параллельно при необходимости.

Данные, которые совместно используются задачами, хранятся в переменных потока данных. Значение переменной потока данных может быть установлено только один раз, но оно может быть прочитано несколько раз. Когда задача хочет прочитать значение, но оно еще не доступно, задача будет ожидать значения неблокирующим способом.

В следующем примере скрипта Groovy мы используем класс Dataflows . Этот класс предоставляет простой способ установить несколько переменных потока данных и получить их значения. В сценарии мы хотим получить температуру в городе как в градусах Цельсия, так и в градусах Фаренгейта, и мы используем удаленные веб-службы для получения данных:

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import groovyx.gpars.dataflow.Dataflows
import static groovyx.gpars.dataflow.Dataflow.task
 
// Create new Dataflows instance to hold
// dataflow variables.
final Dataflows data = new Dataflows()
 
// Convert temperature from Celcius to Fahrenheit.
task {
    println "Task 'convertTemperature' is waiting for dataflow variable 'cityWeather'"
 
    // Get dataflow variable cityWeather value from
    // Dataflows data object. The value
    // is set by findCityWeather task.
    // If the value is not set yet, wait.
    final cityWeather = data.cityWeather
    final cityTemperature = cityWeather.temperature
 
    println "Task 'convertTemperature' got dataflow variable 'cityWeather'"
 
    // Convert value with webservice at
    // www.webservicex.net.
    final params =
        [Temperature: cityTemperature,
         FromUnit: 'degreeCelsius',
         ToUnit: 'degreeFahrenheit']
    final result = downloadData(url, params)
 
    // Assign converted value to dataflow variable
    // temperature in Dataflows data object.
    data.temperature = result.text()
}
 
// Find temperature for city.
task {
    println "Task 'findCityWeather' is waiting for dataflow variable 'searchCity'"
 
    // Get value for city attribute in
    // Dataflows data object. This is
    // set in another task (startSearch)
    // at another time.
    // If the value is not set yet, wait.
    final city = data.searchCity
 
    println "Task 'findCityWeather' got dataflow variable 'searchCity'"
 
    // Get temperature for city with
    // webservice at api.openweathermap.org.
    final params =
        [q: city,
         units: 'metric',
         mode: 'xml']
    final result = downloadData(url, params)
    final temperature = result.list.item.temperature.@value
 
    // Assign map value to cityWeather dataflow
    // variable in Dataflows data object.
    data.cityWeather = [city: city, temperature: temperature]
}
 
// Get city part from search string.
task {
    println "Task 'parseCity' is waiting for dataflow variable 'searchCity'"
 
    // Get value for city attribute in
    // Dataflows data object. This is
    // set in another task (startSearch)
    // at another time.
    // If the value is not set yet, wait.
    final city = data.searchCity
     
    println "Task 'parseCity' got dataflow variable 'searchCity'"
 
    final cityName = city.split(',').first()
 
    // Assign to dataflow variable in Dataflows object.
    data.cityName = cityName
}
 
final startSearch = task {
    // Use command line argument to set
    // city dataflow variable in
    // Dataflows data object.
    // Any code that reads this value
    // was waiting, but will start now,
    // because of this assigment.
    data.searchCity = args[0
}
 
// When a variable is bound we log it.
final printValueBound = { dataflowVar, value ->
    println "Variable '$dataflowVar' bound to '$value'"
}
data.searchCity printValueBound.curry('searchCity')
data.cityName printValueBound.curry('cityName')
data.cityWeather printValueBound.curry('cityWeather')
data.temperature printValueBound.curry('temperature')
 
 
// Here we read the dataflow variables cityWeather and temperature
// from Dataflows data object. Notice once the value is
// is set it is not calculated again. For example cityWeather
// will not do a remote call again, because the value is already known
// by now.
println "Main thread is waiting for dataflow variables 'cityWeather', 'temperature' and 'cityName'"
final cityInfo =
    data.cityWeather + [tempFahrenheit: data.temperature] + [cityName: data.cityName]
 
 
println """\
 
Temperature in city $cityInfo.cityName (searched with $cityInfo.city):
$cityInfo.temperature Celcius
$cityInfo.tempFahrenheit Fahrenheit
"""
 
 
// Helper method to get XML response from URL
// and parse it using XmlSlurper. Returns GPathResult.
def downloadData(requestUrl, requestParams) {
    final params = requestParams.collect { it }.join('&')
    final url = "${requestUrl}?${params}"
 
    final response = new XmlSlurper().parseText(url.toURL().text)
    response
}

Теперь, когда мы запускаем скрипт, мы получаем следующий вывод:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
$ groovy citytemp.groovy Tilburg,NL
Task 'convertTemperature' is waiting for dataflow variable 'cityWeather'
Task 'parseCity' is waiting for dataflow variable 'searchCity'
Task 'findCityWeather' is waiting for dataflow variable 'searchCity'
Task 'findCityWeather' got dataflow variable 'searchCity'
Task 'parseCity' got dataflow variable 'searchCity'
Main thread is waiting for dataflow variables 'cityWeather', 'temperature' and 'cityName'
Variable 'searchCity' bound to 'Tilburg,NL'
Variable 'cityName' bound to 'Tilburg'
Task 'convertTemperature' got dataflow variable 'cityWeather'
Variable 'cityWeather' bound to '[city:Tilburg,NL, temperature:11.76]'
Variable 'temperature' bound to '53.167999999999985'
 
Temperature in city Tilburg (searched with Tilburg,NL):
11.76 Celcius
53.167999999999985 Fahrenheit

Обратите внимание, как задачи ожидают значений и продолжают, когда они получают свои данные. Порядок определения задач не важен, потому что они будут ждать своего ввода, чтобы начать реальную работу.

  • Написано с Groovy 2.4.3.
Ссылка: Groovy Good: делитесь данными в параллельной среде с переменными потока данных от нашего партнера JCG Хьюберта Иккинка в блоге JDriven .