...
Code Block |
---|
package nl.wldelft.fews.system.plugin.dataImporttimeseriesparsers; import comnl.fasterxmlwldelft.jacksonutil.core.JsonFactoryFastDateFormat; import comnl.fasterxmlwldelft.jacksonutil.core.JsonParserPeriod; import com.fasterxml.jackson.core.JsonToken; import nl.wldelft.util.PeriodPeriodConsumer; import nl.wldelft.util.PeriodConsumerTextUtils; import nl.wldelft.util.PropertiesTimeUnit; import nl.wldelft.util.PropertiesConsumerio.BackupServerUrlConsumer; import nl.wldelft.util.io.TextUtilsServerParser; import nl.wldelft.util.io.ServerParserXmlParser; import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader; import nl.wldelft.util.timeseries.TimeSeriesContentHandler; import nl.wldelft.util.timeseries.TimeSeriesHeader; import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import javajavax.xml.iostream.IOExceptionXMLInputFactory; import javajavax.xml.iostream.InputStreamXMLStreamConstants; import javajavax.xml.netstream.ConnectExceptionXMLStreamReader; import java.netio.HttpURLConnectionIOException; import java.netio.URLInputStream; import java.utilnet.DateHttpURLConnection; import java.utilnet.HashMapURL; import java.util.MapLocale; /** * DDSCTime series parser for developedthe bydataware HKVservices. See https://api.ddsc.nl/api/v1/timeseries * * The Upgradedservice tocan supportbe v2called by Deltares. See See https://api.ddsc.nl/api/v2/timeseries * <p>as follows: * * http://servername:portnumber/rest/read/data/<point ID>/<start-date>/<end-date>/<stationname> * * ServiceThe callsexact theURL timeseriesused serviceto usingrequest the followingattached URL,file where the uuid defines the unique timeserie. Using properties a mapping has to be made from parameter,location to uuid. : * <p> * https://ddsc.lizard.net/api/v2/timeseries/cc0a8831-7758-4dd6-999c-f607c026d176/?start=1421816000000&end=1454595236646&format=json * <p> * The response will contain a json file with events in the following format. * <p> * events: [ { timestamp: 1449366444000, max: 1013.6, min: 1013.6 }, { timestamp: 1449367344000, max: 1013.58, min: 1013.58 }, The Importer currently uses the max value to store a value. */ public class DdscTimeSeriesServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer, PropertiesConsumer { private static final Logger log = Logger.getLogger(DdscTimeSeriesServerParser.class); public static final String DATETIME = "timestamp"; public static final String VALUE = "max"; public static final int MISSING_VALUE = -999; public static final String EVENTS = "events"; private TimeSeriesHeader[] headers = null; private Period period = null; private final Map uuidMap = new HashMap<>(); @SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"}) @Override public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) { this.headers = timeSeriesHeaders; } @Override public void setPeriod(Period period) { this.period = period; } @Override public void parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws IOException { if (period == Period.ANY_TIME) throw new IllegalArgumentException("contentHandler.getWantedPeriod() == Period.ANY_TIME"); Date startDate = period.getStartDate(); Date endDate = period.getEndDate(); String periodQuery = "?start=" + startDate.getTime() + "&end=" + endDate.getTime(); for (int i = 0; i < headers.length; i++) { TimeSeriesHeader header = headers[i]; try { DefaultTimeSeriesHeader defaultTimeSeriesHeader = new DefaultTimeSeriesHeader(); defaultTimeSeriesHeader.setLocationId(header.getLocationId()); defaultTimeSeriesHeader.setParameterId(header.getParameterId()); contentHandler.addMissingValue(MISSING_VALUE); contentHandler.createTimeSeriesHeaderAlias(i, defaultTimeSeriesHeader); contentHandler.setTimeSeriesHeader(i); String key = header.getParameterId() + "," + header.getLocationId(); String uuid = (String) uuidMap.get(key); if (uuid == null) { log.warn("No uuid mapping found for '" + key + "'"); continue; } //noinspection StringConcatenationMissingWhitespace URL timeSeriesUrl = new URL(url.toExternalForm() + uuid + "/" + periodQuery + "&format=json"); log.info("Processing timeseries url: " + timeSeriesUrl); try (InputStream timeSeriesInputStream = openHttpInputStream(timeSeriesUrl.toExternalForm(), username, password)) { if (timeSeriesInputStream != null) { parseEvents(contentHandler, timeSeriesInputStream); } } } catch (ConnectException e) { log.warn("Can not connect to " + url + ' ' + e.getMessage(), e); is http://chaddataware01:10003/rest/read/data/19839/2014-11-01T18:00/2014-11-02T03:00/1ha. Depending on the data point we want to access, the server name (chaddataware01), port (10003), and/or the point ID (19839) will change * */ public class DataWareTimeSeriesParser implements ServerParser<TimeSeriesContentHandler>, XmlParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer, BackupServerUrlConsumer { public static final String ARRAY_OF_DATA_ELEMENT = "ArrayOfData"; public static final String DATA_ELEMENT = "Data"; public static final String KEY_ELEMENT = "Key"; public static final String TIME_ELEMENT = "Time"; public static final String VALUE_ELEMENT = "Value"; public static final String QUALITY_ELEMENT = "Quality"; private TimeSeriesHeader[] headers = null; private final DefaultTimeSeriesHeader header = new DefaultTimeSeriesHeader(); private Period period = null; private FastDateFormat gmtDateFormat = null; private FastDateFormat gmtTimeFormat = null; private static final Logger log = LogManager.getLogger(); private int timeoutMillis = 0; XMLStreamReader reader = null; @Override public void parse(XMLStreamReader reader, String virtualFileName, TimeSeriesContentHandler contentHandler) throws Exception { this.reader = reader; contentHandler.addMissingValueRange(-999.9999f, -999f); reader.require(XMLStreamConstants.START_DOCUMENT, null, null); XmlStreamReaderUtils.goTo(reader, ARRAY_OF_DATA_ELEMENT); do { // go return; to the beginning of the array. But don't go past the end } catch (Exception e) {array. That's the break condition. XmlStreamReaderUtils.goTo(reader, DATA_ELEMENT, ARRAY_OF_DATA_ELEMENT); log.warn("Can not parse data for " + header + ' ' + e.getMessageif (TextUtils.equals(reader.getLocalName(), e)ARRAY_OF_DATA_ELEMENT)) break; } }XmlStreamReaderUtils.goTo(reader, KEY_ELEMENT); } private static InputStream openHttpInputStream(String restUrl, String username, String password) throws Exception { // parameter was already set. We could do a sanity check here... URL url = new URL(restUrl); // todo check if HttpURLConnectionwe connectionneed =to (HttpURLConnection) url.openConnection(); // just want to do an HTTP GET here compare the element text with the header parameter that was set. // assumption connection.setRequestMethod("GET"); // give it 15 seconds to respond is that the param name in the resturl should be leading. connection.setReadTimeout(15 * 1000); // header.setParameterId(reader.getElementText()); connection XmlStreamReaderUtils.setRequestPropertygoTo("username"reader, username); TIME_ELEMENT); connection.setRequestProperty("password", passwordcontentHandler.setTime(contentHandler.getDefaultTimeZone(),"yyyy-MM-dd HH:mm:ss.SSS", reader.getElementText()); connection.connect(); return connection.getInputStream(XmlStreamReaderUtils.goTo(reader, VALUE_ELEMENT); } static void parseEvents(TimeSeriesContentHandler contentHandler.setValue('.', InputStream inputStream) throws IOException { reader.getElementText()); if XmlStreamReaderUtils.goTo(inputStream != null) { reader, QUALITY_ELEMENT); try (JsonParser jsonParser = new JsonFactorycontentHandler.setFlag()reader.createParsergetElementText(inputStream)) {; contentHandler.setTimeSeriesHeader(header); boolean startParsingEvents = false; contentHandler.applyCurrentFields(); } for (JsonToken token; (token =while jsonParser(reader.nextTokenhasNext() ); != null; ) {} private void parseXmlStream(TimeSeriesContentHandler contentHandler, InputStream inputStream) throws Exception { ifXMLInputFactory (tokenf == JsonToken.FIELD_NAME) { XMLInputFactory.newInstance(); f.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE); f.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE); XMLStreamReader String fieldNamereader = jsonParserf.getCurrentNamecreateXMLStreamReader(inputStream); if (TextUtils.equals(EVENTS, fieldName)) {parse(reader, "dummy.xml", contentHandler); } @Override public void parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws Exception { startParsingEvents = true; // The events object was found. if (headers.length <= 0) { throw new IllegalStateException("No wanted parameters and locations found in the continueimport configuration."); } if (period == }null) { throw new IllegalStateException("No period defined }for import!"); } gmtDateFormat = if (startParsingEvents) { FastDateFormat.getInstance("yyyy-MM-dd", contentHandler.getDefaultTimeZone(), Locale.US, gmtDateFormat); gmtTimeFormat = FastDateFormat.getInstance("HH:mm", contentHandler.getDefaultTimeZone(), Locale.US, gmtTimeFormat); for (TimeSeriesHeader timeSeriesHeader : processJsonEvents(contentHandler, jsonParser);headers) { String restUrl = null; startParsingEvents = false; try { header.setLocationId(timeSeriesHeader.getLocationId()); } }header.setParameterId(timeSeriesHeader.getParameterId()); } // URL looks as } } private static void processJsonEvents(TimeSeriesContentHandler contentHandler, JsonParser jsonParser) throws IOException { follows: http://chaddataware01:10003/rest/read/data/19839/2014-11-01T18:00/2014-11-02T03:00/1ha float value = MISSING_VALUE; String date = null; // http://servername:portnumber/rest/read/data/location/startdate/enddate/parameter while (jsonParser.nextToken() != JsonToken.END_ARRAY) { // process all events until the end of the events array. String periodQuery = "/*/*"; if (period != Period.ANY_TIME) periodQuery String fieldName = jsonParser.getCurrentName(); if (VALUE.equals(fieldName)) { = '/' + gmtDateFormat.format(period.getStartTime()) + 'T' + gmtTimeFormat.format(period.getStartTime()) + '/' + gmtDateFormat.format(period.getEndTime()) + 'T' + gmtTimeFormat.format(period.getEndTime()); jsonParser.nextToken();//noinspection StringConcatenationMissingWhitespace valuerestUrl = url + jsonParserheader.getFloatValuegetParameterId(); + periodQuery + '/' } else if (DATETIME.equals(fieldName)) {+ header.getLocationId(); jsonParserlog.nextToken(info("Processing " + restUrl); dateif (timeoutMillis == jsonParser.getText();0) { } timeoutMillis = 10 * (int) TimeUnit.SECOND_MILLIS; if (date != null && value != MISSING_VALUE) {} contentHandler.setTime(Long.parseLong(date)HttpURLConnection urlConnection = getHttpURLConnection(new URL(restUrl), timeoutMillis); parseXmlStream(contentHandler, urlConnection.setValuegetInputStream(value)); } catch (IOException ioe) { contentHandler.applyCurrentFields(); log.warn("Error while processing value" = MISSING_VALUE+ restUrl); } date = null;} } private static HttpURLConnection getHttpURLConnection(URL url, } int connectionTimeout) throws IOException { } } HttpURLConnection connection = /**(HttpURLConnection) url.openConnection(); * Properties are used to map parameter location combinations to uuid. connection.setConnectTimeout(connectionTimeout); connection.setReadTimeout(connectionTimeout); * <properties> connection.setDoInput(true); * <string key="param1,location1" value="uuid1"/> return connection; } * <string key="param1,location2" value="uuid2"/> @Override public * <string key="param2,location1" value="uuid3"/>void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) { * </properties> if (timeSeriesHeaders */ @Override == null) throw new IllegalArgumentException("timeSeriesHeaders == null"); public void setProperties(Properties properties) { this.headers = timeSeriesHeaders; } for@Override (int i = 0;public i < properties.size(); i++void setPeriod(Period period) { String key = properties.getKey(i)this.period = period; } @Override Stringpublic value = properties.getString(i);void setBackupServerUrls(URL[] backupUrls) { // currently not //noinspection ResultOfMethodCallIgnoredsupported for DataWare parsers.. } @Override uuidMap.put(key, value); public void setConnectionTimeout(int timeoutMillis) { this.timeoutMillis = }timeoutMillis; } } |