Хакер -Зоркий глаз. Собираем и программируем на Python устройство для видеонаблюдения

Хакер -Зоркий глаз. Собираем и программируем на Python устройство для видеонаблюдения

hacker_frei

https://t.me/hacker_frei

Виктор Паперно

Содержание статьи

  • Настройка инфраструктуры
  • ESP32 Cam
  • Telegram-бот

Для мно­гих раз­работ­чиков Python — это язык, на котором были написа­ны их пер­вые прог­раммы: «Hello, world!» или каль­кулятор. Сей­час начина­ющие прог­раммис­ты во вре­мя обу­чения час­то пишут телег­рам‑ботов, бла­го на Python это сде­лать отно­ситель­но нес­ложно. Давай соз­дадим бота, но не прос­того, а вза­имо­дей­ству­юще­го с нашим собс­твен­ным устрой­ством для умно­го дома, которое мы самос­тоятель­но соберем и зап­рограм­миру­ем.

На мой взгляд, основное дос­тоинс­тво Python — уни­вер­саль­ность: с его помощью мож­но решить прак­тичес­ки любую задачу бла­года­ря огромно­му количес­тву сбо­рок. Это прив­носящий магию C++ Cython, бра­узер­ный Brython, поз­воля­ющий работать с Java Jython и мно­жес­тво биб­лиотек на все слу­чаи жиз­ни.

Но до срав­нитель­но недав­него вре­мени, если Python-прог­раммист хотел сде­лать девайс для умно­го дома или машин­ку на ради­оуп­равле­нии, он был вынуж­ден либо исполь­зовать Arduino и волей‑неволей учить C++, либо доволь­ство­вать­ся мик­рокомпь­юте­ром вро­де Raspberry Pi и искать (а порой и писать свои) биб­лиоте­ки для управле­ния теми или ины­ми китай­ски­ми модуля­ми. Иног­да вооб­ще при­ходи­лось соз­давать хто­ничес­ких чудовищ, под­клю­чая про­водом Arduino к Raspberry и опра­шивая дат­чики через serial port, но все рав­но при этом нуж­но было писать про­шив­ку для Arduino или дру­гого кон­трол­лера на C++.

Но уже при­мер­но пять лет, если судить по дате моей статьи про MicroPython, мож­но занимать­ся DIY-про­екта­ми, не перек­люча­ясь с одно­го язы­ка прог­рамми­рова­ния на дру­гой. За это вре­мя мно­гое в мире MicroPython поменя­лось, выш­ли новые сбор­ки, появи­лись одни биб­лиоте­ки и пол­ностью перес­тали под­держи­вать­ся дру­гие, а Arduino офи­циаль­но при­няла MicroPython в свою эко­сис­тему.

С каж­дым годом чис­ло под­держи­вающих MicroPython плат рас­тет, но зна­читель­ный их про­цент все еще базиру­ется на ESP32. И дей­стви­тель­но, у это­го кон­трол­лера мас­са дос­тоинств, таких как встро­енные Wi-Fi и BLE, а еще, если говорить чес­тно, отсутс­твие кон­курен­тов в том же ценовом диапа­зоне. Но вот что каса­ется плат, базиру­ющих­ся на этом кон­трол­лере, — тут гла­за раз­бега­ются. Это и базовые вер­сии от WeMos, целые наборы и модули от M5Stack и LILYGO. Не отста­ют, кста­ти, и рос­сий­ские инже­неры: магазин iArduino не так дав­но выпус­тил свою отла­доч­ную пла­ту Piranha.

К тому же нель­зя забывать про Великий Китай­ский Ноунейм, которым завален «Али­экс­пресс». Кста­ти, в нашем про­екте мы будем исполь­зовать одну из таких плат — ESP32 Cam (я так и не смог разоб­рать­ся, кто же ее офи­циаль­ный про­изво­дитель). Как нес­ложно догадать­ся по наз­ванию, это модуль на осно­ве ESP32 с под­клю­чен­ной камерой, чаще все­го это OV2640, но если ты хочешь добить­ся луч­шего качес­тва, то мож­но под­клю­чить и дру­гие.

Pinout Map

Вот спи­сок необ­ходимо­го ПО:

  • твоя любимая IDE для раз­работ­ки — я исполь­зую PyCharm;
  • ESP32 MPY-Jama — кросс‑плат­формен­ная IDE для MicroPython;
  • Telegram.

