[docs]classUnityCommunication(object):""" Class to communicate with the Unity simulator and generate videos or agent behaviors :param str url: which url to use to communicate :param str port: which port to use to communicate :param str file_name: location of the Unity executable. If provided, it will open the executable, if `None`, it wil assume that the executable is already running :param str x_display: if using a headless server, display to use for rendering :param bool no_graphics: whether to run the simualtor without graphics :param bool logging: log simulator data :param int timeout_wait: how long to wait until connection with the simulator is called unsuccessful :param bool docker_enabled: whether the simulator is running in a docker container """def__init__(self,url='127.0.0.1',port='8080',file_name=None,x_display=None,no_graphics=False,logging=True,timeout_wait=30,docker_enabled=False):self._address='http://'+url+':'+portself.port=portself.graphics=no_graphicsself.x_display=x_displayself.launcher=Noneself.timeout_wait=timeout_waitiffile_nameisnotNone:self.launcher=communication.UnityLauncher(port=port,file_name=file_name,x_display=x_display,no_graphics=no_graphics,logging=logging,docker_enabled=docker_enabled)ifself.launcher.batchmode:print('Getting connection...')succeeded=Falsetries=0whiletries<5andnotsucceeded:tries+=1try:self.check_connection()succeeded=Trueexcept:time.sleep(2)ifnotsucceeded:sys.exit()defrequests_retry_session(self,retries=5,backoff_factor=2,status_forcelist=(500,502,504),session=None,):session=sessionorrequests.Session()retry=Retry(total=retries,read=retries,connect=retries,backoff_factor=backoff_factor,status_forcelist=status_forcelist,)adapter=HTTPAdapter(max_retries=retry)session.mount('http://',adapter)returnsessiondefclose(self):ifself.launcherisnotNone:self.launcher.close()defpost_command(self,request_dict,repeat=False):try:ifrepeat:resp=self.requests_retry_session().post(self._address,json=request_dict)else:resp=requests.post(self._address,json=request_dict,timeout=self.timeout_wait)ifresp.status_code!=requests.codes.ok:print(resp)raiseUnityEngineException(resp.status_code,resp.json())returnresp.json()exceptrequests.exceptions.RequestExceptionase:raiseUnityCommunicationException(str(e))defcheck_connection(self):response=self.post_command({'id':str(time.time()),'action':'idle'},repeat=True)returnresponse['success']
[docs]defget_visible_objects(self,camera_index):""" Obtain visible objects according to a given camera :param int camera_index: the camera for which you want to check the objects. Between 0 and `camera_count-1` :return: pair success (bool), msg: the object indices visible according to the camera """response=self.post_command({'id':str(time.time()),'action':'observation','intParams':[camera_index]})try:msg=json.loads(response['message'])exceptExceptionase:msg=response['message']returnresponse['success'],msg
[docs]defadd_character(self,character_resource='Chars/Male1',position=None,initial_room=""):""" Add a character in the scene. :param str character_resource: which game object to use for the character # :param int char_index: the index of the character you want to move :param list position: the position where you want to place the character :param str initial_room: the room where you want to put the character, if position is not specified. If this is not specified, it places character in random location :return: success (bool) """mode='random'pos=[0,0,0]ifpositionisnotNone:mode='fix_position'pos=positionelifnotlen(initial_room)==0:assertinitial_roomin["kitchen","bedroom","livingroom","bathroom"]mode='fix_room'response=self.post_command({'id':str(time.time()),'action':'add_character','stringParams':[json.dumps({'character_resource':character_resource,'mode':mode,'character_position':{'x':pos[0],'y':pos[1],'z':pos[2]},'initial_room':initial_room})]})returnresponse['success']
[docs]defmove_character(self,char_index,pos):""" Move the character `char_index` to a new position :param int char_index: the index of the character you want to move :param list pos: the position where you want to place the character :return: succes (bool) """response=self.post_command({'id':str(time.time()),'action':'move_character','stringParams':[json.dumps({'char_index':char_index,'character_position':{'x':pos[0],'y':pos[1],'z':pos[2]},})]})returnresponse['success']
[docs]defadd_camera(self,position=[0,1,0],rotation=[0,0,0],field_view=40):""" Add a new scene camera. The camera will be static in the scene. :param list position: the position of the camera, with respect to the agent :param list rotation: the rotation of the camera, with respect to the agent :param list field_view: the field of view of the camera :return: succes (bool) """cam_dict={'position':{'x':position[0],'y':position[1],'z':position[2]},'rotation':{'x':rotation[0],'y':rotation[1],'z':rotation[2]},'field_view':field_view}response=self.post_command({'id':str(time.time()),'action':'add_camera','stringParams':[json.dumps(cam_dict)]})returnresponse['success'],response['message']
[docs]defupdate_camera(self,camera_index,position=[0,1,0],rotation=[0,0,0],field_view=40):""" Updates an existing camera, identified by index. :param int camera_index: the index of the camera you want to update :param list position: the position of the camera, with respect to the agent :param list rotation: the rotation of the camera, with respect to the agent :param list field_view: the field of view of the camera :return: succes (bool) """cam_dict={'position':{'x':position[0],'y':position[1],'z':position[2]},'rotation':{'x':rotation[0],'y':rotation[1],'z':rotation[2]},'field_view':field_view}response=self.post_command({'id':str(time.time()),'action':'update_camera','intParams':[camera_index],'stringParams':[json.dumps(cam_dict)]})returnresponse['success'],response['message']
[docs]defadd_character_camera(self,position=[0,1,0],rotation=[0,0,0],name="new_camera"):""" Add a new character camera. The camera will be added to every character you include in the scene, and it will move with the character. This must be called before adding any character. :param list position: the position of the camera, with respect to the agent :param list rotation: the rotation of the camera, with respect to the agent :name: the name of the camera, used for recording when calling render script :return: succes (bool) """cam_dict={'position':{'x':position[0],'y':position[1],'z':position[2]},'rotation':{'x':rotation[0],'y':rotation[1],'z':rotation[2]},'camera_name':name}response=self.post_command({'id':str(time.time()),'action':'add_character_camera','stringParams':[json.dumps(cam_dict)]})returnresponse['success'],response['message']
[docs]defupdate_character_camera(self,position=[0,1,0],rotation=[0,0,0],name="PERSON_FRONT"):""" Update character camera specified by name. This must be called before adding any character. :param list position: the position of the camera, with respect to the agent :param list rotation: the rotation of the camera, with respect to the agent :name: the name of the camera, used for recording when calling render script :return: succes (bool) """cam_dict={'position':{'x':position[0],'y':position[1],'z':position[2]},'rotation':{'x':rotation[0],'y':rotation[1],'z':rotation[2]},'camera_name':name}response=self.post_command({'id':str(time.time()),'action':'update_character_camera','stringParams':[json.dumps(cam_dict)]})returnresponse['success'],response['message']
[docs]defreset(self,environment=None):""" Reset scene. Deletes characters and scene changes, and loads the scene in scene_index :param int environment: integer between 0 and 49, corresponding to the apartment we want to load :return: succes (bool) """response=self.post_command({'id':str(time.time()),'action':'clear','intParams':[]ifenvironmentisNoneelse[environment]})response=self.post_command({'id':str(time.time()),'action':'environment','intParams':[]ifenvironmentisNoneelse[environment]})returnresponse['success']
[docs]deffast_reset(self,environment=None):""" Fast scene. Deletes characters and scene changes :return: success (bool) """response=self.post_command({'id':str(time.time()),'action':'fast_reset','intParams':[]ifenvironmentisNoneelse[environment]})returnresponse['success']
[docs]defprocedural_generation(self,seed=None):""" Generates new environments through procedural generation logic. :param int seed: integer corresponding to the seed given during generation :return: success (bool), seed: (integer) """response=self.post_command({'id':str(time.time()),'action':'clear_procedural','intParams':[]})response=self.post_command({'id':str(time.time()),'action':'procedural_generation','intParams':[]ifseedisNoneelse[seed]})returnresponse['success'],response['message']
[docs]defcamera_count(self):""" Returns the number of cameras in the scene, including static cameras, and cameras for each character :return: pair success (bool), num_cameras (int) """response=self.post_command({'id':str(time.time()),'action':'camera_count'})returnresponse['success'],response['value']
[docs]defcharacter_cameras(self):""" Returns the number of cameras in the scene :return: pair success (bool), camera_names: (list): the names of the cameras defined fo the characters """response=self.post_command({'id':str(time.time()),'action':'character_cameras'})returnresponse['success'],response['message']
[docs]defcamera_data(self,camera_indexes):""" Returns camera data for cameras given in camera_indexes list :param list camera_indexes: the list of cameras to return, can go from 0 to `camera_count-1` :return: pair success (bool), cam_data: (list): for every camera, the matrices with the camera parameters """ifnotisinstance(camera_indexes,collections.Iterable):camera_indexes=[camera_indexes]response=self.post_command({'id':str(time.time()),'action':'camera_data','intParams':camera_indexes})returnresponse['success'],json.loads(response['message'])
[docs]defcamera_image(self,camera_indexes,mode='normal',image_width=640,image_height=480):""" Returns a list of renderings of cameras given in camera_indexes. :param list camera_indexes: the list of cameras to return, can go from 0 to `camera_count-1` :param str mode: what kind of camera rendering to return. Possible modes are: "normal", "seg_inst", "seg_class", "depth", "flow", "albedo", "illumination", "surf_normals" :param int image_width: width of the returned images :param int image_height: height of the returned iamges :return: pair success (bool), images: (list) a list of images according to the camera rendering mode """ifnotisinstance(camera_indexes,collections.Iterable):camera_indexes=[camera_indexes]params={'mode':mode,'image_width':image_width,'image_height':image_height}response=self.post_command({'id':str(time.time()),'action':'camera_image','intParams':camera_indexes,'stringParams':[json.dumps(params)]})returnresponse['success'],_decode_image_list(response['message_list'])
[docs]definstance_colors(self):""" Return a mapping from rgb colors, shown on `seg_inst` to object `id`, specified in the environment graph. :return: pair success (bool), mapping: (dictionary) """response=self.post_command({'id':str(time.time()),'action':'instance_colors'})returnresponse['success'],json.loads(response['message'])
[docs]defenvironment_graph(self):""" Returns environment graph, at the current state :return: pair success (bool), graph: (dictionary) """response=self.post_command({'id':str(time.time()),'action':'environment_graph'})returnresponse['success'],json.loads(response['message'])
[docs]defexpand_scene(self,new_graph,randomize=False,random_seed=-1,animate_character=False,ignore_placing_obstacles=False,prefabs_map=None,transfer_transform=True):""" Expands scene with the given graph. Given a starting scene without characters, it updates the scene according to new_graph, which contains a modified description of the scene. Can be used to add, move, or remove objects or change their state or size. :param dict new_graph: a dictionary corresponding to the new graph of the form `{'nodes': ..., 'edges': ...}` :param int bool randomize: a boolean indicating if the new positioni/types of objects should be random :param int random_seed: seed to use for randomize. random_seed < 0 means that seed is not set :param bool animate_character: boolean indicating if the added character should be frozen or not. :param bool ignore_placing_obstacles: when adding new objects, if the transform is not specified, whether to consider if it collides with existing objects :param dict prefabs_map: dictionary to specify which Unity game objects should be used when creating new objects :param bool transfer_transform: boolean indicating if we should set the exact position of new added objects or not :return: pair success (bool), message: (str) """config={'randomize':randomize,'random_seed':random_seed,'animate_character':animate_character,'ignore_obstacles':ignore_placing_obstacles,'transfer_transform':transfer_transform}string_params=[json.dumps(config),json.dumps(new_graph)]int_params=[int(randomize),random_seed]ifprefabs_mapisnotNone:string_params.append(json.dumps(prefabs_map))response=self.post_command({'id':str(time.time()),'action':'expand_scene','stringParams':string_params})try:message=json.loads(response['message'])exceptValueError:message=response['message']returnresponse['success'],message
[docs]defset_time(self,hours=0,minutes=0,seconds=0):""" Set the time in the environment :param int hours: hours in 24-hour time :param int minutes: minutes in 24-hour time :param int seconds: seconds in 24-hour time :param int scaler: scaler is a multipler that increase/decreases time step :return: success (bool) """time_dict={'hours':hours,'minutes':minutes,'seconds':seconds}response=self.post_command({'id':str(time.time()),'action':'set_time','stringParams':[json.dumps(time_dict)]})returnresponse['success']
[docs]defactivate_physics(self,gravity=-10):""" Activates gravity and realistic collisions in the environment :param list gravity: int of gravity value experienced in the environment :return: success (bool) """physics_dict={'gravity':gravity}response=self.post_command({'id':str(time.time()),'action':'activate_physics','stringParams':[json.dumps(physics_dict)]})returnresponse['success']
[docs]defrender_script(self,script,randomize_execution=False,random_seed=-1,processing_time_limit=10,skip_execution=False,find_solution=False,output_folder='Output/',file_name_prefix="script",frame_rate=5,image_synthesis=['normal'],save_pose_data=False,image_width=640,image_height=480,recording=False,save_scene_states=False,camera_mode=['AUTO'],time_scale=1.0,skip_animation=False):""" Executes a script in the simulator. The script can be single or multi agent, and can be used to generate a video, or just to change the state of the environment :param list script: a list of script lines, of the form `['<char{id}> [{Action}] <{object_name}> ({object_id})']` :param bool randomize_execution: randomly choose elements :param int random_seed: random seed to use when randomizing execution, -1 means that the seed is not set :param bool find_solution: find solution (True) or use graph ids to determine object instances (False) :param int processing_time_limit: time limit for finding a solution in seconds :param int skip_execution: skip rendering, only check if a solution exists :param str output_folder: folder to output renderings :param str file_name_prefix: prefix of created files :param int frame_rate: frame rate at which to generate the video :param list image_synthesis: what information to save. Can be multiple at the same time. Modes are: "normal", "seg_inst", "seg_class", "depth", "flow", "albedo", "illumination", "surf_normals". Leave empty if you don't want to generate anythign :param bool save_pose_data: save pose data, a skeleton for every agent and frame :param int image_width: image_height for the generated frames :param int image_height: image_height for the generated frames :param bool recording: whether to record data with cameras :param bool save_scene_states: save scene states (this will be unused soon) :param list camera_mode: list with cameras used to render data. Can be a str(i) with i being a scene camera index or one of the cameras from `character_cameras` :param int time_scale: accelerate time at which actions happen :param bool skip_animation: whether agent should teleport/do actions without animation (True), or perform the animations (False) :return: pair success (bool), message: (str) """params={'randomize_execution':randomize_execution,'random_seed':random_seed,'processing_time_limit':processing_time_limit,'skip_execution':skip_execution,'output_folder':output_folder,'file_name_prefix':file_name_prefix,'frame_rate':frame_rate,'image_synthesis':image_synthesis,'find_solution':find_solution,'save_pose_data':save_pose_data,'save_scene_states':save_scene_states,'camera_mode':camera_mode,'recording':recording,'image_width':image_width,'image_height':image_height,'time_scale':time_scale,'skip_animation':skip_animation}response=self.post_command({'id':str(time.time()),'action':'render_script','stringParams':[json.dumps(params)]+script})try:message=json.loads(response['message'])exceptValueError:message=response['message']returnresponse['success'],message
def_decode_image(img_string):img_bytes=base64.b64decode(img_string)if'PNG'==img_bytes[1:4]:img_file=cv2.imdecode(np.fromstring(img_bytes,np.uint8),cv2.IMREAD_COLOR)else:img_file=cv2.imdecode(np.fromstring(img_bytes,np.uint8),cv2.IMREAD_ANYDEPTH+cv2.IMREAD_ANYCOLOR)returnimg_filedef_decode_image_list(img_string_list):image_list=[]forimg_stringinimg_string_list:image_list.append(_decode_image(img_string))returnimage_listclassUnityEngineException(Exception):""" This exception is raised when an error in communication occurs: - Unity has received invalid request More information is in the message. """def__init__(self,status_code,resp_dict):resp_msg=resp_dict['message']if'message'inresp_dictelse'Message not available'self.message='Unity returned response with status: {0} ({1}), message: {2}'.format(status_code,requests.status_codes._codes[status_code][0],resp_msg)classUnityCommunicationException(Exception):def__init__(self,message):self.message=message