import os
import gym
import tempfile
import pytest
import tensorflow as tf
import numpy as np

from baselines.common.tests.envs.mnist_env import MnistEnv
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
from baselines.run import get_learn_function
from baselines.common.tf_util import make_session, get_session

from functools import partial


learn_kwargs = {
    'deepq': {},
    'a2c': {},
    'acktr': {},
    'acer': {},
    'ppo2': {'nminibatches': 1, 'nsteps': 10},
    'trpo_mpi': {},
}

network_kwargs = {
    'mlp': {},
    'cnn': {'pad': 'SAME'},
    'lstm': {},
    'cnn_lnlstm': {'pad': 'SAME'}
}


@pytest.mark.parametrize("learn_fn", learn_kwargs.keys())
@pytest.mark.parametrize("network_fn", network_kwargs.keys())
def test_serialization(learn_fn, network_fn):
    '''
    Test if the trained model can be serialized
    '''


    if network_fn.endswith('lstm') and learn_fn in ['acer', 'acktr', 'trpo_mpi', 'deepq']:
            # TODO make acktr work with recurrent policies
            # and test
            # github issue: https://github.com/openai/baselines/issues/660
            return

    def make_env():
        env = MnistEnv(episode_len=100)
        env.seed(10)
        return env

    env = DummyVecEnv([make_env])
    ob = env.reset().copy()
    learn = get_learn_function(learn_fn)

    kwargs = {}
    kwargs.update(network_kwargs[network_fn])
    kwargs.update(learn_kwargs[learn_fn])


    learn = partial(learn, env=env, network=network_fn, seed=0, **kwargs)

    with tempfile.TemporaryDirectory() as td:
        model_path = os.path.join(td, 'serialization_test_model')

        with tf.Graph().as_default(), make_session().as_default():
            model = learn(total_timesteps=100)
            model.save(model_path)
            mean1, std1 = _get_action_stats(model, ob)
            variables_dict1 = _serialize_variables()

        with tf.Graph().as_default(), make_session().as_default():
            model = learn(total_timesteps=0, load_path=model_path)
            mean2, std2 = _get_action_stats(model, ob)
            variables_dict2 = _serialize_variables()

        for k, v in variables_dict1.items():
            np.testing.assert_allclose(v, variables_dict2[k], atol=0.01,
                err_msg='saved and loaded variable {} value mismatch'.format(k))

        np.testing.assert_allclose(mean1, mean2, atol=0.5)
        np.testing.assert_allclose(std1, std2, atol=0.5)


@pytest.mark.parametrize("learn_fn", learn_kwargs.keys())
@pytest.mark.parametrize("network_fn", ['mlp'])
def test_coexistence(learn_fn, network_fn):
    '''
    Test if more than one model can exist at a time
    '''

    if learn_fn == 'deepq':
            # TODO enable multiple DQN models to be useable at the same time
            # github issue https://github.com/openai/baselines/issues/656
            return

    if network_fn.endswith('lstm') and learn_fn in ['acktr', 'trpo_mpi', 'deepq']:
            # TODO make acktr work with recurrent policies
            # and test
            # github issue: https://github.com/openai/baselines/issues/660
            return

    env = DummyVecEnv([lambda: gym.make('CartPole-v0')])
    learn = get_learn_function(learn_fn)

    kwargs = {}
    kwargs.update(network_kwargs[network_fn])
    kwargs.update(learn_kwargs[learn_fn])

    learn =  partial(learn, env=env, network=network_fn, total_timesteps=0, **kwargs)
    make_session(make_default=True, graph=tf.Graph())
    model1 = learn(seed=1)
    make_session(make_default=True, graph=tf.Graph())
    model2 = learn(seed=2)

    model1.step(env.observation_space.sample())
    model2.step(env.observation_space.sample())



def _serialize_variables():
    sess = get_session()
    variables = tf.trainable_variables()
    values = sess.run(variables)
    return {var.name: value for var, value in zip(variables, values)}


def _get_action_stats(model, ob):
    ntrials = 1000
    if model.initial_state is None or model.initial_state == []:
        actions = np.array([model.step(ob)[0] for _ in range(ntrials)])
    else:
        actions = np.array([model.step(ob, S=model.initial_state, M=[False])[0] for _ in range(ntrials)])

    mean = np.mean(actions, axis=0)
    std = np.std(actions, axis=0)

    return mean, std

