说明 我之前实现了简单UI来跟OriginBot交互,但是由于我不是专业的前端开发,写UI还是比较耗时的,所以最近想修改一下这部分。
还有一个原因是,自己开发前端,如果想实现远程交互(不在同一wifi下),就一定需要一个云服务器来中转一下,这个是比较麻烦的。
我期望的交互效果是可以双向发送文字、图片和语音,找了一圈发现钉钉的单聊机器人 可以满足所有要求,所以这篇博客记录一下如何为OriginBot接入钉钉单聊机器人。
创建钉钉单聊机器人 创建单聊机器人的步骤其实不复杂,可以参考:https://open.dingtalk.com/document/robots/overview-of-single-chat-robots
唯一需要注意的是消息接收模式要选择Stream模式,不要选择http模式即可。
如何实现收发消息 在钉钉开放平台上创建好单聊机器人后,还需要有一个服务来收发消息。
在OriginBot上创建一个文件dingtalk_runtime.py
, 内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 """ 用于钉钉单聊机器人收发消息 """ from dingtalk_stream import AckMessageimport dingtalk_streamimport osfrom prompts import base_promptfrom llms import azure_gpt4oclass DingtalkMsgHandler (dingtalk_stream.ChatbotHandler): def __init__ (self ): super (dingtalk_stream.ChatbotHandler, self).__init__() async def process (self, callback: dingtalk_stream.CallbackMessage ): incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) message_type = incoming_message.message_type if message_type not in ("text" ): self.reply_text( "您发送的消息类型不合法,目前只支持文本。" , incoming_message ) return AckMessage.STATUS_OK, "OK" if message_type == "text" : text = incoming_message.text.content.strip() self.reply_text(text, incoming_message) return AckMessage.STATUS_OK, "OK" def main (): credential = dingtalk_stream.Credential( os.getenv("DINGTALK_CLIENTID" ), os.getenv("DINGTALK_CLIENTSECRET" ), ) client = dingtalk_stream.DingTalkStreamClient(credential) client.register_callback_handler( dingtalk_stream.chatbot.ChatbotMessage.TOPIC, DingtalkMsgHandler() ) client.start_forever() if __name__ == "__main__" : main()
运行这个脚本后,在钉钉中给“originbot_home_assistant”这个机器人发送消息后,它会给你回复一样的内容。
到这里就已经实现了最基础的交互功能了。
大家在代码中可以看到,我目前限定了只能接收文本格式的消息,其他类型暂时都不允许,这主要是为了降低一开始的开发难度,不用考虑所有可能。
但实际上,钉钉的单聊机器人支持很丰富的消息类型,可以看下面的说明:https://open.dingtalk.com/document/orgapp/the-application-robot-in-the-enterprise-sends-a-single-chat
我会在后面需要的时候添加其他消息类型。
集成GPT4o 上面给出的通过钉钉单聊机器人跟智能小车交互的代码是非常简单的,它只能把你发给小车的消息原样返回,但实际使用过程中肯定不会这样使用。我在这里是希望通过在交互过程中集成GPT4o来让其更加智能化。
具体可以看下面的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 """ 大模型相关的封装和调用, 文件名是llms.py """ import osimport requestsimport jsonimport base64from logger import loggerdef encode_image_to_base64 (image_path=None , image_bytes=None ): if image_path and image_bytes: raise ValueError("image_path and image_bytes cannot be both provided." ) if image_path: with open (image_path, "rb" ) as image_file: encoded_string = base64.b64encode(image_file.read()).decode("utf-8" ) if image_bytes: encoded_string = base64.b64encode(image_bytes).decode("utf-8" ) return encoded_string def azure_gpt4o (message ): api_key = os.getenv("API_KEY" ) headers = {"Content-Type" : "application/json" , "api-key" : api_key} data = { "messages" : message, "max_tokens" : 4096 , "temperature" : 0.8 , "frequency_penalty" : 0 , "presence_penalty" : 0 , "top_p" : 0.95 , "stop" : None , } url = os.environ.get("GPT4O_ENDPOINT" ) try : response = requests.post(url, headers=headers, data=json.dumps(data)) if response.status_code == 200 : return response.json()["choices" ][0 ]["message" ]["content" ] else : logger.info( f"LLM 调用失败,状态码:{response.status_code} ,错误信息:{response.text} " ) except requests.RequestException as e: logger.error(f"请求发生错误:{e} " ) except Exception as e: logger.error(f"发生未知错误:{e} " )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 """ 用于钉钉单聊机器人收发消息 """ from dingtalk_stream import AckMessageimport dingtalk_streamimport osfrom prompts import base_promptfrom llms import azure_gpt4oclass DingtalkMsgHandler (dingtalk_stream.ChatbotHandler): prompt = base_prompt def __init__ (self ): super (dingtalk_stream.ChatbotHandler, self).__init__() def format_prompt (self, msg ): """ 整个运行期间的对话都会被添加到prompt中 TODO: 优化prompt动态管理方法 """ DingtalkMsgHandler.prompt.append( { "role" : "user" , "content" : msg, } ) return DingtalkMsgHandler.prompt async def process (self, callback: dingtalk_stream.CallbackMessage ): incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) message_type = incoming_message.message_type if message_type not in ("text" ): self.reply_text( "您发送的消息类型不合法,目前只支持文本。" , incoming_message ) return AckMessage.STATUS_OK, "OK" if message_type == "text" : text = incoming_message.text.content.strip() self.format_prompt(text) res = azure_gpt4o(DingtalkMsgHandler.prompt) self.reply_text(str (res), incoming_message) self.format_prompt(res) return AckMessage.STATUS_OK, "OK" def main (): credential = dingtalk_stream.Credential( os.getenv("DINGTALK_CLIENTID" ), os.getenv("DINGTALK_CLIENTSECRET" ), ) client = dingtalk_stream.DingTalkStreamClient(credential) client.register_callback_handler( dingtalk_stream.chatbot.ChatbotMessage.TOPIC, DingtalkMsgHandler() ) client.start_forever() if __name__ == "__main__" : main()
结合这两段代码可以看出来我是在钉钉的后台收到用户输入之后,把用户输入拼接到Prompt中,然后发给GPT4o,再把GPT4o的返回发送给钉钉的前端。
在手机上通过钉钉跟小车的交互效果如下截图:
它之所以会知道自己叫做“OriginBot”,并且角色一个家庭助理,跟我使用的Prompt有关:
1 2 3 4 5 6 base_prompt = [ { "role" : "system" , "content" : "你叫OriginBot,是我的智能家庭助理" , }, ]
目前使用的Prompt还比较简单,由于GPT4o支持128K的上下文,可以考虑多预设一些信息,这样这个家庭助理才会更加人性化。
如何进一步控制小车 这一块目前还在开发中,有一个思路是自己实现元动作,然后让大模型根据输入判断应该调用哪个动作来完成任务。
比如下面这个代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 """ OriginBot的元动作,以便让大模型决定应该调用哪个动作 """ import rclpyfrom geometry_msgs.msg import Twistfrom rclpy.node import Nodeimport mathimport timemeta_action_description = { "originbot_turning" : { "description" : "控制originbot原地转弯" , "params" : { "angular_speed" : { "type" : "float" , "description" : "角速度(弧度/秒),假设原地左转,正数表示左转,复数表示右转" , "default" : 0.5 , }, "target_angle" : { "type" : "float" , "description" : "需要转弯的总角度" , "default" : 5 , }, }, }, } class OriginBotTruning (Node ): def __init__ (self, angular_speed=1 , target_angle=5 ): super ().__init__("originbot" ) self.publisher_ = self.create_publisher(Twist, "/cmd_vel" , 10 ) self.timer = self.create_timer(1.0 , self.left_turn) def left_turn (self ): target_angle = math.radians(self.target_angle) duration = target_angle / self.angular_speed twist = Twist() twist.linear.x = 0.0 twist.angular.z = self.angular_speed end_time = time.time() + duration while time.time() < end_time: self.publisher_.publish(twist) twist.angular.z = 0.0 self.publisher_.publish(twist) self.timer.cancel() def originbot_turning (angular_speed=1.0 , target_angle=5.0 , args=None ): rclpy.init(args=args) originbot = OriginBotTruning(angular_speed, target_angle) rclpy.spin(originbot) originbot.destroy_node() rclpy.shutdown()
我提前写好了一个可以让小车转弯的函数,并且将其详细信息封装到meta_action_description中,将来就可以把meta_action_description喂给大模型,让大模型决定应该调用哪个函数,并给出合适的参数值。
这一步还没有做完,目前只是一个思路,欢迎大家交流。