package nl.deltares.hydrotel;
import nl.wldelft.netcdf.NetcdfUtils;
import nl.wldelft.util.Box;
import nl.wldelft.util.FastDateFormat;
import nl.wldelft.util.FileUtils;
import nl.wldelft.util.TextUtils;
import nl.wldelft.util.io.LineReader;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import ucar.ma2.Array;
import ucar.ma2.ArrayDouble;
import ucar.ma2.ArrayFloat;
import ucar.ma2.ArrayObject;
import ucar.ma2.DataType;
import ucar.ma2.InvalidRangeException;
import ucar.nc2.Attribute;
import ucar.nc2.Dimension;
import ucar.nc2.NetcdfFile;
import ucar.nc2.NetcdfFileWriter;
import ucar.nc2.Variable;
import ucar.nc2.constants.CDM;
import ucar.nc2.constants.CF;
import ucar.nc2.units.DateUnit;
import ucar.nc2.units.TimeUnit;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.text.ParseException;
import java.util.*;
import static nl.wldelft.netcdf.NetcdfUtils.*;
public class HydrotelPostAdapter {
private static final Logger log = Logger.getLogger(HydrotelPostAdapter.class);
private static final String PROPERTIES = "properties";
private static final String SORTIES_HYDROTEL = "sortiesHydrotel";
private static final String SAUVEGARDE_ETATS = "sauvegardeEtats";
private static final String RESULTS_OFFSET = "resultsOffset";
private static final String IMPORTER_VALEURS_EXUTOIRE_SEULEMENT = "importerValeursExutoireSeulement";
private static final String TIME_VARIABLE_NAME = CF.TIME;
private static final String STATION_DIMENSION_NAME = "stations";
private static final String STATION_ID_VARNAME = "station_id";
private static final String ID_CHAR_LENGTH_DIMENSION_NAME = "char_leng_id";
private static final int ID_CHAR_LENGTH = 64;
private static final String STANDARD_NAME_ATTRIBUTE = CF.STANDARD_NAME;
private static final String LONG_NAME_ATTRIBUTE = CDM.LONG_NAME;
private static final String UNITS_ATTRIBUTE = CDM.UNITS;
private static final String AXIS_ATTRIBUTE = CF.AXIS;
public static final String DUMMY_LOC = "LOC";
private final File runFile;
private Path runPath = null;
private NetcdfFile netcdfRunFile = null;
private String logLevel = "INFO";
private static final String LOG_LEVEL = "log_level";
private static final String WORK_DIR = "work_dir";
private static final String START_TIME = "start_time";
private String workDirString = null;
private static final String OUTPUT_NETCDF_FILES_VARIABLE_NAME = "output_netcdf_files";
private File workDirFile = null;
private String[] outputTimeSeriesFiles = null;
private String[] outputValuesHydrotel = null;
private double startDateTime = 0.0;
private static final TimeZone EST = TimeZone.getTimeZone("EST");
private static final FastDateFormat YYYY_MM_DDHHMM = FastDateFormat.getInstance("yyyy-MM-dd HH:mm", EST, Locale.CANADA, null);
private static final FastDateFormat YYYYMMDDHH = FastDateFormat.getInstance("yyyyMMddHH", EST, Locale.CANADA, null);
private boolean sauvegardeEtatsBool = false;
private int outputOffsetMinutes = 0;
private boolean importerValeursExutoireSeulementBool = true;
private Variable stationVariable = null;
private HydrotelPostAdapter(File runFile) {
this.runFile = runFile;
}
public static void main(String[] args) throws Exception {
if (args.length != 1)
throw new IllegalArgumentException("Specify either one argument that is the path to the run file, or 4 arguments that are 'work dir', input file, output file and 'log HZ reeks'.");
File runFile = new File(args[0]);
if (!runFile.exists()) throw new Exception("Can not find run file specified as argument " + runFile);
HydrotelPostAdapter adapter = new HydrotelPostAdapter(runFile);
try {
adapter.run();
if (log.isInfoEnabled()) log.info("HydrotelAdapter run finished without exception");
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
} finally {
LogManager.shutdown();
}
}
private void run() throws Exception {
try {
runPath = runFile.getParentFile().toPath();
String absolutePath = runFile.getAbsolutePath();
netcdfRunFile = NetcdfFile.open(absolutePath);
readRunInfoFile();
} finally {
netcdfRunFile.close();
}
convertTimeSeries();
if (sauvegardeEtatsBool) copyOutputStateFiles();
}
private void copyOutputStateFiles() throws IOException {
File etatDir = new File(workDirFile, "etat");
File outputStatesDir = new File(workDirFile, "output/states");
File[] stateFiles = etatDir.listFiles((dir, name) -> {
try {
int length = name.length();
if (length < 14) return false;
String substring = name.substring(length - 14, length - 4);
long stateTime = YYYYMMDDHH.parseToMillis(substring);
return stateTime > startDateTime;
} catch (ParseException e) {
return false;
}
});
if (stateFiles == null) return;
for (File stateFile : stateFiles) {
String name = stateFile.getName();
String substring = name.substring(0, name.length() - 15);
File targetFile = new File(outputStatesDir, substring + ".csv");
FileUtils.copy(stateFile, targetFile);
}
}
private void readRunInfoFile() throws Exception {
logLevel = netcdfRunFile.findVariable(LOG_LEVEL).readScalarString();
String relativeWorkDirString = String.valueOf((char[]) netcdfRunFile.findVariable(WORK_DIR).read().copyToNDJavaArray()).trim();
if (TextUtils.equals(relativeWorkDirString, ".")) {
workDirString = runPath.toString();
} else {
workDirString = runPath.resolve(new File(relativeWorkDirString).toPath()).toString();
}
workDirFile = new File(workDirString);
if (!workDirFile.exists()) {
throw new Exception("Work dir not found: " + workDirString);
}
configureLogging();
if (log.isDebugEnabled()) log.debug("HydrotelAdapter started");
getOutputFiles();
getProperties();
readStartEndTime();
if (log.isInfoEnabled()) log.info("Work dir: " + workDirString);
}
private void readStartEndTime() throws Exception {
Variable startTimeVar = netcdfRunFile.findVariable(START_TIME);
double relativeStartDateTime = startTimeVar.readScalarDouble();
Attribute units = startTimeVar.findAttribute("units");
String tunitsString = units.getStringValue();
if (log.isInfoEnabled()) log.info("INFO: Start time: " + relativeStartDateTime + ' ' + tunitsString);
DateUnit referenceUnit = new DateUnit(units.getStringValue());
long currentReferenceTime = referenceUnit.getDateOrigin().getTime();
TimeUnit timeUnit = referenceUnit.getTimeUnit();
startDateTime = timeUnit.getValueInSeconds(relativeStartDateTime) * 1000 + currentReferenceTime;
}
private void getProperties() throws Exception {
Variable var = netcdfRunFile.findVariable(PROPERTIES);
String sortiesHydrotel = getPropertyStringValue(var, SORTIES_HYDROTEL);
outputValuesHydrotel = sortiesHydrotel.split(";");
String sauvegardeEtats = getPropertyStringValue(var, SAUVEGARDE_ETATS);
sauvegardeEtatsBool = sauvegardeEtats != null ? Boolean.valueOf(sauvegardeEtats) : false;
Integer resultsOffsetPropertyIntValue = getPropertyIntValue(var, RESULTS_OFFSET);
outputOffsetMinutes = resultsOffsetPropertyIntValue != null ? resultsOffsetPropertyIntValue : 0;
String importerValeursExutoireSeulement = getPropertyStringValue(var, IMPORTER_VALEURS_EXUTOIRE_SEULEMENT);
importerValeursExutoireSeulementBool = importerValeursExutoireSeulement != null ? Boolean.valueOf(importerValeursExutoireSeulement) : true; // TODO EP: change to default true
}
private void getOutputFiles() throws IOException {
Variable outputNetcdfVar = netcdfRunFile.findVariable(OUTPUT_NETCDF_FILES_VARIABLE_NAME);
Array outputNetcdfArray = outputNetcdfVar.read();
Object outputTimeSeriesFilesNDArrayObject = outputNetcdfArray.copyToNDJavaArray();
char[][] outputTimeSeriesFilesChar = (char[][]) outputTimeSeriesFilesNDArrayObject;
List<String> outputTimeSeriesFilesList = new ArrayList<>();
for (char[] chArray : outputTimeSeriesFilesChar) {
File outputFileRelativePath = new File(String.valueOf(chArray).trim());
String absoluteFilePath = runPath.resolve(outputFileRelativePath.toPath()).toString();
outputTimeSeriesFilesList.add(absoluteFilePath);
}
outputTimeSeriesFiles = new String[outputTimeSeriesFilesList.size()];
outputTimeSeriesFilesList.toArray(outputTimeSeriesFiles);
}
private void configureLogging() {
File logFile = new File(workDirFile, "hydrotel_post_adapter.log");
if (logFile.exists()) {
//noinspection ResultOfMethodCallIgnored
logFile.delete();
}
Properties props = new Properties();
props.setProperty("log4j.rootLogger", logLevel + ", hydrotel");
props.setProperty("log4j.appender.hydrotel.File", logFile.getAbsolutePath());
props.setProperty("log4j.appender.hydrotel", "org.apache.log4j.RollingFileAppender");
props.setProperty("log4j.appender.hydrotel.layout", "org.apache.log4j.PatternLayout");
props.setProperty("log4j.appender.hydrotel.layout.ConversionPattern", "%p - %m \n");
LogManager.resetConfiguration();
PropertyConfigurator.configure(props);
if (log.isDebugEnabled()) log.debug("Debug log level enabled");
if (log.isInfoEnabled()) log.info("Info log level enabled");
}
private void convertTimeSeries() throws Exception {
for (String outputTimeSeriesFile : outputTimeSeriesFiles) {
if (outputTimeSeriesFile.endsWith("resultats.nc")) {
writeResults(outputTimeSeriesFile);
}
}
}
private void writeResults(String outputTimeSeriesFile) throws IOException, InvalidRangeException, ParseException {
File file = new File(outputTimeSeriesFile);
//noinspection ResultOfMethodCallIgnored
file.getParentFile().mkdirs();
NetcdfFileWriter netcdfFileWriter = null;
Map<String, Float> weights = getWeights();
try {
netcdfFileWriter = NetcdfFileWriter.createNew(NetcdfFileWriter.Version.netcdf3, outputTimeSeriesFile);
File resultDir = new File(workDirString, "simulation/simulation/resultat");
Set<Long> times = new TreeSet<>();
Set<String> stationIds = new LinkedHashSet<>();
List<Box<String, Map<String, List<Float>>>> values = new ArrayList<>();
for (int j = 0; j < outputValuesHydrotel.length; j++) {
String outputValue = outputValuesHydrotel[j];
if (outputValue.equals("DEBITS AVAL")) outputValue = "DEBIT_AVAL";
String outputFileName = outputValue.toLowerCase().replaceAll(" ", "_");
File resultFile = new File(resultDir, outputFileName + ".csv");
if (!resultFile.exists()) {
log.error("Result File: " + resultFile + " not found.");
continue;
}
if (importerValeursExutoireSeulementBool) {
readValuesFromCsvFileIntoOneLocation(weights, times, values, resultFile, outputValue);
} else {
readValuesFromCsvFileIntoSeparateLocations(times, values, resultFile, outputValue, stationIds);
}
}
String timeUnitString = NetcdfUtils.createTimeUnitString(EST);
//create time dimension.
Dimension timeDimension = netcdfFileWriter.addDimension(null, TIME_VARIABLE_NAME, times.size());
//create time variable.
ArrayList<Dimension> dimensionList = new ArrayList<>();
dimensionList.add(timeDimension);
Variable timeVariable = createTimeVariable(netcdfFileWriter, timeUnitString, dimensionList);
Variable[] variables = createVariables(netcdfFileWriter, timeDimension, values, stationIds);
netcdfFileWriter.create();
writeValues(netcdfFileWriter, times, values, variables, timeVariable, stationIds);
} finally {
if (netcdfFileWriter != null) netcdfFileWriter.close();
}
}
private static Variable createTimeVariable(NetcdfFileWriter netcdfFileWriter, String timeUnitString, ArrayList<Dimension> timeDimensionList) {
Variable timeVariable = netcdfFileWriter.addVariable(null, TIME_VARIABLE_NAME, DataType.DOUBLE, timeDimensionList);
netcdfFileWriter.addVariableAttribute(timeVariable, new Attribute(STANDARD_NAME_ATTRIBUTE, TIME_VARIABLE_NAME));
netcdfFileWriter.addVariableAttribute(timeVariable, new Attribute(LONG_NAME_ATTRIBUTE, TIME_VARIABLE_NAME));
netcdfFileWriter.addVariableAttribute(timeVariable, new Attribute(UNITS_ATTRIBUTE, timeUnitString));
netcdfFileWriter.addVariableAttribute(timeVariable, new Attribute(AXIS_ATTRIBUTE, T_AXIS));
return timeVariable;
}
private void writeValues(NetcdfFileWriter netcdfFileWriter, Set<Long> times, List<Box<String, Map<String, List<Float>>>> values, Variable[] variables, Variable timeVariable, Set<String> stationIds) throws IOException, InvalidRangeException {
writeTimeValues(netcdfFileWriter, times, timeVariable);
if (!importerValeursExutoireSeulementBool) writeStationIds(netcdfFileWriter, stationIds);
for (int j = 0, size = values.size(); j < size; j++) {
Map<String, List<Float>> locationValuesMap = values.get(j).getObject1();
if (importerValeursExutoireSeulementBool) {
List<Float> valuesForTimes = locationValuesMap.get(DUMMY_LOC);
ArrayFloat.D1 array = new ArrayFloat.D1(times.size());
for (int i = 0; i < times.size(); i++) {
array.set(i, valuesForTimes.get(i));
}
netcdfFileWriter.write(variables[j], array);
} else {
int stationsLength = stationIds.size();
ArrayFloat.D2 array = new ArrayFloat.D2(times.size(), stationsLength);
int i = 0;
for (String stationId : stationIds) {
List<Float> valuesForTimes = locationValuesMap.get(stationId);
for (int t = 0; t < times.size(); t++) {
float value = valuesForTimes != null && !valuesForTimes.isEmpty() ? valuesForTimes.get(t) : Float.NaN;
array.set(t, i, value);
}
i++;
}
netcdfFileWriter.write(variables[j], array);
}
}
}
private void writeStationIds(NetcdfFileWriter netcdfFileWriter, Set<String> stationIds) throws IOException, InvalidRangeException {
ArrayObject.D1 stationIdArray = new ArrayObject.D1(String.class, stationIds.size());
int i = 0;
for (String stationId : stationIds) {
stationIdArray.set(i, stationId);
i++;
}
netcdfFileWriter.writeStringData(stationVariable, stationIdArray);
}
private void writeTimeValues(NetcdfFileWriter netcdfFileWriter, Set<Long> times, Variable timeVariable) throws IOException, InvalidRangeException {
ArrayDouble.D1 timeArray = new ArrayDouble.D1(times.size());
int i = 0;
for (Long time : times) {
long minutes = time / nl.wldelft.util.TimeUnit.MINUTE_MILLIS + outputOffsetMinutes;
timeArray.set(i++, minutes);
}
netcdfFileWriter.write(timeVariable, timeArray);
}
private Variable[] createVariables(NetcdfFileWriter netcdfFileWriter, Dimension timeDimension, List<Box<String, Map<String, List<Float>>>> values, Set<String> stationIds) {
List<Dimension> dims = new ArrayList<>();
dims.add(timeDimension);
if (!importerValeursExutoireSeulementBool) {
Dimension stationDimension = netcdfFileWriter.addDimension(null, STATION_DIMENSION_NAME, stationIds.size());
dims.add(stationDimension);
List<Dimension> stationIdDims = new ArrayList<>();
Dimension stationIdCharLengthDimension = netcdfFileWriter.addDimension(null, ID_CHAR_LENGTH_DIMENSION_NAME, ID_CHAR_LENGTH);
stationIdDims.add(stationDimension);
stationIdDims.add(stationIdCharLengthDimension);
stationVariable = netcdfFileWriter.addVariable(null, STATION_ID_VARNAME, DataType.CHAR, stationIdDims);
netcdfFileWriter.addVariableAttribute(stationVariable, new Attribute(LONG_NAME_ATTRIBUTE, "station identification code"));
netcdfFileWriter.addVariableAttribute(stationVariable, new Attribute(CF_ROLE_ATTRIBUTE, TIME_SERIES_ID_CF_ROLE));
}
int size = values.size();
Variable[] variables = new Variable[size];
for (int i = 0; i < size; i++) {
String outputVarName = values.get(i).getObject0();
variables[i] = netcdfFileWriter.addVariable(null, outputVarName.toLowerCase(), DataType.FLOAT, dims);
}
return variables;
}
private static void readValuesFromCsvFileIntoOneLocation(Map<String, Float> map, Set<Long> times, List<Box<String, Map<String, List<Float>>>> values, File resultFile, String outputValue) throws IOException, ParseException {
try (LineReader lineReader = new LineReader(resultFile, Charset.defaultCharset())) {
lineReader.skipLines(1);
String line = lineReader.readLine();
String[] ids = line.split(";");
line = lineReader.readLine();
List<Float> floats = new ArrayList<>();
while (line != null) {
String[] columns = line.split(";");
long parse = YYYY_MM_DDHHMM.parseToMillis(columns[0]) + 3 * nl.wldelft.util.TimeUnit.HOUR_MILLIS;
times.add(parse);
float value = resultFile.getName().equals("debit_aval.csv") ? Float.valueOf(columns[1]) : getWeightedValue(map, ids, columns);
floats.add(value);
line = lineReader.readLine();
}
Map<String, List<Float>> oneLocMap = new HashMap<>();
oneLocMap.put(DUMMY_LOC, floats);
values.add(new Box(outputValue, oneLocMap));
}
}
private static void readValuesFromCsvFileIntoSeparateLocations(Set<Long> times, List<Box<String, Map<String, List<Float>>>> values, File resultFile, String outputValue, Set<String> stationIds) throws IOException, ParseException {
try (LineReader lineReader = new LineReader(resultFile, Charset.defaultCharset())) {
lineReader.skipLines(1);
String line = lineReader.readLine();
String[] ids = line.split(";");
for (int i = 1; i < ids.length; i++) {
stationIds.add(ids[i]);
}
Map<String, List<Float>> locationValuesMap = new LinkedHashMap<>();
Arrays.stream(ids).forEach(key -> locationValuesMap.computeIfAbsent(key, k-> new ArrayList<Float>()));
line = lineReader.readLine();
while (line != null) {
String[] columns = line.split(";");
long parse = YYYY_MM_DDHHMM.parseToMillis(columns[0]) + 3 * nl.wldelft.util.TimeUnit.HOUR_MILLIS;
times.add(parse);
for (int i = 1; i < ids.length; i++) {
String id = ids[i];
float value = Float.valueOf(columns[i]);
locationValuesMap.get(id).add(value);
}
line = lineReader.readLine();
}
values.add(new Box(outputValue, locationValuesMap));
}
}
private static float getWeightedValue(Map<String, Float> map, String[] ids, String[] columns) {
float sum = 0.0f;
float totalWeight = 0.0f;
for (int i = 1; i < ids.length; i++) {
String id = ids[i];
Float weight = map.get(id);
if (weight == null || weight == 0.0) continue;
totalWeight += weight;
sum += weight * Float.valueOf(columns[i]);
}
return sum / totalWeight;
}
private Map<String, Float> getWeights() throws IOException {
File weightFile = new File(workDirFile, "info/uhrh_amont.txt");
Map<String, Float> map = new HashMap<>();
try (LineReader lineReader = new LineReader(weightFile, Charset.defaultCharset())) {
String line = lineReader.readLine();
while (line != null) {
String[] columns = line.split(";");
String id = columns[0];
float weight = Float.valueOf(columns[1]);
map.put(id, weight);
line = lineReader.readLine();
}
}
return map;
}
private static String getPropertyStringValue(Variable propertiesVariable, String propertyName) throws Exception {
if (propertiesVariable == null) return null;
Attribute attribute = propertiesVariable.findAttribute(propertyName);
if (attribute == null) return null;
String value = attribute.getStringValue();
if (value == null || value.trim().isEmpty()) {
throw new Exception("Property '" + propertyName + "' in netcdf run file is empty or is not of type String.");
}
String trimmedValue = value.trim();
if (log.isDebugEnabled())
log.debug("Read property " + propertyName + " with value '" + trimmedValue + "' from netcdf run file.");
return trimmedValue;
}
private static Integer getPropertyIntValue(Variable propertiesVariable, String propertyName) throws Exception {
if (propertiesVariable == null) return null;
Attribute attribute = propertiesVariable.findAttribute(propertyName);
if (attribute == null) return null;
Number value = attribute.getNumericValue();
if (value == null) {
throw new Exception("Property '" + propertyName + "' in netcdf run file is empty or is not of type String.");
}
return value.intValue();
}
}
|