OriginBot家庭助理计划之宝宝看护助手

本文首发于古月居

这篇博客主要讲述了如何通过OriginBot来看护宝宝,当宝宝的脸不在摄像头的范围之内时,发送消息到钉钉群组,通知家人及时查看。


前言

我在上个月有了宝宝,为了方便照看宝宝,就买了一个带有宝宝看护功能的摄像头,但是产品做的不怎么样,最重要的脸部遮挡检测功能用不了,后来就退货了。退货后就萌生了自己用OriginBot做一个类似功能的想法,于是就有了这篇博客~


功能流程图(架构图)

具体的流程或者说架构如下:

originbot face detection

其实整体也不复杂,OriginBot上有一个MIPI摄像头,然后利用地平线TogetheROS.Bot的人体检测和跟踪来实时检测摄像头中有没有脸部,如果没有脸部,就向后端发送一条数据,然后后端会向钉钉群组发消息告诉家人。钉钉群组里面需要事先创建一个webhook。

下面会分为人体检测、判断有无脸部、后端操作三个部分来记录。


人体检测

这一部分用的是地平线TogetheROS.Bot现成的功能,启动OriginBot之后,在命令行中运行如下命令:

1
2
3
4
5
6
7
8
9
10
11
# 配置tros.b环境
source /opt/tros/setup.bash

# 从tros.b的安装路径中拷贝出运行示例需要的配置文件。
cp -r /opt/tros/lib/mono2d_body_detection/config/ .

# 配置MIPI摄像头
export CAM_TYPE=mipi

# 启动launch文件
ros2 launch mono2d_body_detection mono2d_body_detection.launch.py

此时可以通过http://IP:8000查看检测效果,这个模块检测了人体、人头、人脸、人手检测框、检测框类型和目标跟踪ID以及人体关键点等,我只取其中的人脸部分,当然了,以后也可以增加如人体检测等。

上面的命令运行之后,在OriginBot上执行ros2 topic list,应该会有一个topic叫做hobot_mono2d_body_detection, 这个就是我们需要的,我们会订阅这个话题,然后分析其中发送数据来判断有没有脸部


判断摄像头中有没有脸部

按照TogetheROS.Bot的文档说明,hobot_mono2d_body_detection的消息类型是ai_msgs.msg.PerceptionTargets, 具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 感知结果

# 消息头
std_msgs/Header header

# 感知结果的处理帧率
# fps val is invalid if fps is less than 0
int16 fps

# 性能统计信息,比如记录每个模型推理的耗时
Perf[] perfs

# 感知目标集合
Target[] targets

# 消失目标集合
Target[] disappeared_targets

其中的disappeared_targets是我们关注的重点,如果face出现在disappeared_targets中,就说明之前是有脸部的,但是现在没有了,这个时候就要向后端发出数据进一步处理。

我为了判断有无脸部,写了一个ROS2 Node,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import rclpy
from rclpy.node import Node
from ai_msgs.msg import PerceptionTargets
from cv_bridge import CvBridge
import time

from api_connection import APIConnection

BabyMonitorMapping = {
# 这里的k-v要根据后端Django中的值来确定
"face": "看不到脸",
"body": "不在摄像头范围内",
}


class FaceDetectionListener(Node):
"""
检测宝宝的脸是不是在摄像头中
"""
def __init__(self):
super().__init__("face_detection")
self.bridge = CvBridge()
self.subscription = self.create_subscription(
PerceptionTargets, "hobot_mono2d_body_detection", self.listener_callback, 10
)
self.conn = APIConnection()
self.timer = time.time()
self.counter = 0

def listener_callback(self, msg):
targets = msg.targets
disappeared_targets = msg.disappeared_targets
targets_list = []
disappeared_targets_list = []
if disappeared_targets:
for item in disappeared_targets:
disappeared_targets_list.append(item.rois[0].type)
if targets:
for item in targets:
targets_list.append(item.rois[0].type)
print(f"检测到的对象如下:{targets_list}")
print(f"消失的对象如下:{disappeared_targets_list}")
if disappeared_targets_list:
self.sending_notification(disappeared_targets_list)

