-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathInteraction_example.py
More file actions
160 lines (127 loc) · 5.58 KB
/
Copy pathInteraction_example.py
File metadata and controls
160 lines (127 loc) · 5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Dec 7 08:46:49 2018
@author: daiwei.lin
Diagram of structure:
------------------------------------------------------------------
| ML_LAS_Interface |
| ------------------------------------ |
| | Env_Example | |
| ------------------------------------ |
------------------------------------------------------------------
/\ |
| action,flag | observation
| |
------|--------------------|-------------------------------------
| | | LASBaselineAgent |
| | | |
| | | |
| | | |
| | \/ |
| ------------------------------- |
| | Internal Environment | |
| ------------------------------- |
| /\ | Flt observation, reward, flag |
| | action \/ |
| --------------------------- |
| | Baseline agent | |
| --------------------------- |
| |
------------------------------------------------------------------
"""
from Environment.LASROMEnv import LASROMEnv
from LASAgent.LASBaselineAgent import LASBaselineAgent
import numpy as np
from gym import spaces
import logging
class Env_Example():
"""
This class represents LAS system.
"""
def __init__(self, action_dimension, sensors_dimension):
"""
Create observation and action space:
observation space: all observation values are within range [0,1]
# IRs shared by all agents:
self.observation_space (gym.spaces.Box): observation space shared by all agents
action space: all action values are within range [-1,1]
self.para_action_space (gym.spaces.Box): action space
"""
obs_max = np.array([1.] * sensors_dimension)
obs_min = np.array([0.]*sensors_dimension)
self.observation_space = spaces.Box(obs_min, obs_max, dtype = np.float32)
para_act_max = np.array([1] * action_dimension)
para_act_min = np.array([-1] * action_dimension)
self.action_space = spaces.Box(para_act_max, para_act_min, dtype=np.float32)
def reset(self):
"""
Reset environment and return an observation.
Here returned observation is a random sample in the observation space
"""
return self.observation_space.sample()
def step(self, action):
"""
Take one step using given action.
Return observation, done, info
Done isn't used.(set to False)
info isn't used here. (set to '')
"""
return self.observation_space.sample(), False, ''
class ML_LAS_Interface():
"""
This class is the interface between the ML and LAS system
"""
def __init__(self, env):
self.env = env
self.observation = env.reset()
self.action = env.action_space.sample()
def get_observation(self):
return self.observation
def reset(self):
self.env.reset()
def take_action(self, action):
self.action = action
# print("action :", action)
observation, done, info = self.env.step(action)
self.observation = observation
if __name__ == '__main__':
#================#
# initialization #
#================#
logger = logging.getLogger(__name__)
# V-REP simulator
ROMenv = LASROMEnv(IP='127.0.0.1',
Port=19997,
reward_function_type='ir')
# Example environment
# env = Env_Example(action_dimension=5, sensors_dimension=10)
print("env Initialized")
interface = ML_LAS_Interface(ROMenv)
print("interface Initialized")
# Constants
agent_name = 'LAS_Baseline_Agent'
x_order_sensor_reading = 2
load_pretrained_agent_flag = False
agent = LASBaselineAgent(agent_name,
interface.env.observation_space.shape[0],
interface.env.action_space.shape[0],
num_observation=x_order_sensor_reading,
load_pretrained_agent_flag=load_pretrained_agent_flag)
print("Agent's observation dimension = {}".format(agent.baseline_agent.observation_space.shape[0]))
#===========#
# Main loop #
#===========#
# Run 1000 steps
interface.reset()
for _ in range(1000):
observation = interface.get_observation()
# print(observation)
take_action_flag, action = agent.feed_observation(observation)
if take_action_flag == True:
interface.take_action(action)
print("Training complete")
# Save learned model
# logger.info('{}: Interaction is done. Saving learned models...'.format(agent.name))
# agent.stop()
# logger.info('{}: Saving learned models done.'.format(agent.name))