mirror of
https://github.com/wassname/Run-Skeleton-Run.git
synced 2026-06-28 03:48:07 +08:00
63 lines
1.8 KiB
Python
63 lines
1.8 KiB
Python
import numpy as np
|
|
|
|
|
|
class RandomProcess(object):
|
|
def reset_states(self):
|
|
pass
|
|
|
|
|
|
class AnnealedGaussianProcess(RandomProcess):
|
|
def __init__(self, mu, sigma, sigma_min, n_steps_annealing=int(1e5)):
|
|
self.mu = mu
|
|
self.sigma = sigma
|
|
self.n_steps = 0
|
|
|
|
if sigma_min is not None:
|
|
self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
|
|
self.c = sigma
|
|
self.sigma_min = sigma_min
|
|
else:
|
|
self.m = 0.
|
|
self.c = sigma
|
|
self.sigma_min = sigma
|
|
|
|
@property
|
|
def current_sigma(self):
|
|
sigma = max(self.sigma_min, self.m * float(self.n_steps) + self.c)
|
|
return sigma
|
|
|
|
|
|
class OrnsteinUhlenbeckProcess(AnnealedGaussianProcess):
|
|
def __init__(self, theta, mu=0., sigma=1., dt=1e-2,
|
|
x0=None, size=1, sigma_min=None, n_steps_annealing=int(1e5)):
|
|
super(OrnsteinUhlenbeckProcess, self).__init__(
|
|
mu=mu, sigma=sigma, sigma_min=sigma_min, n_steps_annealing=n_steps_annealing)
|
|
self.theta = theta
|
|
self.mu = mu
|
|
self.dt = dt
|
|
self.x0 = x0
|
|
self.size = size
|
|
self.reset_states()
|
|
|
|
def sample(self):
|
|
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
|
|
self.current_sigma * np.sqrt(self.dt) * np.random.normal(size=self.size)
|
|
self.x_prev = x
|
|
self.n_steps += 1
|
|
return x
|
|
|
|
def reset_states(self):
|
|
self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)
|
|
|
|
|
|
def create_random_process(args):
|
|
if args.rp_type == "ornstein-uhlenbeck":
|
|
return OrnsteinUhlenbeckProcess(
|
|
size=args.n_action,
|
|
theta=args.rp_theta,
|
|
mu=args.rp_mu,
|
|
sigma=args.rp_sigma,
|
|
sigma_min=args.rp_sigma_min)
|
|
else:
|
|
raise NotImplementedError()
|