def sending_notification(self, disappeared_targets_list):
for item in disappeared_targets_list:
if BabyMonitorMapping.get(item):
event = BabyMonitorMapping.get(item)
if self.counter == 0:
# 这里baby的ID是模拟的,应该去数据库中查
data = {"event": event, "baby": "6b56979a-b2b9-11ee-920d-f12e14f97477"}
self.conn.post_data(item=data, api="api/monitor/face-detection/")
self.counter += 1
self.timer = time.time()
else:
if time.time() - self.timer >= 60.0:
# 60秒不重复发消钉钉消息
data = {"event": event, "baby": "6b56979a-b2b9-11ee-920d-f12e14f97477"}
self.conn.post_data(item=data, api="api/monitor/face-detection/")
self.timer = time.time()
self.counter += 1


def main(args=None):
rclpy.init(args=args)
try:
face_detection_listener = FaceDetectionListener()

rclpy.spin(face_detection_listener)
except KeyboardInterrupt:
print("终止运行")
finally:
face_detection_listener.destroy_node()
rclpy.shutdown()


if __name__ == "__main__":
main()

代码整体不难,但还是做几点必要的说明:
1. BabyMonitorMapping
这个字典的作用是把TogetheROS.Bot里面的字段跟后端的字段做一个映射关系,方便后面使用

2. API调用
代码中有两行如下:

1
2
data = {"event": event, "baby": "6b56979a-b2b9-11ee-920d-f12e14f97477"} 
self.conn.post_data(item=data, api="api/monitor/face-detection/")

这里的uri和data的格式由后端决定,不必细究,想了解细节的可以看后端代码。

