# Defining a new task

In order to use new tasks in our experiments, we need to define two classes:

* A [Task](/api/allenact/base_abstractions/task#task), including, among others, a `step` implementation providing a
[RLStepResult](/api/allenact/base_abstractions/misc#rlstepresult), a `metrics` method providing quantitative performance measurements 
for agents and, optionally, a `query_expert` method that can be used e.g. with an
[imitation loss](/api/allenact/algorithms/onpolicy_sync/losses/imitation#imitation) during training.
* A [TaskSampler](/api/allenact/base_abstractions/task#tasksampler), that allows instantiating new Tasks for the agents to solve during
training, validation and testing.

## Task

Let's define a semantic navigation task, where agents have to navigate from a starting point in an environment to an
object of a specific class using a minimal amount of steps and deciding when the goal has been reached.

We need to define the methods `action_space`, `render`, `_step`, `reached_terminal_state`, `class_action_names`, `close`,
`metrics`, and `query_expert` from the base `Task` definition.


### Initialization, action space and termination
Let's start with the definition of the action space and task initialization:

```python
...
from allenact_plugins.ithor_plugin.ithor_constants import (
    MOVE_AHEAD,
    ROTATE_LEFT,
    ROTATE_RIGHT,
    LOOK_DOWN,
    LOOK_UP,
    END,
)

...


class ObjectNaviThorGridTask(Task[IThorEnvironment]):
    _actions = (MOVE_AHEAD, ROTATE_LEFT, ROTATE_RIGHT, LOOK_DOWN, LOOK_UP, END)

    def __init__(
            self,
            env: IThorEnvironment,
            sensors: List[Sensor],
            task_info: Dict[str, Any],
            max_steps: int,
            **kwargs
    ) -> None:
        super().__init__(
            env=env,
            sensors=sensors,
            task_info=task_info,
            max_steps=max_steps, **kwargs
        )
        self._took_end_action: bool = False
        self._success: Optional[bool] = False

    @property
    def action_space(self):
        return gym.spaces.Discrete(len(self._actions))

    @classmethod
    def class_action_names(cls) -> Tuple[str, ...]:
        return cls._actions

    def reached_terminal_state(self) -> bool:
        return self._took_end_action

    def close(self) -> None:
        self.env.stop()

    ...
```

### Step method
Next, we define the main method `_step` that will be called every time the agent produces a new action: 
```python
class ObjectNaviThorGridTask(Task[IThorEnvironment]):
    ...
    def _step(self, action: Union[int, Sequence[int]]) -> RLStepResult:
        assert isinstance(action, int)
        action = cast(int, action)

        action_str = self.class_action_names()[action]

        if action_str == END:
            self._took_end_action = True
            self._success = self.is_goal_object_visible()
            self.last_action_success = self._success
        else:
            self.env.step({"action": action_str})
            self.last_action_success = self.env.last_action_success

        step_result = RLStepResult(
            observation=self.get_observations(),
            reward=self.judge(),
            done=self.is_done(),
            info={"last_action_success": self.last_action_success},
        )
        return step_result
    
    ...

    def is_goal_object_visible(self) -> bool:
        return any(
            o["objectType"] == self.task_info["object_type"]
            for o in self.env.visible_objects()
        )

    def judge(self) -> float:
        reward = -0.01

        if not self.last_action_success:
            reward += -0.03

        if self._took_end_action:
            reward += 1.0 if self._success else -1.0

        return float(reward)
```

###  Metrics, rendering and expert actions

Finally, we define methods to render and evaluate the current task, and optionally generate expert actions to be used
e.g. for DAgger training.
```python
    def render(self, mode: str = "rgb", *args, **kwargs) -> numpy.ndarray:
        assert mode == "rgb", "only rgb rendering is implemented"
        return self.env.current_frame


    def metrics(self) -> Dict[str, Any]:
        if not self.is_done():
            return {}
        else:
            return {"success": self._success, "ep_length": self.num_steps_taken()}

    def query_expert(self, **kwargs) -> Tuple[int, bool]:
        return my_objnav_expert_implementation(self)
```

## TaskSampler

We also need to define the corresponding TaskSampler, which must contain implementations for methods `__len__`,
`total_unique`, `last_sampled_task`, `next_task`, `close`, `reset`, and `set_seed`. Currently,
an additional method `all_observation_spaces_equal` is used to ensure compatibility with the current
[RolloutBlockStorage](/api/allenact/algorithms/onpolicy_sync/storage#rolloutblockstorage).

Let's define a tasks sampler able to provide an infinite number of object navigation tasks for AI2-THOR.

### Initialization and termination 

```python
class ObjectNavTaskSampler(TaskSampler):
    def __init__(
        self,
        scenes: List[str],
        object_types: str,
        sensors: List[Sensor],
        max_steps: int,
        env_args: Dict[str, Any],
        action_space: gym.Space,
        seed: Optional[int] = None,
        deterministic_cudnn: bool = False,
        *args,
        **kwargs
    ) -> None:
        self.env_args = env_args
        self.scenes = scenes
        self.object_types = object_types
        self.grid_size = 0.25
        self.env: Optional[IThorEnvironment] = None
        self.sensors = sensors
        self.max_steps = max_steps
        self._action_sapce = action_space

        self.scene_id: Optional[int] = None

        self._last_sampled_task: Optional[ObjectNaviThorGridTask] = None

        set_seed(seed)

        self.reset()

    def close(self) -> None:
        if self.env is not None:
            self.env.stop()

    def reset(self):
        self.scene_id = 0
    
    def _create_environment(self) -> IThorEnvironment:
        env = IThorEnvironment(
            make_agents_visible=False,
            object_open_speed=0.05,
            restrict_to_initially_reachable_points=True,
            **self.env_args,
        )
        return env
```

### Task sampling

Finally, we need to define methods to determine the number of available tasks (possibly infinite) and sample tasks:
```python

    @property
    def length(self) -> Union[int, float]:
        return float("inf")

    @property
    def total_unique(self) -> Optional[Union[int, float]]:
        return None

    @property
    def last_sampled_task(self) -> Optional[ObjectNaviThorGridTask]:
        return self._last_sampled_task

    @property
    def all_observation_spaces_equal(self) -> bool:
        return True

    def next_task(self) -> Optional[ObjectNaviThorGridTask]:
        self.scene_id = random.randint(0, len(self.scenes) - 1)
        self.scene = self.scenes[self.scene_id]

        if self.env is not None:
            if scene != self.env.scene_name:
                self.env.reset(scene)
        else:
            self.env = self._create_environment()
            self.env.reset(scene_name=scene)

        self.env.randomize_agent_location()

        task_info = {"object_type": random.sample(self.object_types, 1)}

        self._last_sampled_task = ObjectNaviThorGridTask(
            env=self.env,
            sensors=self.sensors,
            task_info=task_info,
            max_steps=self.max_steps,
            action_space=self._action_sapce,
        )
        return self._last_sampled_task
```