package nl.wldelft.fews.pi; import nl.wldelft.util.*; import nl.wldelft.util.coverage.Geometry; import nl.wldelft.util.coverage.GeometryUtils; import nl.wldelft.util.geodatum.GeoPoint; import nl.wldelft.util.io.VirtualOutputDir; import nl.wldelft.util.io.VirtualOutputDirConsumer; import nl.wldelft.util.io.XmlSerializer; import nl.wldelft.util.timeseries.TimeSeriesContent; import nl.wldelft.util.timeseries.TimeSeriesHeader; import nl.wldelft.util.timeseries.TimeStep; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteOrder; import java.util.Arrays; import java.util.Locale; import java.util.SimpleTimeZone; import java.util.TimeZone; public class PiTimeSeriesSerializer implements XmlSerializer<TimeSeriesContent>, VirtualOutputDirConsumer { private static final int BUFFER_SIZE = 2048; private TimeZone timeZoneNoDst = null; public enum EventDestination { XML_EMBEDDED, SEPARATE_BINARY_FILE, ONLY_HEADERS } public enum EnsembleMemberFormat { INDEX, ID, HIDE } private EventDestination eventDestination = EventDestination.XML_EMBEDDED; private PiVersion version = PiVersion.VERSION_1_2; private EnsembleMemberFormat ensembleMemberFormat = EnsembleMemberFormat.INDEX; private FastDateFormat dateFormat = null; private FastDateFormat timeFormat = null; private TimeSeriesContent timeSeriesContent = null; private XMLStreamWriter writer = null; private VirtualOutputDir virtualOutputDir = null; private OutputStream binaryOutputSteam = null; private byte[] byteBuffer = null; private float[] floatBuffer = null; private int bufferPos = 0; private long[] cachedTimes = new long[4]; private String[] cachedDateStrings = new String[4]; private String[] cachedTimeStrings = new String[4]; private int cachedSeconds = -1; private String cachedSecondsString = null; public PiTimeSeriesSerializer() { } public PiTimeSeriesSerializer(PiVersion version) { this.version = version; } public EventDestination getEventDestination() { return eventDestination; } public void setEventDestination(EventDestination eventDestination) { this.eventDestination = eventDestination; } public PiVersion getVersion() { return version; } public void setVersion(PiVersion version) { if (version == null) throw new IllegalArgumentException("version == null"); this.version = version; } public EnsembleMemberFormat getEnsembleMemberFormat() { return ensembleMemberFormat; } public void setEnsembleMemberFormat(EnsembleMemberFormat ensembleMemberFormat) { if (ensembleMemberFormat == null) throw new IllegalArgumentException("ensembleMemberFormat == null"); this.ensembleMemberFormat = ensembleMemberFormat; } @Override public void setVirtualOutputDir(VirtualOutputDir virtualOutputDir) { this.virtualOutputDir = virtualOutputDir; } @Override public void serialize(TimeSeriesContent timeSeriesContent, XMLStreamWriter streamWriter, String virtualFileName) throws Exception { this.timeSeriesContent = timeSeriesContent; if (timeSeriesContent.getTimeSeriesCount() <=0) throw new IOException("No TimeSeries to be Written to " + virtualFileName); this.writer = streamWriter; if (ensembleMemberFormat == EnsembleMemberFormat.ID && version.getIntId() < PiVersion.VERSION_1_10.getIntId()) { throw new IOException("ensembleMemberId not supported for pi version " + version); } timeSeriesContent.setRequireEnsembleMemberIndices(ensembleMemberFormat == EnsembleMemberFormat.INDEX); if (!ObjectUtils.equals(timeZoneNoDst, this.timeSeriesContent.getDefaultTimeZone())) { Arrays.fill(cachedTimes, Long.MIN_VALUE); timeZoneNoDst = timeSeriesContent.getDefaultTimeZone(); } if (timeZoneNoDst.useDaylightTime()){ //Make timeZone not use day light saving time (DST) because the timeZone field does not use DST therefore // all values must also not use DST timeZoneNoDst = new SimpleTimeZone(timeZoneNoDst.getRawOffset(), timeZoneNoDst.getID(), 0, 0, 0, 0, 0, 0, 0, 0); } dateFormat = FastDateFormat.getInstance("yyyy-MM-dd", timeZoneNoDst, Locale.US, dateFormat); timeFormat = FastDateFormat.getInstance("HH:mm:ss", timeZoneNoDst, Locale.US, timeFormat); String binFileName = FileUtils.getPathWithOtherExtension(virtualFileName, "bin"); if (virtualOutputDir != null) virtualOutputDir.delete(binFileName); if (eventDestination == EventDestination.SEPARATE_BINARY_FILE) { if (virtualOutputDir == null) throw new IllegalStateException("virtualOutputDir == null"); binaryOutputSteam = virtualOutputDir.getOutputStream(binFileName); if (byteBuffer == null) { byteBuffer = new byte[BUFFER_SIZE * NumberType.FLOAT_SIZE]; floatBuffer = new float[BUFFER_SIZE]; } try { serialize(); } finally { bufferPos = 0; binaryOutputSteam.close(); } return; } binaryOutputSteam = null; serialize(); } private void serialize() throws Exception { writer.writeStartDocument(); writer.writeStartElement("TimeSeries"); writer.setDefaultNamespace("http://www.wldelft.nl/fews/PI"); writer.writeNamespace("", "http://www.wldelft.nl/fews/PI"); writer.writeAttribute("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance"); // writer.writeAttribute("xmlns", "http://www.wldelft.nl/fews/PI"); writer.writeAttribute("xsi:schemaLocation", PiSchemaLocations.get("pi_timeseries.xsd")); writer.writeAttribute("version", version.toString()); writeElement("timeZone", String.valueOf((double) timeSeriesContent.getDefaultTimeZone().getRawOffset() / (double) TimeUnit.HOUR_MILLIS)); for (int i = 0, n = timeSeriesContent.getTimeSeriesCount(); i < n; i++) { timeSeriesContent.setTimeSeriesIndex(i); writer.writeStartElement("series"); writeHeader(); writeEvents(); writer.writeEndElement(); } writer.writeEndElement(); writer.writeEndDocument(); if (eventDestination == EventDestination.SEPARATE_BINARY_FILE) flushBinEvents(); } private void writeHeader() throws Exception { TimeSeriesHeader header = timeSeriesContent.getTimeSeriesHeader(); PiTimeSeriesHeader piHeader = header instanceof PiTimeSeriesHeader ? (PiTimeSeriesHeader) header : new PiTimeSeriesHeader(); writer.writeStartElement("header"); writeElement("type", header.getParameterType() == null ? "instantaneous" : header.getParameterType().getName()); writeElement("locationId", header.getLocationId() == null ? "unknown" : header.getLocationId()); writeElement("parameterId", header.getParameterId() == null ? "unknown" : header.getParameterId()); if (version.getIntId() >= PiVersion.VERSION_1_4.getIntId()) { for (int i = 0, n = header.getQualifierCount(); i < n; i++) { writeElement("qualifierId", header.getQualifierId(i)); } } if (version.getIntId() >= PiVersion.VERSION_1_4.getIntId() && header.getEnsembleId() != null && !header.getEnsembleId().equals("main")) { if (ensembleMemberFormat != EnsembleMemberFormat.HIDE) { writeOptionalElement("ensembleId", header.getEnsembleId()); writeOptionalElement(ensembleMemberFormat == EnsembleMemberFormat.INDEX ? "ensembleMemberIndex" : "ensembleMemberId", header.getEnsembleMemberId()); } } writeTimeStep(header); writePeriod(); if (version.getIntId() >= PiVersion.VERSION_1_5.getIntId()) writeTime("forecastDate", header.getForecastTime(), 2); writeElement("missVal", Float.toString(timeSeriesContent.getDefaultMissingValue())); writeOptionalElement("longName", piHeader.getLongName()); writeOptionalElement("stationName", header.getLocationName()); if (version.getIntId() >= PiVersion.VERSION_1_7.getIntId()) writeCoordinates(header); writeOptionalElement("units", header.getUnit()); writeOptionalElement("sourceOrganisation", piHeader.getSourceOrganisation()); writeOptionalElement("sourceSystem", piHeader.getSourceSystem()); writeOptionalElement("fileDescription", piHeader.getFileDescription()); if (header.getCreationTime() != Long.MIN_VALUE) { updateDateTimeStringCache(header.getCreationTime(), 3); writeElement("creationDate", cachedDateStrings[3]); writeElement("creationTime", cachedTimeStrings[3]); } writeOptionalElement("region", piHeader.getRegion()); if (header.getHighLevelThresholdCount() > 0) { writeHighThresholds(header); } writer.writeEndElement(); } private void writeProperties(Properties properties) throws XMLStreamException { if (version.getIntId() < PiVersion.VERSION_1_13.getIntId()) return; if (properties.isEmpty()) { writer.writeEmptyElement("properties"); // next events don't have properties return; } writer.writeStartElement("properties"); for (int i = 0, n = properties.size(); i < n; i++) { String key = properties.getKey(i); PropertyType type = properties.getType(i); switch (type) { case STRING: writer.writeEmptyElement("string"); writer.writeAttribute("key", key); writer.writeAttribute("value", properties.getString(i)); continue; case INT: writer.writeEmptyElement("int"); writer.writeAttribute("key", key); writer.writeAttribute("value", TextUtils.toString(properties.getInt(i))); continue; case FLOAT: writer.writeEmptyElement("float"); writer.writeAttribute("key", key); writer.writeAttribute("value", Float.toString(properties.getInt(i))); continue; case DOUBLE: writer.writeEmptyElement("float"); writer.writeAttribute("key", key); writer.writeAttribute("value", Double.toString(properties.getDouble(i))); continue; case BOOLEAN: writer.writeEmptyElement("bool"); writer.writeAttribute("key", key); writer.writeAttribute("value", Boolean.toString(properties.getBool(i))); continue; case DATE_TIME: writer.writeEmptyElement("dateTime"); writer.writeAttribute("key", key); long dateTime = properties.getDateTime(i); writer.writeAttribute("date", dateFormat.format(dateTime)); writer.writeAttribute("time", timeFormat.format(dateTime)); } } writer.writeEndElement(); } private void writeHighThresholds(TimeSeriesHeader header) throws XMLStreamException { boolean thresholdsAvailable = false; boolean firstThreshold = true; for (int i = 0; i < header.getHighLevelThresholdCount(); i++) { String id = header.getHighLevelThresholdId(i); String name = header.getHighLevelThresholdName(i); //Note: here value can be NaN for LevelThresholdValues that have different values for different aggregationTimeSpans configured. float value = header.getHighLevelThresholdValue(i); if (Float.isNaN(value)) { continue; } thresholdsAvailable = true; if (firstThreshold) { writer.writeStartElement("thresholds"); firstThreshold = false; } writer.writeStartElement("highLevelThreshold"); writer.writeAttribute("id", id); writer.writeAttribute("name", name); writer.writeAttribute("value", Float.toString(value)); if (version.getIntId() >= PiVersion.VERSION_1_14.getIntId()) { String groupId = header.getHighLevelThresholdGroupId(i); if (groupId != null){ writer.writeAttribute("groupId", groupId); String groupName = header.getHighLevelThresholdGroupName(i); if (groupName != null) writer.writeAttribute("groupName", groupName); } } writer.writeEndElement(); } if (thresholdsAvailable) { writer.writeEndElement(); } } private void writePeriod() throws XMLStreamException { TimeStep timeStep = timeSeriesContent.getTimeSeriesHeader().getTimeStep(); Period period = timeSeriesContent.getTimeSeriesPeriod(); Period headerPeriod; if (period == Period.NEVER) { // create a dummy period long now = timeStep.nearestTime(System.currentTimeMillis()); headerPeriod = new Period(now, now); } else { headerPeriod = period; } writeTime("startDate", headerPeriod.getStartTime(), 0); writeTime("endDate", headerPeriod.getEndTime(), 1); } private void writeCoordinates(TimeSeriesHeader header) throws XMLStreamException { Geometry geometry = header.getGeometry(); if (geometry == null) return; GeoPoint geoPoint = GeometryUtils.getPoint(geometry, 0); writeElement("lat", Double.toString(geoPoint.getLatitude())); writeElement("lon", Double.toString(geoPoint.getLongitude())); writeElement("x", Double.toString(geoPoint.getX())); writeElement("y", Double.toString(geoPoint.getY())); writeElement("z", Double.toString(geoPoint.getZ())); } private void updateDateTimeStringCache(long time, int cacheIndex) { if (cachedTimes[cacheIndex] == time) return; cachedTimes[cacheIndex] = time; cachedDateStrings[cacheIndex] = dateFormat.format(time); cachedTimeStrings[cacheIndex] = timeFormat.format(time); } private void writeTime(String name, long time, int cacheIndex) throws XMLStreamException { if (time == Long.MIN_VALUE) return; writer.writeEmptyElement(name); updateDateTimeStringCache(time, cacheIndex); writer.writeAttribute("date", cachedDateStrings[cacheIndex]); writer.writeAttribute("time", cachedTimeStrings[cacheIndex]); } private void writeTimeStep(TimeSeriesHeader header) throws XMLStreamException { writer.writeEmptyElement("timeStep"); TimeStep timeStep = header.getTimeStep(); // todo add support for month time step if (timeStep.isEquidistantMillis()) { writer.writeAttribute("unit", "second"); int seconds = (int) (timeStep.getStepMillis() / 1000); if (cachedSeconds != seconds) { cachedSecondsString = TextUtils.toString(seconds); cachedSeconds = seconds; } writer.writeAttribute("multiplier", cachedSecondsString); } else { writer.writeAttribute("unit", "nonequidistant"); } } private void writeEvents() throws Exception { switch (eventDestination) { case ONLY_HEADERS: return; case SEPARATE_BINARY_FILE: writeBinEvents(); return; case XML_EMBEDDED: writeXmlEvents(); } } private void writeBinEvents() throws Exception { for (int i = 0, n = timeSeriesContent.getContentTimeCount(); i < n; i++) { timeSeriesContent.setContentTimeIndex(i); if (!timeSeriesContent.isTimeAvailable()) continue; if (bufferPos == BUFFER_SIZE) flushBinEvents(); floatBuffer[bufferPos++] = timeSeriesContent.getValue(); } } private void flushBinEvents() throws IOException { if (bufferPos == 0) return; BinaryUtils.copy(floatBuffer, 0, bufferPos, byteBuffer, 0, bufferPos * NumberType.FLOAT_SIZE, ByteOrder.LITTLE_ENDIAN); binaryOutputSteam.write(byteBuffer, 0, bufferPos * NumberType.FLOAT_SIZE); bufferPos = 0; } private void writeXmlEvents() throws XMLStreamException { Properties properties = Properties.NONE; for (int i = 0, n = timeSeriesContent.getContentTimeCount(); i < n; i++) { timeSeriesContent.setContentTimeIndex(i); if (!timeSeriesContent.isTimeAvailable()) continue; Properties newProperties = timeSeriesContent.getProperties(); if (!newProperties.equals(properties)) { properties = newProperties; writeProperties(properties); } long time = timeSeriesContent.getTime(); writer.writeEmptyElement("event"); writer.writeAttribute("date", dateFormat.format(time)); writer.writeAttribute("time", timeFormat.format(time)); writer.writeAttribute("value", timeSeriesContent.getValue('.')); writer.writeAttribute("flag", timeSeriesContent.getStringFlag()); if (version.getIntId() >= PiVersion.VERSION_1_11.getIntId()) { String flagSource = timeSeriesContent.getFlagSource(); if (flagSource != null) writer.writeAttribute("flagSource", flagSource); } if (version.getIntId() >= PiVersion.VERSION_1_3.getIntId()) { String comment = timeSeriesContent.getComment(); if (comment != null) writer.writeAttribute("comment", comment); } if (version.getIntId() >= PiVersion.VERSION_1_10.getIntId()) { String user = timeSeriesContent.getUser(); if (user != null) writer.writeAttribute("user", user); } } } private void writeOptionalElement(String elementName, String s) throws XMLStreamException { if (s == null) return; if (s.trim().isEmpty()) return; writeElement(elementName, s); } private void writeElement(String name, String value) throws XMLStreamException { writer.writeStartElement(name); writer.writeCharacters(value); writer.writeEndElement(); } }