3.APIConnection
APIConnection是用于请求的API的一个封装类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
API CONNECTION FOR IMPORTING WRAPPER
"""
import json
import logging
import requests
import envs


logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)


class APIConnection:
"""
Api Connection
"""

def __init__(self):
self.api_url = envs.API_URL

self.token = None
self.headers = {
"Content-Type": "application/json",
"Cache-Control": "no-cache",
}
self.request_jwt()

def request_jwt(self):
"""
Request JWT token.
"""
logging.info("Requesting JWT..")

api_url = f"{self.api_url}api/token/"
data = {
"username": envs.SCRIPT_USER,
"password": envs.SCRIPT_PASSWORD,
}

res = requests.post(api_url, data=json.dumps(data), headers=self.headers)

if res.status_code == 200:
data = res.json()
self.token = data["access"]
self.headers["Authorization"] = f"Bearer {self.token}"
else:
logging.error(
f"Failed to obtain JWT. Status code: {res.status_code}, Message: {res.text}"
)

def upload_video(self, api, file):
"""
post data
:param item: items to be posted in json format
:param api: path of endpoint
"""
api_url = f"{self.api_url}{api}"

try:
res = requests.post(api_url, files=file, headers=self.headers, timeout=1)

if res.status_code == 401:
self.request_jwt()
self.post_data(api, file)

elif res.status_code in [200, 201]:
logging.info(f"{res.status_code} - video uploaded successfully")
return res.status_code

else:
logging.error(
f"{res.status_code} - {res.json()}- unable to upload video"
)
return res.status_code

except Exception as err:
logging.error(err)
return 500

def post_data(self, item, api):
"""
新建一条数据
"""
api_url = f"{self.api_url}{api}"

try:
response = requests.post(
api_url, data=json.dumps(item), headers=self.headers, timeout=1
)

if response.status_code == 403:
self.request_jwt()
self.post_data(item, api)

elif response.status_code not in [200, 201]:
logging.error(
f"post data to backend failed, \
status code is {response.status_code}, \
error message is:\n \
{response.text}"
)
except Exception as err:
logging.error(
f"post data to backend failed, \
error message is:\n \
{err}"
)

后端操作

经过前面两个部分,如果发现了脸部数据从OriginBot的摄像头中消失的话,已经会把记录发送到后端了,现在来讲一下后端的部分。

后端的操作说起来其实也很简单,就是接收到数据后,现在数据库中存一份,然后再向钉钉发消息提醒家人。

OriginBot家庭助理计划里面提到过,后端是基于Django + Django rest framework 开发的,下面的内容需要一点django的基础才能看懂。

首先是创建了两个django model来存储数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Baby(models.Model):
"""
记录的宝宝的数据
"""

id = models.UUIDField(primary_key=True, default=uuid.uuid1, editable=False)
name = models.CharField(max_length=256)
birth_date = models.DateField()
gender = models.CharField(max_length=1, choices=(("男", "男"), ("女", "女")))

def __str__(self):
return self.name


class BabyMonitorData(models.Model):
"""
记录宝宝监控相关的数据
"""

event_choices = (
("看不到脸", "看不到脸"),
("哭", "哭"),
("翻身", "翻身"),
("不在摄像头范围内", "不在摄像头范围内"),
)

baby = models.ForeignKey(Baby, on_delete=models.PROTECT)
event = models.CharField(max_length=128, choices=event_choices)
timestamp = models.DateTimeField(auto_now_add=True)

def __str__(self):
return f"{self.baby.name} {self.event} {self.timestamp}"

class Meta:
ordering = ["-timestamp"]

需要注意的是,Baby和BabyMonitorData两个类之间存在外键关系。

在FaceDetectionListener中请求的api/monitor/face-detection/这个uri,最终在后端执行的代码是下面这一段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class BabyMonitorView(viewsets.ModelViewSet):
queryset = BabyMonitorData.objects.all().order_by("-timestamp")
serializer_class = BabyMonitorSerializer

def create(self, request, *args, **kwargs):
response = super().create(request, *args, **kwargs)

message = request.data
try:
event = message.get("event")
baby = Baby.objects.filter(id=message.get("baby"))[0].name
except IndexError:
print("找不到对应的婴儿数据")
return response

send_msg_to_dingtalk(f"{baby} {event} 啦!")

# 返回原始的 response
return response

这里面做的事情就是向BabyMonitorData中存一条数据,然后通过send_msg_to_dingtalksend_msg_to_dingtalk给钉钉群组发消息。

send_msg_to_dingtalk的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import json
import requests
from utils import envs

from datetime import datetime


def send_msg_to_dingtalk(msg):
webhook = envs.DING_TALK_URL
headers = {"Content-Type": "application/json;charset=utf-8"}
data = {
"msgtype": "text",
"text": {
"content": msg
+ f"[来自originbot,{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]"
},
}

response = requests.post(webhook, headers=headers, data=json.dumps(data))
return response.text


if __name__ == "__main__":
message = "Hello from my Python script!"
send_msg_to_dingtalk(message)

代码中的webhook = envs.DING_TALK_URL其实是获取环境变量中钉钉群组机器人链接,至于如何创建钉钉群组机器人网上有很多教程,不再赘述。

到这里,如果一切顺利,应该就可以在钉钉里面看到消息啦,效果如下:

OriginBotFaceDetectionDingTalk


源代码

源码地址:https://github.com/yexia553/originbot_home_assistant


待优化

虽然一个最基础的demo完成了,但是还有很多需要优化的地方:

  1. 小车只能在地上跑,但是宝宝一般在床上,只有把小车放在特定的位置,才能实现上述功能,这限制了实际应用,需要解决。
  2. 续航问题,目前小车的电池只能使用2个小时左右
  3. 目前只是检测摄像头中有没有脸,而不是检测宝宝的脸,可以考虑优化
  4. 可以增加类似哭声检测、翻身等动作识别,并加上数据分析