Итак, что же мы будем делать? Как ты уже, навер­ное, догадал­ся, судя по тому, что я выб­рал пла­ту с камерой, — сис­тему виде­онаб­людения. Но не прос­тую, с пос­тоян­ной тран­сля­цией, а с воз­можностью мгно­вен­но получить фотог­рафию. Управлять ею мы будем через Telegram-бота, а что­бы не приш­лось покупать ста­тичес­кий IP-адрес, для обще­ния бота и девай­са исполь­зуем про­токол MQTT.

INFO

MQTT (Message Queuing Telemetry Transport) — это лег­ковес­ный про­токол переда­чи сооб­щений, который исполь­зует­ся для обме­на дан­ными меж­ду устрой­ства­ми в интерне­те вещей (IoT). Он приз­ван обес­печивать надеж­ную связь меж­ду мно­жес­твом устрой­ств, которые работа­ют с огра­ничен­ными ресур­сами и могут быть под­клю­чены к интерне­ту толь­ко вре­мен­но или с пери­оди­чес­кими задер­жка­ми.

MQTT работа­ет по прин­ципу «изда­тель — под­писчик», где одни устрой­ства могут пуб­ликовать сооб­щения в топики (topics), а дру­гие под­писывать­ся на них, что­бы получать сооб­щения, которые им нуж­ны. Про­токол пре­дос­тавля­ет гиб­кий и прос­той спо­соб переда­чи сооб­щений, что дела­ет его иде­аль­ным для исполь­зования в сис­темах IoT, где устрой­ства могут иметь раз­ные фун­кци­ональ­ные воз­можнос­ти и огра­ниче­ния.

НАСТРОЙКА ИНФРАСТРУКТУРЫ

Пе­ред тем как непос­редс­твен­но перей­ти к прог­рамми­рова­нию, необ­ходимо нас­тро­ить так называ­емый MQTT-бро­кер. Это сер­вер, на котором рас­полага­ются топики и к которо­му под­клю­чают­ся под­писчи­ки и изда­тели.

Ра­зуме­ется, мож­но нас­тро­ить его самос­тоятель­но, нап­ример исполь­зуя Mosquitto. Но сто­ит толь­ко подумать, что, кро­ме самого бро­кера, при­дет­ся нас­тра­ивать сер­вер, бес­поко­ить­ся о его безопас­ности и так далее, как желание занимать­ся про­ектом куда‑то про­пада­ет.

Так что в качес­тве бро­кера мы будем исполь­зовать сер­вис shiftr.io. Его бес­плат­ного тарифа нам хва­тит с головой. Сре­ди дос­тоинств это­го сер­виса — инту­итив­но понят­ный интерфейс, под­робная докумен­тация и визу­али­зация всей сис­темы. На глав­ной стра­нице тво­его про­екта мож­но уви­деть, какие топики соз­даны и какое в них было пос­леднее сооб­щение, какие устрой­ства под­клю­чены, а ког­да нач­нутся рас­сылки пакетов — их тоже будет вид­но!

При­мер, как может все отоб­ражать­ся

Что­бы наши устрой­ства — телег­рам‑бот и ESP32 Cam — мог­ли вза­имо­дей­ство­вать с нашим MQTT-бро­кером, необ­ходимо выпус­тить токен. Это мож­но сде­лать в нас­трой­ках тво­его прос­транс­тва.

Пос­ле это­го в спис­ке токенов появит­ся такой эле­мент: mqtt://xakep:utBmi6LTWS7b4l5@xakep.cloud.shiftr.io, где xakep — это логин, а utBmi6LTWS7b4l5 — пароль. Запом­ни их, они нам еще при­годят­ся. Инфраструк­тура нас­тро­ена, мож­но прис­тупать к прог­рамми­рова­нию.

ESP32 CAM

Для прог­рамми­рова­ния ESP32 на MicroPython сущес­тву­ет нес­коль­ко IDE. Я оста­новил свой выбор на ESP32 MPY-Jama — опен­сор­сном решении со мно­жес­твом удоб­ных фун­кций.

Од­на из них — гра­фичес­кий интерфейс для ути­литы esptool. Которую, впро­чем, все рав­но нуж­но уста­новить через pip: pip install esptool.

Так выг­лядит заг­рузчик

Мы будем исполь­зовать спе­циаль­ную про­шив­ку для ESP32 Cam, она дос­тупна в этом GitHub-репози­тории. Ска­чива­ем файл про­шив­ки и, исполь­зуя IDE, про­шива­ем, не забыв перед этим отформа­тиро­вать Flash-память.

