package nl.wldelft.timeseriesparsers; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; import nl.wldelft.util.FastDateFormat; import nl.wldelft.util.Period; import nl.wldelft.util.PeriodConsumer; import nl.wldelft.util.io.ServerParser; import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader; import nl.wldelft.util.timeseries.TimeSeriesContentHandler; import nl.wldelft.util.timeseries.TimeSeriesHeader; import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer; import org.apache.http.util.TextUtils; import org.apache.log4j.Logger; import java.io.IOException; import java.io.InputStream; import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.URL; import java.util.Date; import java.util.Locale; /** * DDSC parser developed by HKV. See https://api.ddsc.nl/api/v1/timeseries * <p> * Service first calls the existing timeseries using the following URL, where name exist of locationname_parametername (in this example is the location name GREB_BK_A1 and the parameter SH) : * <p> * https://api.ddsc.nl/api/v1/timeseries?name=GREB_BK_A1_SH&format=json * <p> * The response will contain the urls to the events: * <p> * https://api.ddsc.nl/api/v1/events/157a6c92-cf4b-422f-8d5a-6aeceb01b43c?format=json&start=2015-01-09T18:00:00.000Z&end=2015-01-19T18:00:00.000Z */ public class DdscTimeSeriesServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer { private static final Logger log = Logger.getLogger(DdscTimeSeriesServerParser.class); public static final String EVENTS = "events"; public static final String RESULTS = "results"; public static final String DATETIME = "datetime"; public static final String VALUE = "value"; public static final String FLAG = "flag"; public static final String NEXT = "next"; public static final int MISSING_VALUE = -999; private FastDateFormat dateFormat = null; private TimeSeriesHeader[] headers = null; private Period period = null; @SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"}) @Override public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) { this.headers = timeSeriesHeaders; } @Override public void setPeriod(Period period) { this.period = period; } @Override public void parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws IOException { if (period == Period.ANY_TIME) throw new IllegalArgumentException("contentHandler.getWantedPeriod() == Period.ANY_TIME"); dateFormat = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", contentHandler.getDefaultTimeZone(), Locale.US, dateFormat); Date startDate = period.getStartDate(); Date endDate = period.getEndDate(); String periodQuery = "?start=" + dateFormat.format(startDate) + "&end=" + dateFormat.format(endDate); log.debug("periodQuery:" + periodQuery); for (int i = 0; i < headers.length; i++) { TimeSeriesHeader header = headers[i]; try { DefaultTimeSeriesHeader defaultTimeSeriesHeader = new DefaultTimeSeriesHeader(); defaultTimeSeriesHeader.setLocationId(header.getLocationId()); defaultTimeSeriesHeader.setParameterId(header.getParameterId()); contentHandler.addMissingValue(MISSING_VALUE); contentHandler.createTimeSeriesHeaderAlias(i, defaultTimeSeriesHeader); contentHandler.setTimeSeriesHeader(i); URL timeSeriesUrl = new URL(url.toExternalForm() + "?name=" + header.getLocationId() + "_" + header.getParameterId() + "&format=json"); log.debug("timeseries url: " + timeSeriesUrl); try (InputStream timeSeriesInputStream = openHttpInputStream(timeSeriesUrl.toExternalForm(), username, password)) { if (timeSeriesInputStream != null) { String events = parseEventsUrlFromTimeseries(timeSeriesInputStream, periodQuery); log.info("events url: " + events); while (!TextUtils.isEmpty(events) && !TextUtils.isBlank(events)) { try (InputStream eventsStream = openHttpInputStream(events,username,password)) { events = parseEvents(contentHandler, eventsStream); log.debug("next: " + events); } } } } } catch (ConnectException e) { log.warn("Can not connect to " + url + ' ' + e.getMessage(), e); return; } catch (Exception e) { log.warn("Can not parse data for " + header + ' ' + e.getMessage(), e); } } } private static InputStream openHttpInputStream(String restUrl, String username, String password) throws Exception { URL url = new URL(restUrl); log.debug("url:" + url); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); // just want to do an HTTP GET here connection.setRequestMethod("GET"); // give it 15 seconds to respond connection.setReadTimeout(15 * 1000); connection.setRequestProperty("username", username); connection.setRequestProperty("password", password); connection.connect(); return connection.getInputStream(); } static String parseEvents(TimeSeriesContentHandler contentHandler, InputStream inputStream) throws IOException { String next = null; if (inputStream != null) { try (JsonParser jsonParser = new JsonFactory().createParser(inputStream)) { while (jsonParser.nextToken() != JsonToken.END_OBJECT) { String fieldName = jsonParser.getCurrentName(); // Lees eventuele next url if (NEXT.equals(fieldName)) { jsonParser.nextToken(); if (!jsonParser.getText().equals("null")) { next = jsonParser.getText(); } } if (RESULTS.equals(fieldName)) writeTimeSeriesEvent(contentHandler, jsonParser); } } } return next; } private static void writeTimeSeriesEvent(TimeSeriesContentHandler contentHandler, JsonParser jsonParser) throws IOException { String flag = null; float value = MISSING_VALUE; String date = null; while (jsonParser.nextToken() != JsonToken.END_ARRAY) { String fieldName = jsonParser.getCurrentName(); if (FLAG.equals(fieldName)) { jsonParser.nextToken(); flag = jsonParser.getText(); } else if (VALUE.equals(fieldName)) { jsonParser.nextToken(); value = jsonParser.getFloatValue(); } else if (DATETIME.equals(fieldName)) { jsonParser.nextToken(); date = jsonParser.getText(); } if (flag != null && date != null && value != MISSING_VALUE) { contentHandler.setTime(contentHandler.getDefaultTimeZone(), "yyyy-MM-dd'T'HH:mm:ss.S", date); contentHandler.setValue(value); contentHandler.setFlag(flag); contentHandler.applyCurrentFields(); flag = null; value = MISSING_VALUE; date = null; } } } static String parseEventsUrlFromTimeseries(InputStream timeSeriesDataInputStream, String periodQuery) throws IOException { if (timeSeriesDataInputStream != null) { //noinspection resource caller has to close the input stream. JsonParser jsonParser = new JsonFactory().createParser(timeSeriesDataInputStream); while (jsonParser.nextToken() != JsonToken.END_OBJECT) { String fieldName = jsonParser.getCurrentName(); if (RESULTS.equals(fieldName)) { while (jsonParser.nextToken() != JsonToken.END_ARRAY) { fieldName = jsonParser.getCurrentName(); log.debug("fieldname = " + fieldName); if (EVENTS.equals(fieldName)) { jsonParser.nextToken(); //noinspection StringConcatenationMissingWhitespace return jsonParser.getText() + periodQuery + "&format=json"; } } } } } return null; } }