You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

package nl.wldelft.fews.pi;

import nl.wldelft.util.*;
import nl.wldelft.util.coverage.Geometry;
import nl.wldelft.util.coverage.GeometryUtils;
import nl.wldelft.util.geodatum.GeoPoint;
import nl.wldelft.util.io.VirtualOutputDir;
import nl.wldelft.util.io.VirtualOutputDirConsumer;
import nl.wldelft.util.io.XmlSerializer;
import nl.wldelft.util.timeseries.TimeSeriesContent;
import nl.wldelft.util.timeseries.TimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeStep;

import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Locale;
import java.util.SimpleTimeZone;
import java.util.TimeZone;

public class PiTimeSeriesSerializer implements XmlSerializer<TimeSeriesContent>, VirtualOutputDirConsumer {
    private static final int BUFFER_SIZE = 2048;
    private TimeZone timeZoneNoDst = null;

    public enum EventDestination {
        XML_EMBEDDED, SEPARATE_BINARY_FILE, ONLY_HEADERS
    }

    public enum EnsembleMemberFormat {
        INDEX, ID, HIDE
    }

    private EventDestination eventDestination = EventDestination.XML_EMBEDDED;
    private PiVersion version = PiVersion.VERSION_1_2;
    private EnsembleMemberFormat ensembleMemberFormat = EnsembleMemberFormat.INDEX;

    private FastDateFormat dateFormat = null;
    private FastDateFormat timeFormat = null;
    private TimeSeriesContent timeSeriesContent = null;
    private XMLStreamWriter writer = null;
    private VirtualOutputDir virtualOutputDir = null;
    private OutputStream binaryOutputSteam = null;
    private byte[] byteBuffer = null;
    private float[] floatBuffer = null;
    private int bufferPos = 0;
    private long[] cachedTimes = new long[4];
    private String[] cachedDateStrings = new String[4];
    private String[] cachedTimeStrings = new String[4];
    private int cachedSeconds = -1;
    private String cachedSecondsString = null;

    public PiTimeSeriesSerializer() {
    }

    public PiTimeSeriesSerializer(PiVersion version) {
        this.version = version;
    }

    public EventDestination getEventDestination() {
        return eventDestination;
    }

    public void setEventDestination(EventDestination eventDestination) {
        this.eventDestination = eventDestination;
    }

    public PiVersion getVersion() {
        return version;
    }

    public void setVersion(PiVersion version) {
        if (version == null)
            throw new IllegalArgumentException("version == null");

        this.version = version;
    }

    public EnsembleMemberFormat getEnsembleMemberFormat() {
        return ensembleMemberFormat;
    }

    public void setEnsembleMemberFormat(EnsembleMemberFormat ensembleMemberFormat) {
        if (ensembleMemberFormat == null)
            throw new IllegalArgumentException("ensembleMemberFormat == null");

        this.ensembleMemberFormat = ensembleMemberFormat;
    }

    @Override
    public void setVirtualOutputDir(VirtualOutputDir virtualOutputDir) {
        this.virtualOutputDir = virtualOutputDir;
    }

    @Override
    public void serialize(TimeSeriesContent timeSeriesContent, XMLStreamWriter streamWriter, String virtualFileName) throws Exception {
        this.timeSeriesContent = timeSeriesContent;
        if (timeSeriesContent.getTimeSeriesCount() <=0) throw new IOException("No TimeSeries to be Written to " + virtualFileName);
        this.writer = streamWriter;

        if (ensembleMemberFormat == EnsembleMemberFormat.ID && version.getIntId() < PiVersion.VERSION_1_10.getIntId()) {
            throw new IOException("ensembleMemberId not supported for pi version " + version);
        }

        timeSeriesContent.setRequireEnsembleMemberIndices(ensembleMemberFormat == EnsembleMemberFormat.INDEX);

        if (!ObjectUtils.equals(timeZoneNoDst, this.timeSeriesContent.getDefaultTimeZone())) {
            Arrays.fill(cachedTimes, Long.MIN_VALUE);
            timeZoneNoDst = timeSeriesContent.getDefaultTimeZone();
        }
        if (timeZoneNoDst.useDaylightTime()){
            //Make timeZone not use day light saving time (DST) because the timeZone field does not use DST therefore
            // all values must also not use DST
            timeZoneNoDst = new SimpleTimeZone(timeZoneNoDst.getRawOffset(), timeZoneNoDst.getID(), 0, 0, 0, 0, 0, 0, 0, 0);
        }
        dateFormat = FastDateFormat.getInstance("yyyy-MM-dd", timeZoneNoDst, Locale.US, dateFormat);
        timeFormat = FastDateFormat.getInstance("HH:mm:ss", timeZoneNoDst, Locale.US, timeFormat);
        String binFileName = FileUtils.getPathWithOtherExtension(virtualFileName, "bin");

        if (virtualOutputDir != null) virtualOutputDir.delete(binFileName);

        if (eventDestination == EventDestination.SEPARATE_BINARY_FILE) {
            if (virtualOutputDir == null)
                throw new IllegalStateException("virtualOutputDir == null");
            binaryOutputSteam = virtualOutputDir.getOutputStream(binFileName);
            if (byteBuffer == null) {
                byteBuffer = new byte[BUFFER_SIZE * NumberType.FLOAT_SIZE];
                floatBuffer = new float[BUFFER_SIZE];
            }
            try {
                serialize();
            } finally {
                bufferPos = 0;
                binaryOutputSteam.close();
            }
            return;
        }

        binaryOutputSteam = null;
        serialize();
    }

