云天 发表于 2022-5-22 20:32:14

【行空板】视频车


【行空板】
行空板有一个USB TYPE-A接口用于外接USB外设,可用于连接摄像头,在CV2进行调用,通过socket将视频传给电脑。






【行空板服务端】

#!/usr/bin/env python
# -*- coding=utf-8 -*-

import socket
import numpy as np
import urllib
import cv2 as cv
import threading
import time


print('this is Server')

cap = cv.VideoCapture(0)
cap.set(cv.CAP_PROP_FRAME_WIDTH, 320)
cap.set(cv.CAP_PROP_FRAME_HEIGHT, 240)

def socket_service():
    try:
      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      # 防止socket server重启后端口被占用(socket.error: Address already in use)
      s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
      #s.bind(('127.0.0.1', 6666))
      s.bind(('192.168.31.25', 6666))#这个是服务端机器的ip
      s.listen(10)
    except socket.error as msg:
      print (msg)
      sys.exit(1)
    print ('Waiting connection...')

    while True:
      conn, addr = s.accept()
      t = threading.Thread(target=deal_data, args=(conn, addr))
      t.start()

def deal_data(conn, addr):
    print ('Accept new connection from {0}'.format(addr))
    while True:
      # get a frame
      ret, frame = cap.read()
      # '.jpg'表示把当前图片img按照jpg格式编码,按照不同格式编码的结果不一样
      img_encode = cv.imencode('.jpg', frame)
      data_encode = np.array(img_encode)
      str_encode = data_encode.tostring()
      encode_len = str(len(str_encode))
      print('img size : %s'%encode_len)
      try:
            conn.send(str_encode)#发送图片的encode码
      except Exception as e:
            print(e)
      time.sleep(0.1)
    conn.close()

socket_service()

【电脑接收端程序】

#!/usr/bin/env python
# -*- coding=utf-8 -*-

import socket
import numpy as np
import urllib
import cv2
import threading
import time,sys

def socket_client():
    try:
      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      s.connect(('192.168.31.25', 6666))#连接服务端
    except socket.error as msg:
      print (msg)
      #sys.exit(1)
    #c_sock, c_addr =s.accept()
    print('this is Client')
    while True:
      try:
            receive_encode = s.recv(77777)#接收的字节数 最大值 2147483647 (31位的二进制)
            nparr = np.fromstring(receive_encode, dtype='uint8')
            img_decode = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
         
            #产生标签文本背景透明效果

            cv2.imshow("Client_show", img_decode)

            cv2.waitKey(1)
            
      except Exception as e:
            print(e)
      

socket_client()


【物联网——行空板】


# -*- coding: utf-8 -*-
import time

import numpy as np
from pinpong.board import Board
from microbit_motor import Microbit_Motor #导入Microbit_Motor库
import siot
from pinpong.extension.unihiker import *

from unihiker import GUI

import socket

import urllib
import cv2 as cv
import threading



print('this is Server')

cap = cv.VideoCapture(0)
cap.set(cv.CAP_PROP_FRAME_WIDTH, 320)
cap.set(cv.CAP_PROP_FRAME_HEIGHT, 240)
u_gui=GUI()
xianshi=u_gui.draw_text(text="开始",x=50,y=100,font_size=50, color="#0000FF")
# 事件回调函数
def on_message_callback(client, userdata, msg):

if (msg.payload.decode("utf-8")== 'G'):
      forward(200)
      xianshi.config(text="前进")
if (msg.payload.decode("utf-8")== 'S'):
      stop()
      xianshi.config(text="停止")   
      
if (msg.payload.decode("utf-8") == 'B'):
      back(200)
      xianshi.config(text="后退")
      
if (msg.payload.decode("utf-8") == 'L'):
      left(200)
      xianshi.config(text="向左")
      
if (msg.payload.decode("utf-8") == 'R'):
      right(200)
      xianshi.config(text="向右")
      

siot.init(client_id="siot_192",server="192.168.31.25",port=1883,user="siot",password="dfrobot")
siot.connect()
siot.loop()
siot.set_callback(on_message_callback)
siot.getsubscribe(topic="car/control")

Board("microbit").begin()
motorbit = Microbit_Motor()
def forward(speed):
#前进
#电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
if speed>255:
      speed=255
motorbit.motor_run(motorbit.M1, motorbit.CW, speed)
motorbit.motor_run(motorbit.M2, motorbit.CW, speed)
motorbit.motor_run(motorbit.M3, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M4, motorbit.CCW, speed)
def back(speed):
#后退
#电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
if speed>255:
      speed=255
motorbit.motor_run(motorbit.M1, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M2, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M3, motorbit.CW, speed)
motorbit.motor_run(motorbit.M4, motorbit.CW, speed)
def left_turn(speed):
#向左转
#电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
if speed>255:
      speed=255
motorbit.motor_run(motorbit.M1, motorbit.CW, speed)
motorbit.motor_run(motorbit.M2, motorbit.CW, speed)
motorbit.motor_run(motorbit.M3, motorbit.CW, speed)
motorbit.motor_run(motorbit.M4, motorbit.CW, speed)
def right_turn(speed):
#向右转
#电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
if speed>255:
      speed=255
