# Pi4 control 305(鍵盤key進去)
###### tags: `RVB`
```python
#!/usr/bin/env python3
import RPi.GPIO as GPIO
import time
GPIO.setmode(GPIO.BOARD)
GPIO.setup(12,GPIO.OUT)
p=GPIO.PWM(12,50)# 50hz frequency
p.start(7)
p.ChangeDutyCycle(7)
try:
while True:
x = input("Enter the strength: ")
p.ChangeDutyCycle(float(x))
except KeyboardInterrupt:
print("STOP")
finally:
GPIO.cleanup()
```
---
# Talker Node (傳送rabboni的資訊)(USB)
```python=
#!/usr/bin/env python3
import rospy
from std_msgs.msg import String
from std_msgs.msg import Float32
import traceback as tb
import math
import time
from rabboni import Rabboni
rab = Rabboni()
rab.connect()
rab.set_sensor_config(2, 500, 5, 100)
position_lr = 0
position_fb = 0
preW_fb = 0
preW_lr = 0
timer = time.time()
def usb_custom_callback(status):
global position_lr
global position_fb
global timer
global preW_fb
global preW_lr
value = 0
ratio = 0.7
if(abs(preW_fb-status['Gyr'][0]) > 0):
position_fb = (1 - ratio) *
(position_fb + (status['Gyr'][0] * math.pi/180) *
(time.time() - timer)) + ratio * status['Acc'][1]
if(abs(preW_lr-status['Gyr'][1]) > 0):
position_lr = (1 - ratio) *
(position_lr + (status['Gyr'][1] * math.pi/180) *
(time.time() - timer)) + ratio * status['Acc'][0]
#Left and Right
if position_lr < -0.35:
s = 'right'
value = status['Acc'][0]
elif position_lr > 0.35:
s = 'left'
value = status['Acc'][0]
#Forward and Backward
elif position_fb < -0.4:
s = 'forward'
value = status['Acc'][1]
elif position_fb > 0.4:
s = 'backward'
value = status['Acc'][1]
else:
s = 'stable'
pub1.publish(s)
pub2.publish(value)
print(s + ' ' + str(value))
timer = time.time()
preW_fb = position_fb
preW_lr = position_lr
if __name__ == '__main__':
rospy.init_node('talker', anonymous=True)
pub1 = rospy.Publisher('command', String, queue_size=10)
pub2 = rospy.Publisher('value', Float32, queue_size=10)
try:
rab.start_fetching_status(custom_callback=usb_custom_callback)
rab.polling_status()
except AssertionError: # 結束程式
print('Bye~!!')
except Exception:
tb.print_exc()
finally:
rab.disconnect()
```
---
# Talker Node (傳送rabboni的資訊)(BLE)
```python=
#!/usr/bin/env python3
import rospy
from std_msgs.msg import String
from std_msgs.msg import Float32
import traceback as tb
import math
import time
from rabboni import Rabboni
rab = Rabboni(mode='BLE')
rab.scan()
rab.connect(mac_address='F6:20:ED:F3:5B:F2') #對應到Rabboni
rab.read_sensor_config()
rab.set_sensor_config(2, 500, 5, 100)
rab.read_sensor_config()
position_lr = 0
position_fb = 0
timer = time.time()
preW_fb = 0
preW_lr = 0
def ble_custom_callback(status):
global position_lr
global position_fb
global timer
global preW_fb
global preW_lr
value = 0
ratio = 0.7
if(abs(preW_fb-status['Gyr'][0]) > 0):
position_fb = (1 - ratio) * (position_fb + (status['Gyr'][0] * math.pi/180) * (time.time() - timer)) + ratio * status['Acc'][1]
if(abs(preW_lr-status['Gyr'][1]) > 0):
position_lr = (1 - ratio) * (position_lr + (status['Gyr'][1] * math.pi/180) * (time.time() - timer)) + ratio * status['Acc'][0]
#Left and Right
if position_lr < -0.35:
s = 'right'
value = status['Acc'][0]
elif position_lr > 0.35:
s = 'left'
value = status['Acc'][0]
#Forward and Backward
elif position_fb < -0.4:
s = 'forward'
value = status['Acc'][1]
elif position_fb > 0.4:
s = 'backward'
value = status['Acc'][1]
else:
s = 'stable'
pub1.publish(s)
pub2.publish(value)
print(s + ' ' + str(value))
timer = time.time()
preW_fb = position_fb
preW_lr = position_lr
if __name__ == '__main__':
rospy.init_node('talker', anonymous=True)
pub1 = rospy.Publisher('command', String, queue_size=10)
pub2 = rospy.Publisher('value', Float32, queue_size=10)
try:
rab.start_fetching_status(custom_callback=ble_custom_callback)
rab.polling_status()
except AssertionError: # 結束程式
print('Bye~!!')
except Exception:
tb.print_exc()
finally:
rab.disconnect()
```
---
# Listener Node
```python=
#!/usr/bin/env python3
import rospy
from std_msgs.msg import String
from std_msgs.msg import Float32
import RPi.GPIO as GPIO
import time
import numpy as np
GPIO.setmode(GPIO.BOARD)
GPIO.setup(32,GPIO.OUT)
GPIO.setup(12,GPIO.OUT)
pR=GPIO.PWM(32,50)# 50hz frequency
pL=GPIO.PWM(12,50)
pR.start(7)
pL.start(7)
pR.ChangeDutyCycle(7)
pL.ChangeDutyCycle(7)
command = 'forward'
def control(value):
global command
lr_strength = 1.4 #左右轉幅度,越大越強
fb_strength = 1.3 #前後強度,越大越強
if command == 'right':
pR.ChangeDutyCycle(7-lr_strength)
pL.ChangeDutyCycle(7+lr_strength)
elif command == 'left':
pR.ChangeDutyCycle(7+lr_strength)
pL.ChangeDutyCycle(7-lr_strength)
else:
pR.ChangeDutyCycle(fb_strength*value.data+7)
pL.ChangeDutyCycle(fb_strength*value.data+7)
def Command(data):
global command
command = data.data
print(command)
def listener():
rospy.init_node('listener', anonymous=True)
rospy.Subscriber('command', String, Command)
rospy.Subscriber('value' , Float32 , control)
rospy.spin()
if __name__ == '__main__':
try:
listener()
except KeyboardInterrupt:
print("STOP")
finally:
GPIO.cleanup()
```