服务端代码:

在树莓派小车上运行

第一版

#!/usr/bin/env python
# coding=UTF-8
import RPi.GPIO as GPIO
import time
import socket
import sys
reload(sys)
sys.setdefaultencoding('utf8')

PWMA = 18
AIN1 = 22
AIN2 = 27

PWMB = 23
BIN1 = 25
BIN2 = 24

def t_up(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, False)  # AIN2
    GPIO.output(AIN1, True)  # AIN1
    # up
    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, False)  # BIN2
    GPIO.output(BIN1, True)  # BIN1

def t_down(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, True)  # AIN2
    GPIO.output(AIN1, False)  # AIN1

    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, True)  # BIN2
    GPIO.output(BIN1, False)  # BIN1

def t_left(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, True)  # AIN2
    GPIO.output(AIN1, False)  # AIN1

    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, False)  # BIN2
    GPIO.output(BIN1, True)  # BIN1

def t_right(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, False)  # AIN2
    GPIO.output(AIN1, True)  # AIN1

    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, True)  # BIN2
    GPIO.output(BIN1, False)  # BIN1

def t_stop():
    L_Motor.ChangeDutyCycle(0)
    GPIO.output(AIN2, False)  # AIN2
    GPIO.output(AIN1, False)  # AIN1
    # stop
    R_Motor.ChangeDutyCycle(0)
    GPIO.output(BIN2, False)  # BIN2
    GPIO.output(BIN1, False)  # BIN1

GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(AIN2, GPIO.OUT)
GPIO.setup(AIN1, GPIO.OUT)
GPIO.setup(PWMA, GPIO.OUT)

GPIO.setup(BIN1, GPIO.OUT)
GPIO.setup(BIN2, GPIO.OUT)
GPIO.setup(PWMB, GPIO.OUT)

L_Motor = GPIO.PWM(PWMA, 100)
L_Motor.start(0)

R_Motor = GPIO.PWM(PWMB, 100)
R_Motor.start(0)

# 下面是命令接收代码
ip_port = ('', 7878)  # 留空才能接收来自局域网的tcp连接
sk = socket.socket()  # 创建套接字
sk.bind(ip_port)  # 绑定服务地址
sk.listen(5)  # 监听连接请求
print('启动socket服务,等待客户端连接...')
conn, address = sk.accept()  # 等待连接,此处自动阻塞
try:
    while True:  # 一个死循环,直到客户端发送‘exit’的信号,才关闭连接
        client_data = conn.recv(1024).decode()  # 接收信息
        o = int(client_data)
        if o==0:#0停止小车
            t_stop()
        elif o==1:#1前进
            t_up(50)
        elif o==2:#2后退
            t_down(50)
        elif o==3:#3左转
            t_left(50)
        elif o==4:#4右转
            t_right(50)
        print("来自%s的客户端向你发来信息:%s" % (address, client_data))
except KeyboardInterrupt:
    GPIO.cleanup()
    conn.close()  # 关闭连接

第二版

#!/usr/bin/env python
# coding=UTF-8
import RPi.GPIO as GPIO
import time
import socket
import sys
reload(sys)
sys.setdefaultencoding('utf8')

PWMA = 18
AIN1 = 22
AIN2 = 27

PWMB = 23
BIN1 = 25
BIN2 = 24

def t_up(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, False)  # AIN2
    GPIO.output(AIN1, True)  # AIN1
    # up
    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, False)  # BIN2
    GPIO.output(BIN1, True)  # BIN1

def t_down(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, True)  # AIN2
    GPIO.output(AIN1, False)  # AIN1

    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, True)  # BIN2
    GPIO.output(BIN1, False)  # BIN1

def t_left(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, True)  # AIN2
    GPIO.output(AIN1, False)  # AIN1

    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, False)  # BIN2
    GPIO.output(BIN1, True)  # BIN1

def t_right(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, False)  # AIN2
    GPIO.output(AIN1, True)  # AIN1

    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, True)  # BIN2
    GPIO.output(BIN1, False)  # BIN1

def t_stop():
    L_Motor.ChangeDutyCycle(0)
    GPIO.output(AIN2, False)  # AIN2
    GPIO.output(AIN1, False)  # AIN1
    # stop
    R_Motor.ChangeDutyCycle(0)
    GPIO.output(BIN2, False)  # BIN2
    GPIO.output(BIN1, False)  # BIN1

GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(AIN2, GPIO.OUT)
GPIO.setup(AIN1, GPIO.OUT)
GPIO.setup(PWMA, GPIO.OUT)

GPIO.setup(BIN1, GPIO.OUT)
GPIO.setup(BIN2, GPIO.OUT)
GPIO.setup(PWMB, GPIO.OUT)

L_Motor = GPIO.PWM(PWMA, 100)
L_Motor.start(0)

R_Motor = GPIO.PWM(PWMB, 100)
R_Motor.start(0)

# 下面是命令接收代码
ip_port = ('', 7878)  # 留空才能接收来自局域网的tcp连接
sk = socket.socket()  # 创建套接字
sk.bind(ip_port)  # 绑定服务地址
sk.listen(5)  # 监听连接请求
print('启动socket服务,等待客户端连接...')
conn, address = sk.accept()  # 等待连接,此处自动阻塞
try:
    while True:  # 一个死循环,直到客户端发送‘exit’的信号,才关闭连接
        client_data = conn.recv(1024).decode()  # 接收遥控命令,使用字符串分割来接收多个命令参数
        print("来自%s的客户端向你发来信息:%s" % (address, client_data))
        order_list=client_data.split(',')#将接收到的命令通过逗号分割
        o = int(order_list[0])#第一个为小车的运行方向
        speed = int(order_list[1])#第二个为小车的运行速度
        if speed==0:#如果没有收到speed参数,或speed为0,则调整为50
            speed=50
        if o==0:#0停止小车
            t_stop()
        elif o==1:#1前进
            t_up(speed)
        elif o==2:#2后退
            t_down(speed)
        elif o==3:#3左转
            t_left(speed)
        elif o==4:#4右转
            t_right(speed)
except KeyboardInterrupt:
    GPIO.cleanup()
    conn.close()  # 关闭连接

第三版

改用UDP,解决了TCP连接断开时,会导致程序关闭的问题

#!/usr/bin/env python
# coding=UTF-8
import RPi.GPIO as GPIO
import time
import socket
import sys
reload(sys)
sys.setdefaultencoding('utf8')

PWMA = 18
AIN1 = 22
AIN2 = 27

PWMB = 23
BIN1 = 25
BIN2 = 24

def t_up(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, False)  # AIN2
    GPIO.output(AIN1, True)  # AIN1
    # up
    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, False)  # BIN2
    GPIO.output(BIN1, True)  # BIN1

def t_down(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, True)  # AIN2
    GPIO.output(AIN1, False)  # AIN1

    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, True)  # BIN2
    GPIO.output(BIN1, False)  # BIN1

def t_left(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, True)  # AIN2
    GPIO.output(AIN1, False)  # AIN1

    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, False)  # BIN2
    GPIO.output(BIN1, True)  # BIN1

def t_right(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, False)  # AIN2
    GPIO.output(AIN1, True)  # AIN1

    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, True)  # BIN2
    GPIO.output(BIN1, False)  # BIN1

def t_stop():
    L_Motor.ChangeDutyCycle(0)
    GPIO.output(AIN2, False)  # AIN2
    GPIO.output(AIN1, False)  # AIN1
    # stop
    R_Motor.ChangeDutyCycle(0)
    GPIO.output(BIN2, False)  # BIN2
    GPIO.output(BIN1, False)  # BIN1

GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(AIN2, GPIO.OUT)
GPIO.setup(AIN1, GPIO.OUT)
GPIO.setup(PWMA, GPIO.OUT)

GPIO.setup(BIN1, GPIO.OUT)
GPIO.setup(BIN2, GPIO.OUT)
GPIO.setup(PWMB, GPIO.OUT)

L_Motor = GPIO.PWM(PWMA, 100)
L_Motor.start(0)

R_Motor = GPIO.PWM(PWMB, 100)
R_Motor.start(0)

# 下面是命令接收代码
ip_port = ('', 7878)  # 留空才能接收来自局域网的UDP连接
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)  # 创建套接字
sk.bind(ip_port)  # 绑定服务地址
print('启动socket服务,等待客户端连接...')
try:
    while True:
        client_data = sk.recv(1024).strip().decode()  # 接收遥控命令,使用字符串分割来接收多个命令参数
        print("客户端向你发来信息:%s" % ( client_data))
        order_list=client_data.split(',')#将接收到的命令通过逗号分割
        o = int(order_list[0])#第一个为小车的运行方向
        speed = int(order_list[1])#第二个为小车的运行速度
        if speed==0:#如果没有收到speed参数,或speed为0,则调整为50
            speed=50
        if o==0:#0停止小车
            t_stop()
        elif o==1:#1前进
            t_up(speed)
        elif o==2:#2后退
            t_down(speed)
        elif o==3:#3左转
            t_left(speed)
        elif o==4:#4右转
            t_right(speed)
