Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
package nl.wldelft.timeseriesparsersfews.system.plugin.dataImport;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import nl.wldelft.util.FastDateFormat;
import nl.wldelft.util.Period;
import nl.wldelft.util.PeriodConsumer;
import nl.wldelft.util.io.ServerParser;
import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import nl.wldelft.util.timeseries.TimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer;
import org.apache.http.util.TextUtils;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Date;
import java.util.Locale;

/**
 * DDSC parser developed by HKV. See https://api.ddsc.nl/api/v1/timeseries
 * <p>
Upgraded *to Servicesupport firstv2 callsby theDeltares. existingSee timeseries using the following See https://api.ddsc.nl/api/v2/timeseries
 * <p>
 * Service calls the timeseries service using the following URL, where name exist of locationname_,parametername  (in this example is the location name GREBOMZD_BK_A129 and the parameter SH9_BARO) :
 * <p>
 * https://api.ddsc.nl/api/v1v2/timeseries/?name=GREB_BK_A1_SHstart=2015-12-03T08:00:00.000Z&end=2015-12-10T08:00:00.000Z&name=OMZD_29,9_BARO&format=json
 * <p>
 * The response will contain the urls to the events: a json file with events in the following format.
 * <p>
 * https://api.ddsc.nl/api/v1/events/157a6c92-cf4b-422f-8d5a-6aeceb01b43c?format=json&start=2015-01-09T18:00:00.000Z&end=2015-01-19T18:00:00.000Z
 */
public class DdscTimeSeriesServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer {

    private static final Logger log = Logger.getLogger(DdscTimeSeriesServerParser.class);
    public static final String EVENTS = "events";
    public static final String RESULTS = "results";
    public static final String DATETIME = "datetime";
    public static final String VALUE = "value";
    public static final String FLAG = "flag";
    public static final String NEXT = "next";
    public static final int MISSING_VALUE = -999;
    private FastDateFormat dateFormat = null;
    private TimeSeriesHeader[] headers = null;
    private Period period = null;

