# supervisorController.py ```python= import numpy as np from deepbots.supervisor.controllers.supervisor_emitter_receiver import SupervisorCSV from PPOAgent import PPOAgent, Transition from utilities import normalizeToRange from ArmUtil import ToArmCoord, PSFunc from CalObs import get_delta_r_and_theta import random as rand # from ikpy.chain import Chain # from ikpy.link import OriginLink, URDFLink class PandaSupervisor(SupervisorCSV): def __init__(self): super().__init__() self.observationSpace = 8 # 4 motors, r, theta, orientation, L2norm # self.observationSpace = 11 # no motorposition 0, 1, and 2 #self.actionSpace = 2187 # The agent can perform 3^7 actions self.actionSpace = 81 # 3^4 self.robot = None self.beaker = None self.target = None self.target = self.supervisor.getFromDef("TARGET") self.endEffector = None # self.armChain = Chain.from_urdf_file("panda_with_bound.URDF") self.respawnRobot() self.messageReceived = None # Variable to save the messages received from the robot self.episodeCount = 0 # Episode counter self.episodeLimit = 15000 # Max number of episodes allowed self.stepsPerEpisode = 500 # Max number of steps per episode self.episodeScore = 0 # Score accumulated during an episode self.episodeScoreList = [] # A list to save all the episode scores, used to check if task is solved self.doneTotal = 0 # for Computing the distance diff # targetPosition = ToArmCoord.convert(self.target.getPosition()) # transfer to arm coordinate system # tmp = [0.0 for _ in range(8)] # tmp[3] = -0.0698 # endEffectorPosition = self.armChain.forward_kinematics(tmp)[0:3, 3] # This is already in arm coordinate. # endEffectorPosition = ToArmCoord.convert(self.endEffector.getPosition()) # transfer to arm coordinate system # self.preL2norm = np.linalg.norm([targetPosition[0]-endEffectorPosition[0],targetPosition[1]-endEffectorPosition[1],targetPosition[2]-endEffectorPosition[2]]) #------ def respawnRobot(self): if self.robot is not None: # Despawn existing robot self.robot.remove() self.beaker.remove() # Respawn robot in starting position and state rootNode = self.supervisor.getRoot() # This gets the root of the scene tree childrenField = rootNode.getField('children') # This gets a list of all the children, ie. objects of the scene childrenField.importMFNode(-2, "Solid.wbo") # Load Beaker from file and add to second-to-last position childrenField.importMFNode(-2, "Robot.wbo") # Load robot from file and add to second-to-last position # Get the new robot self.beaker = self.supervisor.getFromDef("Beaker") self.robot = self.supervisor.getFromDef("Franka_Emika_Panda") self.endEffector = self.supervisor.getFromDef("endEffector") targetPosition = ToArmCoord.convert(self.target.getPosition()) # transfer to arm coordinate system endEffectorPosition = ToArmCoord.convert(self.endEffector.getPosition()) # transfer to arm coordinate system self.preL2norm = np.linalg.norm([targetPosition[0]-endEffectorPosition[0],targetPosition[1]-endEffectorPosition[1],targetPosition[2]-endEffectorPosition[2]]) # print("test:",self.endEffector.getPosition()) def get_observations(self): # [motorPosition, targetPosition, endEffectorPosition, L2norm(targetPosition vs endEffectorPosition)] # Update self.messageReceived received from robot, which contains motor position self.messageReceived = self.handle_receiver() if self.messageReceived is not None: # for 7 motors # motorPosition = [float(self.messageReceived[0]), float(self.messageReceived[1]), float(self.messageReceived[2]), \ # float(self.messageReceived[3]), float(self.messageReceived[4]), float(self.messageReceived[5]), \ # float(self.messageReceived[6])] # for 4 motors motorPosition = [float(self.messageReceived[3]), float(self.messageReceived[4]), float(self.messageReceived[5]), \ float(self.messageReceived[6])] else: # Method is called before self.messageReceived is initialized # motorPosition = [0.0 for _ in range(7)] # motorPosition[3] = -0.0698 motorPosition = [0.0 for _ in range(4)] motorPosition[0] = -0.0698 # get TARGET position targetPosition = self.target.getPosition() targetPosition = ToArmCoord.convert(targetPosition) # transfer to arm coordinate system # get ORIGIN position originPosition = self.robot.getPosition() originPosition = ToArmCoord.convert(originPosition) # transfer to arm coordinate system if self.messageReceived is not None: # get end-effort position motorPosition_for_FK = motorPosition + [0.0] # endEffectorPosition = self.armChain.forward_kinematics(motorPosition_for_FK)[0:3, 3] # This is already in arm coordinate. endEffectorPosition = ToArmCoord.convert(self.endEffector.getPosition()) # transfer to arm coordinate system # compute L2 norm # print("[Debug tartgetPosition]:",targetPosition) # print("[Debug endEffectorPosition]:",endEffectorPosition) L2norm = np.linalg.norm([targetPosition[0]-endEffectorPosition[0],targetPosition[1]-endEffectorPosition[1],targetPosition[2]-endEffectorPosition[2]]) else: # tmp = [0.0 for _ in range(8)] # tmp[3] = -0.0698 # endEffectorPosition = self.armChain.forward_kinematics(tmp)[0:3, 3] # This is already in arm coordinate. endEffectorPosition = ToArmCoord.convert(self.endEffector.getPosition()) # transfer to arm coordinate system L2norm = self.preL2norm delta_r, delta_theta = CalObs.get_delta_r_and_theta(originPosition, endEffectorPosition, targetPosition) # convert to a single list returnObservation = [*motorPosition, delta_r, delta_theta, supervisor.beaker.getOrientation()[4], L2norm] # 4 + 1 + 1 + 1 + 1 return returnObservation def get_reward(self, action=None): return 0 # I implement in the comment:'compute reward here' def is_done(self): return False def solved(self): if len(self.episodeScoreList) > 1000: # Over 100 trials thus far if np.mean(self.episodeScoreList[-100:]) > 1000: # Last 100 episodes' scores average value return True return False def reset(self): self.respawnRobot() self.supervisor.simulationResetPhysics() # Reset the simulation physics to start over self.messageReceived = None return [0.0 for _ in range(self.observationSpace)] def get_info(self): return "I'm trying to reach that red ball!" supervisor = PandaSupervisor() agent = PPOAgent(supervisor.observationSpace, supervisor.actionSpace, use_cuda=True) #add use_cuda solved = False cnt_veryClose = 0 # Run outer loop until the episodes limit is reached or the task is solved while not solved and supervisor.episodeCount < supervisor.episodeLimit: observation = supervisor.reset() # Reset robot and get starting observation total_constrain_reward = 0 total_len_reward = 0 total_orientation_reward = 0 supervisor.episodeScore = 0 cnt_veryClose = 0 for step in range(supervisor.stepsPerEpisode): # print("step: ", step) # In training mode the agent samples from the probability distribution, naturally implementing exploration selectedAction, actionProb = agent.work(observation, type_="selectAction") # print(selectedAction, actionProb) # rand.seed() # print("ActionProb:",actionProb) # if(rand.randint(1,10)==1): # rand.seed() # # print("Origin Master: ", selectedAction) # selectedAction = rand.randint(0,2187-1) # # print("Change Action: ", selectedAction) # Step the supervisor to get the current selectedAction's reward, the new observation and whether we reached # the done condition newObservation, reward, done, info = supervisor.step([selectedAction]) # print("L2",newObservation[-1]) # compute done here if newObservation[-1] <= 0.01: cnt_veryClose += 1 if cnt_veryClose >= 50 or step==supervisor.stepsPerEpisode-1: done = True supervisor.preL2norm=0.4 # compute reward here ## do not get too close to the limit value # [-2.897, 2.897], [-1.763, 1.763], [-2.8973, 2.8973], [-3.072, -0.0698] # [-2.8973, 2.8973], [-0.0175, 3.7525], [-2.897, 2.897] if newObservation[0]-(-3.072)<0.05 or -0.0697976-newObservation[3]<0.05 or\ newObservation[1]-(-2.8973)<0.05 or 2.8973-newObservation[4]<0.05 or\ newObservation[2]-(-0.0175)<0.05 or 3.7525-newObservation[5]<0.05 or\ newObservation[3]-(-2.897)<0.05 or 2.897-newObservation[6]<0.05: constrain_reward = -1 # if on of the motors on the limit, reward = -2 if(newObservation[-1]<0.01): len_reward = 10 #*((supervisor.stepsPerEpisode - step)/supervisor.stepsPerEpisode) elif(newObservation[-1]<0.05): len_reward = 5 #*((supervisor.stepsPerEpisode - step)/supervisor.stepsPerEpisode) elif(newObservation[-1]<0.1): len_reward = 1 #*((supervisor.stepsPerEpisode - step)/supervisor.stepsPerEpisode) else: len_reward = -(newObservation[-1]-supervisor.preL2norm) # positive reward supervisor.preL2norm = newObservation[-1] # print("Beaker: ",supervisor.beaker.getOrientation(),"=>",supervisor.beaker.getOrientation()[4]) orientation_reward = 0.05*(supervisor.beaker.getOrientation()[4] - 1.0) # We want it to remain horizontal. reward = len_reward + constrain_reward + orientation_reward # print("reward: ",reward) # print("L2norm: ", newObservation[-1]) # print("tarPosition(trans): ", newObservation[7:10]) # print("endPosition: ", newObservation[10:13]) #print("endPosition(trans): ", ToArmCoord.convert(newObservation[10:13])) # ------compute reward end------ # Save the current state transition in agent's memory trans = Transition(observation, selectedAction, actionProb, reward, newObservation) agent.storeTransition(trans) if done: if(step==0): print("0 Step but done?") continue print("done gogo") # Save the episode's score supervisor.episodeScoreList.append(supervisor.episodeScore) agent.trainStep(batchSize=step) solved = supervisor.solved() # Check whether the task is solved agent.save('') break total_constrain_reward += constrain_reward total_len_reward += len_reward total_orientation_reward += orientation_reward supervisor.episodeScore += reward # Accumulate episode reward observation = newObservation # observation for next step is current step's newObservation fp = open("Episode-constrain-score.txt","a") fp.write(str(total_constrain_reward)+'\n') fp.close() fp = open("Episode-len-score.txt","a") fp.write(str(total_len_reward)+'\n') fp.close() fp = open("Episode-orientation-score.txt","a") fp.write(str(total_orientation_reward)+'\n') fp.close() fp = open("Episode-score.txt","a") fp.write(str(supervisor.episodeScore)+'\n') fp.close() print("Episode #", supervisor.episodeCount, "score:", supervisor.episodeScore) supervisor.episodeCount += 1 # Increment episode counter if not solved: print("Task is not solved, deploying agent for testing...") elif solved: print("Task is solved, deploying agent for testing...") observation = supervisor.reset() while True: selectedAction, actionProb = agent.work(observation, type_="selectActionMax") observation, _, _, _ = supervisor.step([selectedAction]) ```