package nl.wldelft.fews.pi;
import nl.wldelft.util.DateUtils;
import nl.wldelft.util.ExceptionUtils;
import nl.wldelft.util.FastDateFormat;
import nl.wldelft.util.FileUtils;
import nl.wldelft.util.Period;
import nl.wldelft.util.TextUtils;
import nl.wldelft.util.TimeUnit;
import nl.wldelft.util.TimeZoneUtils;
import nl.wldelft.util.NumberType;
import nl.wldelft.util.BinaryUtils;
import nl.wldelft.util.io.VirtualInputDir;
import nl.wldelft.util.io.VirtualInputDirConsumer;
import nl.wldelft.util.io.XmlParser;
import nl.wldelft.util.timeseries.IrregularTimeStep;
import nl.wldelft.util.timeseries.ParameterType;
import nl.wldelft.util.timeseries.SimpleEquidistantTimeStep;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import nl.wldelft.util.timeseries.TimeStep;
import org.apache.log4j.Logger;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.EOFException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.TimeZone;
import java.nio.ByteOrder;
public class PiTimeSeriesParser implements XmlParser<TimeSeriesContentHandler>, VirtualInputDirConsumer {
private static final Logger log = Logger.getLogger(PiTimeSeriesParser.class);
private static final int BUFFER_SIZE = 2048;
@Override
public void setVirtualInputDir(VirtualInputDir virtualInputDir) {
this.virtualInputDir = virtualInputDir;
}
private enum HeaderElement {
type(F.R), locationId(F.R),
parameterId(F.R), qualifierId(F.M), ensembleId, ensembleMemberIndex,
timeStep(F.R | F.A), startDate(F.R | F.A), endDate(F.R | F.A), forecastDate(F.A),
missVal, longName, stationName, units,
sourceOrganisation, sourceSystem, fileDescription,
creationDate, creationTime, region, thresholds;
interface F {
int A = 1 << 0; // attributes
int R = 1 << 1; // required;
int M = 1 << 2; // multple;
}
private final int flags;
HeaderElement() {
this.flags = 0;
}
HeaderElement(int flags) {
this.flags = flags;
}
public boolean isRequired() {
return (flags & F.R) != 0;
}
public boolean hasAttributes() {
return (flags & F.A) != 0;
}
public boolean isMultipleAllowed() {
return (flags & F.M) != 0;
}
}
// fastDateFormat is used to keep track of last time zone and lenient
private FastDateFormat fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd", "HH:mm:ss", DateUtils.GMT, Locale.US, null);
private boolean invalidHeaderTimeDetected = false;
private HeaderElement currentHeaderElement = null;
private static final HeaderElement[] HEADER_ELEMENTS = HeaderElement.class.getEnumConstants();
private PiTimeSeriesHeader header = new PiTimeSeriesHeader();
private List<String> qualfiers = new ArrayList<String>();
private long timeStepMillis = 0;
private TimeStep timeStep = null;
private long startTime = Long.MIN_VALUE;
private long endTime = Long.MIN_VALUE;
private float missingValue = Float.NaN;
private String creationDateText = null;
private String creationTimeText = null;
private TimeSeriesContentHandler timeSeriesContentHandler = null;
/**
* For performance reasions the pi time series format alllows that the values are stored in
* a separate bin file instead of embedded in the xml file.
* The bin file should have same name as the xml file except the extension equals bin
* In this case all time series should be equidistant.
*/
private VirtualInputDir virtualInputDir = VirtualInputDir.NONE;
private InputStream binaryInputStream = null;
private byte[] byteBuffer = null;
private float[] floatBuffer = null;
private int bufferPos = 0;
private int bufferCount = 0;
private XMLStreamReader reader = null;
private String virtualFileName = null;
private static boolean lenient = false;
/**
* For backwards compatibility. Earlier versions of the PiTimeSeriesParser were tollerant about the date/time format
* and the case insensitive for header element names.
* This parser should not accept files that are not valid according to pi_timeseries.xsd
* When old adapters are not working you can UseLenientPiTimeSeriesParser temporaray till the adapter is fixed
*
* @param lenient
*/
public static void setLenient(boolean lenient) {
PiTimeSeriesParser.lenient = lenient;
}
public PiTimeSeriesParser() {
fastDateFormat.setLenient(lenient);
}
@Override
public void parse(XMLStreamReader reader, String virtualFileName, TimeSeriesContentHandler timeSeriesContentHandler) throws Exception {
this.reader = reader;
this.virtualFileName = virtualFileName;
this.timeSeriesContentHandler = timeSeriesContentHandler;
String virtualBinFileName = FileUtils.getPathWithOtherExtension(virtualFileName, "bin");
// time zone can be overruled by one or more time zone elements in the pi file
this.fastDateFormat.setTimeZone(timeSeriesContentHandler.getDefaultTimeZone());
if (!virtualInputDir.exists(virtualBinFileName)) {
parse();
return;
}
binaryInputStream = virtualInputDir.getInputStream(virtualBinFileName);
try {
if (byteBuffer == null) {
byteBuffer = new byte[BUFFER_SIZE * NumberType.FLOAT_SIZE];
floatBuffer = new float[BUFFER_SIZE];
}
parse();
boolean eof = bufferPos == bufferCount && binaryInputStream.read() == -1;
if (!eof)
throw new IOException("More values available in bin file than expected based on time step and start and end time\n" + FileUtils.getPathWithOtherExtension(virtualFileName, "bin"));
} finally {
bufferPos = 0;
bufferCount = 0;
binaryInputStream.close();
binaryInputStream = null;
}
}
private void parse() throws Exception {
reader.require(XMLStreamConstants.START_DOCUMENT, null, null);
reader.nextTag();
reader.require(XMLStreamConstants.START_ELEMENT, null, "TimeSeries");
reader.nextTag();
while (reader.getEventType() != XMLStreamConstants.END_ELEMENT) {
parseTimeZone();
readTimeSeries();
}
reader.require(XMLStreamConstants.END_ELEMENT, null, "TimeSeries");
reader.next();
reader.require(XMLStreamConstants.END_DOCUMENT, null, null);
}
private void readTimeSeries() throws Exception {
reader.require(XMLStreamConstants.START_ELEMENT, null, "series");
reader.nextTag();
parseHeader();
if (binaryInputStream == null) {
while (reader.getEventType() == XMLStreamConstants.START_ELEMENT && TextUtils.equals(reader.getLocalName(), "event")) {
parseEvent();
}
if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) {
// skip comment
reader.require(XMLStreamConstants.START_ELEMENT, null, "comment");
reader.getElementText();
reader.nextTag();
}
} else {
readValuesFromBinFile();
}
reader.require(XMLStreamConstants.END_ELEMENT, null, "series");
reader.nextTag();
}
private void parseHeader() throws Exception {
reader.require(XMLStreamConstants.START_ELEMENT, null, "header");
if (reader.getAttributeCount() > 0) {
throw new Exception("Attributes are not allowed for header element ");
}
reader.nextTag();
initHeader();
do {
detectHeaderElement();
parseHeaderElement();
} while (reader.getEventType() != XMLStreamConstants.END_ELEMENT);
if (header.getForecastTime() == Long.MIN_VALUE) header.setForecastTime(startTime);
initiateTimeStep();
header.setTimeStep(timeStep);
if (!qualfiers.isEmpty()) header.setQualifierIds(qualfiers.toArray(new String[qualfiers.size()]));
if (creationDateText != null) {
try {
long creationTime = fastDateFormat.parseToMillis(creationDateText, creationTimeText);
header.setCreationTime(creationTime);
} catch (ParseException e) {
throw new Exception("Can not parse creation date/time " + creationDateText + ' ' + creationTimeText);
}
}
timeSeriesContentHandler.setNewTimeSeriesHeader(header);
if (startTime != Long.MIN_VALUE && endTime != Long.MIN_VALUE) {
timeSeriesContentHandler.setEstimatedPeriod(new Period(startTime, endTime));
}
reader.require(XMLStreamConstants.END_ELEMENT, null, "header");
reader.nextTag();
}
private void parseEvent() throws Exception {
assert binaryInputStream == null;
reader.require(XMLStreamConstants.START_ELEMENT, null, "event");
String timeText = reader.getAttributeValue(null, "time");
String dateText = reader.getAttributeValue(null, "date");
String valueText = reader.getAttributeValue(null, "value");
String flagText = reader.getAttributeValue(null, "flag");
String commentText = reader.getAttributeValue(null, "comment");
if (timeText == null)
throw new Exception("Attribute time is missing");
if (dateText == null)
throw new Exception("Attribute date is missing");
if (valueText == null)
throw new Exception("Attribute value is missing");
try {
timeSeriesContentHandler.setTime(fastDateFormat.parseToMillis(dateText, timeText));
} catch (ParseException e) {
throw new Exception("Can not parse " + dateText + ' ' + timeText);
}
if (flagText == null) {
timeSeriesContentHandler.setFlag(0);
} else {
try {
timeSeriesContentHandler.setFlag(TextUtils.parseInt(flagText));
} catch (NumberFormatException e) {
throw new Exception("Flag should be an integer " + flagText);
}
}
timeSeriesContentHandler.setComment(commentText);
try {
float value = TextUtils.parseFloat(valueText);
// we can not use the automatic missing value detection of the content handler because the missing value is different for each time series
if (value == missingValue) {
value = Float.NaN;
} else {
timeSeriesContentHandler.setValueResolution(TextUtils.getValueResolution(valueText, '.'));
}
timeSeriesContentHandler.setValue(value);
timeSeriesContentHandler.applyCurrentFields();
} catch (NumberFormatException e) {
throw new Exception("Value should be a float " + valueText);
}
reader.nextTag();
reader.require(XMLStreamConstants.END_ELEMENT, null, "event");
reader.nextTag();
}
private long parseTime() throws Exception {
String dateText = reader.getAttributeValue(null, "date");
if (dateText == null) {
throw new Exception("Attribute " + currentHeaderElement +
"-date is missing");
}
String timeText = reader.getAttributeValue(null, "time");
if (timeText == null) {
throw new Exception("Attribute " + currentHeaderElement +
"-time is missing");
}
long time;
try {
time = fastDateFormat.parseToMillis(dateText, timeText);
} catch (ParseException e) {
throw new Exception("Not a valid data time for "
+ currentHeaderElement + ' ' + dateText + ' ' + timeText, e);
}
reader.nextTag();
return time;
}
private long parseTimeStep() throws Exception {
String unit = reader.getAttributeValue(null, "unit");
if (unit == null) {
throw new Exception("Attribute unit is missing in " + currentHeaderElement);
}
TimeUnit tu = TimeUnit.get(unit);
if (tu != null) {
String multiplierText = reader.getAttributeValue(null, "multiplier");
int multiplier;
if (multiplierText == null) {
multiplier = 1;
} else {
try {
multiplier = Integer.parseInt(multiplierText);
} catch (NumberFormatException e) {
throw new Exception(ExceptionUtils.getMessage(e), e);
}
if (multiplier == 0) {
throw new Exception("Multiplier is 0");
}
}
String dividerText = reader.getAttributeValue(null, "divider");
int divider;
if (dividerText == null) {
divider = 1;
} else {
try {
divider = Integer.parseInt(dividerText);
} catch (NumberFormatException e) {
throw new Exception(ExceptionUtils.getMessage(e), e);
}
if (divider == 0) {
throw new Exception("dividplier is 0");
}
}
reader.nextTag();
return tu.getMillis() * multiplier / divider;
} else {
reader.nextTag();
return 0;
}
}
private void initHeader() {
header.clear();
header.setFileDescription(virtualFileName);
currentHeaderElement = null;
timeStep = null;
timeStepMillis = 0;
startTime = Long.MIN_VALUE;
endTime = Long.MIN_VALUE;
missingValue = Float.NaN;
creationDateText = null;
creationTimeText = "00:00:00";
qualfiers.clear();
}
private void readValuesFromBinFile() throws Exception {
TimeStep timeStep = header.getTimeStep();
if (!timeStep.isRegular()) {
throw new Exception("Only equidistant time step supported when pi events are stored in bin file instead of xml");
}
boolean equidistantMillis = timeStep.isEquidistantMillis();
long stepMillis = equidistantMillis ? timeStep.getStepMillis() : Long.MIN_VALUE;
try {
for (long time = startTime; time <= endTime;) {
timeSeriesContentHandler.setTime(time);
if (bufferPos == bufferCount) fillBuffer();
float value = floatBuffer[bufferPos++];
// we can not use the automatic missing value detection of the content handler because the missing value is different for each time series
if (value == missingValue) value = Float.NaN;
timeSeriesContentHandler.setValue(value);
timeSeriesContentHandler.applyCurrentFields();
if (equidistantMillis) {
time += stepMillis;
continue;
}
time = timeStep.nextTime(time);
}
} catch (IOException e) {
throw new Exception(ExceptionUtils.getMessage(e), e);
}
}
private void fillBuffer() throws IOException {
int byteBufferCount = 0;
while (byteBufferCount % NumberType.FLOAT_SIZE != 0 || byteBufferCount == 0) {
int count = binaryInputStream.read(byteBuffer, byteBufferCount, BUFFER_SIZE * NumberType.FLOAT_SIZE - byteBufferCount);
assert count != 0; // see read javadoc
if (count == -1) throw new EOFException("Bin file is too short");
byteBufferCount += count;
}
bufferCount = byteBufferCount / NumberType.FLOAT_SIZE;
BinaryUtils.copy(byteBuffer, 0, byteBufferCount, floatBuffer, 0, bufferCount, ByteOrder.LITTLE_ENDIAN);
bufferPos = 0;
}
private void initiateTimeStep() {
timeStep = IrregularTimeStep.INSTANCE; //default timestep
if (timeStepMillis == 0) {
return;
}
if (timeStepMillis % TimeUnit.MINUTE_MILLIS != 0) {
if (!this.invalidHeaderTimeDetected) {
if (log.isDebugEnabled()) log.debug("Header timestep and/or start time has not rounded minutes ! Irregular timestep wil be used.");
this.invalidHeaderTimeDetected = true;
}
timeStepMillis = 0;
return;
}
long timeZoneOffsetMillis = -startTime % timeStepMillis;
if (timeZoneOffsetMillis % TimeUnit.MINUTE_MILLIS != 0) {
if (!this.invalidHeaderTimeDetected) {
if (log.isDebugEnabled()) log.debug("Header timestep and/or start time has not rounded minutes ! Irregular timestep wil be used.");
this.invalidHeaderTimeDetected = true;
}
timeStepMillis = 0;
return;
}
timeStep = SimpleEquidistantTimeStep.getInstance(timeStepMillis, timeZoneOffsetMillis);
}
private void parseTimeZone() throws Exception {
if (reader.getEventType() != XMLStreamConstants.START_ELEMENT) return;
if (!TextUtils.equals(reader.getLocalName(), "timeZone")) return;
try {
double offset = Double.parseDouble(reader.getElementText());
TimeZone timeZoneFromDouble = TimeZoneUtils.createTimeZoneFromDouble(offset);
this.fastDateFormat.setTimeZone(timeZoneFromDouble);
} catch (NumberFormatException e) {
throw new Exception("Not valid timeZone format", e);
}
reader.require(XMLStreamConstants.END_ELEMENT, null, "timeZone");
reader.nextTag();
}
@SuppressWarnings({"OverlyLongMethod"})
private void parseHeaderElement() throws Exception {
switch (currentHeaderElement) {
case type:
header.setParameterType(parseType(reader.getElementText()));
break;
case locationId:
header.setLocationId(reader.getElementText());
break;
case parameterId:
header.setParameterId(reader.getElementText());
break;
case qualifierId:
qualfiers.add(reader.getElementText());
break;
case ensembleId:
header.setEnsembleId(reader.getElementText());
break;
case ensembleMemberIndex:
header.setEnsembleMemberIndex(parseEnsembleMemberIndex(reader.getElementText()));
break;
case timeStep:
timeStepMillis = parseTimeStep();
break;
case startDate:
startTime = parseTime();
break;
case endDate:
endTime = parseTime();
break;
case forecastDate:
header.setForecastTime(parseTime());
break;
case missVal:
missingValue = parseMissingValue(reader.getElementText());
break;
case longName:
header.setLongName(reader.getElementText());
break;
case stationName:
header.setLocationName(reader.getElementText());
break;
case units:
header.setUnit(reader.getElementText());
break;
case sourceOrganisation:
header.setSourceOrganisation(reader.getElementText());
break;
case sourceSystem:
header.setSourceSystem(reader.getElementText());
break;
case fileDescription:
header.setFileDescription(reader.getElementText());
break;
case creationDate:
creationDateText = reader.getElementText();
break;
case creationTime:
creationTimeText = reader.getElementText();
break;
case region:
header.setRegion(reader.getElementText());
break;
case thresholds:
parseThresholds();
break;
}
reader.require(XMLStreamConstants.END_ELEMENT, null, currentHeaderElement.name());
reader.nextTag();
}
private void parseThresholds() throws XMLStreamException {
reader.nextTag();
ArrayList<String> ids = new ArrayList<String>();
ArrayList<String> names = new ArrayList<String>();
ArrayList<String> stringValues = new ArrayList<String>();
do {
if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) {
String id = reader.getAttributeValue(null, "id");
String name = reader.getAttributeValue(null, "name");
String stringValue = reader.getAttributeValue(null, "value");
ids.add(id);
names.add(name);
stringValues.add(stringValue);
}
reader.nextTag();
} while (!reader.getLocalName().equals(currentHeaderElement.name()));
float[] values = new float[stringValues.size()];
for (int i = 0; i < values.length; i++) {
values[i] = Float.valueOf(stringValues.get(i));
}
header.setHighLevelThresholds(ids.toArray(new String[ids.size()]), names.toArray(new String[names.size()]), values);
}
private static float parseMissingValue(String gotString) throws Exception {
try {
return TextUtils.parseFloat(gotString);
} catch (NumberFormatException e) {
throw new Exception(ExceptionUtils.getMessage(e), e);
}
}
private static int parseEnsembleMemberIndex(String gotString) throws Exception {
int index = Integer.parseInt(gotString);
if (index < 0) {
throw new Exception("Negative ensemble member index not allowed " + gotString);
}
return index;
}
private static ParameterType parseType(String gotString) throws Exception {
ParameterType type = ParameterType.get(gotString);
if (type == null) {
throw new Exception("Type in header should be instantaneous or accumulative and not " + gotString);
}
return type;
}
private void detectHeaderElement() throws Exception {
if (reader.getEventType() != XMLStreamConstants.START_ELEMENT)
throw new Exception("header element expected");
String localName = reader.getLocalName();
HeaderElement element;
try {
element = Enum.valueOf(HeaderElement.class, localName);
assert element != null; // contract of valueOf
} catch (Exception e) {
throw new Exception("Unknown header element: " + localName);
}
if (currentHeaderElement == element && currentHeaderElement.isMultipleAllowed()) return;
if (currentHeaderElement != null && element.ordinal() < currentHeaderElement.ordinal()) {
throw new Exception("Header elements in wrong order: " + localName);
}
if (currentHeaderElement == element) {
throw new Exception("Duplicate header element: " + localName);
}
if (reader.getAttributeCount() > 0 && !element.hasAttributes()) {
throw new Exception("Attributes are not allowed for header element " + localName);
}
int nextOrdinal = currentHeaderElement == null ? 0 : currentHeaderElement.ordinal() + 1;
// order is correct and no duplicate so currentHeaderElement can not be last header element
assert nextOrdinal < HEADER_ELEMENTS.length;
HeaderElement nextHeaderElement = HEADER_ELEMENTS[nextOrdinal];
if (nextHeaderElement.isRequired() && nextHeaderElement != element) {
throw new Exception("Required header item missing: " + nextHeaderElement);
}
currentHeaderElement = element;
}
public TimeZone getTimeZone() {
return fastDateFormat.getTimeZone();
}
}
|