package nl.wldelft.fews.pi; import nl.wldelft.util.DateUtils; import nl.wldelft.util.ExceptionUtils; import nl.wldelft.util.FastDateFormat; import nl.wldelft.util.FileUtils; import nl.wldelft.util.Period; import nl.wldelft.util.TextUtils; import nl.wldelft.util.TimeUnit; import nl.wldelft.util.TimeZoneUtils; import nl.wldelft.util.NumberType; import nl.wldelft.util.BinaryUtils; import nl.wldelft.util.io.VirtualInputDir; import nl.wldelft.util.io.VirtualInputDirConsumer; import nl.wldelft.util.io.XmlParser; import nl.wldelft.util.timeseries.IrregularTimeStep; import nl.wldelft.util.timeseries.ParameterType; import nl.wldelft.util.timeseries.SimpleEquidistantTimeStep; import nl.wldelft.util.timeseries.TimeSeriesContentHandler; import nl.wldelft.util.timeseries.TimeStep; import org.apache.log4j.Logger; import javax.xml.stream.XMLStreamConstants; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamReader; import java.io.IOException; import java.io.InputStream; import java.io.EOFException; import java.text.ParseException; import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.TimeZone; import java.nio.ByteOrder; public class PiTimeSeriesParser implements XmlParser<TimeSeriesContentHandler>, VirtualInputDirConsumer { private static final Logger log = Logger.getLogger(PiTimeSeriesParser.class); private static final int BUFFER_SIZE = 2048; @Override public void setVirtualInputDir(VirtualInputDir virtualInputDir) { this.virtualInputDir = virtualInputDir; } private enum HeaderElement { type(F.R), locationId(F.R), parameterId(F.R), qualifierId(F.M), ensembleId, ensembleMemberIndex, timeStep(F.R | F.A), startDate(F.R | F.A), endDate(F.R | F.A), forecastDate(F.A), missVal, longName, stationName, units, sourceOrganisation, sourceSystem, fileDescription, creationDate, creationTime, region, thresholds; interface F { int A = 1 << 0; // attributes int R = 1 << 1; // required; int M = 1 << 2; // multple; } private final int flags; HeaderElement() { this.flags = 0; } HeaderElement(int flags) { this.flags = flags; } public boolean isRequired() { return (flags & F.R) != 0; } public boolean hasAttributes() { return (flags & F.A) != 0; } public boolean isMultipleAllowed() { return (flags & F.M) != 0; } } // fastDateFormat is used to keep track of last time zone and lenient private FastDateFormat fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd", "HH:mm:ss", DateUtils.GMT, Locale.US, null); private boolean invalidHeaderTimeDetected = false; private HeaderElement currentHeaderElement = null; private static final HeaderElement[] HEADER_ELEMENTS = HeaderElement.class.getEnumConstants(); private PiTimeSeriesHeader header = new PiTimeSeriesHeader(); private List<String> qualfiers = new ArrayList<String>(); private long timeStepMillis = 0; private TimeStep timeStep = null; private long startTime = Long.MIN_VALUE; private long endTime = Long.MIN_VALUE; private float missingValue = Float.NaN; private String creationDateText = null; private String creationTimeText = null; private TimeSeriesContentHandler timeSeriesContentHandler = null; /** * For performance reasions the pi time series format alllows that the values are stored in * a separate bin file instead of embedded in the xml file. * The bin file should have same name as the xml file except the extension equals bin * In this case all time series should be equidistant. */ private VirtualInputDir virtualInputDir = VirtualInputDir.NONE; private InputStream binaryInputStream = null; private byte[] byteBuffer = null; private float[] floatBuffer = null; private int bufferPos = 0; private int bufferCount = 0; private XMLStreamReader reader = null; private String virtualFileName = null; private static boolean lenient = false; /** * For backwards compatibility. Earlier versions of the PiTimeSeriesParser were tollerant about the date/time format * and the case insensitive for header element names. * This parser should not accept files that are not valid according to pi_timeseries.xsd * When old adapters are not working you can UseLenientPiTimeSeriesParser temporaray till the adapter is fixed * * @param lenient */ public static void setLenient(boolean lenient) { PiTimeSeriesParser.lenient = lenient; } public PiTimeSeriesParser() { fastDateFormat.setLenient(lenient); } @Override public void parse(XMLStreamReader reader, String virtualFileName, TimeSeriesContentHandler timeSeriesContentHandler) throws Exception { this.reader = reader; this.virtualFileName = virtualFileName; this.timeSeriesContentHandler = timeSeriesContentHandler; String virtualBinFileName = FileUtils.getPathWithOtherExtension(virtualFileName, "bin"); // time zone can be overruled by one or more time zone elements in the pi file this.fastDateFormat.setTimeZone(timeSeriesContentHandler.getDefaultTimeZone()); if (!virtualInputDir.exists(virtualBinFileName)) { parse(); return; } binaryInputStream = virtualInputDir.getInputStream(virtualBinFileName); try { if (byteBuffer == null) { byteBuffer = new byte[BUFFER_SIZE * NumberType.FLOAT_SIZE]; floatBuffer = new float[BUFFER_SIZE]; } parse(); boolean eof = bufferPos == bufferCount && binaryInputStream.read() == -1; if (!eof) throw new IOException("More values available in bin file than expected based on time step and start and end time\n" + FileUtils.getPathWithOtherExtension(virtualFileName, "bin")); } finally { bufferPos = 0; bufferCount = 0; binaryInputStream.close(); binaryInputStream = null; } } private void parse() throws Exception { reader.require(XMLStreamConstants.START_DOCUMENT, null, null); reader.nextTag(); reader.require(XMLStreamConstants.START_ELEMENT, null, "TimeSeries"); reader.nextTag(); while (reader.getEventType() != XMLStreamConstants.END_ELEMENT) { parseTimeZone(); readTimeSeries(); } reader.require(XMLStreamConstants.END_ELEMENT, null, "TimeSeries"); reader.next(); reader.require(XMLStreamConstants.END_DOCUMENT, null, null); } private void readTimeSeries() throws Exception { reader.require(XMLStreamConstants.START_ELEMENT, null, "series"); reader.nextTag(); parseHeader(); if (binaryInputStream == null) { while (reader.getEventType() == XMLStreamConstants.START_ELEMENT && TextUtils.equals(reader.getLocalName(), "event")) { parseEvent(); } if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) { // skip comment reader.require(XMLStreamConstants.START_ELEMENT, null, "comment"); reader.getElementText(); reader.nextTag(); } } else { readValuesFromBinFile(); } reader.require(XMLStreamConstants.END_ELEMENT, null, "series"); reader.nextTag(); } private void parseHeader() throws Exception { reader.require(XMLStreamConstants.START_ELEMENT, null, "header"); if (reader.getAttributeCount() > 0) { throw new Exception("Attributes are not allowed for header element "); } reader.nextTag(); initHeader(); do { detectHeaderElement(); parseHeaderElement(); } while (reader.getEventType() != XMLStreamConstants.END_ELEMENT); if (header.getForecastTime() == Long.MIN_VALUE) header.setForecastTime(startTime); initiateTimeStep(); header.setTimeStep(timeStep); if (!qualfiers.isEmpty()) header.setQualifierIds(qualfiers.toArray(new String[qualfiers.size()])); if (creationDateText != null) { try { long creationTime = fastDateFormat.parseToMillis(creationDateText, creationTimeText); header.setCreationTime(creationTime); } catch (ParseException e) { throw new Exception("Can not parse creation date/time " + creationDateText + ' ' + creationTimeText); } } timeSeriesContentHandler.setNewTimeSeriesHeader(header); if (startTime != Long.MIN_VALUE && endTime != Long.MIN_VALUE) { timeSeriesContentHandler.setEstimatedPeriod(new Period(startTime, endTime)); } reader.require(XMLStreamConstants.END_ELEMENT, null, "header"); reader.nextTag(); } private void parseEvent() throws Exception { assert binaryInputStream == null; reader.require(XMLStreamConstants.START_ELEMENT, null, "event"); String timeText = reader.getAttributeValue(null, "time"); String dateText = reader.getAttributeValue(null, "date"); String valueText = reader.getAttributeValue(null, "value"); String flagText = reader.getAttributeValue(null, "flag"); String commentText = reader.getAttributeValue(null, "comment"); if (timeText == null) throw new Exception("Attribute time is missing"); if (dateText == null) throw new Exception("Attribute date is missing"); if (valueText == null) throw new Exception("Attribute value is missing"); try { timeSeriesContentHandler.setTime(fastDateFormat.parseToMillis(dateText, timeText)); } catch (ParseException e) { throw new Exception("Can not parse " + dateText + ' ' + timeText); } if (flagText == null) { timeSeriesContentHandler.setFlag(0); } else { try { timeSeriesContentHandler.setFlag(TextUtils.parseInt(flagText)); } catch (NumberFormatException e) { throw new Exception("Flag should be an integer " + flagText); } } timeSeriesContentHandler.setComment(commentText); try { float value = TextUtils.parseFloat(valueText); // we can not use the automatic missing value detection of the content handler because the missing value is different for each time series if (value == missingValue) { value = Float.NaN; } else { timeSeriesContentHandler.setValueResolution(TextUtils.getValueResolution(valueText, '.')); } timeSeriesContentHandler.setValue(value); timeSeriesContentHandler.applyCurrentFields(); } catch (NumberFormatException e) { throw new Exception("Value should be a float " + valueText); } reader.nextTag(); reader.require(XMLStreamConstants.END_ELEMENT, null, "event"); reader.nextTag(); } private long parseTime() throws Exception { String dateText = reader.getAttributeValue(null, "date"); if (dateText == null) { throw new Exception("Attribute " + currentHeaderElement + "-date is missing"); } String timeText = reader.getAttributeValue(null, "time"); if (timeText == null) { throw new Exception("Attribute " + currentHeaderElement + "-time is missing"); } long time; try { time = fastDateFormat.parseToMillis(dateText, timeText); } catch (ParseException e) { throw new Exception("Not a valid data time for " + currentHeaderElement + ' ' + dateText + ' ' + timeText, e); } reader.nextTag(); return time; } private long parseTimeStep() throws Exception { String unit = reader.getAttributeValue(null, "unit"); if (unit == null) { throw new Exception("Attribute unit is missing in " + currentHeaderElement); } TimeUnit tu = TimeUnit.get(unit); if (tu != null) { String multiplierText = reader.getAttributeValue(null, "multiplier"); int multiplier; if (multiplierText == null) { multiplier = 1; } else { try { multiplier = Integer.parseInt(multiplierText); } catch (NumberFormatException e) { throw new Exception(ExceptionUtils.getMessage(e), e); } if (multiplier == 0) { throw new Exception("Multiplier is 0"); } } String dividerText = reader.getAttributeValue(null, "divider"); int divider; if (dividerText == null) { divider = 1; } else { try { divider = Integer.parseInt(dividerText); } catch (NumberFormatException e) { throw new Exception(ExceptionUtils.getMessage(e), e); } if (divider == 0) { throw new Exception("dividplier is 0"); } } reader.nextTag(); return tu.getMillis() * multiplier / divider; } else { reader.nextTag(); return 0; } } private void initHeader() { header.clear(); header.setFileDescription(virtualFileName); currentHeaderElement = null; timeStep = null; timeStepMillis = 0; startTime = Long.MIN_VALUE; endTime = Long.MIN_VALUE; missingValue = Float.NaN; creationDateText = null; creationTimeText = "00:00:00"; qualfiers.clear(); } private void readValuesFromBinFile() throws Exception { TimeStep timeStep = header.getTimeStep(); if (!timeStep.isRegular()) { throw new Exception("Only equidistant time step supported when pi events are stored in bin file instead of xml"); } boolean equidistantMillis = timeStep.isEquidistantMillis(); long stepMillis = equidistantMillis ? timeStep.getStepMillis() : Long.MIN_VALUE; try { for (long time = startTime; time <= endTime;) { timeSeriesContentHandler.setTime(time); if (bufferPos == bufferCount) fillBuffer(); float value = floatBuffer[bufferPos++]; // we can not use the automatic missing value detection of the content handler because the missing value is different for each time series if (value == missingValue) value = Float.NaN; timeSeriesContentHandler.setValue(value); timeSeriesContentHandler.applyCurrentFields(); if (equidistantMillis) { time += stepMillis; continue; } time = timeStep.nextTime(time); } } catch (IOException e) { throw new Exception(ExceptionUtils.getMessage(e), e); } } private void fillBuffer() throws IOException { int byteBufferCount = 0; while (byteBufferCount % NumberType.FLOAT_SIZE != 0 || byteBufferCount == 0) { int count = binaryInputStream.read(byteBuffer, byteBufferCount, BUFFER_SIZE * NumberType.FLOAT_SIZE - byteBufferCount); assert count != 0; // see read javadoc if (count == -1) throw new EOFException("Bin file is too short"); byteBufferCount += count; } bufferCount = byteBufferCount / NumberType.FLOAT_SIZE; BinaryUtils.copy(byteBuffer, 0, byteBufferCount, floatBuffer, 0, bufferCount, ByteOrder.LITTLE_ENDIAN); bufferPos = 0; } private void initiateTimeStep() { timeStep = IrregularTimeStep.INSTANCE; //default timestep if (timeStepMillis == 0) { return; } if (timeStepMillis % TimeUnit.MINUTE_MILLIS != 0) { if (!this.invalidHeaderTimeDetected) { if (log.isDebugEnabled()) log.debug("Header timestep and/or start time has not rounded minutes ! Irregular timestep wil be used."); this.invalidHeaderTimeDetected = true; } timeStepMillis = 0; return; } long timeZoneOffsetMillis = -startTime % timeStepMillis; if (timeZoneOffsetMillis % TimeUnit.MINUTE_MILLIS != 0) { if (!this.invalidHeaderTimeDetected) { if (log.isDebugEnabled()) log.debug("Header timestep and/or start time has not rounded minutes ! Irregular timestep wil be used."); this.invalidHeaderTimeDetected = true; } timeStepMillis = 0; return; } timeStep = SimpleEquidistantTimeStep.getInstance(timeStepMillis, timeZoneOffsetMillis); } private void parseTimeZone() throws Exception { if (reader.getEventType() != XMLStreamConstants.START_ELEMENT) return; if (!TextUtils.equals(reader.getLocalName(), "timeZone")) return; try { double offset = Double.parseDouble(reader.getElementText()); TimeZone timeZoneFromDouble = TimeZoneUtils.createTimeZoneFromDouble(offset); this.fastDateFormat.setTimeZone(timeZoneFromDouble); } catch (NumberFormatException e) { throw new Exception("Not valid timeZone format", e); } reader.require(XMLStreamConstants.END_ELEMENT, null, "timeZone"); reader.nextTag(); } @SuppressWarnings({"OverlyLongMethod"}) private void parseHeaderElement() throws Exception { switch (currentHeaderElement) { case type: header.setParameterType(parseType(reader.getElementText())); break; case locationId: header.setLocationId(reader.getElementText()); break; case parameterId: header.setParameterId(reader.getElementText()); break; case qualifierId: qualfiers.add(reader.getElementText()); break; case ensembleId: header.setEnsembleId(reader.getElementText()); break; case ensembleMemberIndex: header.setEnsembleMemberIndex(parseEnsembleMemberIndex(reader.getElementText())); break; case timeStep: timeStepMillis = parseTimeStep(); break; case startDate: startTime = parseTime(); break; case endDate: endTime = parseTime(); break; case forecastDate: header.setForecastTime(parseTime()); break; case missVal: missingValue = parseMissingValue(reader.getElementText()); break; case longName: header.setLongName(reader.getElementText()); break; case stationName: header.setLocationName(reader.getElementText()); break; case units: header.setUnit(reader.getElementText()); break; case sourceOrganisation: header.setSourceOrganisation(reader.getElementText()); break; case sourceSystem: header.setSourceSystem(reader.getElementText()); break; case fileDescription: header.setFileDescription(reader.getElementText()); break; case creationDate: creationDateText = reader.getElementText(); break; case creationTime: creationTimeText = reader.getElementText(); break; case region: header.setRegion(reader.getElementText()); break; case thresholds: parseThresholds(); break; } reader.require(XMLStreamConstants.END_ELEMENT, null, currentHeaderElement.name()); reader.nextTag(); } private void parseThresholds() throws XMLStreamException { reader.nextTag(); ArrayList<String> ids = new ArrayList<String>(); ArrayList<String> names = new ArrayList<String>(); ArrayList<String> stringValues = new ArrayList<String>(); do { if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) { String id = reader.getAttributeValue(null, "id"); String name = reader.getAttributeValue(null, "name"); String stringValue = reader.getAttributeValue(null, "value"); ids.add(id); names.add(name); stringValues.add(stringValue); } reader.nextTag(); } while (!reader.getLocalName().equals(currentHeaderElement.name())); float[] values = new float[stringValues.size()]; for (int i = 0; i < values.length; i++) { values[i] = Float.valueOf(stringValues.get(i)); } header.setHighLevelThresholds(ids.toArray(new String[ids.size()]), names.toArray(new String[names.size()]), values); } private static float parseMissingValue(String gotString) throws Exception { try { return TextUtils.parseFloat(gotString); } catch (NumberFormatException e) { throw new Exception(ExceptionUtils.getMessage(e), e); } } private static int parseEnsembleMemberIndex(String gotString) throws Exception { int index = Integer.parseInt(gotString); if (index < 0) { throw new Exception("Negative ensemble member index not allowed " + gotString); } return index; } private static ParameterType parseType(String gotString) throws Exception { ParameterType type = ParameterType.get(gotString); if (type == null) { throw new Exception("Type in header should be instantaneous or accumulative and not " + gotString); } return type; } private void detectHeaderElement() throws Exception { if (reader.getEventType() != XMLStreamConstants.START_ELEMENT) throw new Exception("header element expected"); String localName = reader.getLocalName(); HeaderElement element; try { element = Enum.valueOf(HeaderElement.class, localName); assert element != null; // contract of valueOf } catch (Exception e) { throw new Exception("Unknown header element: " + localName); } if (currentHeaderElement == element && currentHeaderElement.isMultipleAllowed()) return; if (currentHeaderElement != null && element.ordinal() < currentHeaderElement.ordinal()) { throw new Exception("Header elements in wrong order: " + localName); } if (currentHeaderElement == element) { throw new Exception("Duplicate header element: " + localName); } if (reader.getAttributeCount() > 0 && !element.hasAttributes()) { throw new Exception("Attributes are not allowed for header element " + localName); } int nextOrdinal = currentHeaderElement == null ? 0 : currentHeaderElement.ordinal() + 1; // order is correct and no duplicate so currentHeaderElement can not be last header element assert nextOrdinal < HEADER_ELEMENTS.length; HeaderElement nextHeaderElement = HEADER_ELEMENTS[nextOrdinal]; if (nextHeaderElement.isRequired() && nextHeaderElement != element) { throw new Exception("Required header item missing: " + nextHeaderElement); } currentHeaderElement = element; } public TimeZone getTimeZone() { return fastDateFormat.getTimeZone(); } }