finally:
    GPIO.cleanup()
    sk.close()

第四版

添加了舵机摇头控制功能

#!/usr/bin/env python
# coding=UTF-8
import Adafruit_PCA9685
import RPi.GPIO as GPIO
import time
import socket
import sys

reload(sys)
sys.setdefaultencoding('utf8')

PWMA = 18
AIN1 = 22
AIN2 = 27

PWMB = 23
BIN1 = 25
BIN2 = 24


# 舵机摇头代码
class CarServo:
    def __init__(self):
        # 2个摄像头舵机,1个超声波舵机
        self.pwm_pca9685 = Adafruit_PCA9685.PCA9685()
        self.pwm_pca9685.set_pwm_freq(50)

        self.servo = {}

        self.set_servo_angle(0, 110)
        self.set_servo_angle(1, 100)
        self.set_servo_angle(2, 20)

    # 输入角度转换成12^精度的数值
    def set_servo_angle(self, channel, angle):
        if (channel >= 0) and (channel <= 2):
            new_angle = angle
            if angle < 0:
                new_angle = 0
            elif angle > 180:
                new_angle = 180
            else:
                new_angle = angle
            print("channel={0}, angle={1}".format(channel, new_angle))
            # date=4096*((new_angle*11)+500)/20000#进行四舍五入运算 date=int(4096*((angle*11)+500)/(20000)+0.5)
            date = int(4096 * ((new_angle * 11) + 500) / (20000) + 0.5)
            self.pwm_pca9685.set_pwm(channel, 0, date)
            self.servo[channel] = new_angle
        else:
            print("set_servo_angle error. servo[{0}] = [{1}]".format(channel, angle))

    def inc_servo_angle(self, channel, v):
        self.set_servo_angle(channel, self.servo[channel] + v)

    def dec_servo_angle(self, channel, v):
        self.set_servo_angle(channel, self.servo[channel] - v)


def t_up(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, False)  # AIN2
    GPIO.output(AIN1, True)  # AIN1
    # up
    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, False)  # BIN2
    GPIO.output(BIN1, True)  # BIN1


def t_down(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, True)  # AIN2
    GPIO.output(AIN1, False)  # AIN1

    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, True)  # BIN2
    GPIO.output(BIN1, False)  # BIN1


def t_left(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, True)  # AIN2
    GPIO.output(AIN1, False)  # AIN1

    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, False)  # BIN2
    GPIO.output(BIN1, True)  # BIN1


def t_right(speed):
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(AIN2, False)  # AIN2
    GPIO.output(AIN1, True)  # AIN1

    R_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN2, True)  # BIN2
    GPIO.output(BIN1, False)  # BIN1


def t_stop():
    L_Motor.ChangeDutyCycle(0)
    GPIO.output(AIN2, False)  # AIN2
    GPIO.output(AIN1, False)  # AIN1
    # stop
    R_Motor.ChangeDutyCycle(0)
    GPIO.output(BIN2, False)  # BIN2
    GPIO.output(BIN1, False)  # BIN1


GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(AIN2, GPIO.OUT)
GPIO.setup(AIN1, GPIO.OUT)
GPIO.setup(PWMA, GPIO.OUT)

GPIO.setup(BIN1, GPIO.OUT)
GPIO.setup(BIN2, GPIO.OUT)
GPIO.setup(PWMB, GPIO.OUT)

L_Motor = GPIO.PWM(PWMA, 100)
L_Motor.start(0)

R_Motor = GPIO.PWM(PWMB, 100)
R_Motor.start(0)

cs = CarServo()

