package nl.wldelft.fews.system.plugin.dataImporttimeseriesparsers;
import comnl.fasterxmlwldelft.jacksonutil.core.JsonFactoryFastDateFormat;
import comnl.fasterxmlwldelft.jacksonutil.core.JsonParserPeriod;
import com.fasterxml.jackson.core.JsonToken;
import nl.wldelft.util.PeriodPeriodConsumer;
import nl.wldelft.util.PeriodConsumerTextUtils;
import nl.wldelft.util.PropertiesTimeUnit;
import nl.wldelft.util.PropertiesConsumerio.BackupServerUrlConsumer;
import nl.wldelft.util.io.TextUtilsServerParser;
import nl.wldelft.util.io.ServerParserXmlParser;
import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import nl.wldelft.util.timeseries.TimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer;
import org.apache.log4j.Logger;
import javajavax.xml.iostream.IOExceptionXMLInputFactory;
import javajavax.xml.iostream.InputStreamXMLStreamConstants;
import javajavax.xml.netstream.ConnectExceptionXMLStreamReader;
import java.netio.HttpURLConnectionIOException;
import java.netio.URLInputStream;
import java.utilnet.DateHttpURLConnection;
import java.utilnet.HashMapURL;
import java.util.MapLocale;
/**
* Time DDSCseries parser developedfor the bydataware HKVservices. See https://api.ddsc.nl/api/v1/timeseries
*
* The Upgradedservice tocan supportbe v2called by Deltares. See See https://api.ddsc.nl/api/v2/timeseries
* <p>as follows:
*
* http://servername:portnumber/rest/read/data/<point ID>/<start-date>/<end-date>/<stationname>
*
* Service calls the timeseries service using the following URL, where the uuid defines the unique timeserie. Using properties a mapping has to be made from parameter,location to uuid. :
* <p>
* https://ddsc.lizard.net/api/v2/timeseries/cc0a8831-7758-4dd6-999c-f607c026d176/?start=1421816000000&end=1454595236646&format=json
* <p>
* The response will contain a json file with events in the following format.
* <p>
* events: [
{
timestamp: 1449366444000,
max: 1013.6,
min: 1013.6
},
{
timestamp: 1449367344000,
max: 1013.58,
min: 1013.58
},
The Importer currently uses the max value to store a value.
*/
public class DdscTimeSeriesServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer, PropertiesConsumer {
private static final Logger log = Logger.getLogger(DdscTimeSeriesServerParser.class);
public static final String DATETIME = "timestamp";
public static final String VALUE = "max";
public static final int MISSING_VALUE = -999;
public static final String EVENTS = "events";
private TimeSeriesHeader[] headers = null;
private Period period = null;
private final Map uuidMap = new HashMap<>();
@SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"})
@Override
public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
this.headers = timeSeriesHeaders;
}
@Override
public void setPeriod(Period period) {
this.period = period;
}
@Override
public void parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws IOException {
if (period == Period.ANY_TIME) throw new IllegalArgumentException("contentHandler.getWantedPeriod() == Period.ANY_TIME");
Date startDate = period.getStartDate();
Date endDate = period.getEndDate();
String periodQuery = "?start=" + startDate.getTime() + "&end=" + endDate.getTime();
for (int i = 0; i < headers.length; i++) {
TimeSeriesHeader header = headers[i];
try {
DefaultTimeSeriesHeader defaultTimeSeriesHeader = new DefaultTimeSeriesHeader();
defaultTimeSeriesHeader.setLocationId(header.getLocationId());
defaultTimeSeriesHeader.setParameterId(header.getParameterId());
contentHandler.addMissingValue(MISSING_VALUE);
contentHandler.createTimeSeriesHeaderAlias(i, defaultTimeSeriesHeader);
contentHandler.setTimeSeriesHeader(i);
String key = header.getParameterId() + "," + header.getLocationId();
String uuid = (String) uuidMap.get(key);
if (uuid == null) {
log.warn("No uuid mapping found for '" + key + "'");
continue;
}
//noinspection StringConcatenationMissingWhitespace
URL timeSeriesUrl = new URL(url.toExternalForm() + uuid + "/" + periodQuery +
"&format=json");
log.info("Processing timeseries url: " + timeSeriesUrl);
try (InputStream timeSeriesInputStream = openHttpInputStream(timeSeriesUrl.toExternalForm(), username, password)) {
if (timeSeriesInputStream != null) {
parseEvents(contentHandler, timeSeriesInputStream);
}
}
} catch (ConnectException e) {
log.warn("Can not connect to " + url + ' ' + e.getMessage(), eThe exact URL used to request the attached file is http://chaddataware01:10003/rest/read/data/19839/2014-11-01T18:00/2014-11-02T03:00/1ha.
* Depending on the data point we want to access, the server name (chaddataware01), port (10003), and/or the point ID (19839) will change
*
*/
public class DataWareTimeSeriesParser implements ServerParser<TimeSeriesContentHandler>, XmlParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer, BackupServerUrlConsumer {
public static final String ARRAY_OF_DATA_ELEMENT = "ArrayOfData";
public static final String DATA_ELEMENT = "Data";
public static final String KEY_ELEMENT = "Key";
public static final String TIME_ELEMENT = "Time";
public static final String VALUE_ELEMENT = "Value";
public static final String QUALITY_ELEMENT = "Quality";
private TimeSeriesHeader[] headers = null;
private final DefaultTimeSeriesHeader header = new DefaultTimeSeriesHeader();
private Period period = null;
private FastDateFormat gmtDateFormat = null;
private FastDateFormat gmtTimeFormat = null;
private static final Logger log = Logger.getLogger(DataWareTimeSeriesParser.class);
private int timeoutMillis = 0;
XMLStreamReader reader = null;
@Override
public void parse(XMLStreamReader reader, String virtualFileName, TimeSeriesContentHandler contentHandler) throws Exception {
this.reader = reader;
contentHandler.addMissingValueRange(-999.9999f, -999f);
reader.require(XMLStreamConstants.START_DOCUMENT, null, null);
XmlStreamReaderUtils.goTo(reader, ARRAY_OF_DATA_ELEMENT);
do {
return;
// go to the beginning of the } catch (Exception e) {
array. But don't go past the end array. That's the break condition.
logXmlStreamReaderUtils.warn("Can not parse data for " + header + ' ' + e.getMessage(), e)goTo(reader, DATA_ELEMENT, ARRAY_OF_DATA_ELEMENT);
if (TextUtils.equals(reader.getLocalName(), ARRAY_OF_DATA_ELEMENT)) break;
}
}XmlStreamReaderUtils.goTo(reader, KEY_ELEMENT);
}
private static InputStream openHttpInputStream(String restUrl, String username, String password) throws Exception {
// parameter was already set. We could do a sanity check here...
URL url = new URL(restUrl);
// todo check if HttpURLConnectionwe connectionneed =to (HttpURLConnection) url.openConnection();
// just want to do an HTTP GET here
compare the element text with the header parameter that was set.
// assumption connection.setRequestMethod("GET");
// give it 15 seconds to respond
is that the param name in the resturl should be leading.
connection.setReadTimeout(15 * 1000);
// header.setParameterId(reader.getElementText());
connectionXmlStreamReaderUtils.setRequestPropertygoTo("username"reader, usernameTIME_ELEMENT);
connection.setRequestProperty("password", password);
connection.connect(contentHandler.setTime(contentHandler.getDefaultTimeZone(),"yyyy-MM-dd HH:mm:ss.SSS", reader.getElementText());
return connection.getInputStream();
}
static void parseEvents(TimeSeriesContentHandler contentHandler, InputStream inputStream) throws IOException {
XmlStreamReaderUtils.goTo(reader, VALUE_ELEMENT);
if (inputStream != null) {
contentHandler.setValue('.', reader.getElementText());
try (JsonParser jsonParser = new JsonFactory()XmlStreamReaderUtils.createParser(inputStream)) {goTo(reader, QUALITY_ELEMENT);
boolean startParsingEvents = false;
contentHandler.setFlag(reader.getElementText());
contentHandler.setTimeSeriesHeader(header);
for (JsonToken token; (token = jsonParsercontentHandler.nextTokenapplyCurrentFields()) != null; ) {
}
ifwhile (token == JsonToken.FIELD_NAME) {reader.hasNext() );
}
private void parseXmlStream(TimeSeriesContentHandler contentHandler, InputStream inputStream) throws Exception {
StringXMLInputFactory fieldNamef = jsonParserXMLInputFactory.getCurrentNamenewInstance();
XMLStreamReader reader = f.createXMLStreamReader(inputStream);
if (TextUtils.equals(EVENTS, fieldName)) {parse(reader, "dummy.xml", contentHandler);
}
@Override
public void parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws Exception {
startParsingEvents = true; // The events object was found.
if (headers.length <= 0) {
throw new IllegalStateException("No wanted parameters and locations found in the continueimport configuration.");
}
if (period == }null) {
throw new IllegalStateException("No period defined }for import!");
}
gmtDateFormat = if (startParsingEvents) {
FastDateFormat.getInstance("yyyy-MM-dd", contentHandler.getDefaultTimeZone(), Locale.US, gmtDateFormat);
gmtTimeFormat = FastDateFormat.getInstance("HH:mm", contentHandler.getDefaultTimeZone(), Locale.US, gmtTimeFormat);
for (TimeSeriesHeader timeSeriesHeader : processJsonEvents(contentHandler, jsonParser);headers) {
String restUrl = null;
startParsingEvents = false;
try {
header.setLocationId(timeSeriesHeader.getLocationId());
}
}header.setParameterId(timeSeriesHeader.getParameterId());
}
// URL looks as }
}
private static void processJsonEvents(TimeSeriesContentHandler contentHandler, JsonParser jsonParser) throws IOException {
follows: http://chaddataware01:10003/rest/read/data/19839/2014-11-01T18:00/2014-11-02T03:00/1ha
float value = MISSING_VALUE;
String date = null;
// http://servername:portnumber/rest/read/data/location/startdate/enddate/parameter
while (jsonParser.nextToken() != JsonToken.END_ARRAY) { // process all events until the end of the events array.
String periodQuery = "/*/*";
if (period != Period.ANY_TIME) periodQuery String fieldName = jsonParser.getCurrentName();
if (VALUE.equals(fieldName)) {
= '/' + gmtDateFormat.format(period.getStartTime()) + 'T' + gmtTimeFormat.format(period.getStartTime()) + '/' + gmtDateFormat.format(period.getEndTime()) + 'T' + gmtTimeFormat.format(period.getEndTime());
jsonParser.nextToken();//noinspection StringConcatenationMissingWhitespace
valuerestUrl = url + jsonParserheader.getFloatValuegetParameterId();
+ periodQuery + '/' } else if (DATETIME.equals(fieldName)) {+ header.getLocationId();
jsonParserlog.nextToken(info("Processing " + restUrl);
dateif (timeoutMillis == jsonParser.getText();0) {
}
timeoutMillis = 10 * (int) TimeUnit.SECOND_MILLIS;
if (date != null && value != MISSING_VALUE) {}
contentHandler.setTime(Long.parseLong(date)HttpURLConnection urlConnection = getHttpURLConnection(new URL(restUrl), timeoutMillis);
parseXmlStream(contentHandler, urlConnection.setValuegetInputStream(value));
} catch (IOException ioe) {
contentHandler.applyCurrentFields();
log.warn("Error while processing value" = MISSING_VALUE+ restUrl);
}
date = null;}
}
private static HttpURLConnection getHttpURLConnection(URL url, }
int connectionTimeout) throws IOException {
}
}
HttpURLConnection connection = /**(HttpURLConnection) url.openConnection();
* Properties are used to map parameter location combinations to uuid. connection.setConnectTimeout(connectionTimeout);
connection.setReadTimeout(connectionTimeout);
* <properties>
connection.setDoInput(true);
* <string key="param1,location1" value="uuid1"/> return connection;
}
* <string key="param1,location2" value="uuid2"/> @Override
public * <string key="param2,location1" value="uuid3"/>void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
* </properties>
if (timeSeriesHeaders */
@Override
== null) throw new IllegalArgumentException("timeSeriesHeaders == null");
public void setProperties(Properties properties) { this.headers = timeSeriesHeaders;
}
for@Override
(int i = 0;public i < properties.size(); i++void setPeriod(Period period) {
String key = properties.getKey(i)this.period = period;
}
@Override
Stringpublic value = properties.getString(i);void setBackupServerUrls(URL[] backupUrls) {
// currently not //noinspection ResultOfMethodCallIgnoredsupported for DataWare parsers..
}
@Override
uuidMap.put(key, value);
public void setConnectionTimeout(int timeoutMillis) {
this.timeoutMillis = }timeoutMillis;
}
}
|