    private void serialize() throws Exception {
        writer.writeStartDocument();
        writer.writeStartElement("TimeSeries");
        writer.setDefaultNamespace("http://www.wldelft.nl/fews/PI");
        writer.writeNamespace("", "http://www.wldelft.nl/fews/PI");
        writer.writeAttribute("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance");
//        writer.writeAttribute("xmlns", "http://www.wldelft.nl/fews/PI");
        writer.writeAttribute("xsi:schemaLocation", PiSchemaLocations.get("pi_timeseries.xsd"));
        writer.writeAttribute("version", version.toString());
        writeElement("timeZone", String.valueOf((double) timeSeriesContent.getDefaultTimeZone().getRawOffset() / (double) TimeUnit.HOUR_MILLIS));
        for (int i = 0, n = timeSeriesContent.getTimeSeriesCount(); i < n; i++) {
            timeSeriesContent.setTimeSeriesIndex(i);
            writer.writeStartElement("series");
            writeHeader();
            writeEvents();
            writer.writeEndElement();
        }
        writer.writeEndElement();
        writer.writeEndDocument();
        if (eventDestination == EventDestination.SEPARATE_BINARY_FILE) flushBinEvents();
    }

    private void writeHeader() throws Exception {
        TimeSeriesHeader header = timeSeriesContent.getTimeSeriesHeader();
        PiTimeSeriesHeader piHeader = header instanceof PiTimeSeriesHeader ? (PiTimeSeriesHeader) header : new PiTimeSeriesHeader();

        writer.writeStartElement("header");
        writeElement("type", header.getParameterType() == null ? "instantaneous" : header.getParameterType().getName());
        writeElement("locationId", header.getLocationId() == null ? "unknown" : header.getLocationId());
        writeElement("parameterId", header.getParameterId() == null ? "unknown" : header.getParameterId());
        if (version.getIntId() >= PiVersion.VERSION_1_4.getIntId()) {
            for (int i = 0, n = header.getQualifierCount(); i < n; i++) {
                writeElement("qualifierId", header.getQualifierId(i));
            }
        }
        if (version.getIntId() >= PiVersion.VERSION_1_4.getIntId() && header.getEnsembleId() != null && !header.getEnsembleId().equals("main")) {
            if (ensembleMemberFormat != EnsembleMemberFormat.HIDE) {
                writeOptionalElement("ensembleId", header.getEnsembleId());
                writeOptionalElement(ensembleMemberFormat == EnsembleMemberFormat.INDEX
                        ? "ensembleMemberIndex" : "ensembleMemberId", header.getEnsembleMemberId());
            }
        }

        writeTimeStep(header);
        writePeriod();
        if (version.getIntId() >= PiVersion.VERSION_1_5.getIntId()) writeTime("forecastDate", header.getForecastTime(), 2);

        writeElement("missVal", Float.toString(timeSeriesContent.getDefaultMissingValue()));
        writeOptionalElement("longName", piHeader.getLongName());
        writeOptionalElement("stationName", header.getLocationName());

        if (version.getIntId() >= PiVersion.VERSION_1_7.getIntId()) writeCoordinates(header);
        writeOptionalElement("units", header.getUnit());
        writeOptionalElement("sourceOrganisation", piHeader.getSourceOrganisation());
        writeOptionalElement("sourceSystem", piHeader.getSourceSystem());
        writeOptionalElement("fileDescription", piHeader.getFileDescription());


        if (header.getCreationTime() != Long.MIN_VALUE) {
            updateDateTimeStringCache(header.getCreationTime(), 3);
            writeElement("creationDate", cachedDateStrings[3]);
            writeElement("creationTime", cachedTimeStrings[3]);
        }

        writeOptionalElement("region", piHeader.getRegion());

        if (header.getHighLevelThresholdCount() > 0) {
            writeHighThresholds(header);
        }

        writer.writeEndElement();
    }

