...
Code Block |
---|
package nl.wldelft.fews.system.plugin.dataImporttimeseriesparsers; import nl.wldelft.util.TextUtils; import nl.wldelft.util.TimeZoneUtils; import nl.wldelft.util.io.LineReader; import nl.wldelft.util.io.TextParser; import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader; import nl.wldelft.util.timeseries.TimeSeriesContentHandler; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import java.io.IOException; import java.util.TimeZone; /** * This parser supports two formats: WISKI7 and the previous format of WISKI (number unknown) * Recognizing the formats: * a) If the header contains the keyword TSPATH, then it is WISKI7. * The event rows wil be parsed according to the format as specified with the keyword LAYOUT. If no LAYOUT specified, the defaults wil be used. * b) otherwise it is older Wiski format. * The event rows wil be parsed as follows: the first column is timestamp, the second column is value */ public class WiskiTimeSeriesParser implements TextParser<TimeSeriesContentHandler> { private static final Logger log = LoggerLogManager.getLogger(WiskiTimeSeriesParser.class); private static final char remarkSepar = '"'; private boolean anyHeaderInvalid = false; private LineReader reader = null; private TimeSeriesContentHandler contentHandler = null; private DefaultTimeSeriesHeader header = new DefaultTimeSeriesHeader(); private TimeZone headerTimeZone; //timeZone read from the file header private String virtualFileName; //Fields <name>Index specify the column of the timestamp, value, status ,.... in the event row. //With the keyword LAYOUT an another sequence of the data can be specified. private int timestampIndex; private int valueIndex; private int statusIndex; private int interpolation_typeIndex; private int remarkIndex; private boolean defaultWiski7LayoutUsed = true; @Override public void parse(LineReader reader, String virtualFileName, TimeSeriesContentHandler contentHandler) throws Exception { this.virtualFileName = virtualFileName; this.contentHandler = contentHandler; this.contentHandler.addMissingValue(-777.0f); this.reader = reader; this.reader.setCommentLinePrefix('?'); this.reader.setSkipEmptyLines(true); this.header.clear(); this.headerTimeZone = null; //Initialized the columns to read data from the event rows. initializeWiski7RowLayout(); anyHeaderInvalid = false; boolean headerValid = false; reader.mark(500); String[] buffer = new String[5]; for (String line; (line = reader.readLine()) != null; reader.mark(500)) { line = line.trim(); if (line.equals("ENDOFFILE")) return; if (line.charAt(0) == '#') { reader.reset(); headerValid = parseHeader(); continue; } if (this.header.getLocationId() == null && this.header.!headerValid) { anyHeaderInvalid = true; continue; } if (this.header.getLocationId() == null && this.header.getParameterId() == null) throw new Exception("Not a valid wiski file, REXCHANGE, CNAME, SANR tags are all missing in the file header"); if (this.contentHandler.isCurrentTimeSeriesHeaderForAllTimesRejected()) continue; //If default layout used, the remarks (if any) must be specified last in the line and start with "-token String remarksDefaultLayout; if (this.defaultWiski7LayoutUsed && line.indexOf(remarkSepar) != -1) { //extract remarks first (if any), so that they cannot be confused with the status or interp.type String leftStr = TextUtils.leftFrom(line, remarkSepar); remarksDefaultLayout = TextUtils.rightFrom(line, remarkSepar).replace(remarkSepar, ' '); TextUtils.split(leftStr, ' ', buffer); } else { remarksDefaultLayout = null; TextUtils.split(line, ' ', buffer); } if (this.headerTimeZone != null) { contentHandler.setTime(this.headerTimeZone, "yyyyMMddHHmmss", buffer[this.timestampIndex]); } else { contentHandler.setTime(contentHandler.getDefaultTimeZone(), "yyyyMMddHHmmss", buffer[this.timestampIndex]); } int flag = getFlag(this.defaultWiski7LayoutUsed, this.statusIndex, this.interpolation_typeIndex, buffer, line); if (flag != Integer.MIN_VALUE) { contentHandler.setFlag(flag); } contentHandler.setValue('.', buffer[this.valueIndex]); contentHandler.setComment(null); //reset if (remarksDefaultLayout != null) { if (!remarksDefaultLayout.isEmpty()) contentHandler.setComment(remarksDefaultLayout); } else { if (this.remarkIndex != -1) { String remark = buffer[this.remarkIndex].replace(remarkSepar, ' '); if (!remark.isEmpty()) contentHandler.setComment(remark); } } contentHandler.applyCurrentFields(); } } /** * if (anyHeaderInvalid) throw new IOException(" the file has one or more invalid headers"); // throw exception since the file should be marked as not fully successful } /** * Read metadata from the #-records. Metadata block is followed by the timeseries-records * but the timeseries-records may be also omitted. In this case the Metadata block MUST start * with a record that begins with ## ! * Empty records wil be ignored. * <p/> * The meaning of the keys is: * TZ : time zone. TZ are UTC0 and UTC+/-x (e.g. UTC+1 or UTC-2). * TSPATH : /site id/location id/parameter id/ts shortname * example TSPATH/160/160_1/WATHTE/cmd.p * only location id and parameter id is parsed and used * SANR : location id. Used only if not specified with TSPATH * CNAME: parameter id. Used only if not specified with TSPATH * CUNIT: unit * RINVAL: missing value * REXCHANGE: location-parameter. Wil be used only if the metadata block does not contain keys TSPATH, SANR or CNAME. * The string specified by keyword REXCHANGE represents location Id and also parameter-id (so locations Id and parameter Id equals) * * @throws IOException if the header format is incorrect */ private voidboolean parseHeader() throws IOException { this.header.clear(); this.headerTimeZone = null; //Initialized the columns to read data from the event rows. initializeWiski7RowLayout(); String tspathPar = null; String tspathQual = null; String tspathLoc = null; String fallbackParLoc = null; for (String line; (line = this.reader.readLine()) != null; reader.mark(500)) { line = line.trim(); if (line.charAt(0) != '#') { reader.reset(); break; } String layoutString = parseKeyValue("LAYOUT", line); if (layoutString != null) { defaultWiski7LayoutUsed = false; //Obtain information how to parse event rows parseEventRowLayout(layoutString.trim()); } String tzString = parseKeyValue("|TZ", line); if (tzString != null) { this.headerTimeZone = parseTimeZone(tzString, this.virtualFileName, this.contentHandler.getDefaultTimeZone().getID()); } //Parse location id and parameter specified with keyword TSPATH //format: TSPATH/<site id>/<station id>/<parameter shortname>/<ts shortname> //example: TSPATH/160/160_1/WATHTE/cmd.p (contains always all these 4 elements ) //<ts shortname> is read as qualifier String tspath = parseKeyValue("TSPATH", line); ifint (tspathindex != nullline.contains("TSPATH/") { ? 1 : 0; Stringif (tspath != null && !tspath.trim().equals("/")) { //TSPATH available and not empty String[] buffer = TextUtils.split(tspath, '/'); if (buffer.length != 4 5+ index || buffer[21 + index].length() < 1 || buffer[32+index].length() < 1) { throw new IOException("Not a valid wiski file, TSPATH has a incorrect format: " + tspath + " expected: TSPATH/<site id>/<station id>/<parameter shortname>/<ts shortname>"); } tspathLoc = buffer[21+index]; tspathPar = buffer[32+index]; tspathQual = buffer[43+index].replace('.', '_'); // dots are not allowed in fews as internal qualifiers, replace dots with underscores } String locationId = parseKeyValue("SANR", line); if (locationId != null && !locationId.isBlank()) header.setLocationId(locationId); String parameterId = parseKeyValue("CNAME", line); if (log.isDebugEnabled() && locationId != null && locationId.isBlank()) { if (parameterId != null) header.setParameterId(parameterIdlog.debug("Location id is blank, skipping current line."); } String unitparameterId = parseKeyValue("CUNITCNAME", line); if (unitparameterId != null) header.setUnit(unit&& !parameterId.isBlank()) header.setParameterId(parameterId); if String missingValue = parseKeyValue("RINVAL", line); (log.isDebugEnabled() && parameterId != null && parameterId.isBlank()) { if (missingValue != null) contentHandler.addMissingValue(missingValue);log.debug("Parameter id is blank, skipping current line."); } String parLocunit = parseKeyValue("REXCHANGECUNIT", line); if (parLocunit != null) fallbackParLoc = parLoc; header.setUnit(unit); } String missingValue = //If no location and parameter can be parsed from the file, we have reached end of file that is ended with parseKeyValue("RINVAL", line); if (missingValue != null) contentHandler.addMissingValue(missingValue); //oneString orparLoc more comment lines starting with # character. = parseKeyValue("REXCHANGE", line); if (tspathParparLoc !== null) &&fallbackParLoc tspathLoc == null && fallbackParLoc == null && fallbackParLoc == null) return; parLoc; } if (tspathPar != null && tspathLoc != null) { //If par id, qualifier id and loc are specified with TSPATH, use them , even if the keywords SANR and SNAME are also present in the file header.setParameterId(tspathPar); header.setQualifierIds(tspathQual); header.setLocationId(tspathLoc); } else { //The header has an OLD format (no WISKI7), if any LAYOUT specifid, it wil be ignored ! //Re-initialized the columns to read data from the event rows (no flags reading !). initializeRowLayout(); if (header.getParameterId() == null || header.getLocationId() == null) { if (fallbackParLoc != null && !fallbackParLoc.isEmpty()) { header.setParameterId(fallbackParLoc); header.setLocationId(fallbackParLoc); } else { } if (log.isDebugEnabled()) log.debug("Reading data for "+ header.toString()); contentHandler.setTimeSeriesHeader(headerwarn(this.reader.getFileAndLineNumber()+" parameter/location is missing next to the keyword REXCHANGE"); } private void initializeWiski7RowLayout() { defaultWiski7LayoutUsed =return truefalse; timestampIndex = 0; } // timestamp wil be read from column} 1 by default } valueIndex = 1; if (log.isDebugEnabled()) log.debug("Reading data for " + header.toString()); // value wil be read from column 2 by defaultif (header.getLocationId() != null || header.getParameterId() != null) contentHandler.setTimeSeriesHeader(header); statusIndexreturn = 2true; } private void initializeWiski7RowLayout() { // status wil be readdefaultWiski7LayoutUsed from column 3 by default= true; interpolation_typeIndextimestampIndex = 30; // interpolation_type // timestamp wil be read from column 41 by default remarkIndexvalueIndex = 41; // remarkvalue wil be read from column 52 by default } statusIndex = 2; private void initializeRowLayout() { defaultWiski7LayoutUsed = false; // status wil be read from column timestampIndex3 =by 0;default interpolation_typeIndex = 3; // timestampinterpolation_type iswil alwaysbe read from column 4 by 1default valueIndexremarkIndex = 14; // value isremark alwayswil be read from column 5 2by default } statusIndexprivate = -1; void initializeRowLayout() { defaultWiski7LayoutUsed = false; // status timestampIndex NOT= read 0; interpolation_typeIndex = -1; // timestamp interpolation_typeis NOTalways read from column 1 remarkIndexvalueIndex = -1; // value remarkis NOTalways read from column 2 } //Some examples of thestatusIndex buffer, that is parsed from the header line LAYOUT: = -1; //(timestamp,value) //(timestamp,value,status,interpolationm_type,remark) status private void parseEventRowLayout(String bufferIn) throws IOException { NOT read interpolation_typeIndex = -1; //Check the passed interpolation_type buffer, remove brackets from bufferNOT read String bufferremarkIndex = null-1; if (bufferIn.length() > 5) { // remark NOT read } if (bufferIn.charAt(0) == '(' && bufferIn.charAt(bufferIn.length() - 1) == ')') { //Some examples of the buffer, that is parsed from the header line LAYOUT: //(timestamp,value) //(timestamp,value,status,interpolationm_type,remark) private void parseEventRowLayout(String bufferIn) throws IOException { //Check the passed buffer, remove brackets from buffer String buffer = null; if (bufferIn.length() > 5) { if (bufferIn.charAt(0) == '(' && bufferIn.charAt(bufferIn.length() - 1) == ')') { buffer = bufferIn.substring(1, bufferIn.length() - 1).trim(); } } if (buffer == null) throw new IOException("String specified with the LAYOUT-keyword has wrong format:" + bufferIn); String[] keywords = new String[5]; //max. 5 keywords expected , as specified in the second example TextUtils.split(buffer, ',', keywords); //-1 = data not specified in the file (accorning to the header keyword LAYOUT) timestampIndex = -1; valueIndex = -1; statusIndex = -1; interpolation_typeIndex = -1; remarkIndex = -1; buffer =for bufferIn.substring(1, bufferIn.length() - 1).trim(); int i = 0; i < 5; i++) { } String keyword } = keywords[i].trim(); if (buffer == null) keyword.isEmpty()) continue; //not all throwkeywords new IOException("Stringmust be specified, withhowever theat LAYOUT-keyword has wrong format:" + bufferIn);least 2: timestamp and value String[] keywords = new String[5]; //max. 5 keywords expected , as specified in the second example if (keyword.equals("timestamp")) { if TextUtils.split(buffer, ',', keywords); timestampIndex != -1) //-1 = data not specified in the filethrow new IOException(accorning"Keyword to the header'" + keyword LAYOUT) + "' specified more than once in the timestampIndexheader = -1line LAYOUT"); valueIndex = -1; statusIndextimestampIndex = -1i; interpolation_typeIndex = -1; } else if (keyword.equals("value")) { remarkIndex = -1; for (int i = 0; iif < 5; i++) { (valueIndex != -1) String keyword = keywords[i].trim(); throw new IOException("Keyword '" + keyword + if (keyword.isEmpty()) "' specified more than once in the header line LAYOUT"); continue; //not all keywords must be specified, however atvalueIndex least 2:= i; timestamp and value } else if if ((keyword.equals("status") || keyword.equals("timestampprimary_status")) { if (timestampIndexstatusIndex != -1) throw new IOException("Keyword '" + keyword + "' specified more than once in the header line LAYOUT"); timestampIndexstatusIndex = i; } else if (keyword.equals("valueinterpolation_type")) { if (valueIndexinterpolation_typeIndex != -1) throw new IOException("Keyword '" + keyword + "' specified more than once in the header line LAYOUT"); valueIndexinterpolation_typeIndex = i; } else if (keyword.equals("statusremark")) { if (statusIndexremarkIndex != -1) throw new IOException("Keyword '" + keyword + "' specified more than once in the header line LAYOUT"); statusIndexremarkIndex = i; } else if (isKnownKeyword(keyword.equals("interpolation_type")) { // Ignore if (interpolation_typeIndex != -1) } else { throw new IOException("KeywordWrong keyword '" + keyword + "' specified more than once in the header line #LAYOUT LAYOUT"); } } if (timestampIndex == -1 || interpolation_typeIndexvalueIndex = i;= -1) { }throw else if (keyword.equals("remark")) { new IOException("Keywords timestamp and/or value are not specified in the header line LAYOUT !"); if (remarkIndex != -1) } } private static boolean isKnownKeyword(String key) { throw new IOExceptionif (key.equals("Keyword '" + keyword + "' specified more than once in the header line LAYOUT"); timestampoccurence")) return true; if (key.equals("forecast")) return true; if (key.equals("member")) return true; remarkIndex = iif (key.equals("dispatch_info")) return true; return false; } else if (keyword.equals("timestampoccurence")) { //Returns value or null if the key not found in the buffer private static String // Ignore parseKeyValue(String key, String buffer) { int }keyPos else {= buffer.indexOf(key); if (keyPos == -1) return null; throw new IOException("Wrong keyword '" +int keywordendValuePos += "' specified in the header line #LAYOUT "); buffer.indexOf(";*;", keyPos + key.length()); if (endValuePos } } == -1) endValuePos = buffer.indexOf("|*|", keyPos + key.length()); if (timestampIndexendValuePos == -1 || valueIndex == -1) {) return null; return buffer.substring(keyPos + key.length(), endValuePos); } private static TimeZone parseTimeZone(String buffer, String fileName, String throwdefaultTimeZone) newthrows IOException("Keywords timestamp{ and/or value are not specified in the header line LAYOUT !if (buffer.equals("MEZ")) return TimeZone.getTimeZone("GMT+1"); } } if (buffer.equals("MESZ")) return TimeZone.getTimeZone("CET"); //Returns value or null if the key not found in the buffer (buffer.equals("CET")) return TimeZone.getTimeZone("CET"); private static String parseKeyValue(String key, String buffer) {if (buffer.equals("CEST")) return TimeZone.getTimeZone("GMT+2"); int keyPos =if (buffer.equals("Europe/Amsterdam") || buffer.indexOf(key); if (keyPos == -1) return null; equals("Europe/Berlin") || buffer.equals("Europe/Brussels") || buffer.equals("Europe/Luxembourg") || int endValuePos = buffer.indexOfequals(";*;", keyPos + key.length()); if (endValuePos == -1) endValuePos = buffer.indexOf("|*|", keyPos + key.length())Europe/Madrid") || buffer.equals("Europe/Paris") || buffer.equals("Europe/Rome") || buffer.equals("Europe/Vienna") || buffer.equals("Europe/Zurich")) { return TimeZone.getTimeZone("CET"); if (endValuePos} String strOffset == -1) return nullgetUtcGmtOffset(buffer); returnif buffer.substring(keyPosstrOffset + key.length(), endValuePos);== null) { } //Parse time zone. Note: UTC always expected , since no other code wil occur according to the Wiski 7 format //Allowed formats are: UTC0 and UTC+/-x (e.g. UTC+1 or UTC-2). private static TimeZone parseTimeZone(String buffer, String fileName, String defaultTimeZone) throws IOException { if (buffer.equals("MEZ")) return TimeZone.getTimeZone("GMT+1") log.warn(fileName + ": invalid timezone specified with TZ keyword - " + buffer + " , " + defaultTimeZone + " will be used."); return null; } TimeZone timeZone; if (buffer.equals("MESZ")) return TimeZone.getTimeZone("CET"); try { if (buffer.indexOf("UTC") != 0 || buffer.length() < 4) { double offset = Double.parseDouble(strOffset); timeZone = logTimeZoneUtils.warn(fileName + ": invalid timezone specified with TZ keyword - " + buffer + " , " + defaultTimeZone + " will be used."); return nullcreateTimeZoneFromDouble(offset); } catch (NumberFormatException e) { throw new IOException("Invalid timeZone specified with TZ keyword:" + buffer, e); } String strOffset = buffer.substring(3)return timeZone; TimeZone timeZone;} private static String getUtcGmtOffset(String trybuffer) { if ((buffer.startsWith("UTC") || double offset = Double.parseDouble(strOffset);buffer.startsWith("GMT")) && buffer.length() >= 4) { timeZonereturn = TimeZoneUtilsbuffer.createTimeZoneFromDoublesubstring(offset3); } catch (NumberFormatException e) { if ((buffer.startsWith("Etc/UTC") || buffer.startsWith("Etc/GMT")) && buffer.length() >= 8) { throw new IOException("Invalid timeZone specified with TZ keyword:" +return buffer, e.substring(7); } return timeZonenull; } //Parse flags from the line. //First flag is 'status', the second one is interpolation type. //Two optional flags are composed to one flag as follows: flag1*1000+flag2 //Flag2 must be between 0 and 999 //Line examples and the composed flags: //20100227000709 3.0 200 103 -> 200103 //20100227000709 3.0 0 103 -> 103 //20100227000709 3.0 200 0 -> 200000 //20100227000709 3.0 200 -> 200000 private static int getFlag(boolean defaultLayout, int statusColumnIndex, int interpTypeColumnIndex, String[] buffer, String fileLine) { int statusFlag = Integer.MIN_VALUE; if (statusColumnIndex != -1) { String status = buffer[statusColumnIndex]; if (status.isEmpty()) { if (!defaultLayout) { //status not specified according to the header, give message log.error("Status expected, but is ommited in the line: " + fileLine); } return Integer.MIN_VALUE; //no status specified in defaultLayout } statusFlag = parseIntFlag(status); if (statusFlag == Integer.MIN_VALUE) { log.error("Wrong status specified in the line:" + fileLine); return Integer.MIN_VALUE; //flag cannot be converted to integer, so no flags wil be set (for this timestep) } } if (statusFlag != Integer.MIN_VALUE) statusFlag *= 1000; int interpTypeFlag = Integer.MIN_VALUE; if (interpTypeColumnIndex != -1) { String interpType = buffer[interpTypeColumnIndex]; if (interpType.isEmpty()) { if (!defaultLayout) { //Interpolation type not specified according to the header , give message log.error("Interpolation type expected, but is ommited in the line: " + fileLine); } return statusFlag; } interpTypeFlag = parseIntFlag(interpType); if (interpTypeFlag == Integer.MIN_VALUE || interpTypeFlag < 0 || interpTypeFlag > 999) { log.error("Wrong interpolation type specified, it should be between 0 and 999. Line: " + fileLine); return Integer.MIN_VALUE; } } if (statusFlag == Integer.MIN_VALUE) { return interpTypeFlag; //only the interpolation type flag specified } if (interpTypeFlag == Integer.MIN_VALUE) { return statusFlag; //only the status flag type specified } return statusFlag + interpTypeFlag; } private static int parseIntFlag(String buffer) { int flag; try { flag = TextUtils.parseInt(buffer); } catch (NumberFormatException e) { flag = Integer.MIN_VALUE; } return flag; } } } |