"""
Various ways to generate datapoints from the solution space, such that it is well covered.
Uses :class:`IO_Objects.Descriptor` to determine which inputs are valid.
"""
# Python Core Libraries
from typing import Callable, Any
# External Libraries
import numpy as np
import pandas as pd
import pyDOE2
import copy
import random as randlib
# BESOS Imports
from besos.IO_Objects import Descriptor
from besos.problem import Problem
from besos.parameters import CategoryParameter, RangeParameter
########################################################################################################
############################## Static sampling ################################################
########################################################################################################
_sampler = Callable[[int, int, Any], np.ndarray] # the format of all sampler functions
[docs]def dist_sampler(
sampler: _sampler, problem: Problem, num_samples: int, *args, **kwargs
) -> pd.DataFrame:
"""uses the sampling function provided to generate `num_samples` sets of values
These values will be valid for the corresponding inputs
:param problem: Problem that the inputs should apply to
:param sampler: a function that is used to produce the distribution of samples
:param num_samples: number of samples to take
:param args: Arguments passed to the sampling function
:param kwargs: Arguments passed to the sampling function
:return: pandas DataFrame with one column per parameter, and `num_samples` rows
"""
samples = sampler(num_samples, problem.num_inputs, *args, **kwargs)
data = {
value_descriptor.name: _sample_param(value_descriptor, samples[:, i])
for i, (value_descriptor) in enumerate(problem.value_descriptors)
}
df = pd.DataFrame(data)
# enforce the correct order in case it was lost by the dictionary
# this may be unnecessary with python 3.7+
df = df[problem.names("inputs")]
return df
[docs]def add_extremes(df: pd.DataFrame, problem: Problem) -> pd.DataFrame:
"""Adds two datapoints with the minimum and maximum values for each attribute.
:param df: existing data to which the extreme values should be added.
:param problem: the problem defining which values are valid.
:return: df with 2 added rows, one with maximal values, one with minimal values.
"""
new = dist_sampler(extremes, problem, 2)
return pd.concat([df, new], ignore_index=True)
[docs]def extremes(samples: int, attributes: int = None) -> np.ndarray:
"""This sampler generates datapoints corresponding to the extremes of a problem.
:param samples: the number of samples. Must be equal to 2
(this argument is available in order to implement the sampler interface)
If only one argument is provided, that argument will be assigned to attributes,
even though samples is listed first.
:param attributes: the number of attributes for which to generate samples
:return: An array containing 2 samples: one with all zeroes, one with all ones.
When transformed by a problem, these become minimal and maximal values for that problem.
"""
if attributes is None:
samples, attributes = 2, samples
assert samples == 2, "extremes can only produce two samples"
return np.array([np.zeros(attributes), np.ones(attributes)])
def _sample_param(descriptor: Descriptor, values):
try:
# this will only work if p.sample is numpy compatible
return descriptor.sample(values)
except TypeError:
# apply non-vectorised version of p.sample
return [descriptor.sample(x) for x in values]
# all of the following return a 2d array of shape (samples, attributes)
# which contains values between 0 and 1
random: _sampler = np.random.rand
[docs]def seeded_sampler(samples: int, attributes: int, seed=0) -> np.ndarray:
"""MUST BE USED AS AN INPUT TO dist_sampler. Creates random samples from a seed
:param samples: the number of samples
:param attributes: number of parameters that are to be varied
:param seed: [in args/kwargs for dist_sampler] seed to be used
:returns: an array of floats between 0 and 1 to be transformed in dist_sampler
"""
np.random.seed(seed)
return np.random.rand(samples, attributes)
[docs]def lhs(samples: int, attributes: int, *args, **kwargs) -> np.ndarray:
"""MUST BE USED AS AN INPUT TO dist_sampler. Uses latin hypercube sampling to create samples
:param samples: the number of samples
:param attributes: number of parameters that are to be varied
:param args: [in args/kwargs for dist_sampler] Arguments passed to pyDOE2.lhs
:param kwargs: [in args/kwargs for dist_sampler] Arguments passed to pyDOE2.lhs
:returns: an array of floats between 0 and 1 to be transformed in dist_sampler
"""
return pyDOE2.lhs(attributes, samples, *args, **kwargs)
[docs]def full_factorial(samples: int, attributes: int, level: int = None) -> np.ndarray:
"""MUST BE USED AS AN INPUT TO dist_sampler. Creates every possible combination of variables
:param samples: the number of samples
:param attributes: number of parameters that are to be varied
:param level: [in args/kwargs for dist_sampler]
:returns: an array of floats between 0 and 1 to be transformed in dist_sampler
"""
if level is None:
level = np.int(np.exp(np.log(samples) / attributes))
if (level**attributes) > level:
print(
f"Total number of samples ({level**attributes}) is smaller than input ({samples}) "
f"to have an even number of factor levels ({level}) for all parameters."
)
return pyDOE2.fullfact((np.ones([attributes]) * level).astype(int)) / level
[docs]def morris_sample(
samples: int,
attributes: int,
problem_dup: Problem,
steps=None,
random_order: bool = False,
chooser=randlib.random,
):
"""MUST BE USED AS AN INPUT TO dist_sampler. Using a set of starting points (origins), begins a random walk at each one through the parameter space.
:param samples: number of samples that are used as the origins - NOT total number of samples
:param attributes: number of parameters that are to be varied
:param problem_dup: [in args/kwargs for dist_sampler] the problem that is used
:param steps: [in args/kwargs for dist_sampler] how long the random walk should be for each point (the number of times a variable is changed)
:param random_order: [in args/kwargs for dist_sampler] boolean, -> if true: randomizes the order of the variables to be cycled through, and rerandomizes every time all variables are cycled through
-> if false: uses the original order
:chooser: [in args/kwargs for dist_sampler] function, how to choose a random number between 0 and 1. random.random default, so a uniform distribution, must take no arguments
:returns: an array of floats between 0 and 1 to be transformed in dist_sampler
"""
if (
steps is None
): # by default the number of steps (new points generated) will be equal to the number of parameters
steps = len(problem_dup.parameters)
init_samples_np = pyDOE2.lhs(
attributes, samples
) # creates the samples that are to be used as origins
new_samples_np = np.empty([0, attributes], dtype=float)
params = list(
enumerate(copy.deepcopy(problem_dup.parameters))
) # creates a list of (parameter's location in row, parameter)
for row in init_samples_np:
row = np.expand_dims(row, 0) # expand dims to allow appending later
temp_samples_np = copy.deepcopy(row)
for x in range(steps):
if (
random_order and x % len(params) == 0
): # randomize the order if random_order
randlib.shuffle(params)
loc, parameter = params[x % len(params)]
if isinstance(parameter.value_descriptors[0], CategoryParameter):
number = chooser()
while parameter.value_descriptors[0].sample(
number
) == parameter.value_descriptors[0].sample(row[0, loc]):
number = (
chooser()
) # this skips the value of the categorical parameter already present in the origin point, unless that categorical parameter only has one value
if len(parameter.value_descriptors[0].options) == 1:
break
row = point = copy.deepcopy(
row
) # Unlike neighborhood_sample, the point varied next is the point *after* this variable is changed
point[0, loc] = number
temp_samples_np = np.append(
temp_samples_np, point, axis=0
) # Appends the new point to the temporary lis
elif isinstance(parameter.value_descriptors[0], RangeParameter):
number = chooser()
row = point = copy.deepcopy(
row
) # Unlike neighborhood_sample, the point varied next is the point *after* this variable is changed
point[0, loc] = number
temp_samples_np = np.append(
temp_samples_np, point, axis=0
) # Appends the new point to the temporary lis
new_samples_np = np.append(
new_samples_np, temp_samples_np, axis=0
) # Appends the neighborhood for each origin to the output
return new_samples_np
[docs]def neighborhood_sample(
samples: int, attributes: int, problem_dup: Problem, subdivisions: int = 2
):
"""MUST BE USED AS AN INPUT TO dist_sampler. Using a set of starting points (origins), varies each parameter one at a time to explore the 'neighorhood' of each starting point.
:param samples: number of samples that are used as the origins - NOT total number of samples
:param attributes: number of parameters that are to be varied
:param problem_dup: [in args/kwargs for dist_sampler] the problem that is used
:param subdivisions: [in args/kwargs for dist_sampler] for range parameters, this is the number of subdivisions (>= 2)
:returns: an array of floats between 0 and 1 to be transformed in dist_sampler
"""
assert subdivisions >= 2
init_samples_np = pyDOE2.lhs(
attributes, samples
) # creates the samples that are to be used as origins
new_samples_np = np.empty([0, attributes], dtype=float)
params = list(
enumerate(copy.deepcopy(problem_dup.parameters))
) # creates a list of (parameter's location in row, parameter)
for row in init_samples_np:
temp_samples_np = np.expand_dims(
copy.deepcopy(row), 0
) # expand dims to allow appending later
for loc, parameter in params:
if isinstance(parameter.value_descriptors[0], CategoryParameter):
for x in range(len(parameter.value_descriptors[0].options)):
number = (x + 0.5) / len(
params
) # This reverse engineers CategoryParameter.sample() to get one of each sample
if parameter.value_descriptors[0].sample(
number
) == parameter.value_descriptors[0].sample(row[loc]):
continue # this skips the value of the categorical parameter already present in the origin point
point = np.expand_dims(copy.deepcopy(row), 0)
point[0, loc] = number
temp_samples_np = np.append(
temp_samples_np, point, axis=0
) # Appends the new point to the temporary list
elif isinstance(parameter.value_descriptors[0], RangeParameter):
for x in range(subdivisions):
number = x / (
subdivisions - 1
) # This way the extremes will always be included (0 and 1)
point = np.expand_dims(copy.deepcopy(row), 0)
point[0, loc] = number
temp_samples_np = np.append(
temp_samples_np, point, axis=0
) # Appends the new point to the temporary list
new_samples_np = np.append(
new_samples_np, temp_samples_np, axis=0
) # Appends the neighborhood for each origin to the output
return new_samples_np
########################################################################################################
############################## Adaptive sampling ################################################
########################################################################################################
[docs]class adaptive_sampler_lv:
"""Represents an adaptive sampler which iteratively picks simulation samples and runs them."""
def __init__(
self,
P,
P_out,
n_new,
problem,
evaluator,
reg,
test_in,
test_out,
scaler=None,
scaler_out=None,
verbose=False,
):
if len(P[:, 0]) < 2 * len(P[0, :]):
print(
"Size of initial number of samples needs to be bigger than 2 times no. inputs."
)
self.P = P
self.P_out = P_out
self.n_new = n_new
self.problem = problem
self.evaluator = evaluator
self.reg = reg
self.scaler = scaler
self.scaler_out = scaler_out
self.verbose = verbose
self.test_in = test_in
self.test_out = test_out
def run(self, no_iter):
self.score = np.empty([no_iter + 1])
self.score[0] = self.reg.score(self.test_in, self.test_out)
self.N_P, self.S_P = init_Neighborhood(self.P)
self.pick_new_samples()
self.update_model()
for i in range(no_iter):
print(i)
for p_new in self.P_new:
self.N_P, self.S_P = update_Neighborhood(
self.N_P, self.P, self.S_P, p_new
)
if self.verbose == True:
print("initialize neighborhood for new samples")
N_P_new, S_P_new = init_Neighborhood(self.P, p_new)
self.N_P = np.append(self.N_P, N_P_new, axis=2)
self.S_P = np.append(self.S_P, S_P_new, axis=0)
# update existing set of samples
self.P = np.append(self.P, self.P_new, axis=0)
self.P_out = np.append(self.P_out, self.P_out_new, axis=0)
self.pick_new_samples()
self.update_model()
self.score[i + 1] = self.reg.score(self.test_in, self.test_out)
def update_model(self):
self.reg.fit(self.P, self.P_out)
if self.verbose == True:
print(self.reg.score(self.P, self.P_out))
def pick_new_samples(self):
# 2) Compute nonlinearity measure and Voronoi Cell
E = LOLA_estimate(
self.N_P, self.P, self.reg, scaler=self.scaler, scaler_out=self.scaler_out
)
V_P, samples = MC_Voronoi_estimate(self.P, self.problem)
H = hybrid_score(E, V_P)
ind_new = np.argsort(H)
P_sorted = self.P[ind_new, :]
if self.verbose == True:
print("Collecting new samples around:")
print(P_sorted[-self.n_new :, :])
# Generate new samples in Voronoi cell taking the mean of the neighborhood
ind = 2
self.P_new = np.empty([self.n_new, len(self.P[0, :])])
for i in range(self.n_new):
candidates = containedin_Voronoi(P_sorted[-i - 1, :], self.P, samples)
# print(len(candidates))
while len(candidates) <= 1:
# print('in loop')
candidates = containedin_Voronoi(P_sorted[-i - ind, :], self.P, samples)
ind += 1
self.P_new[i, :] = select_newSamp(
P_sorted[-i - 1, :], self.N_P[:, :, ind_new][:, :, -i - 1], candidates
)
self.P_out_new = self.evaluator.df_apply(pd.DataFrame(self.P_new))
###############################################################
def hybrid_score(E, V):
return V + E / sum(E)
def select_newSamp(p_r, N, candidates, verbose=False):
N = np.append(N, [p_r], axis=0)
max_dist = 0
if verbose == True:
print(len(candidates[1:, :]))
for c in candidates[1:, :]:
d = 0
for n in N:
d = d + np.linalg.norm(c - n)
if d > max_dist:
max_dist = d
max_cand = c
return max_cand
def MC_Voronoi_estimate(P, problem):
# P are the existing points
# initialize arrays
V_P = np.zeros(
[
len(P),
]
) # Vector of all Voronoi sample estimates
# per existing point collect randomly selected 100 points
S = dist_sampler(
lhs, problem, 500
) # for now 100 random samples per point to estimate Voronoi cell
for s in S.values:
d = np.inf
ind = 0
for p in P:
if np.linalg.norm(p - s) < d:
d = np.linalg.norm(p - s)
ind_fin = ind
ind = ind + 1
V_P[ind_fin] = V_P[ind_fin] + 1 / len(S)
# plt.plot(S.values[:, 0], S.values[:, 1], 'x')
return V_P, S
def LOLA_estimate(N_P, P, model, scaler, scaler_out):
# N_P is a 3-D array with all neighbors for each sample (mxdx#samples)
# S_P is a 1_D array with the neighborhood score for all samples
# P is the array including all samples
n = len(P)
ind = 0
E = np.empty(
[
n,
]
)
for p_r in P:
grad = Gradient_estimation(
N_P[:, :, ind], p_r, model, scaler=scaler, scaler_out=scaler_out
)
E[ind] = Nonlinearity_measure(
grad, N_P[:, :, ind], p_r, model, scaler=scaler, scaler_out=scaler_out
)
ind = ind + 1
return E
def update_Neighborhood(N_P, P, S_P, P_new):
# this function tries to add one new sample p_new to all of the existing neighborhoods
# S is the neighborhood score
# N_P are the associated neighbors of all points in P
# P is the matrix of reference points
# p_new the candidate
if np.ndim(P) == 1:
P = np.expand_dims(P, axis=0)
N_P = np.reshape(N_P, (len(N_P[:, 0]), len(N_P[0, :]), 1))
S_P = np.expand_dims(S_P, axis=0)
if np.ndim(P_new) == 1:
P_new = np.expand_dims(P_new, axis=0)
m = 2 * len(P[0, :])
ind = 0
for p_new in P_new:
for p_r in P:
if sum(p_r == p_new) < len(P[0, :]):
N_temp = np.dstack([N_P[:, :, ind]] * m)
S_temp = np.zeros(
[
m,
]
)
for i in range(m):
# replace one sample with the new one
if sum(sum(N_temp[:, :, i] == p_new)) < len(P[0, :]):
N_temp[i, :, i] = p_new
else:
pass
# compute neighborhood score for all possible combinations
S_temp[i] = Neighborhood_score(N_temp[:, :, i], p_r)
# if neighborhood score is better than the score without the new sample, update the neighborhood
ind_min = np.argmin(S_temp)
# print(ind_min)
# print(S_temp)
N_P[:, :, ind] = N_temp[:, :, ind_min]
S_P[ind] = S_temp[ind_min]
ind = ind + 1
else:
pass
# print(N_P[:,:,1])
return N_P, S_P
def init_Neighborhood(P, P_new=None, verbose=False):
# P is the matrix with all samples
# pnew optional if the neighborhood for new samples shall be computed
# for now dim(pnew)=1 is assumed
m = 2 * len(P[0, :]) # number of samples in neighborhood
d = len(P[0, :]) # number of dimensions
if np.any(P_new == None):
# case I: first initialization after static sampling
n = len(P) # number of samples
N_P = np.empty([m, d, n])
S_P = np.empty(
[
n,
]
)
P_ref = (
P * 1.0
) # array of all samples to be considered as reference in the next for loop
else:
# case II: initialize neighborhoods of new samples p_new
if np.ndim(P_new) == 1:
if verbose == True:
print("ndim")
P_ref = np.expand_dims(P_new, axis=0)
else:
P_ref = P_new * 1.0
n_new = len(P_ref) # number of new samples
N_P = np.empty([m, d, n_new])
S_P = np.empty(
[
n_new,
]
)
for i in range(len(P_ref)):
mask = P != P_ref[i, :]
P_NoRefPoint = P[mask[:, 0], :]
N_P[:, :, i] = P_NoRefPoint[0:m, :]
S_P[i] = Neighborhood_score(N_P[:, :, i], P_ref[i, :])
ind = 0
for p_cand in P:
ind += 1
# at the moment p_new to in P --> only 1 p_new allowed
N_P, S_P = update_Neighborhood(N_P, P_ref, S_P, p_cand)
if verbose == True:
print("Neighborhood initiated!")
return N_P, S_P
def Neighborhood_score(N, p_r):
# To Do: make it possible to have 3D arrays as inputs
# N is the neighborhood
# P_cand should have m entries
m = len(N)
# initialize arrays
Dist_cand = np.empty(
[
m,
]
)
minDist = np.empty(
[
m,
]
)
# compute distance matrix
C = 0
for i in range(m):
C = C + np.linalg.norm(N[i, :] - p_r)
C = C / m
for i in range(m):
for j in range(m):
if i == j:
pass
else:
Dist_cand[i] = np.linalg.norm(N[i, :] - N[j, :])
minDist[i] = min(Dist_cand)
A = 1 / m * sum(minDist)
R = A / (np.sqrt(2) * C)
return R / C
def Gradient_estimation(N, p_r, model, scaler, scaler_out):
m = len(N)
d = len(p_r)
P_mat = np.empty([m, d])
F_mat = np.empty(
[
m,
]
)
for i in range(m):
P_mat[i, :] = N[i, :] - p_r
if scaler == None:
F_mat[i] = model.predict([N[i, :]])
else:
F_mat[i] = scaler_out.inverse_transform(
model.predict(scaler.transform([N[i, :]]))
)
# grad = np.linalg.solve(P_mat,F_mat.reshape((len(F_mat),1)))
grad = np.linalg.lstsq(P_mat, np.transpose(F_mat), rcond=None)[0].reshape((1, d))
return [grad]
def Nonlinearity_measure(grad, N, p_r, model, scaler, scaler_out):
E = 0
for i in range(len(N)):
if scaler == None:
E = E + abs(
model.predict([N[i, :]])
- (model.predict([p_r]) + np.dot(grad, (N[i, :] - p_r)))
)
else:
E = E + abs(
scaler_out.inverse_transform(model.predict(scaler.transform([N[i, :]])))
- (
scaler_out.inverse_transform(model.predict(scaler.transform([p_r])))
+ np.dot(grad, (N[i, :] - p_r))
)
)
return E
def containedin_Voronoi(p_r, P, samples):
mask = P != p_r
P_temp = P[mask[:, 0], :]
candidates = np.empty([1, len(P[0, :])])
for s in samples.values:
for p_j in P_temp:
if np.linalg.norm(s - p_j) <= np.linalg.norm(s - p_r):
break
else:
continue
if np.all(p_j == P[-1]):
candidates = np.append(candidates, [s], axis=0)
return candidates