# 下面是命令接收代码
ip_port = ('', 7878)  # 留空才能接收来自局域网的UDP连接
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)  # 创建套接字
sk.bind(ip_port)  # 绑定服务地址
print('启动socket服务,等待客户端连接...')
try:
    while True:
        client_data = sk.recv(1024).strip().decode()  # 接收遥控命令,使用字符串分割来接收多个命令参数
        print("客户端向你发来信息:%s" % (client_data))
        order_list = client_data.split(',')  # 将接收到的命令通过逗号分割
        o = int(order_list[0])  # 第一个为小车的运行方向
        speed = int(order_list[1])  # 第二个为小车的运行速度
        iod = int(order_list[2])  # 第三个参数为判断increase_channel(0)或decrease_channel(1)
        channel = int(order_list[3])  # 第四个参数为了获得channel的值
        angel = int(order_list[4])  # 第五个参数为获得angel的值
        type = int(order_list[5])  # 第六个参数用于判断是接受到了马达命令还是舵机命令,0马达,1舵机,这样就不怕马达和舵机命令相互干扰了
        # 马达命令示例:1,50,0,0,0,1 以50的速度前进
        # 舵机命令示例:0,0,0,1,20,2 舵机向左转动20度
        if type == 1:  # 如果收到了马达命令
            if speed == 0:  # 如果没有收到speed参数,或speed为0,则调整为50
                speed = 50
            if o == 0:  # 0停止小车
                t_stop()
            elif o == 1:  # 1前进
                t_up(speed)
            elif o == 2:  # 2后退
                t_down(speed)
            elif o == 3:  # 3左转
                t_left(speed)
            elif o == 4:  # 4右转
                t_right(speed)
        if type == 2:  # 如果收到了舵机命令
            if iod == 0:  # increase_channel:1左2下
                cs.inc_servo_angle(channel, angel)
            elif iod == 1:  # decrease_channel:1右2上
                cs.dec_servo_angle(channel, angel)

finally:
    GPIO.cleanup()
    sk.close()

控制端测试代码:

放在电脑端上运行测试,看能否通过网络远程控制小车

第一版

import socket
import time

# ip_port = ('192.168.1.180', 7878)
ip_port = ('192.168.1.180', 7878)#这里写树莓派ip

s = socket.socket()  # 创建套接字

s.connect(ip_port)  # 连接服务器

s.sendall("1".encode())
time.sleep(1)#前进一秒
s.sendall("0".encode())
time.sleep(2)#停两秒
s.sendall("2".encode())
time.sleep(1)#前进一秒
s.sendall("0".encode())#停止

s.close()  # 关闭连接

第二版

import socket
import time
ip_port = ('192.168.1.180', 7878)#这里写树莓派ip
s = socket.socket()  # 创建套接字
s.connect(ip_port)  # 连接服务器
try:
    while True:
        order = input("请输入要发送的指令(o,v)")
        s.sendall(order.encode())
        time.sleep(1)#执行一秒这个命令
        s.sendall("0,0".encode())#然后发送停止命令
        print("执行完毕")
except KeyboardInterrupt:
    s.close()  # 关闭连接

第三版(UDP)

Python

import socket
import time
ip_port = ('192.168.1.180', 7878)#这里写树莓派ip
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
try:
    while True:
        order = input("请输入要发送的指令(o,v)")
        print("sent"+order)
        sk.sendto(order.encode(),ip_port)
        time.sleep(1)#执行一秒这个命令
        sk.sendto("0,0".encode(),ip_port)
        print("执行完毕")
except KeyboardInterrupt:
    sk.close()  # 关闭连接

Java

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;

public class UDPCli {
    private static DatagramSocket socket;
    public static void main(String[] args) {
        try {
            socket=new DatagramSocket();
            InetSocketAddress addr = new InetSocketAddress("192.168.1.180", 7878);
            byte[] sendData = ("1,30").getBytes();
            DatagramPacket datagramPacket = new DatagramPacket(sendData, sendData.length, addr);
            socket.send(datagramPacket);
            Thread.sleep(1000);
            sendData = ("0,0").getBytes();
            datagramPacket = new DatagramPacket(sendData, sendData.length, addr);
            socket.send(datagramPacket);
        } catch (SocketException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

第四版

import socket
import time
ip_port = ('192.168.1.180', 7878)#这里写树莓派ip
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
try:
    while True:
        order = input("请输入要发送的指令(o,v)")
        print("sent"+order)
        sk.sendto(order.encode(),ip_port)
        time.sleep(1)#执行一秒这个命令
        sk.sendto("0,0,0,0,0,0".encode(),ip_port)
        print("执行完毕")
except KeyboardInterrupt:
    sk.close()  # 关闭连接

报错1:无法通过局域网连接(已解决)

1670036741475.webp

解决方案

看我的这篇文章

https://hash070.top/python-connectionrefusederror.html

报错2:字典方法问题(暂时已改用if else)

1670036732026.webp

解决方案:python暂时学得不够深入,暂时使用if else

Q.E.D.