Те­перь мы можем под­клю­чить­ся и заг­рузить необ­ходимые фай­лы на нашу пла­ту. Для работы, кро­ме встро­енных биб­лиотек, нам понадо­бят­ся биб­лиоте­ки micropython-umqtt.robust и micropython-umqtt.simple. Их мож­но уста­новить с помощью встро­енно­го менед­жера пакетов upip. Для это­го под­клю­чись к интерне­ту через Wi-Fi, а затем выпол­ни сле­дующие коман­ды:

import network

import upip

# Не забудь вставить имя своей Wi-Fi-сети и пароль от нее

def wifi_setup():

global sta_if

sta_if.active(True)

sta_if.scan()

sta_if.connect('SSID', 'PASSWORD')

print("Waiting for Wifi connection")

while not sta_if.isconnected(): time.sleep(1)

print("Connected to WiFi")

wifi_setup()

upip.install("micropython-umqtt.robust")

upip.install("micropython-umqtt.simple")

Те­перь у нас все готово для работы. Что­бы облегчить задачу, я написал неболь­шую биб­лиотеч­ку — со все­ми необ­ходимы­ми фун­кци­ями. Давай пос­мотрим, что в нее вхо­дит.

Вна­чале импорти­руем необ­ходимые биб­лиоте­ки:

  • network — для под­клю­чения к интерне­ту;
  • umqtt.robust — для вза­имо­дей­ствия с MQTT-бро­кером;
  • camera - в этой биб­лиоте­ке находят­ся фун­кции для работы с камерой;
  • time — похож на одно­имен­ную биб­лиоте­ку из обыч­ного Python;
  • machine — это базовый модуль MicroPython, поз­воля­ющий управлять пинами.

# Импорт библиотек

import network

from umqtt.robust import MQTTClient

import camera

import time

import machine

# Инициализация глобальных переменных

sta_if = network.WLAN(network.STA_IF); # Wi-Fi-модуль

camera.init(0, format=camera.JPEG, fb_location=camera.PSRAM) # Непосредственно камера

mqtt_client = None # Заготовка для MQTT-клиента

led = machine.Pin(4, machine.Pin.OUT) # Встроенный светодиод

С одной из фун­кций биб­лиоте­ки мы поз­накоми­лись рань­ше, ког­да под­клю­чались к Wi-Fi, теперь рас­смот­рим осталь­ные.

Пер­вая называ­ется take_photo. В этой фун­кции выпол­няет­ся фотог­рафиро­вание и отправ­ка в фотог­рафии (которая сей­час выг­лядит как строч­ка бай­тов) в MQTT-топик 'image'. Что­бы очис­тить топик, мы отпра­вим строч­ку b'None' перед тем, как отсы­лать непос­редс­твен­но изоб­ражение.

def take_photo():

global mqtt_client

mqtt_client.publish('image', b'None', qos=0)

capture = camera.capture()

mqtt_client.publish('image', capture, qos=0)

Сле­дующая фун­кция — mqtt_callback, это обра­бот­чик сооб­щений, при­ходя­щих в топики. В том слу­чае, если в топик command при­ходит коман­да take_photo, мы вызыва­ем уже зна­комую нам фун­кцию, а если коман­да enable_led или disable_led — то, соот­ветс­твен­но, вклю­чаем или вык­люча­ем встро­енный све­тоди­од.

def mqtt_callback(topic, data_bytes):

global led

topic = topic.decode()

data = data_bytes.decode()

if topic == "command":

if data == "take_photo":

take_photo()

elif data == "enable_led":

led.value(1)

elif data == "disable_led":

led.value(0)

Идем даль­ше: mqtt_setup — в этой фун­кции про­изво­дит­ся под­клю­чение к MQTT-бро­керу с исполь­зовани­ем логина и пароля, которые мы нас­тро­или рань­ше, под­клю­чает­ся кол­бэк (обра­бот­чик) и запус­кает­ся бес­конеч­ный цикл ожи­дания команд от бро­кера.

def mqtt_setup():

global mqtt_client

mqtt_client = MQTTClient(

"umqtt_xakep_client",

server='xakep.cloud.shiftr.io',

port=1883,

user='xakep',

password='utBmi6LTWS7b4l5')

mqtt_client.set_callback(mqtt_callback)

mqtt_client.connect()

if mqtt_client:

mqtt_client.subscribe('command')

print("Connected to MQTT")

while True:

mqtt_client.check_msg()

