# Pi4 control 305(鍵盤key進去) ###### tags: `RVB` ```python #!/usr/bin/env python3 import RPi.GPIO as GPIO import time GPIO.setmode(GPIO.BOARD) GPIO.setup(12,GPIO.OUT) p=GPIO.PWM(12,50)# 50hz frequency p.start(7) p.ChangeDutyCycle(7) try: while True: x = input("Enter the strength: ") p.ChangeDutyCycle(float(x)) except KeyboardInterrupt: print("STOP") finally: GPIO.cleanup() ``` --- # Talker Node (傳送rabboni的資訊)(USB) ```python= #!/usr/bin/env python3 import rospy from std_msgs.msg import String from std_msgs.msg import Float32 import traceback as tb import math import time from rabboni import Rabboni rab = Rabboni() rab.connect() rab.set_sensor_config(2, 500, 5, 100) position_lr = 0 position_fb = 0 preW_fb = 0 preW_lr = 0 timer = time.time() def usb_custom_callback(status): global position_lr global position_fb global timer global preW_fb global preW_lr value = 0 ratio = 0.7 if(abs(preW_fb-status['Gyr'][0]) > 0): position_fb = (1 - ratio) * (position_fb + (status['Gyr'][0] * math.pi/180) * (time.time() - timer)) + ratio * status['Acc'][1] if(abs(preW_lr-status['Gyr'][1]) > 0): position_lr = (1 - ratio) * (position_lr + (status['Gyr'][1] * math.pi/180) * (time.time() - timer)) + ratio * status['Acc'][0] #Left and Right if position_lr < -0.35: s = 'right' value = status['Acc'][0] elif position_lr > 0.35: s = 'left' value = status['Acc'][0] #Forward and Backward elif position_fb < -0.4: s = 'forward' value = status['Acc'][1] elif position_fb > 0.4: s = 'backward' value = status['Acc'][1] else: s = 'stable' pub1.publish(s) pub2.publish(value) print(s + ' ' + str(value)) timer = time.time() preW_fb = position_fb preW_lr = position_lr if __name__ == '__main__': rospy.init_node('talker', anonymous=True) pub1 = rospy.Publisher('command', String, queue_size=10) pub2 = rospy.Publisher('value', Float32, queue_size=10) try: rab.start_fetching_status(custom_callback=usb_custom_callback) rab.polling_status() except AssertionError: # 結束程式 print('Bye~!!') except Exception: tb.print_exc() finally: rab.disconnect() ``` --- # Talker Node (傳送rabboni的資訊)(BLE) ```python= #!/usr/bin/env python3 import rospy from std_msgs.msg import String from std_msgs.msg import Float32 import traceback as tb import math import time from rabboni import Rabboni rab = Rabboni(mode='BLE') rab.scan() rab.connect(mac_address='F6:20:ED:F3:5B:F2') #對應到Rabboni rab.read_sensor_config() rab.set_sensor_config(2, 500, 5, 100) rab.read_sensor_config() position_lr = 0 position_fb = 0 timer = time.time() preW_fb = 0 preW_lr = 0 def ble_custom_callback(status): global position_lr global position_fb global timer global preW_fb global preW_lr value = 0 ratio = 0.7 if(abs(preW_fb-status['Gyr'][0]) > 0): position_fb = (1 - ratio) * (position_fb + (status['Gyr'][0] * math.pi/180) * (time.time() - timer)) + ratio * status['Acc'][1] if(abs(preW_lr-status['Gyr'][1]) > 0): position_lr = (1 - ratio) * (position_lr + (status['Gyr'][1] * math.pi/180) * (time.time() - timer)) + ratio * status['Acc'][0] #Left and Right if position_lr < -0.35: s = 'right' value = status['Acc'][0] elif position_lr > 0.35: s = 'left' value = status['Acc'][0] #Forward and Backward elif position_fb < -0.4: s = 'forward' value = status['Acc'][1] elif position_fb > 0.4: s = 'backward' value = status['Acc'][1] else: s = 'stable' pub1.publish(s) pub2.publish(value) print(s + ' ' + str(value)) timer = time.time() preW_fb = position_fb preW_lr = position_lr if __name__ == '__main__': rospy.init_node('talker', anonymous=True) pub1 = rospy.Publisher('command', String, queue_size=10) pub2 = rospy.Publisher('value', Float32, queue_size=10) try: rab.start_fetching_status(custom_callback=ble_custom_callback) rab.polling_status() except AssertionError: # 結束程式 print('Bye~!!') except Exception: tb.print_exc() finally: rab.disconnect() ``` --- # Listener Node ```python= #!/usr/bin/env python3 import rospy from std_msgs.msg import String from std_msgs.msg import Float32 import RPi.GPIO as GPIO import time import numpy as np GPIO.setmode(GPIO.BOARD) GPIO.setup(32,GPIO.OUT) GPIO.setup(12,GPIO.OUT) pR=GPIO.PWM(32,50)# 50hz frequency pL=GPIO.PWM(12,50) pR.start(7) pL.start(7) pR.ChangeDutyCycle(7) pL.ChangeDutyCycle(7) command = 'forward' def control(value): global command lr_strength = 1.4 #左右轉幅度,越大越強 fb_strength = 1.3 #前後強度,越大越強 if command == 'right': pR.ChangeDutyCycle(7-lr_strength) pL.ChangeDutyCycle(7+lr_strength) elif command == 'left': pR.ChangeDutyCycle(7+lr_strength) pL.ChangeDutyCycle(7-lr_strength) else: pR.ChangeDutyCycle(fb_strength*value.data+7) pL.ChangeDutyCycle(fb_strength*value.data+7) def Command(data): global command command = data.data print(command) def listener(): rospy.init_node('listener', anonymous=True) rospy.Subscriber('command', String, Command) rospy.Subscriber('value' , Float32 , control) rospy.spin() if __name__ == '__main__': try: listener() except KeyboardInterrupt: print("STOP") finally: GPIO.cleanup() ```