package nl.wldelft.fews.pi;
import nl.wldelft.util.*;
import nl.wldelft.util.coverage.Geometry;
import nl.wldelft.util.coverage.GeometryUtils;
import nl.wldelft.util.geodatum.GeoPoint;
import nl.wldelft.util.io.VirtualOutputDir;
import nl.wldelft.util.io.VirtualOutputDirConsumer;
import nl.wldelft.util.io.XmlSerializer;
import nl.wldelft.util.timeseries.TimeSeriesContent;
import nl.wldelft.util.timeseries.TimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeStep;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Locale;
import java.util.SimpleTimeZone;
import java.util.TimeZone;
public class PiTimeSeriesSerializer implements XmlSerializer<TimeSeriesContent>, VirtualOutputDirConsumer {
private static final int BUFFER_SIZE = 2048;
private TimeZone timeZoneNoDst = null;
public enum EventDestination {
XML_EMBEDDED, SEPARATE_BINARY_FILE, ONLY_HEADERS
}
public enum EnsembleMemberFormat {
INDEX, ID, HIDE
}
private EventDestination eventDestination = EventDestination.XML_EMBEDDED;
private PiVersion version = PiVersion.VERSION_1_2;
private EnsembleMemberFormat ensembleMemberFormat = EnsembleMemberFormat.INDEX;
private FastDateFormat dateFormat = null;
private FastDateFormat timeFormat = null;
private TimeSeriesContent timeSeriesContent = null;
private XMLStreamWriter writer = null;
private VirtualOutputDir virtualOutputDir = null;
private OutputStream binaryOutputSteam = null;
private byte[] byteBuffer = null;
private float[] floatBuffer = null;
private int bufferPos = 0;
private long[] cachedTimes = new long[4];
private String[] cachedDateStrings = new String[4];
private String[] cachedTimeStrings = new String[4];
private int cachedSeconds = -1;
private String cachedSecondsString = null;
public PiTimeSeriesSerializer() {
}
public PiTimeSeriesSerializer(PiVersion version) {
this.version = version;
}
public EventDestination getEventDestination() {
return eventDestination;
}
public void setEventDestination(EventDestination eventDestination) {
this.eventDestination = eventDestination;
}
public PiVersion getVersion() {
return version;
}
public void setVersion(PiVersion version) {
if (version == null)
throw new IllegalArgumentException("version == null");
this.version = version;
}
public EnsembleMemberFormat getEnsembleMemberFormat() {
return ensembleMemberFormat;
}
public void setEnsembleMemberFormat(EnsembleMemberFormat ensembleMemberFormat) {
if (ensembleMemberFormat == null)
throw new IllegalArgumentException("ensembleMemberFormat == null");
this.ensembleMemberFormat = ensembleMemberFormat;
}
@Override
public void setVirtualOutputDir(VirtualOutputDir virtualOutputDir) {
this.virtualOutputDir = virtualOutputDir;
}
@Override
public void serialize(TimeSeriesContent timeSeriesContent, XMLStreamWriter streamWriter, String virtualFileName) throws Exception {
this.timeSeriesContent = timeSeriesContent;
if (timeSeriesContent.getTimeSeriesCount() <=0) throw new IOException("No TimeSeries to be Written to " + virtualFileName);
this.writer = streamWriter;
if (ensembleMemberFormat == EnsembleMemberFormat.ID && version.getIntId() < PiVersion.VERSION_1_10.getIntId()) {
throw new IOException("ensembleMemberId not supported for pi version " + version);
}
timeSeriesContent.setRequireEnsembleMemberIndices(ensembleMemberFormat == EnsembleMemberFormat.INDEX);
if (!ObjectUtils.equals(timeZoneNoDst, this.timeSeriesContent.getDefaultTimeZone())) {
Arrays.fill(cachedTimes, Long.MIN_VALUE);
timeZoneNoDst = timeSeriesContent.getDefaultTimeZone();
}
if (timeZoneNoDst.useDaylightTime()){
//Make timeZone not use day light saving time (DST) because the timeZone field does not use DST therefore
// all values must also not use DST
timeZoneNoDst = new SimpleTimeZone(timeZoneNoDst.getRawOffset(), timeZoneNoDst.getID(), 0, 0, 0, 0, 0, 0, 0, 0);
}
dateFormat = FastDateFormat.getInstance("yyyy-MM-dd", timeZoneNoDst, Locale.US, dateFormat);
timeFormat = FastDateFormat.getInstance("HH:mm:ss", timeZoneNoDst, Locale.US, timeFormat);
String binFileName = FileUtils.getPathWithOtherExtension(virtualFileName, "bin");
if (virtualOutputDir != null) virtualOutputDir.delete(binFileName);
if (eventDestination == EventDestination.SEPARATE_BINARY_FILE) {
if (virtualOutputDir == null)
throw new IllegalStateException("virtualOutputDir == null");
binaryOutputSteam = virtualOutputDir.getOutputStream(binFileName);
if (byteBuffer == null) {
byteBuffer = new byte[BUFFER_SIZE * NumberType.FLOAT_SIZE];
floatBuffer = new float[BUFFER_SIZE];
}
try {
serialize();
} finally {
bufferPos = 0;
binaryOutputSteam.close();
}
return;
}
binaryOutputSteam = null;
serialize();
}
private void serialize() throws Exception {
writer.writeStartDocument();
writer.writeStartElement("TimeSeries");
writer.setDefaultNamespace("http://www.wldelft.nl/fews/PI");
writer.writeNamespace("", "http://www.wldelft.nl/fews/PI");
writer.writeAttribute("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance");
// writer.writeAttribute("xmlns", "http://www.wldelft.nl/fews/PI");
writer.writeAttribute("xsi:schemaLocation", PiSchemaLocations.get("pi_timeseries.xsd"));
writer.writeAttribute("version", version.toString());
writeElement("timeZone", String.valueOf((double) timeSeriesContent.getDefaultTimeZone().getRawOffset() / (double) TimeUnit.HOUR_MILLIS));
for (int i = 0, n = timeSeriesContent.getTimeSeriesCount(); i < n; i++) {
timeSeriesContent.setTimeSeriesIndex(i);
writer.writeStartElement("series");
writeHeader();
writeEvents();
writer.writeEndElement();
}
writer.writeEndElement();
writer.writeEndDocument();
if (eventDestination == EventDestination.SEPARATE_BINARY_FILE) flushBinEvents();
}
private void writeHeader() throws Exception {
TimeSeriesHeader header = timeSeriesContent.getTimeSeriesHeader();
PiTimeSeriesHeader piHeader = header instanceof PiTimeSeriesHeader ? (PiTimeSeriesHeader) header : new PiTimeSeriesHeader();
writer.writeStartElement("header");
writeElement("type", header.getParameterType() == null ? "instantaneous" : header.getParameterType().getName());
writeElement("locationId", header.getLocationId() == null ? "unknown" : header.getLocationId());
writeElement("parameterId", header.getParameterId() == null ? "unknown" : header.getParameterId());
if (version.getIntId() >= PiVersion.VERSION_1_4.getIntId()) {
for (int i = 0, n = header.getQualifierCount(); i < n; i++) {
writeElement("qualifierId", header.getQualifierId(i));
}
}
if (version.getIntId() >= PiVersion.VERSION_1_4.getIntId() && header.getEnsembleId() != null && !header.getEnsembleId().equals("main")) {
if (ensembleMemberFormat != EnsembleMemberFormat.HIDE) {
writeOptionalElement("ensembleId", header.getEnsembleId());
writeOptionalElement(ensembleMemberFormat == EnsembleMemberFormat.INDEX
? "ensembleMemberIndex" : "ensembleMemberId", header.getEnsembleMemberId());
}
}
writeTimeStep(header);
writePeriod();
if (version.getIntId() >= PiVersion.VERSION_1_5.getIntId()) writeTime("forecastDate", header.getForecastTime(), 2);
writeElement("missVal", Float.toString(timeSeriesContent.getDefaultMissingValue()));
writeOptionalElement("longName", piHeader.getLongName());
writeOptionalElement("stationName", header.getLocationName());
if (version.getIntId() >= PiVersion.VERSION_1_7.getIntId()) writeCoordinates(header);
writeOptionalElement("units", header.getUnit());
writeOptionalElement("sourceOrganisation", piHeader.getSourceOrganisation());
writeOptionalElement("sourceSystem", piHeader.getSourceSystem());
writeOptionalElement("fileDescription", piHeader.getFileDescription());
if (header.getCreationTime() != Long.MIN_VALUE) {
updateDateTimeStringCache(header.getCreationTime(), 3);
writeElement("creationDate", cachedDateStrings[3]);
writeElement("creationTime", cachedTimeStrings[3]);
}
writeOptionalElement("region", piHeader.getRegion());
if (header.getHighLevelThresholdCount() > 0) {
writeHighThresholds(header);
}
writer.writeEndElement();
}
private void writeProperties(Properties properties) throws XMLStreamException {
if (version.getIntId() < PiVersion.VERSION_1_13.getIntId()) return;
if (properties.isEmpty()) {
writer.writeEmptyElement("properties");
// next events don't have properties
return;
}
writer.writeStartElement("properties");
for (int i = 0, n = properties.size(); i < n; i++) {
String key = properties.getKey(i);
PropertyType type = properties.getType(i);
switch (type) {
case STRING:
writer.writeEmptyElement("string");
writer.writeAttribute("key", key);
writer.writeAttribute("value", properties.getString(i));
continue;
case INT:
writer.writeEmptyElement("int");
writer.writeAttribute("key", key);
writer.writeAttribute("value", TextUtils.toString(properties.getInt(i)));
continue;
case FLOAT:
writer.writeEmptyElement("float");
writer.writeAttribute("key", key);
writer.writeAttribute("value", Float.toString(properties.getInt(i)));
continue;
case DOUBLE:
writer.writeEmptyElement("float");
writer.writeAttribute("key", key);
writer.writeAttribute("value", Double.toString(properties.getDouble(i)));
continue;
case BOOLEAN:
writer.writeEmptyElement("bool");
writer.writeAttribute("key", key);
writer.writeAttribute("value", Boolean.toString(properties.getBool(i)));
continue;
case DATE_TIME:
writer.writeEmptyElement("dateTime");
writer.writeAttribute("key", key);
long dateTime = properties.getDateTime(i);
writer.writeAttribute("date", dateFormat.format(dateTime));
writer.writeAttribute("time", timeFormat.format(dateTime));
}
}
writer.writeEndElement();
}
private void writeHighThresholds(TimeSeriesHeader header) throws XMLStreamException {
boolean thresholdsAvailable = false;
boolean firstThreshold = true;
for (int i = 0; i < header.getHighLevelThresholdCount(); i++) {
String id = header.getHighLevelThresholdId(i);
String name = header.getHighLevelThresholdName(i);
//Note: here value can be NaN for LevelThresholdValues that have different values for different aggregationTimeSpans configured.
float value = header.getHighLevelThresholdValue(i);
if (Float.isNaN(value)) {
continue;
}
thresholdsAvailable = true;
if (firstThreshold) {
writer.writeStartElement("thresholds");
firstThreshold = false;
}
writer.writeStartElement("highLevelThreshold");
writer.writeAttribute("id", id);
writer.writeAttribute("name", name);
writer.writeAttribute("value", Float.toString(value));
if (version.getIntId() >= PiVersion.VERSION_1_14.getIntId()) {
String groupId = header.getHighLevelThresholdGroupId(i);
if (groupId != null){
writer.writeAttribute("groupId", groupId);
String groupName = header.getHighLevelThresholdGroupName(i);
if (groupName != null) writer.writeAttribute("groupName", groupName);
}
}
writer.writeEndElement();
}
if (thresholdsAvailable) {
writer.writeEndElement();
}
}
private void writePeriod() throws XMLStreamException {
TimeStep timeStep = timeSeriesContent.getTimeSeriesHeader().getTimeStep();
Period period = timeSeriesContent.getTimeSeriesPeriod();
Period headerPeriod;
if (period == Period.NEVER) {
// create a dummy period
long now = timeStep.nearestTime(System.currentTimeMillis());
headerPeriod = new Period(now, now);
} else {
headerPeriod = period;
}
writeTime("startDate", headerPeriod.getStartTime(), 0);
writeTime("endDate", headerPeriod.getEndTime(), 1);
}
private void writeCoordinates(TimeSeriesHeader header) throws XMLStreamException {
Geometry geometry = header.getGeometry();
if (geometry == null) return;
GeoPoint geoPoint = GeometryUtils.getPoint(geometry, 0);
writeElement("lat", Double.toString(geoPoint.getLatitude()));
writeElement("lon", Double.toString(geoPoint.getLongitude()));
writeElement("x", Double.toString(geoPoint.getX()));
writeElement("y", Double.toString(geoPoint.getY()));
writeElement("z", Double.toString(geoPoint.getZ()));
}
private void updateDateTimeStringCache(long time, int cacheIndex) {
if (cachedTimes[cacheIndex] == time) return;
cachedTimes[cacheIndex] = time;
cachedDateStrings[cacheIndex] = dateFormat.format(time);
cachedTimeStrings[cacheIndex] = timeFormat.format(time);
}
private void writeTime(String name, long time, int cacheIndex) throws XMLStreamException {
if (time == Long.MIN_VALUE) return;
writer.writeEmptyElement(name);
updateDateTimeStringCache(time, cacheIndex);
writer.writeAttribute("date", cachedDateStrings[cacheIndex]);
writer.writeAttribute("time", cachedTimeStrings[cacheIndex]);
}
private void writeTimeStep(TimeSeriesHeader header) throws XMLStreamException {
writer.writeEmptyElement("timeStep");
TimeStep timeStep = header.getTimeStep();
// todo add support for month time step
if (timeStep.isEquidistantMillis()) {
writer.writeAttribute("unit", "second");
int seconds = (int) (timeStep.getStepMillis() / 1000);
if (cachedSeconds != seconds) {
cachedSecondsString = TextUtils.toString(seconds);
cachedSeconds = seconds;
}
writer.writeAttribute("multiplier", cachedSecondsString);
} else {
writer.writeAttribute("unit", "nonequidistant");
}
}
private void writeEvents() throws Exception {
switch (eventDestination) {
case ONLY_HEADERS:
return;
case SEPARATE_BINARY_FILE:
writeBinEvents();
return;
case XML_EMBEDDED:
writeXmlEvents();
}
}
private void writeBinEvents() throws Exception {
for (int i = 0, n = timeSeriesContent.getContentTimeCount(); i < n; i++) {
timeSeriesContent.setContentTimeIndex(i);
if (!timeSeriesContent.isTimeAvailable()) continue;
if (bufferPos == BUFFER_SIZE) flushBinEvents();
floatBuffer[bufferPos++] = timeSeriesContent.getValue();
}
}
private void flushBinEvents() throws IOException {
if (bufferPos == 0) return;
BinaryUtils.copy(floatBuffer, 0, bufferPos, byteBuffer, 0, bufferPos * NumberType.FLOAT_SIZE, ByteOrder.LITTLE_ENDIAN);
binaryOutputSteam.write(byteBuffer, 0, bufferPos * NumberType.FLOAT_SIZE);
bufferPos = 0;
}
private void writeXmlEvents() throws XMLStreamException {
Properties properties = Properties.NONE;
for (int i = 0, n = timeSeriesContent.getContentTimeCount(); i < n; i++) {
timeSeriesContent.setContentTimeIndex(i);
if (!timeSeriesContent.isTimeAvailable()) continue;
Properties newProperties = timeSeriesContent.getProperties();
if (!newProperties.equals(properties)) {
properties = newProperties;
writeProperties(properties);
}
long time = timeSeriesContent.getTime();
writer.writeEmptyElement("event");
writer.writeAttribute("date", dateFormat.format(time));
writer.writeAttribute("time", timeFormat.format(time));
writer.writeAttribute("value", timeSeriesContent.getValue('.'));
writer.writeAttribute("flag", timeSeriesContent.getStringFlag());
if (version.getIntId() >= PiVersion.VERSION_1_11.getIntId()) {
String flagSource = timeSeriesContent.getFlagSource();
if (flagSource != null) writer.writeAttribute("flagSource", flagSource);
}
if (version.getIntId() >= PiVersion.VERSION_1_3.getIntId()) {
String comment = timeSeriesContent.getComment();
if (comment != null) writer.writeAttribute("comment", comment);
}
if (version.getIntId() >= PiVersion.VERSION_1_10.getIntId()) {
String user = timeSeriesContent.getUser();
if (user != null) writer.writeAttribute("user", user);
}
}
}
private void writeOptionalElement(String elementName, String s) throws XMLStreamException {
if (s == null) return;
if (s.trim().isEmpty()) return;
writeElement(elementName, s);
}
private void writeElement(String name, String value) throws XMLStreamException {
writer.writeStartElement(name);
writer.writeCharacters(value);
writer.writeEndElement();
}
}
|