motorbit.motor_run(motorbit.M1, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M2, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M3, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M4, motorbit.CCW, speed)
def left(speed):
#向左
#电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
if speed>255:
      speed=255
motorbit.motor_run(motorbit.M1, motorbit.CW, speed)
motorbit.motor_run(motorbit.M2, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M3, motorbit.CW, speed)
motorbit.motor_run(motorbit.M4, motorbit.CCW, speed)
def right(speed):
#向右
#电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
if speed>255:
      speed=255
motorbit.motor_run(motorbit.M1, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M2, motorbit.CW, speed)
motorbit.motor_run(motorbit.M3, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M4, motorbit.CW, speed)
def stop():
motorbit.motor_stop(motorbit.M1)
motorbit.motor_stop(motorbit.M2)
motorbit.motor_stop(motorbit.M3)
motorbit.motor_stop(motorbit.M4)
def socket_service():
    try:
      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      # 防止socket server重启后端口被占用(socket.error: Address already in use)
      s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
      #s.bind(('127.0.0.1', 6666))
      s.bind(('192.168.31.25', 6666))#这个是服务端机器的ip
      s.listen(10)
    except socket.error as msg:
      print (msg)
      sys.exit(1)
    print ('Waiting connection...')

    while True:
      conn, addr = s.accept()
      t = threading.Thread(target=deal_data, args=(conn, addr))
      t.start()

def deal_data(conn, addr):
    print ('Accept new connection from {0}'.format(addr))
    while True:
      # get a frame
      ret, frame = cap.read()
      # '.jpg'表示把当前图片img按照jpg格式编码,按照不同格式编码的结果不一样
      img_encode = cv.imencode('.jpg', frame)
      data_encode = np.array(img_encode)
      str_encode = data_encode.tostring()
      encode_len = str(len(str_encode))
      print('img size : %s'%encode_len)
      try:
            conn.send(str_encode)#发送图片的encode码
      except Exception as e:
            print(e)
      time.sleep(0.1)
    conn.close()
def trace_fun():

    while True:
      socket_service()
         


if __name__ == '__main__':
    trace_fun()
【物联网——电脑端】

#!/usr/bin/env python
# -*- coding=utf-8 -*-

import socket
import numpy as np
import urllib
import cv2
import threading
import time,sys
import tkinter as tk
from tkinter import *
from PIL import Image, ImageTk
import siot
siot.init(client_id="siot_192",server="192.168.31.25",port=1883,user="siot",password="dfrobot")
siot.connect()
siot.loop()

#生成窗体,布置相应文本框和按钮
def forward():
    siot.publish('car/control', 'G')
def left():
    siot.publish('car/control', 'L')
def right():
    siot.publish('car/control', 'R')
def stop():
    siot.publish('car/control', 'S')
def back():
    siot.publish('car/control', 'B')
top = tk.Tk()
top.title('控制窗口')
top.geometry('420x300')
image_width = 425
image_height = 300

canvas = Canvas(top,bg = 'white',width = image_width,height = image_height )#绘制画布
canvas.pack()
img = Image.open('back.jpg')
bg = ImageTk.PhotoImage(img)
bgid = canvas.create_image(0, 0, image=bg, anchor='nw')
canvas.place(x = 0,y = 0)
#产生标签文本背景透明效果
txtid=canvas.create_text(5,20, fill = 'red',font=("黑体", 15),anchor="nw")
canvas.insert(txtid,1,"行空板四驱车")
wx=5
hy=20

#生成按钮,并指定相应功能
left = tk.Button(top,text='左转',height=2,width=15,command=left)
left.place(x=wx,y=hy+100)
forward = tk.Button(top,text='前进',height=2,width=15,command=forward)
forward.place(x=wx+150,y=hy)
right = tk.Button(top,text='右转',height=2,width=15,command=right)
right.place(x=wx+300,y=hy+100)
back = tk.Button(top,text='后退',height=2,width=15,command=back)
back.place(x=wx+150,y=hy+200)
stop = tk.Button(top,text='停止',height=2,width=15,command=stop)
stop.place(x=wx+150,y=hy+100)
def socket_client():
    try:
      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      s.connect(('192.168.31.25', 6666))#连接服务端
    except socket.error as msg:
      print (msg)
      #sys.exit(1)
    #c_sock, c_addr =s.accept()
    print('this is Client')
    while True:
      try:
            receive_encode = s.recv(77777)#接收的字节数 最大值 2147483647 (31位的二进制)
            nparr = np.fromstring(receive_encode, dtype='uint8')
            img_decode = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
         
            #产生标签文本背景透明效果

            cv2.imshow("Client_show", img_decode)

            cv2.waitKey(1)
            top.update()
            top.after(100)
      except Exception as e:
            print(e)
      

socket_client()



【演示视频】


https://www.bilibili.com/video/BV1v3411G7UQ?share_source=copy_web
页: [1]
查看完整版本: 【行空板】视频车