"""Objectives generate the output of evaluators.
For energyplus, outputs are calculated by reading the simulation output files,
using either MeterReaders or VariableReaders.
The optimisation direction (minimize or maximize) and if the objectives are used to put
constraints on a solution (such as electricity use being less than a certain amount)
are both specified by arguments to the Problem containing the objectives.
If the model used does not generate the needed output, the objective will modify the model
to add the corresponding output. For example, if a building does not include an
Ouptut:Meter object for Electricity:Facility, one will be added if the corresponding
:class:`MeterReader` is used with that building.
"""
# Python Core Libraries
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass
from itertools import chain
from pathlib import Path
from typing import List, Dict, Tuple, Union, Callable
from warnings import warn
# External Libraries
import pandas as pd
from packaging.version import Version
# BESOS Imports
from besos import config
from besos import eppy_funcs as ef
from besos.IO_Objects import Objective
from besos.errors import ModeError
from besos.besostypes import PathLike
import logging
from logging import debug as lgd
import logging
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())
def _get_data_dict_regex():
"""Generates a regex expression that can parse a line of the data dictionary in the .mtr file
Various assumptions are made about the exact composition of names."""
report_code = r"(?P<report_code>\d+)" # report code of digits
num_items = r"(?P<num_items>\d+)" # number of items composed of digits
name = r"(?P<name>[^\[\]]+?)" # assumes that name can be anything without '[]' characters, may be wrong
units = r"\[(?P<units>[^\]]*?)\]" # units are enclosed in square brackets.
frequency = r"!(?P<frequency>[\w|,]+)" # frequency preceded by a '!'
remainder = r"(?: ?\[(?P<remainder>[^\]]+)\])?" # can be preceded by 1-2 spaces based on experience
return re.compile(
rf"^{report_code},{num_items},{name} {units} {frequency}{remainder}$"
)
_values_collection = List[List[float]]
[docs]@dataclass
class EPResults:
"""Represents the information about a Meter or Variable read from a .eso file."""
name: str
frequency: str
units: str
data: pd.DataFrame
# TODO: Consider improving the index for the DataFrame
# index derived from the section headers (i.e. Design day winter hour=1) converted to a timestamp
# this would simplify manipulating time series
# Also consider nicer handling of duplicate names than just numbering
@classmethod
def from_pieces(
cls, name, frequency, units, items, data, prefix=None
) -> "EPResults":
if prefix is True:
prefix = name
unq_names = {}
for i, original in enumerate(items):
unq_names[original] = unq_names.get(original, []) + [i]
new_names = items[:]
for unq_name, values in unq_names.items():
if len(values) == 1:
continue # not a duplicate, leave the name alone
for count, index in enumerate(values):
new_names[index] = f"{unq_name}_{count}"
if prefix is not None:
names = [f"{prefix}_{base_name}" for base_name in new_names]
else:
names = new_names
df = pd.DataFrame({name: values for name, values in zip(names, zip(*data))})
return cls(name, frequency, units, df)
_results_format = Dict[Tuple[str, str], EPResults]
[docs]def get_eso_version(eso: PathLike) -> Version:
"""Gets the version of energyplus used to generate an eso file.
:param eso: the path to the eso file
:return: the version of energyplus used to generate the eso file.
"""
# a regular expression to extract information from the first line of an eso file
first_line_regex = re.compile(
r"^Program Version,EnergyPlus, Version " # constant value at the start of the line
r"(?P<version_number>[\d.]+)-(?P<hash>[0-9a-z]{10}), "
r"YMD=(?P<date>\d\d\d\d\.\d\d\.\d\d+) (?P<time>\d?\d:\d\d)$",
)
with open(eso, "r") as f:
first_line = f.readline()
match = first_line_regex.match(first_line)
if match is None:
raise ValueError(
f"First line of file:'{first_line}' did not match the expected format."
)
return Version(match.group("version_number"))
[docs]def read_eso(
out_dir: PathLike = config.out_dir,
file_name="eplusout.eso",
version=None,
) -> _results_format:
"""Retrieve information from an Energy-Plus simulation, with outputs in out_dir
:param out_dir: the directory that contains the output files
:param file_name: the name of the output file to read from. Should be a .eso file.
:param version: Deprecated argument. Do not use.
:return: a dictionary with keys of the form {(MeterName, Reporting_Frequency or None):EPResults dataclass}
"""
# output format described here:
# https://energyplus.net/sites/default/files/pdfs_v8.3.0/OutputDetailsAndExamples.pdf
eso_path = Path(out_dir, file_name)
eso_version = get_eso_version(eso_path)
with open(eso_path, "r") as file:
f = file.readlines() # may cause memory errors on very large files
# Backwards compatibility warning.
if version:
warn(
"The version number is now auto-detected."
"The version argument is deprecated "
"and will be removed in the future.",
FutureWarning,
)
if Version(version) != eso_version:
raise ValueError(
f"Version {version} provided, "
f"but eso file is from version {eso_version}"
)
del version
# TODO: Find a way to avoid hardcoding these values to make things portable between versions
# or move them to a version-dependant config of some kind
if eso_version >= Version("9.0"):
# header is always 5 lines for E+ 8.8 and 6 lines for E+ 9.0
data_dictionary_header = 6
else:
data_dictionary_header = 5 # pragma: nocover
data_dictionary_start = 1 # first line has already been processed
data_dictionary_end = f.index("End of Data Dictionary\n")
data_end = -2 # omit the End of Data line and the Number of Records Written line
header = f[data_dictionary_start:data_dictionary_header] # header currently ignored
requested_vars = f[data_dictionary_header + 1 : data_dictionary_end]
data = f[data_dictionary_end + 1 : data_end]
data_dict_line = _get_data_dict_regex()
codes = {}
keys = set()
for line in requested_vars:
match = data_dict_line.search(line)
if match is None:
raise ValueError(f"could not match: {line}")
name = match.group("name")
frequency = match.group("frequency")
items = match.group("remainder")
items = ["Value"] if items is None else items.split(",")
report_code = match.group("report_code")
key = (name, frequency)
assert report_code not in codes, f"already found report code {report_code}"
assert key not in keys, f"already found item with {key}"
result = dict(
name=name,
frequency=frequency,
units=match.group("units"),
items=items,
data=[],
)
codes[report_code] = result
keys.add(key)
for line in data:
report_code, *values = line.split(",")
if report_code in codes:
values = [float(value) for value in values]
codes[report_code]["data"].append(values)
return {
(result["name"], result["frequency"]): EPResults.from_pieces(**result)
for result in codes.values()
}
[docs]def time_series_values(results: EPResults):
"""Returns the entire value column from the results.
:param results: the results object
:return: the values from the results object.
"""
# TODO: Finish
# Already is a pandas DF [to_dict() can be used to convert it to the
# timeseries dict now or later in a different function?(maybe)]
# possibly unit conversions as well? (although that should maybe
# be done in a different function at a different time - would be easy if its a pandas dataframe)
warn(
"`time_series_values` is incomplete, and returns raw values, not time series values."
)
return results.data["Value"]
[docs]def sum_values(results: EPResults) -> float:
"""Returns the sum over the Value column from some results
:param results: the results object to sum over
:return: the sum of the first value for each entry in the collection.
"""
return results.data["Value"].sum()
# TODO make "all" clear all optional output, currently incomplete (i.e. csv files)
[docs]def clear_outputs(building, outputs: Union[str, List[str]] = "all") -> None:
"""Disable certain types of output that appear in the .eso file.
:param building: the building to modify
:param outputs: Can be the class_name of the output to clear,
a shortcut for different types of output,
or a list that combines the above.
:return: None
"""
# `empty` is used so that different fields do not reference each the same container
mode = ef.get_mode(building)
if mode == "idf":
building_items = building.idfobjects
empty = list
elif mode == "json":
building_items = building
empty = dict
else:
raise ModeError(mode)
def find(prefix):
return {
x
for x in building_items
if x.startswith(ef.convert_format(prefix, "class", mode))
}
# These lists are probably incomplete!
class_names = dict()
# ENVIRONMENTALIMPACTFACTORS adds a bunch of meters, but is not a meter itself
class_names["output_meters"] = find("Output:Meter:") | {
"Output:Meter",
"Output:EnvironmentalImpactFactors",
}
class_names["internal_meters"] = {"Meter:Custom", "Meter:CustomDecrement"}
class_names["tables"] = find("Output:Table:")
class_names["output"] = find("Output:")
class_names["outputcontrol"] = find("OutputControl:")
class_names["meters"] = (
class_names["output_meters"] | class_names["internal_meters"]
)
if outputs == "all":
outputs = chain(*class_names.values())
else:
if isinstance(outputs, str): # make outputs a list
outputs = [outputs]
# convert each element in outputs via the class_names shortcuts
outputs = set(chain(*(class_names.get(output, [output]) for output in outputs)))
# clear the selected outputs
for output in outputs:
building_items[output] = empty()
[docs]class EPReader(Objective, ABC):
field_pairs: tuple = NotImplemented
def __init__(self, class_name, frequency: str = None, func=sum_values, name=""):
super().__init__(name=name)
self.class_name = class_name
self.frequency = frequency
# class name has a nonzero default, so this may be adding noise to the repr
# but it does provide a more complete picture
self._add_reprs(["class_name", "frequency"], check=True)
self._process = func
self._add_repr("func", "_process")
def check_all(self, objective, mode):
if mode == "idf":
def get(attribute):
return getattr(objective, attribute)
elif mode == "json":
def get(attribute):
return objective[attribute]
else:
raise ModeError(mode)
for self_attr, objective_attr in self.field_pairs:
self_value = getattr(self, self_attr)
objective_value = get(ef.convert_format(objective_attr, "field", mode))
if self_value is not None and objective_value != self_value:
return False
return True
[docs] def get_objective(self, building):
"""Retrieves output object in the building corresponding to this Objective.
:param building: The building to search for the object.
:return: The part of the building's data corresponding to this Objective.
The return type varies between json and idf buildings
"""
mode = ef.get_mode(building)
if mode == "idf":
objectives = building.idfobjects[
ef.convert_format(self.class_name, "class", mode)
]
elif mode == "json":
objectives = building[
ef.convert_format(self.class_name, "class", mode)
].values()
else:
raise ModeError(mode)
for objective in objectives:
if self.check_all(objective, mode):
return objective
raise ValueError(f"Cannot find the objective for {repr(self)}")
[docs] def add_objective(self, building):
"""Creates and adds the meter needed by this objective to the building.
:param building: the building to modify
:return: None
"""
try:
self.get_objective(building)
except ValueError:
pass # the objective is not present
else:
raise ValueError(f"Objective for {repr(self)} already exists")
mode = ef.get_mode(building)
new_object_dict = {
ef.convert_format(field, "field", mode): getattr(self, attr)
for attr, field in self.field_pairs
if getattr(self, attr)
}
if self.frequency is None:
new_object_dict[
ef.convert_format("Reporting_Frequency", "field", mode)
] = "Hourly"
if mode == "idf":
building.newidfobject(
key=ef.convert_format(self.class_name, "class", mode), **new_object_dict
)
elif mode == "json":
# this is equivalent to appending, but e+ uses a dictionary instead of a list
objectives = building[self.class_name]
num = len(objectives)
new_key = f"{self.class_name} {num}"
assert (
new_key not in building
), f"The building has incorrectly numbered {self.class_name} entries"
objectives[new_key] = new_object_dict
def validate(self, building):
self.get_objective(building)
[docs] def setup(self, building) -> None:
"""Prepares an idf so that it's output can be read by this meter.
:param building: the idf to modify
:return: None
"""
try:
self.get_objective(building)
except ValueError:
self.add_objective(building)
self.validate(building)
@abstractmethod
def results_name(self):
pass
def __call__(self, results: _results_format) -> float:
results_name = self.results_name()
if self.frequency:
meter_results = results[(results_name, self.frequency)]
else:
for (name, _), v in results.items():
if name == results_name:
meter_results = v
break
else:
raise ValueError(f"No meter with name {results_name} found")
return self._process(meter_results)
[docs]class MeterReader(EPReader):
field_pairs = (("key_name", "Key_Name"), ("frequency", "Reporting_Frequency"))
def __init__(
self,
key_name: str,
class_name: str = config.objective_meter_type,
frequency: str = None,
func: Callable = sum_values,
name: str = "",
):
"""
:param key_name: What to measure (e.g. Electricity:Facility)
:param class_name: The kind of meter to use (e.g. Output:Meter)
:param frequency: the reporting frequency of the meter. (e.g. Timestep, Monthly)
:param func: a function used to aggregate the results read from the file.
:param name: the name of the output column corresponding to this objective
"""
super().__init__(
class_name=class_name, frequency=frequency, func=func, name=name
)
self.key_name = key_name
self._add_repr("key_name", check=True)
def results_name(self):
return self.key_name
@property
def _default_name(self):
return self.key_name
[docs]class VariableReader(EPReader):
field_pairs = (
("key_value", "Key_Value"),
("variable_name", "Variable_Name"),
("frequency", "Reporting_Frequency"),
)
# TODO check if class_name must allays be Output:Variable
def __init__(
self,
key_value,
variable_name="*",
class_name=config.objective_variable_type,
frequency: str = None,
func: Callable = sum_values,
name="",
):
"""Reads the values of Variables from the .eso file
Energyplus docs:
https://bigladdersoftware.com/epx/docs/8-9/input-output-reference/input-for-output.html#outputvariable
:param key_value: The designation or location where to read the variable (e.g. Zone name, or Wall name, or * for all matches with variable_name available).
:param variable_name: The variable name you want to measure (e.g. "Site Outdoor Air Drybulb").
:param frequency: The reporting frequency of the variable.
:param func: A function used to aggregate the results read from the file.
:param name: The name of the output column corresponding to this objective.
"""
super().__init__(
class_name=class_name, frequency=frequency, func=func, name=name
)
self.key_value = key_value
self._add_repr("key_value")
self.variable_name = variable_name
if self.variable_name != "*":
self._add_repr("variable_name")
def results_name(self):
# E+ forces key_values (except for "Environment") to upper case in the eso file
# To ensure matching, the input key_value is forced to uppercase.
if self.key_value.capitalize() == "Environment":
return f"{self.key_value.capitalize()},{self.variable_name}"
else:
return f"{self.key_value.upper()},{self.variable_name}"
@property
def _default_name(self):
return self.variable_name
# FIXME: The return types for this function are inconsistent
def __call__(self, results: _results_format) -> list:
results_name = self.results_name()
results_list = []
if self.frequency:
meter_results = results[(results_name, self.frequency)]
return self._process(meter_results)
for (name, _), v in results.items():
if self.key_value != "*":
if name == results_name:
meter_results = v
return self._process(meter_results)
else:
if re.match(rf"\w+,{self.variable_name}", name):
meter_results = v
results_list.append(self._process(meter_results))
if results_list:
return results_list
raise ValueError(f"No meter with name {results_name} found")
# TODO: Remove objectives from idf output automatically