    @SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"})
    @Override
    public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
        this.headers = timeSeriesHeaders;
    }

    @Override
    public void setPeriod(Period period) {
        this.period = period;
    }

    @Override
    public void parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws IOException {
        if (period == Period.ANY_TIME) throw new IllegalArgumentException("contentHandler.getWantedPeriod() == Period.ANY_TIME");

        dateFormat = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", contentHandler.getDefaultTimeZone(), Locale.US, dateFormat);
        Date startDate = period.getStartDate();
        Date endDate = period.getEndDate();

        String periodQuery = "?start=" + dateFormat.format(startDate) + "&end=" + dateFormat.format(endDate);
        log.debug("periodQuery:" + periodQuery);

        for (int i = 0; i < headers.length; i++) {
            TimeSeriesHeader header = headers[i];
            try {
                DefaultTimeSeriesHeader defaultTimeSeriesHeader = new DefaultTimeSeriesHeader();
                defaultTimeSeriesHeader.setLocationId(header.getLocationId());
                defaultTimeSeriesHeader.setParameterId(header.getParameterId());
                contentHandler.addMissingValue(MISSING_VALUE);
                contentHandler.createTimeSeriesHeaderAlias(i, defaultTimeSeriesHeader);
                contentHandler.setTimeSeriesHeader(i);

                URL timeSeriesUrl = new URL(url.toExternalForm() +
                        "?name=" + header.getLocationId() + "_" + header.getParameterId() +
                        "&format=json");
                log.debug("timeseries url: " + timeSeriesUrl);

                try (InputStream timeSeriesInputStream = openHttpInputStream(timeSeriesUrl.toExternalForm(), username, password)) {
                    if (timeSeriesInputStream != null) {
                        String events = parseEventsUrlFromTimeseries(timeSeriesInputStream, periodQuery);
                        log.info("events url: " + events);
                        while (!TextUtils.isEmpty(events) && !TextUtils.isBlank(events)) {
                            try (InputStream eventsStream = openHttpInputStream(events,username,password)) {
                                events = parseEvents(contentHandler, eventsStream);
                                log.debug("next: " + events);
                            }
                        }
                    }
                }
            } catch (ConnectException e) {
                log.warn("Can not connect to " + url + ' ' + e.getMessage(), e);events: [
 {
 timestamp: 1449366444000,
 max: 1013.6,
 min: 1013.6
 },
 {
 timestamp: 1449367344000,
 max: 1013.58,
 min: 1013.58
 },

 The Importer currently uses the max value to store a value.
 */
public class DdscTimeSeriesServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer {

    private static final Logger log = Logger.getLogger(DdscTimeSeriesServerParser.class);
    public static final String RESULTS = "results";
    public static final String DATETIME = "timestamp";
    public static final String VALUE = "max";
    public static final int MISSING_VALUE = -999;
    public static final String EVENTS = "events";
    private FastDateFormat dateFormat = null;
    private TimeSeriesHeader[] headers = null;
    private Period period = null;

    @SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"})
    @Override
    public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
        this.headers = timeSeriesHeaders;
    }

   return; @Override
    public void setPeriod(Period period) {
    } catch (Exception e) {
   this.period = period;
    }

    @Override
    public void log.warn("Can not parse data for " + header + ' ' + e.getMessage(), e);
parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws IOException {
        if (period == Period.ANY_TIME) throw new IllegalArgumentException("contentHandler.getWantedPeriod()  }
        }== Period.ANY_TIME");

    }

    privatedateFormat static InputStream openHttpInputStream(String restUrl, String username, String password) throws Exception {= FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", contentHandler.getDefaultTimeZone(), Locale.US, dateFormat);
        URLDate urlstartDate = new URL(restUrlperiod.getStartDate();
        Date log.debug("url:" + urlendDate = period.getEndDate();

        String periodQuery = "?start="  HttpURLConnection connection = (HttpURLConnection) url.openConnection(+ dateFormat.format(startDate) + "&end=" + dateFormat.format(endDate);
        for //(int justi want= to0; doi an HTTP GET here< headers.length; i++) {
        connection.setRequestMethod("GET");
    TimeSeriesHeader header =  // give it 15 seconds to respond
headers[i];
            try {
  connection.setReadTimeout(15 * 1000);
        connection.setRequestProperty("username", username);
    DefaultTimeSeriesHeader defaultTimeSeriesHeader = new connection.setRequestProperty("password", passwordDefaultTimeSeriesHeader();
        connection.connect();
           return connection.getInputStream(defaultTimeSeriesHeader.setLocationId(header.getLocationId());
    }

    static String parseEvents(TimeSeriesContentHandler contentHandler, InputStream inputStream) throws IOException {
     defaultTimeSeriesHeader.setParameterId(header.getParameterId());
    String next = null;
        if (inputStream != null) { contentHandler.addMissingValue(MISSING_VALUE);
            try (JsonParser jsonParser = new JsonFactory()contentHandler.createParser(inputStream)) {createTimeSeriesHeaderAlias(i, defaultTimeSeriesHeader);
                while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
contentHandler.setTimeSeriesHeader(i);

                //noinspection StringConcatenationMissingWhitespace
       String fieldName = jsonParser.getCurrentName();
      URL timeSeriesUrl = new URL(url.toExternalForm() + periodQuery +
       // Lees eventuele next url
              "&name=" + URLEncoder.encode(header.getLocationId(), "utf-8") + "," if+ URLEncoder.encode(NEXTheader.equalsgetParameterId(fieldName), "utf-8") {+
                        jsonParser.nextToken("&format=json");
                        if (!jsonParser.getText().equals("null")) {
         log.debug("timeseries url: " + timeSeriesUrl);

                try (InputStream  nexttimeSeriesInputStream = jsonParseropenHttpInputStream(timeSeriesUrl.getText();toExternalForm(), username, password)) {
                    if (timeSeriesInputStream != null) }{
                    }
    parseEvents(contentHandler, timeSeriesInputStream);
                if (RESULTS.equals(fieldName)) writeTimeSeriesEvent(contentHandler, jsonParser);    }
                }
            } catch (ConnectException e) {
        }
        return next;
    }

    private static void writeTimeSeriesEvent(TimeSeriesContentHandler contentHandler, JsonParser jsonParser) throws IOException {
log.warn("Can not connect to " + url + ' ' + e.getMessage(), e);
             String flag = nullreturn;
         float value = MISSING_VALUE;
} catch (Exception e) {
    String date = null;

        while (jsonParserlog.nextToken() != JsonToken.END_ARRAY) {
            String fieldName = jsonParser.getCurrentName()warn("Can not parse data for " + header + ' ' + e.getMessage(), e);
            if (FLAG.equals(fieldName)) {
}
        }
    }

    private static InputStream jsonParser.nextToken();
       openHttpInputStream(String restUrl, String username, String password) throws Exception {
        URL flagurl = jsonParser.getText(new URL(restUrl);
        HttpURLConnection connection =  } else if (VALUE.equals(fieldName)) {(HttpURLConnection) url.openConnection();
                jsonParser.nextToken();
          // just want to do an HTTP GET here
      value = jsonParserconnection.getFloatValuesetRequestMethod("GET");
        // give it 15 }seconds elseto if (DATETIME.equals(fieldName)) {
respond
        connection.setReadTimeout(15 * 1000);
        jsonParser.nextToken(connection.setRequestProperty("username", username);
          connection.setRequestProperty("password", password);
      date = jsonParserconnection.getTextconnect();
        return connection.getInputStream();
    }

    static void parseEvents(TimeSeriesContentHandler contentHandler, InputStream inputStream) throws IOException if{
 (flag != null && date != null &&if value(inputStream != MISSING_VALUEnull) {
            try (JsonParser jsonParser  contentHandler.setTime(contentHandler.getDefaultTimeZone(), "yyyy-MM-dd'T'HH:mm:ss.S", date);
= new JsonFactory().createParser(inputStream)) {
                while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
       contentHandler.setValue(value);
             String fieldName = contentHandlerjsonParser.setFlaggetCurrentName(flag);
                contentHandler.applyCurrentFields();
    // Lees eventuele next url
        flag = null;
                value = MISSING_VALUEif (RESULTS.equals(fieldName)) writeTimeSeriesEvent(contentHandler, jsonParser);
                date = null;}
            }
        }
    }

    private static Stringvoid parseEventsUrlFromTimeserieswriteTimeSeriesEvent(InputStreamTimeSeriesContentHandler timeSeriesDataInputStreamcontentHandler, StringJsonParser periodQueryjsonParser) throws IOException {
        ifwhile (timeSeriesDataInputStreamjsonParser.nextToken() != null) {
       JsonToken.END_ARRAY) {
     //noinspection resource caller has to close the input stream.if (EVENTS.equals(jsonParser.getCurrentName())) {
            JsonParser jsonParser = new JsonFactoryprocessJsonEvents().createParser(timeSeriesDataInputStream)contentHandler, jsonParser);
            while (jsonParser.nextToken() != JsonToken.END_OBJECT) {    }
        }
    }

    Stringprivate fieldNamestatic =void jsonParser.getCurrentName();
      processJsonEvents(TimeSeriesContentHandler contentHandler, JsonParser jsonParser) throws IOException {
        float value if (RESULTS.equals(fieldName)) {= MISSING_VALUE;
        String date =  null;
        while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
 // process all events until the end of the events array.
            String fieldName = jsonParser.getCurrentName();
            if (VALUE.equals(fieldName)) {
          log.debug("fieldname = " + fieldName);
      jsonParser.nextToken();
                value = jsonParser.getFloatValue();
            } else if (EVENTSDATETIME.equals(fieldName)) {
                jsonParser.nextToken();
                date = jsonParser.nextTokengetText();
            }
            if (date != null //noinspection StringConcatenationMissingWhitespace
   && value != MISSING_VALUE) {
                         return jsonParser.getText() + periodQuery + "&format=json"contentHandler.setTime(Long.parseLong(date));
                contentHandler.setValue(value);
        }
        contentHandler.applyCurrentFields();
            }
    value = MISSING_VALUE;
          }
      date = null;
    }
        }
        return null;}
    }
}