    private void writeProperties(Properties properties) throws XMLStreamException {
        if (version.getIntId() < PiVersion.VERSION_1_13.getIntId()) return;
        if (properties.isEmpty()) {
            writer.writeEmptyElement("properties");
            // next events don't have properties
            return;
        }
        writer.writeStartElement("properties");
        for (int i = 0, n = properties.size(); i < n; i++) {
            String key = properties.getKey(i);
            PropertyType type = properties.getType(i);
            switch (type) {
                case STRING:
                    writer.writeEmptyElement("string");
                    writer.writeAttribute("key", key);
                    writer.writeAttribute("value", properties.getString(i));
                    continue;
                case INT:
                    writer.writeEmptyElement("int");
                    writer.writeAttribute("key", key);
                    writer.writeAttribute("value", TextUtils.toString(properties.getInt(i)));
                    continue;
                case FLOAT:
                    writer.writeEmptyElement("float");
                    writer.writeAttribute("key", key);
                    writer.writeAttribute("value", Float.toString(properties.getInt(i)));
                    continue;
                case DOUBLE:
                    writer.writeEmptyElement("float");
                    writer.writeAttribute("key", key);
                    writer.writeAttribute("value", Double.toString(properties.getDouble(i)));
                    continue;
                case BOOLEAN:
                    writer.writeEmptyElement("bool");
                    writer.writeAttribute("key", key);
                    writer.writeAttribute("value", Boolean.toString(properties.getBool(i)));
                    continue;
                case DATE_TIME:
                    writer.writeEmptyElement("dateTime");
                    writer.writeAttribute("key", key);
                    long dateTime = properties.getDateTime(i);
                    writer.writeAttribute("date", dateFormat.format(dateTime));
                    writer.writeAttribute("time", timeFormat.format(dateTime));
            }
        }
        writer.writeEndElement();
    }

    private void writeHighThresholds(TimeSeriesHeader header) throws XMLStreamException {
        boolean thresholdsAvailable = false;
        boolean firstThreshold = true;

        for (int i = 0; i < header.getHighLevelThresholdCount(); i++) {
            String id = header.getHighLevelThresholdId(i);
            String name = header.getHighLevelThresholdName(i);
            //Note: here value can be NaN for LevelThresholdValues that have different values for different aggregationTimeSpans configured.
            float value = header.getHighLevelThresholdValue(i);
            if (Float.isNaN(value)) {
                continue;
            }
            thresholdsAvailable = true;
            if (firstThreshold) {
                writer.writeStartElement("thresholds");
                firstThreshold = false;
            }
            writer.writeStartElement("highLevelThreshold");
            writer.writeAttribute("id", id);
            writer.writeAttribute("name", name);
            writer.writeAttribute("value", Float.toString(value));
            if (version.getIntId() >= PiVersion.VERSION_1_14.getIntId()) {
                String groupId = header.getHighLevelThresholdGroupId(i);
                if (groupId != null){
                    writer.writeAttribute("groupId", groupId);
                    String groupName = header.getHighLevelThresholdGroupName(i);
                    if (groupName != null) writer.writeAttribute("groupName", groupName);
                }
            }
            writer.writeEndElement();
        }

        if (thresholdsAvailable) {
            writer.writeEndElement();
        }
    }

    private void writePeriod() throws XMLStreamException {
        TimeStep timeStep = timeSeriesContent.getTimeSeriesHeader().getTimeStep();
        Period period = timeSeriesContent.getTimeSeriesPeriod();
        Period headerPeriod;
        if (period == Period.NEVER) {
            // create a dummy period
            long now = timeStep.nearestTime(System.currentTimeMillis());
            headerPeriod = new Period(now, now);
        } else {
            headerPeriod = period;
        }

        writeTime("startDate", headerPeriod.getStartTime(), 0);
        writeTime("endDate", headerPeriod.getEndTime(), 1);
    }

    private void writeCoordinates(TimeSeriesHeader header) throws XMLStreamException {
        Geometry geometry = header.getGeometry();
        if (geometry == null) return;
        GeoPoint geoPoint = GeometryUtils.getPoint(geometry, 0);
        writeElement("lat", Double.toString(geoPoint.getLatitude()));
        writeElement("lon", Double.toString(geoPoint.getLongitude()));
        writeElement("x", Double.toString(geoPoint.getX()));
        writeElement("y", Double.toString(geoPoint.getY()));
        writeElement("z", Double.toString(geoPoint.getZ()));
    }