Что­бы запус­тить наше устрой­ство, необ­ходимо импорти­ровать из биб­лиоте­ки фун­кции wifi_setup и mqtt_setup, а затем по оче­реди их выпол­нить. Это мож­но сде­лать пря­мо из тер­минала. Если все нас­тро­ено пра­виль­но, то в сво­ем инста­се shiftr.io ты уви­дишь под­клю­чен­ное устрой­ство с име­нем umqtt_xakep_client.

Убе­дим­ся, что все нас­тро­ено пра­виль­но, попытав­шись вклю­чить све­тоди­од. Для это­го нам нуж­но записать в топик с име­нем command строч­ку enable_led. Мож­но вос­поль­зовать­ся любым MQTT-кли­ентом: на телефо­не, на компь­юте­ре или в интерне­те. Я выб­рал мобиль­ное при­ложе­ние EasyMQTT, и в моем инста­се появи­лось новое устрой­ство. На экра­не мож­но отсле­дить переме­щение пакета (малень­кой чер­ной точ­ки).

Ес­ли у тебя все работа­ет, то све­тоди­од будет пос­лушно вклю­чать­ся и вык­лючать­ся по коман­де. Оста­лось нас­тро­ить авто­мати­чес­кое под­клю­чение к Wi-Fi и MQTT при вклю­чении девай­са. Для это­го необ­ходимо добавить вызовы наших фун­кций в файл boot.py, который выпол­няет­ся при запус­ке устрой­ства.

TELEGRAM-БОТ

Что дол­жен делать наш бот? При­нимать коман­ду от поль­зовате­ля — при­чем желатель­но в челове­чес­ком фор­мате — и отправ­лять ее на наш MQTT-бро­кер, а затем дожидать­ся фотог­рафии с устрой­ства и переда­вать ее поль­зовате­лю.

Прог­рамма будет сос­тоять из двух час­тей: непос­редс­твен­но телег­рам‑бота, реали­зующе­го вза­имо­дей­ствие с поль­зовате­лем, и «MQTT-кли­ента», который будет отправ­лять коман­ды на наш гад­жет и при­нимать фотог­рафии. Для начала напишем имен­но его с исполь­зовани­ем биб­лиоте­ки pahomqtt.

Вспом­ним, что нуж­но было сде­лать для нас­трой­ки «MQTT-кли­ента» на ESP:

  1. Нас­тро­ить под­клю­чение к сер­веру.
  2. Нас­тро­ить обра­бот­чик событий.

Здесь все то же самое, толь­ко нем­ного отли­чает­ся син­таксис. Для начала соз­дадим файл config.py, где будет хра­нить­ся вся кон­фигура­ция:

MQTT_HOST = "xakep.cloud.shiftr.io"

MQTT_PORT = 1883

MQTT_KEEPALIVE = 60

MQTT_LOGIN = "xakep"

MQTT_PASSWORD = "utBmi6LTWS7b4l5"

Мож­но было соз­дать отдель­ный токен, но мы вос­поль­зовались сущес­тву­ющим. Импорти­руем биб­лиоте­ки:

import paho.mqtt.client as mqtt

from config import MQTT_LOGIN, MQTT_KEEPALIVE, MQTT_PORT, MQTT_HOST, MQTT_PASSWORD

Соз­даем фун­кции — обра­бот­чики событий: под­клю­чения и нового сооб­щения.

def on_connect(client, userdata, flags, rc):

client.subscribe("image")

def on_message(client, userdata, msg):

if msg.topic == "image":

if msg.payload != b'None':

with open(f"images/photo.jpeg", "wb") as file:

file.write(msg.payload)

Пос­ле под­клю­чения наша прог­рамма под­писыва­ется на канал image и ждет сооб­щений. А ког­да дожида­ется, про­веря­ет, что это кор­рек­тное сооб­щение, и сох­раня­ет его как кар­тинку.

Те­перь оста­лось толь­ко под­клю­чить­ся к сер­веру и при­вязать наши фун­кции к соот­ветс­тву­ющим событи­ям:

client = mqtt.Client(client_id="TelegramBot", clean_session=True)

client.username_pw_set(MQTT_LOGIN, MQTT_PASSWORD)

client.on_connect = on_connect

client.on_message = on_message

client.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE)

Те­перь зай­мем­ся непос­редс­твен­но ботом. Для реали­зации логики ботов на Python сущес­тву­ет мно­жес­тво биб­лиотек. Нап­ример, в статье «Пи­тоном по телег­раму! Пишем пять прос­тых Telegram-ботов на Python» исполь­зовалась telebot, но я выб­рал асин­хрон­ную биб­лиоте­ку aiogram.

