【行空板】视频车
【行空板】
行空板有一个USB TYPE-A接口用于外接USB外设,可用于连接摄像头,在CV2进行调用,通过socket将视频传给电脑。
【行空板服务端】
#!/usr/bin/env python
# -*- coding=utf-8 -*-
import socket
import numpy as np
import urllib
import cv2 as cv
import threading
import time
print('this is Server')
cap = cv.VideoCapture(0)
cap.set(cv.CAP_PROP_FRAME_WIDTH, 320)
cap.set(cv.CAP_PROP_FRAME_HEIGHT, 240)
def socket_service():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 防止socket server重启后端口被占用(socket.error: Address already in use)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#s.bind(('127.0.0.1', 6666))
s.bind(('192.168.31.25', 6666))#这个是服务端机器的ip
s.listen(10)
except socket.error as msg:
print (msg)
sys.exit(1)
print ('Waiting connection...')
while True:
conn, addr = s.accept()
t = threading.Thread(target=deal_data, args=(conn, addr))
t.start()
def deal_data(conn, addr):
print ('Accept new connection from {0}'.format(addr))
while True:
# get a frame
ret, frame = cap.read()
# '.jpg'表示把当前图片img按照jpg格式编码,按照不同格式编码的结果不一样
img_encode = cv.imencode('.jpg', frame)
data_encode = np.array(img_encode)
str_encode = data_encode.tostring()
encode_len = str(len(str_encode))
print('img size : %s'%encode_len)
try:
conn.send(str_encode)#发送图片的encode码
except Exception as e:
print(e)
time.sleep(0.1)
conn.close()
socket_service()
【电脑接收端程序】
#!/usr/bin/env python
# -*- coding=utf-8 -*-
import socket
import numpy as np
import urllib
import cv2
import threading
import time,sys
def socket_client():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('192.168.31.25', 6666))#连接服务端
except socket.error as msg:
print (msg)
#sys.exit(1)
#c_sock, c_addr =s.accept()
print('this is Client')
while True:
try:
receive_encode = s.recv(77777)#接收的字节数 最大值 2147483647 (31位的二进制)
nparr = np.fromstring(receive_encode, dtype='uint8')
img_decode = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
#产生标签文本背景透明效果
cv2.imshow("Client_show", img_decode)
cv2.waitKey(1)
except Exception as e:
print(e)
socket_client()
【物联网——行空板】
# -*- coding: utf-8 -*-
import time
import numpy as np
from pinpong.board import Board
from microbit_motor import Microbit_Motor #导入Microbit_Motor库
import siot
from pinpong.extension.unihiker import *
from unihiker import GUI
import socket
import urllib
import cv2 as cv
import threading
print('this is Server')
cap = cv.VideoCapture(0)
cap.set(cv.CAP_PROP_FRAME_WIDTH, 320)
cap.set(cv.CAP_PROP_FRAME_HEIGHT, 240)
u_gui=GUI()
xianshi=u_gui.draw_text(text="开始",x=50,y=100,font_size=50, color="#0000FF")
# 事件回调函数
def on_message_callback(client, userdata, msg):
if (msg.payload.decode("utf-8")== 'G'):
forward(200)
xianshi.config(text="前进")
if (msg.payload.decode("utf-8")== 'S'):
stop()
xianshi.config(text="停止")
if (msg.payload.decode("utf-8") == 'B'):
back(200)
xianshi.config(text="后退")
if (msg.payload.decode("utf-8") == 'L'):
left(200)
xianshi.config(text="向左")
if (msg.payload.decode("utf-8") == 'R'):
right(200)
xianshi.config(text="向右")
siot.init(client_id="siot_192",server="192.168.31.25",port=1883,user="siot",password="dfrobot")
siot.connect()
siot.loop()
siot.set_callback(on_message_callback)
siot.getsubscribe(topic="car/control")
Board("microbit").begin()
motorbit = Microbit_Motor()
def forward(speed):
#前进
#电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
if speed>255:
speed=255
motorbit.motor_run(motorbit.M1, motorbit.CW, speed)
motorbit.motor_run(motorbit.M2, motorbit.CW, speed)
motorbit.motor_run(motorbit.M3, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M4, motorbit.CCW, speed)
def back(speed):
#后退
#电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
if speed>255:
speed=255
motorbit.motor_run(motorbit.M1, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M2, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M3, motorbit.CW, speed)
motorbit.motor_run(motorbit.M4, motorbit.CW, speed)
def left_turn(speed):
#向左转
#电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
if speed>255:
speed=255
motorbit.motor_run(motorbit.M1, motorbit.CW, speed)
motorbit.motor_run(motorbit.M2, motorbit.CW, speed)
motorbit.motor_run(motorbit.M3, motorbit.CW, speed)
motorbit.motor_run(motorbit.M4, motorbit.CW, speed)
def right_turn(speed):
#向右转
#电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
if speed>255:
speed=255
motorbit.motor_run(motorbit.M1, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M2, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M3, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M4, motorbit.CCW, speed)
def left(speed):
#向左
#电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
if speed>255:
speed=255
motorbit.motor_run(motorbit.M1, motorbit.CW, speed)
motorbit.motor_run(motorbit.M2, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M3, motorbit.CW, speed)
motorbit.motor_run(motorbit.M4, motorbit.CCW, speed)
def right(speed):
#向右
#电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
if speed>255:
speed=255
motorbit.motor_run(motorbit.M1, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M2, motorbit.CW, speed)
motorbit.motor_run(motorbit.M3, motorbit.CCW, speed)
motorbit.motor_run(motorbit.M4, motorbit.CW, speed)
def stop():
motorbit.motor_stop(motorbit.M1)
motorbit.motor_stop(motorbit.M2)
motorbit.motor_stop(motorbit.M3)
motorbit.motor_stop(motorbit.M4)
def socket_service():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 防止socket server重启后端口被占用(socket.error: Address already in use)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#s.bind(('127.0.0.1', 6666))
s.bind(('192.168.31.25', 6666))#这个是服务端机器的ip
s.listen(10)
except socket.error as msg:
print (msg)
sys.exit(1)
print ('Waiting connection...')
while True:
conn, addr = s.accept()
t = threading.Thread(target=deal_data, args=(conn, addr))
t.start()
def deal_data(conn, addr):
print ('Accept new connection from {0}'.format(addr))
while True:
# get a frame
ret, frame = cap.read()
# '.jpg'表示把当前图片img按照jpg格式编码,按照不同格式编码的结果不一样
img_encode = cv.imencode('.jpg', frame)
data_encode = np.array(img_encode)
str_encode = data_encode.tostring()
encode_len = str(len(str_encode))
print('img size : %s'%encode_len)
try:
conn.send(str_encode)#发送图片的encode码
except Exception as e:
print(e)
time.sleep(0.1)
conn.close()
def trace_fun():
while True:
socket_service()
if __name__ == '__main__':
trace_fun()
【物联网——电脑端】
#!/usr/bin/env python
# -*- coding=utf-8 -*-
import socket
import numpy as np
import urllib
import cv2
import threading
import time,sys
import tkinter as tk
from tkinter import *
from PIL import Image, ImageTk
import siot
siot.init(client_id="siot_192",server="192.168.31.25",port=1883,user="siot",password="dfrobot")
siot.connect()
siot.loop()
#生成窗体,布置相应文本框和按钮
def forward():
siot.publish('car/control', 'G')
def left():
siot.publish('car/control', 'L')
def right():
siot.publish('car/control', 'R')
def stop():
siot.publish('car/control', 'S')
def back():
siot.publish('car/control', 'B')
top = tk.Tk()
top.title('控制窗口')
top.geometry('420x300')
image_width = 425
image_height = 300
canvas = Canvas(top,bg = 'white',width = image_width,height = image_height )#绘制画布
canvas.pack()
img = Image.open('back.jpg')
bg = ImageTk.PhotoImage(img)
bgid = canvas.create_image(0, 0, image=bg, anchor='nw')
canvas.place(x = 0,y = 0)
#产生标签文本背景透明效果
txtid=canvas.create_text(5,20, fill = 'red',font=("黑体", 15),anchor="nw")
canvas.insert(txtid,1,"行空板四驱车")
wx=5
hy=20
#生成按钮,并指定相应功能
left = tk.Button(top,text='左转',height=2,width=15,command=left)
left.place(x=wx,y=hy+100)
forward = tk.Button(top,text='前进',height=2,width=15,command=forward)
forward.place(x=wx+150,y=hy)
right = tk.Button(top,text='右转',height=2,width=15,command=right)
right.place(x=wx+300,y=hy+100)
back = tk.Button(top,text='后退',height=2,width=15,command=back)
back.place(x=wx+150,y=hy+200)
stop = tk.Button(top,text='停止',height=2,width=15,command=stop)
stop.place(x=wx+150,y=hy+100)
def socket_client():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('192.168.31.25', 6666))#连接服务端
except socket.error as msg:
print (msg)
#sys.exit(1)
#c_sock, c_addr =s.accept()
print('this is Client')
while True:
try:
receive_encode = s.recv(77777)#接收的字节数 最大值 2147483647 (31位的二进制)
nparr = np.fromstring(receive_encode, dtype='uint8')
img_decode = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
#产生标签文本背景透明效果
cv2.imshow("Client_show", img_decode)
cv2.waitKey(1)
top.update()
top.after(100)
except Exception as e:
print(e)
socket_client()
【演示视频】
https://www.bilibili.com/video/BV1v3411G7UQ?share_source=copy_web
页:
[1]