    private void updateDateTimeStringCache(long time, int cacheIndex) {
        if (cachedTimes[cacheIndex] == time) return;
        cachedTimes[cacheIndex] = time;
        cachedDateStrings[cacheIndex] = dateFormat.format(time);
        cachedTimeStrings[cacheIndex] = timeFormat.format(time);
    }

    private void writeTime(String name, long time, int cacheIndex) throws XMLStreamException {
        if (time == Long.MIN_VALUE) return;
        writer.writeEmptyElement(name);

        updateDateTimeStringCache(time, cacheIndex);
        writer.writeAttribute("date", cachedDateStrings[cacheIndex]);
        writer.writeAttribute("time", cachedTimeStrings[cacheIndex]);
    }

    private void writeTimeStep(TimeSeriesHeader header) throws XMLStreamException {
        writer.writeEmptyElement("timeStep");
        TimeStep timeStep = header.getTimeStep();

        // todo add support for month time step
        if (timeStep.isEquidistantMillis()) {
            writer.writeAttribute("unit", "second");
            int seconds = (int) (timeStep.getStepMillis() / 1000);
            if (cachedSeconds != seconds) {
                cachedSecondsString = TextUtils.toString(seconds);
                cachedSeconds = seconds;
            }
            writer.writeAttribute("multiplier", cachedSecondsString);
        } else {
            writer.writeAttribute("unit", "nonequidistant");
        }
    }

    private void writeEvents() throws Exception {
        switch (eventDestination) {
            case ONLY_HEADERS:
                return;
            case SEPARATE_BINARY_FILE:
                writeBinEvents();
                return;
            case XML_EMBEDDED:
                writeXmlEvents();
        }
    }

    private void writeBinEvents() throws Exception {
        for (int i = 0, n = timeSeriesContent.getContentTimeCount(); i < n; i++) {
            timeSeriesContent.setContentTimeIndex(i);
            if (!timeSeriesContent.isTimeAvailable()) continue;
            if (bufferPos == BUFFER_SIZE) flushBinEvents();
            floatBuffer[bufferPos++] = timeSeriesContent.getValue();
        }
    }

    private void flushBinEvents() throws IOException {
        if (bufferPos == 0) return;
        BinaryUtils.copy(floatBuffer, 0, bufferPos, byteBuffer, 0, bufferPos * NumberType.FLOAT_SIZE, ByteOrder.LITTLE_ENDIAN);
        binaryOutputSteam.write(byteBuffer, 0, bufferPos * NumberType.FLOAT_SIZE);
        bufferPos = 0;
    }

    private void writeXmlEvents() throws XMLStreamException {
        Properties properties = Properties.NONE;
        for (int i = 0, n = timeSeriesContent.getContentTimeCount(); i < n; i++) {
            timeSeriesContent.setContentTimeIndex(i);
            if (!timeSeriesContent.isTimeAvailable()) continue;
            Properties newProperties = timeSeriesContent.getProperties();
            if (!newProperties.equals(properties)) {
                properties = newProperties;
                writeProperties(properties);
            }
            long time = timeSeriesContent.getTime();
            writer.writeEmptyElement("event");
            writer.writeAttribute("date", dateFormat.format(time));
            writer.writeAttribute("time", timeFormat.format(time));
            writer.writeAttribute("value", timeSeriesContent.getValue('.'));
            writer.writeAttribute("flag", timeSeriesContent.getStringFlag());
            if (version.getIntId() >= PiVersion.VERSION_1_11.getIntId()) {
                String flagSource = timeSeriesContent.getFlagSource();
                if (flagSource != null) writer.writeAttribute("flagSource", flagSource);
            }
            if (version.getIntId() >= PiVersion.VERSION_1_3.getIntId()) {
                String comment = timeSeriesContent.getComment();
                if (comment != null) writer.writeAttribute("comment", comment);
            }
            if (version.getIntId() >= PiVersion.VERSION_1_10.getIntId()) {
                String user = timeSeriesContent.getUser();
                if (user != null) writer.writeAttribute("user", user);
            }
        }
    }

    private void writeOptionalElement(String elementName, String s) throws XMLStreamException {
        if (s == null) return;
        if (s.trim().isEmpty()) return;
        writeElement(elementName, s);
    }

    private void writeElement(String name, String value) throws XMLStreamException {
        writer.writeStartElement(name);
        writer.writeCharacters(value);
        writer.writeEndElement();
    }
}

 

 

  • No labels