...
Code Block |
---|
package nl.wldelft.timeseriesparsersfews.system.plugin.dataImport; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; import nl.wldelft.util.FastDateFormat; import nl.wldelft.util.Period; import nl.wldelft.util.PeriodConsumer; import nl.wldelft.util.io.ServerParser; import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader; import nl.wldelft.util.timeseries.TimeSeriesContentHandler; import nl.wldelft.util.timeseries.TimeSeriesHeader; import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer; import org.apache.http.util.TextUtils; import org.apache.log4j.Logger; import java.io.IOException; import java.io.InputStream; import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; import java.util.Date; import java.util.Locale; /** * DDSC parser developed by HKV. See https://api.ddsc.nl/api/v1/timeseries * <p> Upgraded *to Servicesupport firstv2 callsby theDeltares. existingSee timeseries using the following See https://api.ddsc.nl/api/v2/timeseries * <p> * Service calls the timeseries service using the following URL, where name exist of locationname_,parametername (in this example is the location name GREBOMZD_BK_A129 and the parameter SH9_BARO) : * <p> * https://api.ddsc.nl/api/v1v2/timeseries/?name=GREB_BK_A1_SHstart=2015-12-03T08:00:00.000Z&end=2015-12-10T08:00:00.000Z&name=OMZD_29,9_BARO&format=json * <p> * The response will contain the urls to the events: a json file with events in the following format. * <p> * https://api.ddsc.nl/api/v1/events/157a6c92-cf4b-422f-8d5a-6aeceb01b43c?format=json&start=2015-01-09T18:00:00.000Z&end=2015-01-19T18:00:00.000Z */ public class DdscTimeSeriesServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer { private static final Logger log = Logger.getLogger(DdscTimeSeriesServerParser.class); public static final String EVENTS = "events"; public static final String RESULTS = "results"; public static final String DATETIME = "datetime"; public static final String VALUE = "value"; public static final String FLAG = "flag"; public static final String NEXT = "next"; public static final int MISSING_VALUE = -999; private FastDateFormat dateFormat = null; private TimeSeriesHeader[] headers = null; private Period period = null; @SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"}) @Override public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) { this.headers = timeSeriesHeaders; } @Override public void setPeriod(Period period) { this.period = period; } @Override public void parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws IOException { if (period == Period.ANY_TIME) throw new IllegalArgumentException("contentHandler.getWantedPeriod() == Period.ANY_TIME"); dateFormat = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", contentHandler.getDefaultTimeZone(), Locale.US, dateFormat); Date startDate = period.getStartDate(); Date endDate = period.getEndDate(); String periodQuery = "?start=" + dateFormat.format(startDate) + "&end=" + dateFormat.format(endDate); log.debug("periodQuery:" + periodQuery); for (int i = 0; i < headers.length; i++) { TimeSeriesHeader header = headers[i]; try { DefaultTimeSeriesHeader defaultTimeSeriesHeader = new DefaultTimeSeriesHeader(); defaultTimeSeriesHeader.setLocationId(header.getLocationId()); defaultTimeSeriesHeader.setParameterId(header.getParameterId()); contentHandler.addMissingValue(MISSING_VALUE); contentHandler.createTimeSeriesHeaderAlias(i, defaultTimeSeriesHeader); contentHandler.setTimeSeriesHeader(i); URL timeSeriesUrl = new URL(url.toExternalForm() + "?name=" + header.getLocationId() + "_" + header.getParameterId() + "&format=json"); log.debug("timeseries url: " + timeSeriesUrl); try (InputStream timeSeriesInputStream = openHttpInputStream(timeSeriesUrl.toExternalForm(), username, password)) { if (timeSeriesInputStream != null) { String events = parseEventsUrlFromTimeseries(timeSeriesInputStream, periodQuery); log.info("events url: " + events); while (!TextUtils.isEmpty(events) && !TextUtils.isBlank(events)) { try (InputStream eventsStream = openHttpInputStream(events,username,password)) { events = parseEvents(contentHandler, eventsStream); log.debug("next: " + events); } } } } } catch (ConnectException e) { log.warn("Can not connect to " + url + ' ' + e.getMessage(), e);events: [ { timestamp: 1449366444000, max: 1013.6, min: 1013.6 }, { timestamp: 1449367344000, max: 1013.58, min: 1013.58 }, The Importer currently uses the max value to store a value. */ public class DdscTimeSeriesServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer { private static final Logger log = Logger.getLogger(DdscTimeSeriesServerParser.class); public static final String RESULTS = "results"; public static final String DATETIME = "timestamp"; public static final String VALUE = "max"; public static final int MISSING_VALUE = -999; public static final String EVENTS = "events"; private FastDateFormat dateFormat = null; private TimeSeriesHeader[] headers = null; private Period period = null; @SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"}) @Override public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) { this.headers = timeSeriesHeaders; } return; @Override public void setPeriod(Period period) { } catch (Exception e) { this.period = period; } @Override public void log.warn("Can not parse data for " + header + ' ' + e.getMessage(), e); parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws IOException { if (period == Period.ANY_TIME) throw new IllegalArgumentException("contentHandler.getWantedPeriod() } }== Period.ANY_TIME"); } privatedateFormat static InputStream openHttpInputStream(String restUrl, String username, String password) throws Exception {= FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", contentHandler.getDefaultTimeZone(), Locale.US, dateFormat); URLDate urlstartDate = new URL(restUrlperiod.getStartDate(); Date log.debug("url:" + urlendDate = period.getEndDate(); String periodQuery = "?start=" HttpURLConnection connection = (HttpURLConnection) url.openConnection(+ dateFormat.format(startDate) + "&end=" + dateFormat.format(endDate); for //(int justi want= to0; doi an HTTP GET here< headers.length; i++) { connection.setRequestMethod("GET"); TimeSeriesHeader header = // give it 15 seconds to respond headers[i]; try { connection.setReadTimeout(15 * 1000); connection.setRequestProperty("username", username); DefaultTimeSeriesHeader defaultTimeSeriesHeader = new connection.setRequestProperty("password", passwordDefaultTimeSeriesHeader(); connection.connect(); return connection.getInputStream(defaultTimeSeriesHeader.setLocationId(header.getLocationId()); } static String parseEvents(TimeSeriesContentHandler contentHandler, InputStream inputStream) throws IOException { defaultTimeSeriesHeader.setParameterId(header.getParameterId()); String next = null; if (inputStream != null) { contentHandler.addMissingValue(MISSING_VALUE); try (JsonParser jsonParser = new JsonFactory()contentHandler.createParser(inputStream)) {createTimeSeriesHeaderAlias(i, defaultTimeSeriesHeader); while (jsonParser.nextToken() != JsonToken.END_OBJECT) { contentHandler.setTimeSeriesHeader(i); //noinspection StringConcatenationMissingWhitespace String fieldName = jsonParser.getCurrentName(); URL timeSeriesUrl = new URL(url.toExternalForm() + periodQuery + // Lees eventuele next url "&name=" + URLEncoder.encode(header.getLocationId(), "utf-8") + "," if+ URLEncoder.encode(NEXTheader.equalsgetParameterId(fieldName), "utf-8") {+ jsonParser.nextToken("&format=json"); if (!jsonParser.getText().equals("null")) { log.debug("timeseries url: " + timeSeriesUrl); try (InputStream nexttimeSeriesInputStream = jsonParseropenHttpInputStream(timeSeriesUrl.getText();toExternalForm(), username, password)) { if (timeSeriesInputStream != null) }{ } parseEvents(contentHandler, timeSeriesInputStream); if (RESULTS.equals(fieldName)) writeTimeSeriesEvent(contentHandler, jsonParser); } } } catch (ConnectException e) { } return next; } private static void writeTimeSeriesEvent(TimeSeriesContentHandler contentHandler, JsonParser jsonParser) throws IOException { log.warn("Can not connect to " + url + ' ' + e.getMessage(), e); String flag = nullreturn; float value = MISSING_VALUE; } catch (Exception e) { String date = null; while (jsonParserlog.nextToken() != JsonToken.END_ARRAY) { String fieldName = jsonParser.getCurrentName()warn("Can not parse data for " + header + ' ' + e.getMessage(), e); if (FLAG.equals(fieldName)) { } } } private static InputStream jsonParser.nextToken(); openHttpInputStream(String restUrl, String username, String password) throws Exception { URL flagurl = jsonParser.getText(new URL(restUrl); HttpURLConnection connection = } else if (VALUE.equals(fieldName)) {(HttpURLConnection) url.openConnection(); jsonParser.nextToken(); // just want to do an HTTP GET here value = jsonParserconnection.getFloatValuesetRequestMethod("GET"); // give it 15 }seconds elseto if (DATETIME.equals(fieldName)) { respond connection.setReadTimeout(15 * 1000); jsonParser.nextToken(connection.setRequestProperty("username", username); connection.setRequestProperty("password", password); date = jsonParserconnection.getTextconnect(); return connection.getInputStream(); } static void parseEvents(TimeSeriesContentHandler contentHandler, InputStream inputStream) throws IOException if{ (flag != null && date != null &&if value(inputStream != MISSING_VALUEnull) { try (JsonParser jsonParser contentHandler.setTime(contentHandler.getDefaultTimeZone(), "yyyy-MM-dd'T'HH:mm:ss.S", date); = new JsonFactory().createParser(inputStream)) { while (jsonParser.nextToken() != JsonToken.END_OBJECT) { contentHandler.setValue(value); String fieldName = contentHandlerjsonParser.setFlaggetCurrentName(flag); contentHandler.applyCurrentFields(); // Lees eventuele next url flag = null; value = MISSING_VALUEif (RESULTS.equals(fieldName)) writeTimeSeriesEvent(contentHandler, jsonParser); date = null;} } } } private static Stringvoid parseEventsUrlFromTimeserieswriteTimeSeriesEvent(InputStreamTimeSeriesContentHandler timeSeriesDataInputStreamcontentHandler, StringJsonParser periodQueryjsonParser) throws IOException { ifwhile (timeSeriesDataInputStreamjsonParser.nextToken() != null) { JsonToken.END_ARRAY) { //noinspection resource caller has to close the input stream.if (EVENTS.equals(jsonParser.getCurrentName())) { JsonParser jsonParser = new JsonFactoryprocessJsonEvents().createParser(timeSeriesDataInputStream)contentHandler, jsonParser); while (jsonParser.nextToken() != JsonToken.END_OBJECT) { } } } Stringprivate fieldNamestatic =void jsonParser.getCurrentName(); processJsonEvents(TimeSeriesContentHandler contentHandler, JsonParser jsonParser) throws IOException { float value if (RESULTS.equals(fieldName)) {= MISSING_VALUE; String date = null; while (jsonParser.nextToken() != JsonToken.END_ARRAY) { // process all events until the end of the events array. String fieldName = jsonParser.getCurrentName(); if (VALUE.equals(fieldName)) { log.debug("fieldname = " + fieldName); jsonParser.nextToken(); value = jsonParser.getFloatValue(); } else if (EVENTSDATETIME.equals(fieldName)) { jsonParser.nextToken(); date = jsonParser.nextTokengetText(); } if (date != null //noinspection StringConcatenationMissingWhitespace && value != MISSING_VALUE) { return jsonParser.getText() + periodQuery + "&format=json"contentHandler.setTime(Long.parseLong(date)); contentHandler.setValue(value); } contentHandler.applyCurrentFields(); } value = MISSING_VALUE; } date = null; } } return null;} } } |