Создание приложений MiNiFi AI IoT с новым Cloudera EFM
Трудно запустить приложение IoT, которое выполняет локальную классификацию глубокого обучения, бизнес-правила, преобразования, маршрутизацию, сжатие и эффективно отправляет эти данные в центральное облачное хранилище для обработки и хранения. С появлением современных инструментов потокового вещания с открытым исходным кодом от Cloudera все стало просто.
Edge Flow Manager позволяет мне развертывать код графического интерфейса drag-n-drop, который генерируется в веб-интерфейсе EFM, на тысячах удаленных устройств, на которых работают агенты MiNiFi. В моем примере у меня установлен агент MiNiFi Java на Raspberry Pi с датчиками Coral и Google Coral TPU. Я могу развернуть модели через EFM, а также свою логику.
Как только приложение IoT развернуто, оно запустит его на логике и классификации устройства и безопасно отправит результаты по HTTP (s) или по TCP / IP для Kafka / MQTT. Получив данные в своих центральных кластерах NiFi, я могу делать такие вещи, как преобразование типов данных, запрашивать оперативные данные и постоянно хранить их в быстром хранилище данных, таком как Apache Kudu / Impala, Apache HBase или Apache Hive.
Вам также могут понравиться: Искусственный интеллект и IoT: почему они выигрышная комбинация
С помощью EFM очень легко создать приложение EdgeAI с функцией перетаскивания, а затем отправить его всем агентам MiNiFi.
Получение NiFi Flow от Java-агента MiNiFi
В кластере в моем кластере CDP-DC я использую сообщения Kafka, отправленные с моего удаленного шлюза NiFi, для публикации предупреждений в Kafka и отправки записей в Apache HBase и Apache Kudu . Мы фильтруем наши данные с помощью потокового SQL.
Мы можем использовать SQL для маршрутизации, создания агрегатов, таких как средние значения, выбора подмножества полей и ограничения возвращаемых данных. Используя возможности Apache Calcite, потоковая передача SQL в NiFi меняет правила игры против типов записей, включая CSV, XML, Avro, Parquet, JSON и Grokable. Читайте и пишите различные форматы и конвертируйте, когда ваш SQL готов. Или просто выбрать * FROW FLOWFILE, чтобы получить все.
Мы можем увидеть этот поток из Атласа, когда проследим происхождение данных и происхождение из темы Кафки .
Мы можем искать в Атласе темы Кафки.
От темы кораллового кафки до нифи к куду.
Подробности по теме Коралловый Кафка
Изучение данных Hive Metastore в таблице Coral Kudu
Детали потока NiFi в Атласе
Подробности в теме оповещений
Статистика из Атласа
См .: https://www.datainmotion.dev/2020/02/connecting-apache-nifi-to-apache-atlas.html.
Пример изображения с веб-камеры
Пример записи JSON
[{ «Cputemp»: 59, «идентификатор»: «20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9», «температура»: «39.44», «память»: 91,1, «score_1»: «0,29», «время_запуска»: «21.02.2020 14:07:13», «label_1»: «лак для волос», «tempf»: «102.34», «discusage»: «50373,5 МБ», «message»: «Success», «ambient_light» : «329,92», «хозяин»: «coralenv», «CPU»: 34,1, «MacAddress»: «b8: 27: ЭБ: 99: 64: 6b», «давление»: «102,76», «score_2»:» 0,14 «,» ip «:» 127.0.1.1 «,» te «:» 5.10 «,» systemtime «:» 21.02.2020 14:07:18 «,» label_2 «:» шприц «,» влажность «: «10.21» }]
Запрос результатов Kudu в Hue
Оповещения об ослаблении от NiFi
Я работаю на Apache NiFi 1.11.3 и хотел бы отметить новую функцию. Поток загрузки: загрузка выделенного потока / группы в виде JSON.
Глядя на счетчики NiFi для мониторинга прогресса:
Мы видим, как легко получать данные датчиков IoT и запускать алгоритмы искусственного интеллекта на Coral TPU.
Shell (coralrun.sh)
Оболочка
1
2
3
DATE=$(date +"%Y-%m-%d_%H%M%S")
4
5
fswebcam -q -r 1280x720 /opt/demo/images/$DATE.jpg
6
7
python3 -W ignore /opt/demo/test.py --image /opt/demo/images/$DATE.jpg 2>/dev/null
Куду Таблица DDL
Python 3 (test.py)
питон
xxxxxxxxxx
1
import time
2
import sys
3
import subprocess
4
import os
5
import base64
6
import uuid
7
import datetime
8
import traceback
9
import base64
10
import json
11
from time import gmtime, strftime
12
import math
13
import random, string
14
import time
15
import psutil
16
from getmac import get_mac_address
17
from coral.enviro.board import EnviroBoard
18
from luma.core.render import canvas
19
from PIL import Image, ImageDraw, ImageFont
20
import os
21
import argparse
22
from edgetpu.classification.engine import ClassificationEngine
23
24
# Importing socket library
25
import socket
26
27
start = time.time()
28
starttf = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
29
30
def ReadLabelFile(file_path):
31
with open(file_path, 'r') as f:
32
lines = f.readlines()
33
ret = {}
34
for line in lines:
35
pair = line.strip().split(maxsplit=1)
36
ret[int(pair[0])] = pair[1].strip()
37
return ret
38
39
# Google Example Code
40
def update_display(display, msg):
41
with canvas(display) as draw:
42
draw.text((0, 0), msg, fill='white')
43
44
def getCPUtemperature():
45
res = os.popen('vcgencmd measure_temp').readline()
46
return(res.replace("temp=","").replace("'C\n",""))
47
48
49
# Get MAC address of a local interfaces
50
def psutil_iface(iface):
51
# type: (str) -> Optional[str]
52
import psutil
53
nics = psutil.net_if_addrs()
54
if iface in nics:
55
nic = nics[iface]
56
for i in nic:
57
if i.family == psutil.AF_LINK:
58
return i.address
59
60
# /opt/demo/examples-camera/all_models
61
row = { }
62
63
try:
64
65
#i = 1
66
#while i == 1:
67
parser = argparse.ArgumentParser()
68
parser.add_argument('--image', help='File path of the image to be recognized.', required=True)
69
args = parser.parse_args()
70
71
# Prepare labels.
72
labels = ReadLabelFile('/opt/demo/examples-camera/all_models/imagenet_labels.txt')
73
74
# Initialize engine.
75
engine = ClassificationEngine('/opt/demo/examples-camera/all_models/inception_v4_299_quant_edgetpu.tflite')
76
77
# Run inference.
78
img = Image.open(args.image)
79
80
scores = {}
81
kCount = 1
82
83
# Iterate Inference Results
84
for result in engine.ClassifyWithImage(img, top_k=5):
85
scores['label_' + str(kCount)] = labels[result[0]]
86
scores['score_' + str(kCount)] = "{:.2f}".format(result[1])
87
kCount = kCount + 1
88
89
enviro = EnviroBoard()
90
host_name = socket.gethostname()
91
host_ip = socket.gethostbyname(host_name)
92
cpuTemp=int(float(getCPUtemperature()))
93
uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
94
usage = psutil.disk_usage("/")
95
end = time.time()
96
row.update(scores)
97
row['host'] = os.uname()[1]
98
row['ip'] = host_ip
99
row['macaddress'] = psutil_iface('wlan0')
100
row['cputemp'] = round(cpuTemp,2)
101
row['te'] = "{0:.2f}".format((end-start))
102
row['starttime'] = starttf
103
row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
104
row['cpu'] = psutil.cpu_percent(interval=1)
105
row['diskusage'] = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
106
row['memory'] = psutil.virtual_memory().percent
107
row['id'] = str(uuid2)
108
row['message'] = "Success"
109
row['temperature'] = '{0:.2f}'.format(enviro.temperature)
110
row['humidity'] = '{0:.2f}'.format(enviro.humidity)
111
row['tempf'] = '{0:.2f}'.format((enviro.temperature * 1.8) + 32)
112
row['ambient_light'] = '{0}'.format(enviro.ambient_light)
113
row['pressure'] = '{0:.2f}'.format(enviro.pressure)
114
msg = 'Temp: {0}'.format(row['temperature'])
115
msg += 'IP: {0}'.format(row['ip'])
116
update_display(enviro.display, msg)
117
# i = 2
118
119
except:
120
row['message'] = "Error"
121
122
print(json.dumps(row))
Исходный код ( GitHub )
Датчики / Устройства / Оборудование:
- Датчик влажности Humdity-HDC2010
- Light-OPT3002 датчик внешней освещенности
- Барометрический датчик барометрического давления BMP280
- PS3 Eye Camera и микрофон USB
- Raspberry Pi 3B +
- Google Coral Датчик окружающей среды
- Google Coral USB-ускоритель ТПУ
Ссылки:
- https://coral.ai/docs/enviro-board/get-started/
- https://coral.ai/products/accelerator/
- https://coral.ai/docs/enviro-board/datasheet/
- https://github.com/tspannhw/nifi-minifi-coral-env
- https://github.com/tspannhw/nifi-minifi-coral
- https://www.datainmotion.dev/2019/08/google-coral-tpu-with-edge-devices-and.html
- https://github.com/tspannhw/minifi-grove-sensors
- https://coral.ai/docs/enviro-board/get-started/
- https://coral.ai/products/accelerator/
- https://github.com/google/mediapipe/tree/master/mediapipe/examples/coral
- https://github.com/tensorflow/examples/tree/master/lite/examples/image_classification/raspberry_pi
- https://github.com/google-coral/examples-camera
- https://github.com/google-coral/project-keyword-spotter
- https://github.com/google/mediapipe/tree/master/mediapipe/examples/coral