服务端代码:
在树莓派小车上运行
第一版
#!/usr/bin/env python
# coding=UTF-8
import RPi.GPIO as GPIO
import time
import socket
import sys
reload(sys)
sys.setdefaultencoding('utf8')
PWMA = 18
AIN1 = 22
AIN2 = 27
PWMB = 23
BIN1 = 25
BIN2 = 24
def t_up(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, False) # AIN2
GPIO.output(AIN1, True) # AIN1
# up
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, False) # BIN2
GPIO.output(BIN1, True) # BIN1
def t_down(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, True) # AIN2
GPIO.output(AIN1, False) # AIN1
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, True) # BIN2
GPIO.output(BIN1, False) # BIN1
def t_left(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, True) # AIN2
GPIO.output(AIN1, False) # AIN1
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, False) # BIN2
GPIO.output(BIN1, True) # BIN1
def t_right(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, False) # AIN2
GPIO.output(AIN1, True) # AIN1
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, True) # BIN2
GPIO.output(BIN1, False) # BIN1
def t_stop():
L_Motor.ChangeDutyCycle(0)
GPIO.output(AIN2, False) # AIN2
GPIO.output(AIN1, False) # AIN1
# stop
R_Motor.ChangeDutyCycle(0)
GPIO.output(BIN2, False) # BIN2
GPIO.output(BIN1, False) # BIN1
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(AIN2, GPIO.OUT)
GPIO.setup(AIN1, GPIO.OUT)
GPIO.setup(PWMA, GPIO.OUT)
GPIO.setup(BIN1, GPIO.OUT)
GPIO.setup(BIN2, GPIO.OUT)
GPIO.setup(PWMB, GPIO.OUT)
L_Motor = GPIO.PWM(PWMA, 100)
L_Motor.start(0)
R_Motor = GPIO.PWM(PWMB, 100)
R_Motor.start(0)
# 下面是命令接收代码
ip_port = ('', 7878) # 留空才能接收来自局域网的tcp连接
sk = socket.socket() # 创建套接字
sk.bind(ip_port) # 绑定服务地址
sk.listen(5) # 监听连接请求
print('启动socket服务,等待客户端连接...')
conn, address = sk.accept() # 等待连接,此处自动阻塞
try:
while True: # 一个死循环,直到客户端发送‘exit’的信号,才关闭连接
client_data = conn.recv(1024).decode() # 接收信息
o = int(client_data)
if o==0:#0停止小车
t_stop()
elif o==1:#1前进
t_up(50)
elif o==2:#2后退
t_down(50)
elif o==3:#3左转
t_left(50)
elif o==4:#4右转
t_right(50)
print("来自%s的客户端向你发来信息:%s" % (address, client_data))
except KeyboardInterrupt:
GPIO.cleanup()
conn.close() # 关闭连接
第二版
#!/usr/bin/env python
# coding=UTF-8
import RPi.GPIO as GPIO
import time
import socket
import sys
reload(sys)
sys.setdefaultencoding('utf8')
PWMA = 18
AIN1 = 22
AIN2 = 27
PWMB = 23
BIN1 = 25
BIN2 = 24
def t_up(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, False) # AIN2
GPIO.output(AIN1, True) # AIN1
# up
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, False) # BIN2
GPIO.output(BIN1, True) # BIN1
def t_down(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, True) # AIN2
GPIO.output(AIN1, False) # AIN1
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, True) # BIN2
GPIO.output(BIN1, False) # BIN1
def t_left(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, True) # AIN2
GPIO.output(AIN1, False) # AIN1
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, False) # BIN2
GPIO.output(BIN1, True) # BIN1
def t_right(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, False) # AIN2
GPIO.output(AIN1, True) # AIN1
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, True) # BIN2
GPIO.output(BIN1, False) # BIN1
def t_stop():
L_Motor.ChangeDutyCycle(0)
GPIO.output(AIN2, False) # AIN2
GPIO.output(AIN1, False) # AIN1
# stop
R_Motor.ChangeDutyCycle(0)
GPIO.output(BIN2, False) # BIN2
GPIO.output(BIN1, False) # BIN1
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(AIN2, GPIO.OUT)
GPIO.setup(AIN1, GPIO.OUT)
GPIO.setup(PWMA, GPIO.OUT)
GPIO.setup(BIN1, GPIO.OUT)
GPIO.setup(BIN2, GPIO.OUT)
GPIO.setup(PWMB, GPIO.OUT)
L_Motor = GPIO.PWM(PWMA, 100)
L_Motor.start(0)
R_Motor = GPIO.PWM(PWMB, 100)
R_Motor.start(0)
# 下面是命令接收代码
ip_port = ('', 7878) # 留空才能接收来自局域网的tcp连接
sk = socket.socket() # 创建套接字
sk.bind(ip_port) # 绑定服务地址
sk.listen(5) # 监听连接请求
print('启动socket服务,等待客户端连接...')
conn, address = sk.accept() # 等待连接,此处自动阻塞
try:
while True: # 一个死循环,直到客户端发送‘exit’的信号,才关闭连接
client_data = conn.recv(1024).decode() # 接收遥控命令,使用字符串分割来接收多个命令参数
print("来自%s的客户端向你发来信息:%s" % (address, client_data))
order_list=client_data.split(',')#将接收到的命令通过逗号分割
o = int(order_list[0])#第一个为小车的运行方向
speed = int(order_list[1])#第二个为小车的运行速度
if speed==0:#如果没有收到speed参数,或speed为0,则调整为50
speed=50
if o==0:#0停止小车
t_stop()
elif o==1:#1前进
t_up(speed)
elif o==2:#2后退
t_down(speed)
elif o==3:#3左转
t_left(speed)
elif o==4:#4右转
t_right(speed)
except KeyboardInterrupt:
GPIO.cleanup()
conn.close() # 关闭连接
第三版
改用UDP,解决了TCP连接断开时,会导致程序关闭的问题
#!/usr/bin/env python
# coding=UTF-8
import RPi.GPIO as GPIO
import time
import socket
import sys
reload(sys)
sys.setdefaultencoding('utf8')
PWMA = 18
AIN1 = 22
AIN2 = 27
PWMB = 23
BIN1 = 25
BIN2 = 24
def t_up(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, False) # AIN2
GPIO.output(AIN1, True) # AIN1
# up
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, False) # BIN2
GPIO.output(BIN1, True) # BIN1
def t_down(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, True) # AIN2
GPIO.output(AIN1, False) # AIN1
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, True) # BIN2
GPIO.output(BIN1, False) # BIN1
def t_left(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, True) # AIN2
GPIO.output(AIN1, False) # AIN1
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, False) # BIN2
GPIO.output(BIN1, True) # BIN1
def t_right(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, False) # AIN2
GPIO.output(AIN1, True) # AIN1
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, True) # BIN2
GPIO.output(BIN1, False) # BIN1
def t_stop():
L_Motor.ChangeDutyCycle(0)
GPIO.output(AIN2, False) # AIN2
GPIO.output(AIN1, False) # AIN1
# stop
R_Motor.ChangeDutyCycle(0)
GPIO.output(BIN2, False) # BIN2
GPIO.output(BIN1, False) # BIN1
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(AIN2, GPIO.OUT)
GPIO.setup(AIN1, GPIO.OUT)
GPIO.setup(PWMA, GPIO.OUT)
GPIO.setup(BIN1, GPIO.OUT)
GPIO.setup(BIN2, GPIO.OUT)
GPIO.setup(PWMB, GPIO.OUT)
L_Motor = GPIO.PWM(PWMA, 100)
L_Motor.start(0)
R_Motor = GPIO.PWM(PWMB, 100)
R_Motor.start(0)
# 下面是命令接收代码
ip_port = ('', 7878) # 留空才能接收来自局域网的UDP连接
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) # 创建套接字
sk.bind(ip_port) # 绑定服务地址
print('启动socket服务,等待客户端连接...')
try:
while True:
client_data = sk.recv(1024).strip().decode() # 接收遥控命令,使用字符串分割来接收多个命令参数
print("客户端向你发来信息:%s" % ( client_data))
order_list=client_data.split(',')#将接收到的命令通过逗号分割
o = int(order_list[0])#第一个为小车的运行方向
speed = int(order_list[1])#第二个为小车的运行速度
if speed==0:#如果没有收到speed参数,或speed为0,则调整为50
speed=50
if o==0:#0停止小车
t_stop()
elif o==1:#1前进
t_up(speed)
elif o==2:#2后退
t_down(speed)
elif o==3:#3左转
t_left(speed)
elif o==4:#4右转
t_right(speed)
finally:
GPIO.cleanup()
sk.close()
第四版
添加了舵机摇头控制功能
#!/usr/bin/env python
# coding=UTF-8
import Adafruit_PCA9685
import RPi.GPIO as GPIO
import time
import socket
import sys
reload(sys)
sys.setdefaultencoding('utf8')
PWMA = 18
AIN1 = 22
AIN2 = 27
PWMB = 23
BIN1 = 25
BIN2 = 24
# 舵机摇头代码
class CarServo:
def __init__(self):
# 2个摄像头舵机,1个超声波舵机
self.pwm_pca9685 = Adafruit_PCA9685.PCA9685()
self.pwm_pca9685.set_pwm_freq(50)
self.servo = {}
self.set_servo_angle(0, 110)
self.set_servo_angle(1, 100)
self.set_servo_angle(2, 20)
# 输入角度转换成12^精度的数值
def set_servo_angle(self, channel, angle):
if (channel >= 0) and (channel <= 2):
new_angle = angle
if angle < 0:
new_angle = 0
elif angle > 180:
new_angle = 180
else:
new_angle = angle
print("channel={0}, angle={1}".format(channel, new_angle))
# date=4096*((new_angle*11)+500)/20000#进行四舍五入运算 date=int(4096*((angle*11)+500)/(20000)+0.5)
date = int(4096 * ((new_angle * 11) + 500) / (20000) + 0.5)
self.pwm_pca9685.set_pwm(channel, 0, date)
self.servo[channel] = new_angle
else:
print("set_servo_angle error. servo[{0}] = [{1}]".format(channel, angle))
def inc_servo_angle(self, channel, v):
self.set_servo_angle(channel, self.servo[channel] + v)
def dec_servo_angle(self, channel, v):
self.set_servo_angle(channel, self.servo[channel] - v)
def t_up(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, False) # AIN2
GPIO.output(AIN1, True) # AIN1
# up
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, False) # BIN2
GPIO.output(BIN1, True) # BIN1
def t_down(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, True) # AIN2
GPIO.output(AIN1, False) # AIN1
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, True) # BIN2
GPIO.output(BIN1, False) # BIN1
def t_left(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, True) # AIN2
GPIO.output(AIN1, False) # AIN1
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, False) # BIN2
GPIO.output(BIN1, True) # BIN1
def t_right(speed):
L_Motor.ChangeDutyCycle(speed)
GPIO.output(AIN2, False) # AIN2
GPIO.output(AIN1, True) # AIN1
R_Motor.ChangeDutyCycle(speed)
GPIO.output(BIN2, True) # BIN2
GPIO.output(BIN1, False) # BIN1
def t_stop():
L_Motor.ChangeDutyCycle(0)
GPIO.output(AIN2, False) # AIN2
GPIO.output(AIN1, False) # AIN1
# stop
R_Motor.ChangeDutyCycle(0)
GPIO.output(BIN2, False) # BIN2
GPIO.output(BIN1, False) # BIN1
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(AIN2, GPIO.OUT)
GPIO.setup(AIN1, GPIO.OUT)
GPIO.setup(PWMA, GPIO.OUT)
GPIO.setup(BIN1, GPIO.OUT)
GPIO.setup(BIN2, GPIO.OUT)
GPIO.setup(PWMB, GPIO.OUT)
L_Motor = GPIO.PWM(PWMA, 100)
L_Motor.start(0)
R_Motor = GPIO.PWM(PWMB, 100)
R_Motor.start(0)
cs = CarServo()
# 下面是命令接收代码
ip_port = ('', 7878) # 留空才能接收来自局域网的UDP连接
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) # 创建套接字
sk.bind(ip_port) # 绑定服务地址
print('启动socket服务,等待客户端连接...')
try:
while True:
client_data = sk.recv(1024).strip().decode() # 接收遥控命令,使用字符串分割来接收多个命令参数
print("客户端向你发来信息:%s" % (client_data))
order_list = client_data.split(',') # 将接收到的命令通过逗号分割
o = int(order_list[0]) # 第一个为小车的运行方向
speed = int(order_list[1]) # 第二个为小车的运行速度
iod = int(order_list[2]) # 第三个参数为判断increase_channel(0)或decrease_channel(1)
channel = int(order_list[3]) # 第四个参数为了获得channel的值
angel = int(order_list[4]) # 第五个参数为获得angel的值
type = int(order_list[5]) # 第六个参数用于判断是接受到了马达命令还是舵机命令,0马达,1舵机,这样就不怕马达和舵机命令相互干扰了
# 马达命令示例:1,50,0,0,0,1 以50的速度前进
# 舵机命令示例:0,0,0,1,20,2 舵机向左转动20度
if type == 1: # 如果收到了马达命令
if speed == 0: # 如果没有收到speed参数,或speed为0,则调整为50
speed = 50
if o == 0: # 0停止小车
t_stop()
elif o == 1: # 1前进
t_up(speed)
elif o == 2: # 2后退
t_down(speed)
elif o == 3: # 3左转
t_left(speed)
elif o == 4: # 4右转
t_right(speed)
if type == 2: # 如果收到了舵机命令
if iod == 0: # increase_channel:1左2下
cs.inc_servo_angle(channel, angel)
elif iod == 1: # decrease_channel:1右2上
cs.dec_servo_angle(channel, angel)
finally:
GPIO.cleanup()
sk.close()
控制端测试代码:
放在电脑端上运行测试,看能否通过网络远程控制小车
第一版
import socket
import time
# ip_port = ('192.168.1.180', 7878)
ip_port = ('192.168.1.180', 7878)#这里写树莓派ip
s = socket.socket() # 创建套接字
s.connect(ip_port) # 连接服务器
s.sendall("1".encode())
time.sleep(1)#前进一秒
s.sendall("0".encode())
time.sleep(2)#停两秒
s.sendall("2".encode())
time.sleep(1)#前进一秒
s.sendall("0".encode())#停止
s.close() # 关闭连接
第二版
import socket
import time
ip_port = ('192.168.1.180', 7878)#这里写树莓派ip
s = socket.socket() # 创建套接字
s.connect(ip_port) # 连接服务器
try:
while True:
order = input("请输入要发送的指令(o,v)")
s.sendall(order.encode())
time.sleep(1)#执行一秒这个命令
s.sendall("0,0".encode())#然后发送停止命令
print("执行完毕")
except KeyboardInterrupt:
s.close() # 关闭连接
第三版(UDP)
Python
import socket
import time
ip_port = ('192.168.1.180', 7878)#这里写树莓派ip
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
try:
while True:
order = input("请输入要发送的指令(o,v)")
print("sent"+order)
sk.sendto(order.encode(),ip_port)
time.sleep(1)#执行一秒这个命令
sk.sendto("0,0".encode(),ip_port)
print("执行完毕")
except KeyboardInterrupt:
sk.close() # 关闭连接
Java
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
public class UDPCli {
private static DatagramSocket socket;
public static void main(String[] args) {
try {
socket=new DatagramSocket();
InetSocketAddress addr = new InetSocketAddress("192.168.1.180", 7878);
byte[] sendData = ("1,30").getBytes();
DatagramPacket datagramPacket = new DatagramPacket(sendData, sendData.length, addr);
socket.send(datagramPacket);
Thread.sleep(1000);
sendData = ("0,0").getBytes();
datagramPacket = new DatagramPacket(sendData, sendData.length, addr);
socket.send(datagramPacket);
} catch (SocketException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
第四版
import socket
import time
ip_port = ('192.168.1.180', 7878)#这里写树莓派ip
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
try:
while True:
order = input("请输入要发送的指令(o,v)")
print("sent"+order)
sk.sendto(order.encode(),ip_port)
time.sleep(1)#执行一秒这个命令
sk.sendto("0,0,0,0,0,0".encode(),ip_port)
print("执行完毕")
except KeyboardInterrupt:
sk.close() # 关闭连接
报错1:无法通过局域网连接(已解决)
解决方案
看我的这篇文章
https://hash070.top/python-connectionrefusederror.html
报错2:字典方法问题(暂时已改用if else)
解决方案:python暂时学得不够深入,暂时使用if else