Что­бы вза­имо­дей­ство­вать с Telegram, необ­ходимо зарегис­три­ровать бот. О том, как это сде­лать, написа­но мно­го раз. Так что не буду пов­торять­ся, ска­жу лишь, что TELEGRAM_TOKEN, получен­ный от BotFather, я тоже добавил в файл кон­фигура­ции.

Соз­дадим шаб­лон для нашего бота, что­бы отве­чать на все фра­зы сооб­щени­ем о том, что этот бот уме­ет.

from aiogram import Bot, Dispatcher, executor

from aiogram.types import Message, KeyboardButton, ReplyKeyboardMarkup

from config import TELEGRAM_TOKEN

# Создаем красивую клавиатуру с кнопками

keyboard = ReplyKeyboardMarkup(resize_keyboard=True)

keyboard.add(

KeyboardButton(text="Включи светодиод"),

KeyboardButton("Выключи светодиод"),

KeyboardButton("Сделай фото")

)

# Объект бота

bot = Bot(token=TELEGRAM_TOKEN)

# Диспетчер для бота

dp = Dispatcher(bot)

# Обработчик всех сообщений

@dp.message_handler()

async def any_text_message(message: Message):

await message.answer(

"Привет! Я умею включать и выключать светодиод и делать фотографии. Нажми на нужную кнопку!",

reply_markup=keyboard

)

if __name__ == "__main__":

# Запуск бота

executor.start_polling(dp, skip_updates=True)

Ес­ли теперь ты выпол­нишь прог­рамму и поп­робу­ешь запус­тить бота на телефо­не, то уви­дишь такую кар­тину.

Шаб­лон для бота

Те­перь необ­ходимо добавить работу нашего MQTT-кли­ента и отправ­ку команд. Что­бы одновре­мен­но работал и бот, и MQTT-кли­ент, мы запус­тим кли­ент в парал­лель­ном потоке.

import threading

tr = threading.Thread(target=client.loop_forever)

tr.start()

А теперь напишем обра­бот­чики для команд. С вклю­чени­ем и вык­лючени­ем све­тоди­ода все прос­то — надо отпра­вить соот­ветс­тву­ющую коман­ду в нуж­ный топик и отве­тить поль­зовате­лю, не забыв передать исполь­зуемую кла­виату­ру.

@dp.message_handler(lambda message: message.text == "Включи светодиод")

async def enable_led(message: Message):

client.publish("command", "enable_led")

await message.answer("Команда на включение светодиода отправлена", reply_markup=keyboard)

@dp.message_handler(lambda message: message.text == "Выключи светодиод")

async def disable_led(message: Message):

client.publish("command", "disable_led")

await message.answer("Команда на выключение светодиода отправлена", reply_markup=keyboard)

Но что делать, ког­да мы хотим снять фотог­рафию? Ведь нам нуж­но получить ответ от камеры. Вос­поль­зуем­ся неболь­шим лай­фха­ком.

Пос­тро­им алго­ритм так: отправ­ляем коман­ду, ждем какое‑то вре­мя, если за это вре­мя наш MQTT-кли­ент не ска­чал фотог­рафию, то счи­таем, что про­изош­ла ошиб­ка, и уве­дом­ляем об этом поль­зовате­ля. Вре­мя опять же зада­ется в кон­фигура­цион­ном фай­ле — в перемен­ной TIME_WAIT_PHOTO.

from asyncio import sleep

@dp.message_handler(lambda message: message.text == "Сделай фото")

async def take_photo(message: Message):

client.publish("image", "None")

client.publish("command", "take_screenshot")

start = time.time()

while not os.listdir("images"):

if time.time() - start < TIME_WAIT_PHOTO:

await sleep(1)

else:

await message.answer("Что-то пошло не так — повторите попозже!", reply_markup=keyboard)

return

with open("images/photo.jpeg", "rb") as new_image:

await message.answer_photo(photo=new_image, reply_markup=keyboard)

os.remove("images/photo.jpeg")

Да­вай теперь запус­тим все вмес­те и пос­мотрим, как оно работа­ет.

Наш девайс готов!

Не забудь под­клю­чить к устрой­ству пос­тоян­ное питание — и можешь наб­людать за сво­ими домаш­ними питом­цами или 3D-прин­тером.

Читайте ещё больше платных статей бесплатно: https://t.me/hacker_frei



Report Page