import numpy as np
import time
from collections import OrderedDict
from digideep.environment import MakeEnvironment
from .data_helpers import flatten_dict, update_dict_of_lists, complete_dict_of_list, convert_time_to_batch_major, extract_keywise
# from mujoco_py import MujocoException
# from dm_control.rl.control import PhysicsError
from digideep.utility.logging import logger
from digideep.utility.profiling import KeepTime
from digideep.utility.monitoring import monitor
[docs]class Explorer:
"""A class which runs environments in parallel and returns the result trajectories in a unified structure.
It support multi-agents in an environment.
Note:
The entrypoint of this class is the :func:`update` function, in which the :func:`step` function will be
called for ``n_steps`` times. In the :func:`step` function, the :func:`prestep` function is called first to get the
actions from the agents. Then the ``env.step`` function is called to execute those actions in the environments.
After the loop is done in the :func:`update`, we do another :func:`prestep` to save the ``observations``/``actions``
of the last step. This indicates the final action that the agent would take without actually executing that. This
information will be useful in some algorithms.
Args:
session (:obj:`~digideep.pipeline.session.Session`): The running session object.
agents (dict): A dictionary of the agents and their corresponding agent objects.
mode (str): The mode of the Explorer, which is any of the three: ``train`` | ``test`` | ``eval``
env (:obj:`env`): The parameters of the environment.
do_reset (bool): A flag indicating whether to reset the environment at the update start.
final_action (bool): A flag indicating whether in the final call of :func:`prestep` the action should also be generated or not.
num_workers (int): Number of workers to work in parallel.
deterministic (bool): Whether to choose the optimial action or to mix some noise with the action (i.e. for exploration).
n_steps (int): Number of steps to take in the :func:`update`.
render (bool): A flag used to indicate whether environment should be rendered at each step.
render_delay (float): The amount of seconds to wait after calling ``env.render``. Used when environment is too fast for
visualization, typically in ``eval`` mode.
seed (int): The environment seed.
Attributes:
steps (int): Number of times the :func:`step` function is called.
n_episode (int): Number of episodes (a full round of simulation) generated so far.
timesteps (int): Number of total timesteps of experience generated so far.
was_reset (bool): A flag indicating whether the Explorer has been just reset or not.
observations: A tracker of environment observations used to produce the actions for the next step.
masks: A tracker of environment ``done`` flag indicating the start of a new episode.
hidden_states: A tracker of hidden_states of the agents for producing the next step action in recurrent policies.
Caution:
Use ``do_reset`` with caution; only when you know what the consequences are.
Generally there are few oportunities when this flag needs to be true.
Tip:
This class is partially serializable. It only saves the state of environment wrappers and not the environment per se.
See Also:
:ref:`ref-data-structure`
"""
def __init__(self, session, agents=None, **params):
self.agents = agents
self.params = params
self.session = session
# Create models
extra_env_kwargs = self.params.get("extra_env_kwargs", {})
menv = MakeEnvironment(session, mode=self.params["mode"], seed=self.params["seed"], **self.params["env"])
self.envs = menv.create_envs(num_workers=self.params["num_workers"], extra_env_kwargs=extra_env_kwargs)
# self.params["env"]["env_type"]
self.state = {}
self.state["steps"] = 0
self.state["n_episode"] = 0
self.state["timesteps"] = 0
self.state["was_reset"] = False
self.local = {}
self.local["steps"] = 0
self.local["n_episode"] = 0
self.monitor_n_episode()
self.monitor_timesteps()
# We only reset once. Later environments will be reset automatically.
self.reset()
# Will the results be reported when using ``do_reset``?`
[docs] def monitor_n_episode(self):
if self.params["mode"] == "train":
monitor.set_meta_key("episode", self.state["n_episode"])
[docs] def monitor_timesteps(self):
if self.params["mode"] == "train":
monitor.set_meta_key("frame", self.state["timesteps"])
[docs] def state_dict(self):
# TODO" Should we make a deepcopy?
return {"state":self.state, "envs":self.envs.state_dict()}
[docs] def load_state_dict(self, state_dict):
self.state.update(state_dict["state"])
self.envs.load_state_dict(state_dict["envs"])
self.monitor_n_episode()
self.monitor_timesteps()
# if self.params["mode"] in ["test", "eval"]:
# # We reset the explorer in case of test/eval to clear the history of observations/masks/hidden_state.
# # Because this part does not make sense to be transferred.
# self.reset()
[docs] def report_rewards(self, infos):
"""This function will extract episode information from infos and will send them to
:class:`~digideep.utility.monitoring.Monitor` class.
"""
# This episode keyword only exists if we use a Monitor wrapper.
# This keyword will only appear at the "reset" times.
# TODO: If this is a true multi-agent system, then the rewards
# must be separated as well!
if '/episode/r' in infos.keys():
rewards = infos['/episode/r']
for rew in rewards:
if (rew is not None) and (not np.isnan(rew)):
self.local["n_episode"] += 1
self.state["n_episode"] += 1
self.monitor_n_episode()
monitor("/reward/"+self.params["mode"]+"/episodic", rew, window=self.params["win_size"])
self.session.writer.add_scalar('reward/'+self.params["mode"], rew, self.state["n_episode"])
[docs] def close(self):
"""It closes all environments.
"""
self.envs.close()
[docs] def reset(self):
"""Will reset the Explorer and all of its states. Will set ``was_reset`` to ``True`` to prevent immediate resets.
"""
self.state["observations"] = self.envs.reset()
self.state["masks"] = np.array([[0]]*self.params["num_workers"], dtype=np.float32)
# The initial hidden_state is not saved in the memory. The only use for it is
# getting passed to the action_generator.
# So if there is a size mismatch between this and the next hidden_states, no
# conflicts/errors would happen.
self.state["hidden_state"] = {}
for agent_name in self.agents:
self.state["hidden_state"][agent_name] = self.agents[agent_name].reset_hidden_state(self.params["num_workers"])
self.state["was_reset"] = True
[docs] def prestep(self, final_step=False):
"""
Function to produce actions for all of the agents. This function does not execute the actions in the environment.
Args:
final_step (bool): A flag indicating whether this is the last call of this function.
Returns:
dict: The pre-transition dictionary containing observations, masks, and agents informations. The format is like:
``{"observations":..., "masks":..., "agents":...}``
"""
with KeepTime("to_numpy"):
# TODO: Is it necessary for conversion of obs?
# NOTE: The np conversion will not work if observation is a dictionary.
# observations = np.array(self.state["observations"], dtype=np.float32)
observations = self.state["observations"]
masks = self.state["masks"]
hidden_state = self.state["hidden_state"]
with KeepTime("gen_action"):
publish_agents = True
agents = {}
# TODO: We are assuming a one-level action space.
if (not final_step) or (self.params["final_action"]):
if self.state["steps"] < self.params["warm_start"]:
# Take RANDOM actions if warm-starting
for agent_name in self.agents:
agents[agent_name] = self.agents[agent_name].random_action_generator(self.envs, self.params["num_workers"])
else:
# Take REAL actions if not warm-starting
for agent_name in self.agents:
action_generator = self.agents[agent_name].action_generator
agents[agent_name] = action_generator(observations, hidden_state[agent_name], masks, deterministic=self.params["deterministic"])
else:
publish_agents = False
# We are saving the "new" hidden_state now.
# for agent_name in self.agents:
# if (not final_step) or (self.params["final_action"]):
# action_generator = self.agents[agent_name].action_generator
# agents[agent_name] = action_generator(observations, hidden_state[agent_name], masks, deterministic=self.params["deterministic"])
# else:
# publish_agents = False
with KeepTime("form_dictionary"):
if publish_agents:
pre_transition = dict(observations=observations,
masks=masks,
agents=agents)
else:
pre_transition = dict(observations=observations,
masks=masks)
return pre_transition
[docs] def step(self):
"""Function that runs the ``prestep`` and the actual ``env.step`` functions.
It will also manipulate the transition data to be in appropriate format.
Returns:
dict: The full transition information, including the pre-transition (actions, last observations, etc) and the
results of executing actions on the environments, i.e. rewards and infos. The format is like:
``{"observations":..., "masks":..., "rewards":..., "infos":..., "agents":...}``
See Also:
:ref:`ref-data-structure`
"""
# We are saving old versions of observations, hidden_state, and masks.
with KeepTime("prestep"):
pre_transition = self.prestep()
# TODO: For true multi-agent systems, rewards must be a dictionary as well,
# i.e. one reward for each agent. However, if the agents are pursuing
# a single goal, the reward can still be a single scalar!
# Updating observations and masks: These two are one step old in the trajectory.
# hidden_state is the newest.
with KeepTime("envstep"):
# Prepare actions
actions = extract_keywise(pre_transition["agents"], "actions")
# Step
self.state["observations"], rewards, dones, infos = self.envs.step(actions)
# Post-step
self.state["hidden_state"] = extract_keywise(pre_transition["agents"], "hidden_state")
self.state["masks"] = np.array([0.0 if done_ else 1.0 for done_ in dones], dtype=np.float32).reshape((-1,1))
# NOTE: Uncomment if you find useful information in the continuous rewards ...
# monitor("/reward/"+self.params["mode"]+"/continuous", np.mean(rewards))
with KeepTime("render"):
if self.params["render"]:
self.envs.render()
if self.params["render_delay"] > 0:
time.sleep(self.params["render_delay"])
# except MujocoException as e:
# logger.error("We got a MuJoCo exception!")
# raise
# ## Retry??
# # return self.run()
with KeepTime("poststep"):
# TODO: Sometimes the type of observations is "dict" which shouldn't be. Investigate the reason.
if isinstance(self.state["observations"], OrderedDict) or isinstance(self.state["observations"], dict):
for key in self.state["observations"]:
if np.isnan(self.state["observations"][key]).any():
logger.warn('NaN caught in observations during rollout generation.', 'step =', self.state["steps"])
raise ValueError
else:
if np.isnan(self.state["observations"]).any():
logger.warn('NaN caught in observations during rollout generation.', 'step =', self.state["steps"])
raise ValueError
## Retry??
# return self.run()
self.state["steps"] += 1
self.state["timesteps"] += self.params["num_workers"]
self.monitor_timesteps()
# TODO: Adapt with the new dict_of_lists data structure.
with KeepTime("report_reward"):
self.report_rewards(infos)
transition = dict(**pre_transition,
rewards=rewards,
infos=infos)
return transition
[docs] def update(self):
"""Runs :func:`step` for ``n_steps`` times.
Returns:
dict: A dictionary of unix-stype file system keys including all information generated by the simulation.
See Also:
:ref:`ref-data-structure`
"""
# trajectory is a dictionary of lists
trajectory = {}
if not self.state["was_reset"] and self.params["do_reset"]:
self.reset()
self.state["was_reset"] = False
# Run T (n-step) steps.
self.local["steps"] = 0
self.local["n_episode"] = 0
while (self.params["n_steps"] and self.local["steps"] < self.params["n_steps"]) or \
(self.params["n_episodes"] and self.local["n_episode"] < self.params["n_episodes"]):
with KeepTime("step"):
# print("one exploration step ...")
transition = self.step()
with KeepTime("append"):
# Data is flattened in the explorer per se.
transition = flatten_dict(transition)
# Update the trajectory with the current list of data.
# Put nones if the key is absent.
update_dict_of_lists(trajectory, transition, index=self.local["steps"])
self.local["steps"] += 1
with KeepTime("poststep"):
# Take one prestep so we have the next observation/hidden_state/masks/action/value/ ...
transition = self.prestep(final_step=True)
transition = flatten_dict(transition)
update_dict_of_lists(trajectory, transition, index=self.local["steps"])
# Complete the trajectory if one key was in a transition, but did not occur in later
# transitions. "length=n_steps+1" is because of counting final out-of-loop prestep.
# complete_dict_of_list(trajectory, length=self.params["n_steps"]+1)
complete_dict_of_list(trajectory, length=self.local["steps"]+1)
result = convert_time_to_batch_major(trajectory)
# We discard the rest of monitored episodes for the test mode to prevent them from affecting next test.
monitor.discard_key("/reward/test/episodic")
return result
### Data Structure:
# Pre-step:
# observations
# masks:
#
# Agent (policies):
# actions
# hidden_state
# artifacts:
# action_log_p
# value
#
# Step:
# rewards
# infos
######################
##### Statistics #####
######################
# Stats: Wall-time