package nl.wldelft.fews.pi; import nl.wldelft.util.*; import nl.wldelft.util.coverage.PointGeometry; import nl.wldelft.util.geodatum.GeoDatum; import nl.wldelft.util.io.VirtualInputDir; import nl.wldelft.util.io.VirtualInputDirConsumer; import nl.wldelft.util.io.XmlParser; import nl.wldelft.util.timeseries.*; import javax.xml.stream.XMLStreamConstants; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamReader; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteOrder; import java.text.ParseException; import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.TimeZone; public class PiTimeSeriesParser implements XmlParser<TimeSeriesContentHandler>, VirtualInputDirConsumer { private static final int BUFFER_SIZE = 2048; private final Properties.Builder propertyBuilder = new Properties.Builder(); @Override public void setVirtualInputDir(VirtualInputDir virtualInputDir) { this.virtualInputDir = virtualInputDir; } private enum HeaderElement { type(F.R), locationId(F.R), parameterId(F.R), qualifierId(F.M), ensembleId, ensembleMemberIndex, ensembleMemberId, timeStep(F.R | F.A), startDate(F.R | F.A), endDate(F.R | F.A), forecastDate(F.A), missVal, longName, stationName, lat, lon, x, y, z, units, sourceOrganisation, sourceSystem, fileDescription, creationDate, creationTime, region, thresholds; interface F { int A = 1 << 0; // attributes int R = 1 << 1; // required; int M = 1 << 2; // multiple; } private final int flags; HeaderElement() { this.flags = 0; } HeaderElement(int flags) { this.flags = flags; } public boolean isRequired() { return (flags & F.R) != 0; } public boolean hasAttributes() { return (flags & F.A) != 0; } public boolean isMultipleAllowed() { return (flags & F.M) != 0; } } // fastDateFormat is used to keep track of last time zone and lenient private FastDateFormat fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd", "HH:mm:ss", TimeZoneUtils.GMT, Locale.US, null); private HeaderElement currentHeaderElement = null; private static final HeaderElement[] HEADER_ELEMENTS = HeaderElement.class.getEnumConstants(); private PiTimeSeriesHeader header = new PiTimeSeriesHeader(); private List<String> qualifiers = new ArrayList<>(); private long timeStepMillis = 0; private TimeStep timeStep = null; private long startTime = Long.MIN_VALUE; private long endTime = Long.MIN_VALUE; private float missingValue = Float.NaN; private String creationDateText = null; private String creationTimeText = null; private TimeSeriesContentHandler timeSeriesContentHandler = null; private PiTimeSeriesSerializer.EventDestination eventDestination = null; /** * For performance reasons the pi time series format allows that the values are stored in * a separate bin file instead of embedded in the xml file. * The bin file should have same name as the xml file except the extension equals bin * In this case all time series should be equidistant. */ private VirtualInputDir virtualInputDir = VirtualInputDir.NONE; private InputStream binaryInputStream = null; private byte[] byteBuffer = null; private float[] floatBuffer = null; private int bufferPos = 0; private int bufferCount = 0; private XMLStreamReader reader = null; private String virtualFileName = null; private static boolean lenient = false; private double lat = Double.NaN; private double lon = Double.NaN; private double z = Double.NaN; /** * For backwards compatibility. Earlier versions of the PiTimeSeriesParser were tolerant about the date/time format * and the case insensitive for header element names. * This parser should not accept files that are not valid according to pi_timeseries.xsd * When old adapters are not working you can UseLenientPiTimeSeriesParser temporary till the adapter is fixed * * @param lenient */ public static void setLenient(boolean lenient) { PiTimeSeriesParser.lenient = lenient; } public static boolean isLenient() { return lenient; } public PiTimeSeriesParser() { fastDateFormat.setLenient(lenient); } @Override public void parse(XMLStreamReader reader, String virtualFileName, TimeSeriesContentHandler timeSeriesContentHandler) throws Exception { this.reader = reader; this.virtualFileName = virtualFileName; this.timeSeriesContentHandler = timeSeriesContentHandler; String virtualBinFileName = FileUtils.getPathWithOtherExtension(virtualFileName, "bin"); // time zone can be overruled by one or more time zone elements in the pi file this.fastDateFormat.setTimeZone(timeSeriesContentHandler.getDefaultTimeZone()); if (!virtualInputDir.exists(virtualBinFileName)) { eventDestination = PiTimeSeriesSerializer.EventDestination.XML_EMBEDDED; parse(); return; } eventDestination = PiTimeSeriesSerializer.EventDestination.SEPARATE_BINARY_FILE; binaryInputStream = virtualInputDir.getInputStream(virtualBinFileName); try { if (byteBuffer == null) { byteBuffer = new byte[BUFFER_SIZE * NumberType.FLOAT_SIZE]; floatBuffer = new float[BUFFER_SIZE]; } parse(); boolean eof = bufferPos == bufferCount && binaryInputStream.read() == -1; if (!eof) throw new IOException("More values available in bin file than expected based on time step and start and end time\n" + FileUtils.getPathWithOtherExtension(virtualFileName, "bin")); } finally { bufferPos = 0; bufferCount = 0; binaryInputStream.close(); binaryInputStream = null; } } private void parse() throws Exception { reader.require(XMLStreamConstants.START_DOCUMENT, null, null); reader.nextTag(); reader.require(XMLStreamConstants.START_ELEMENT, null, "TimeSeries"); reader.nextTag(); while (reader.getEventType() != XMLStreamConstants.END_ELEMENT) { parseTimeZone(); readTimeSeries(); } reader.require(XMLStreamConstants.END_ELEMENT, null, "TimeSeries"); reader.next(); reader.require(XMLStreamConstants.END_DOCUMENT, null, null); } private void readTimeSeries() throws Exception { reader.require(XMLStreamConstants.START_ELEMENT, null, "series"); reader.nextTag(); parseHeader(); timeSeriesContentHandler.setProperties(Properties.NONE); if (eventDestination == PiTimeSeriesSerializer.EventDestination.XML_EMBEDDED) { while (reader.getEventType() == XMLStreamConstants.START_ELEMENT) { String localName = reader.getLocalName(); if (TextUtils.equals(localName, "event")) { parseEvent(); } else if (TextUtils.equals(localName, "properties")) { parseProperties(); } else { break; } } if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) { // skip comment reader.require(XMLStreamConstants.START_ELEMENT, null, "comment"); reader.getElementText(); reader.nextTag(); } } else { assert eventDestination == PiTimeSeriesSerializer.EventDestination.SEPARATE_BINARY_FILE; readValuesFromBinFile(); } reader.require(XMLStreamConstants.END_ELEMENT, null, "series"); reader.nextTag(); } private void parseHeader() throws Exception { reader.require(XMLStreamConstants.START_ELEMENT, null, "header"); if (reader.getAttributeCount() > 0) { throw new Exception("Attributes are not allowed for header element "); } reader.nextTag(); initHeader(); do { detectHeaderElement(); parseHeaderElement(); } while (reader.getEventType() != XMLStreamConstants.END_ELEMENT); if (header.getForecastTime() == Long.MIN_VALUE) header.setForecastTime(startTime); if (!Double.isNaN(lat)) header.setGeometry(new PointGeometry(GeoDatum.WGS_1984.createLatLongZ(lat, lon, z))); initiateTimeStep(); header.setTimeStep(timeStep); if (!qualifiers.isEmpty()) header.setQualifierIds(qualifiers.toArray(new String[qualifiers.size()])); if (creationDateText != null) { try { long creationTime = fastDateFormat.parseToMillis(creationDateText, creationTimeText); header.setCreationTime(creationTime); } catch (ParseException e) { throw new Exception("Can not parse creation date/time " + creationDateText + ' ' + creationTimeText); } } if (startTime != Long.MIN_VALUE && endTime != Long.MIN_VALUE) { timeSeriesContentHandler.setEstimatedPeriod(new Period(startTime, endTime)); } timeSeriesContentHandler.setNewTimeSeriesHeader(header); reader.require(XMLStreamConstants.END_ELEMENT, null, "header"); reader.nextTag(); } @SuppressWarnings("OverlyLongMethod") private void parseEvent() throws Exception { assert binaryInputStream == null; reader.require(XMLStreamConstants.START_ELEMENT, null, "event"); String dateText = null; String timeText = null; String valueText = null; String flagText = null; String flagSource = null; String comment = null; String user = null; for (int i = 0, n = reader.getAttributeCount(); i < n; i++) { String localName = reader.getAttributeLocalName(i); String attributeValue = reader.getAttributeValue(i); if (dateText == null && TextUtils.equals(localName, "date")) { dateText = attributeValue; } else if (timeText == null && TextUtils.equals(localName, "time")) { timeText = attributeValue; } else if (valueText == null && TextUtils.equals(localName, "value")) { valueText = attributeValue; } else if (flagText == null && TextUtils.equals(localName, "flag")) { flagText = attributeValue; } else if (flagSource == null && TextUtils.equals(localName, "flagSource")) { flagSource = attributeValue; } else if (comment == null && TextUtils.equals(localName, "comment")) { comment = attributeValue; } else if (user == null && TextUtils.equals(localName, "user")) { user = attributeValue; } else { throw new Exception("Unknown attribute " + localName + " in event"); } } if (timeText == null) throw new Exception("Attribute time is missing"); if (dateText == null) throw new Exception("Attribute date is missing"); if (valueText == null) throw new Exception("Attribute value is missing"); putValue(dateText, timeText, valueText, flagText, flagSource, comment, user); reader.nextTag(); reader.require(XMLStreamConstants.END_ELEMENT, null, "event"); reader.nextTag(); } private void putValue(String dateText, String timeText, String valueText, String flagText, String flagSource, String comment, String user) throws Exception { try { timeSeriesContentHandler.setTime(fastDateFormat.parseToMillis(dateText, timeText)); } catch (ParseException e) { throw new Exception("Can not parse " + dateText + ' ' + timeText); } if (flagText == null) { timeSeriesContentHandler.setFlag(0); } else { try { timeSeriesContentHandler.setFlag(TextUtils.parseInt(flagText)); } catch (NumberFormatException e) { throw new Exception("Flag should be an integer " + flagText); } } timeSeriesContentHandler.setComment(comment); timeSeriesContentHandler.setUser(user); timeSeriesContentHandler.setFlagSource(flagSource); try { float value = TextUtils.parseFloat(valueText); // we can not use the automatic missing value detection of the content handler because the missing value is different for each time series if (value == missingValue) { value = Float.NaN; } else { timeSeriesContentHandler.setValueResolution(TextUtils.getValueResolution(valueText, '.')); } timeSeriesContentHandler.setValue(value); timeSeriesContentHandler.applyCurrentFields(); } catch (NumberFormatException e) { throw new Exception("Value should be a float " + valueText); } } private long parseTime() throws Exception { String dateText = reader.getAttributeValue(null, "date"); if (dateText == null) { throw new Exception("Attribute " + currentHeaderElement + "-date is missing"); } String timeText = reader.getAttributeValue(null, "time"); if (timeText == null) { throw new Exception("Attribute " + currentHeaderElement + "-time is missing"); } long time; try { time = fastDateFormat.parseToMillis(dateText, timeText); } catch (ParseException e) { throw new Exception("Not a valid data time for " + currentHeaderElement + ' ' + dateText + ' ' + timeText, e); } reader.nextTag(); return time; } private long parseTimeStep() throws Exception { String unit = reader.getAttributeValue(null, "unit"); if (unit == null) { throw new Exception("Attribute unit is missing in " + currentHeaderElement); } TimeUnit tu = TimeUnit.get(unit); if (tu != null) { String multiplierText = reader.getAttributeValue(null, "multiplier"); int multiplier; if (multiplierText == null) { multiplier = 1; } else { try { multiplier = Integer.parseInt(multiplierText); } catch (NumberFormatException e) { throw new Exception(ExceptionUtils.getMessage(e), e); } if (multiplier == 0) { throw new Exception("Multiplier is 0"); } } String dividerText = reader.getAttributeValue(null, "divider"); int divider; if (dividerText == null) { divider = 1; } else { try { divider = Integer.parseInt(dividerText); } catch (NumberFormatException e) { throw new Exception(ExceptionUtils.getMessage(e), e); } if (divider == 0) { throw new Exception("divider is 0"); } } reader.nextTag(); return tu.getMillis() * multiplier / divider; } else { reader.nextTag(); return 0; } } private void initHeader() { header.clear(); header.setFileDescription(virtualFileName); currentHeaderElement = null; timeStep = null; timeStepMillis = 0; startTime = Long.MIN_VALUE; endTime = Long.MIN_VALUE; missingValue = Float.NaN; creationDateText = null; creationTimeText = "00:00:00"; qualifiers.clear(); lat = Double.NaN; lon = Double.NaN; z = Double.NaN; } private void readValuesFromBinFile() throws Exception { TimeStep timeStep = header.getTimeStep(); if (!timeStep.isRegular()) { throw new Exception("Only equidistant time step supported when pi events are stored in bin file instead of xml"); } boolean equidistantMillis = timeStep.isEquidistantMillis(); long stepMillis = equidistantMillis ? timeStep.getStepMillis() : Long.MIN_VALUE; for (long time = startTime; time <= endTime;) { timeSeriesContentHandler.setTime(time); if (bufferPos == bufferCount) fillBuffer(); float value = floatBuffer[bufferPos++]; // we can not use the automatic missing value detection of the content handler because the missing value is different for each time series if (value == missingValue) value = Float.NaN; timeSeriesContentHandler.setValue(value); timeSeriesContentHandler.applyCurrentFields(); if (equidistantMillis) { time += stepMillis; continue; } time = timeStep.nextTime(time); } } private void fillBuffer() throws IOException { int byteBufferCount = 0; while (byteBufferCount % NumberType.FLOAT_SIZE != 0 || byteBufferCount == 0) { int count = binaryInputStream.read(byteBuffer, byteBufferCount, BUFFER_SIZE * NumberType.FLOAT_SIZE - byteBufferCount); assert count != 0; // see read javadoc if (count == -1) throw new EOFException("Bin file is too short"); byteBufferCount += count; } bufferCount = byteBufferCount / NumberType.FLOAT_SIZE; BinaryUtils.copy(byteBuffer, 0, byteBufferCount, floatBuffer, 0, bufferCount, ByteOrder.LITTLE_ENDIAN); bufferPos = 0; } private void initiateTimeStep() { if (timeStepMillis == 0) { timeStep = IrregularTimeStep.INSTANCE; return; } long startTime = this.startTime == Long.MIN_VALUE ? 0L : this.startTime; if (timeStepMillis % TimeUnit.SECOND_MILLIS != 0) { timeStep = RelativeEquidistantTimeStep.getInstance(timeStepMillis, startTime); return; } long timeZoneOffsetMillis = -startTime % timeStepMillis; if (timeZoneOffsetMillis % TimeUnit.MINUTE_MILLIS != 0) { timeStep = RelativeEquidistantTimeStep.getInstance(timeStepMillis, startTime); return; } timeStep = SimpleEquidistantTimeStep.getInstance(timeStepMillis, timeZoneOffsetMillis); } private void parseTimeZone() throws Exception { if (reader.getEventType() != XMLStreamConstants.START_ELEMENT) return; if (!TextUtils.equals(reader.getLocalName(), "timeZone")) return; try { String timeZoneText = reader.getElementText(); // element name="timeZone" type="fews:TimeZoneSimpleType" default="0.0" minOccurs="0"/> // when default is used in schema for element the consequence is that empty strings are allowed double offset = timeZoneText.isEmpty() ? 0d : Double.parseDouble(timeZoneText); TimeZone timeZoneFromDouble = TimeZoneUtils.createTimeZoneFromDouble(offset); this.fastDateFormat.setTimeZone(timeZoneFromDouble); } catch (NumberFormatException e) { throw new Exception("Not valid timeZone format", e); } reader.require(XMLStreamConstants.END_ELEMENT, null, "timeZone"); reader.nextTag(); } @SuppressWarnings({"OverlyLongMethod"}) private void parseHeaderElement() throws Exception { switch (currentHeaderElement) { case type: header.setParameterType(parseType(reader.getElementText())); break; case locationId: // see FEWS-9858, when there is no location id the time series are assigned to all locations // this is a flaw in the pi_timeSeries.xsd, the location element is required but is allowed empty strings header.setLocationId(TextUtils.defaultIfNull(TextUtils.trimToNull(reader.getElementText()), "none")); break; case parameterId: // see FEWS-9858, when there is no parameter id the time series are assigned to all locations // this is a flaw in the pi_timeSeries.xsd, the location element is required but is allowed empty strings header.setParameterId(TextUtils.defaultIfNull(TextUtils.trimToNull(reader.getElementText()), "none")); break; case qualifierId: qualifiers.add(reader.getElementText()); break; case ensembleId: header.setEnsembleId(reader.getElementText()); break; case ensembleMemberIndex: header.setEnsembleMemberIndex(parseEnsembleMemberIndex(reader.getElementText())); break; case ensembleMemberId: header.setEnsembleMemberId(reader.getElementText()); break; case timeStep: timeStepMillis = parseTimeStep(); break; case startDate: startTime = parseTime(); break; case endDate: endTime = parseTime(); break; case forecastDate: header.setForecastTime(parseTime()); break; case missVal: missingValue = parseValue(reader.getElementText()); break; case longName: header.setLongName(reader.getElementText()); break; case stationName: header.setLocationName(reader.getElementText()); break; case units: header.setUnit(reader.getElementText()); break; case sourceOrganisation: header.setSourceOrganisation(reader.getElementText()); break; case sourceSystem: header.setSourceSystem(reader.getElementText()); break; case fileDescription: header.setFileDescription(reader.getElementText()); break; case creationDate: creationDateText = reader.getElementText(); break; case creationTime: creationTimeText = reader.getElementText(); break; case region: header.setRegion(reader.getElementText()); break; case thresholds: parseThresholds(); break; case lat: lat = parseValue(reader.getElementText()); break; case lon: lon = parseValue(reader.getElementText()); break; case x: reader.getElementText(); break; case y: reader.getElementText(); break; case z: z = parseValue(reader.getElementText()); break; } reader.require(XMLStreamConstants.END_ELEMENT, null, currentHeaderElement.name()); reader.nextTag(); } private void parseThresholds() throws XMLStreamException { reader.nextTag(); ArrayList<String> ids = new ArrayList<>(); ArrayList<String> names = new ArrayList<>(); ArrayList<String> stringValues = new ArrayList<>(); ArrayList<String> groupIds = new ArrayList<>(); ArrayList<String> groupNames = new ArrayList<>(); do { if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) { ids.add(reader.getAttributeValue(null, "id")); names.add(reader.getAttributeValue(null, "name")); stringValues.add(reader.getAttributeValue(null, "value")); groupIds.add(reader.getAttributeValue(null, "groupId")); groupNames.add(reader.getAttributeValue(null, "groupName")); } reader.nextTag(); } while (!reader.getLocalName().equals(currentHeaderElement.name())); float[] values = new float[stringValues.size()]; for (int i = 0; i < values.length; i++) { values[i] = Float.parseFloat(stringValues.get(i)); } header.setHighLevelThresholds(ids.toArray(new String[ids.size()]), names.toArray(new String[names.size()]), values, groupIds.toArray(new String[groupIds.size()]), groupNames.toArray(new String[groupNames.size()])); } private void parseProperties() throws XMLStreamException { reader.require(XMLStreamConstants.START_ELEMENT, null, "properties"); reader.nextTag(); propertyBuilder.clear(); while (!TextUtils.equals(reader.getLocalName(), "properties")){ if (reader.getEventType() != XMLStreamConstants.START_ELEMENT) { // eg <int key="a" value=12><int> reader.nextTag(); continue; } String key = reader.getAttributeValue(null, "key"); String value = reader.getAttributeValue(null, "value"); String date = reader.getAttributeValue(null, "date"); String time = reader.getAttributeValue(null, "time"); switch (reader.getLocalName()) { case "string" : propertyBuilder.addString(key, value); break; case "int": propertyBuilder.addInt(key, TextUtils.parseInt(value)); break; case "float": propertyBuilder.addFloat(key, TextUtils.parseFloat(value)); break; case "double": propertyBuilder.addDouble(key, TextUtils.parseDouble(value)); break; case "bool": propertyBuilder.addBoolean(key, Boolean.parseBoolean(value)); break; case "dateTime": try { propertyBuilder.addDateTime(key, fastDateFormat.parseToMillis(date, time)); break; } catch (ParseException e) { throw new XMLStreamException("Invalid date time "+ date + ' ' + time); } default: throw new XMLStreamException("Invalid property type " + reader.getLocalName()); } reader.nextTag(); } timeSeriesContentHandler.setProperties(propertyBuilder.build()); reader.require(XMLStreamConstants.END_ELEMENT, null, "properties"); reader.nextTag(); } private static float parseValue(String gotString) throws Exception { // <element name="missVal" type="double" default="NaN"> // when default is used in schema for element the consequence is that empty strings are allowed if (gotString.isEmpty()) return Float.NaN; try { return TextUtils.parseFloat(gotString); } catch (NumberFormatException e) { throw new Exception(ExceptionUtils.getMessage(e), e); } } private static int parseEnsembleMemberIndex(String gotString) throws Exception { int index = Integer.parseInt(gotString); if (index < 0) { throw new Exception("Negative ensemble member index not allowed " + gotString); } return index; } private static ParameterType parseType(String gotString) throws Exception { ParameterType type = ParameterType.get(gotString); if (type == null) { throw new Exception("Type in header should be instantaneous or accumulative and not " + gotString); } return type; } private void detectHeaderElement() throws Exception { if (reader.getEventType() != XMLStreamConstants.START_ELEMENT) throw new Exception("header element expected"); String localName = reader.getLocalName(); HeaderElement element; try { element = Enum.valueOf(HeaderElement.class, localName); } catch (Exception e) { throw new Exception("Unknown header element: " + localName); } if (currentHeaderElement == element && currentHeaderElement.isMultipleAllowed()) return; if (currentHeaderElement != null && element.ordinal() < currentHeaderElement.ordinal()) { throw new Exception("Header elements in wrong order: " + localName); } if (currentHeaderElement == HeaderElement.ensembleMemberIndex && element == HeaderElement.ensembleMemberId) { throw new Exception("Duplicate header element, both ensembleMemberIndex and ensembleMemberId in header"); } if (currentHeaderElement == element) { throw new Exception("Duplicate header element: " + localName); } if (reader.getAttributeCount() > 0 && !element.hasAttributes()) { throw new Exception("Attributes are not allowed for header element " + localName); } int nextOrdinal = currentHeaderElement == null ? 0 : currentHeaderElement.ordinal() + 1; // order is correct and no duplicate so currentHeaderElement can not be last header element assert nextOrdinal < HEADER_ELEMENTS.length; HeaderElement nextHeaderElement = HEADER_ELEMENTS[nextOrdinal]; if (nextHeaderElement.isRequired() && nextHeaderElement != element) { throw new Exception("Required header item missing: " + nextHeaderElement); } currentHeaderElement = element; } public TimeZone getTimeZone() { return fastDateFormat.getTimeZone(); } @SuppressWarnings("UnusedDeclaration") public PiTimeSeriesSerializer.EventDestination getEventDestination() { return eventDestination; } @SuppressWarnings("UnusedDeclaration") public float getMissingValue() { return missingValue; } }