Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
package nl.wldelft.fews.system.plugin.dataImporttimeseriesparsers;

import comnl.fasterxmlwldelft.jacksonutil.core.JsonFactoryFastDateFormat;
import comnl.fasterxmlwldelft.jacksonutil.core.JsonParserPeriod;
import com.fasterxml.jackson.core.JsonToken;
import nl.wldelft.util.PeriodPeriodConsumer;
import nl.wldelft.util.PeriodConsumerTextUtils;
import nl.wldelft.util.PropertiesTimeUnit;
import nl.wldelft.util.PropertiesConsumerio.BackupServerUrlConsumer;
import nl.wldelft.util.io.TextUtilsServerParser;
import nl.wldelft.util.io.ServerParserXmlParser;
import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import nl.wldelft.util.timeseries.TimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import javajavax.xml.iostream.IOExceptionXMLInputFactory;
import javajavax.xml.iostream.InputStreamXMLStreamConstants;
import javajavax.xml.netstream.ConnectExceptionXMLStreamReader;
import java.netio.HttpURLConnectionIOException;
import java.netio.URLInputStream;
import java.utilnet.DateHttpURLConnection;
import java.utilnet.HashMapURL;
import java.util.MapLocale;

/**
 * DDSCTime series parser for developedthe bydataware HKVservices. See https://api.ddsc.nl/api/v1/timeseries
 *
 * The Upgradedservice tocan supportbe v2called by Deltares. See See https://api.ddsc.nl/api/v2/timeseries
 * <p>as follows:
 *
 * http://servername:portnumber/rest/read/data/<point ID>/<start-date>/<end-date>/<stationname>
 *
 * ServiceThe callsexact theURL timeseriesused serviceto usingrequest the followingattached URL,file where the uuid defines the unique timeserie. Using properties a mapping has to be made from parameter,location to uuid. :
 * <p>
 * https://ddsc.lizard.net/api/v2/timeseries/cc0a8831-7758-4dd6-999c-f607c026d176/?start=1421816000000&end=1454595236646&format=json
 * <p>
 * The response will contain a json file with events in the following format.
 * <p>
 * events: [
 {
 timestamp: 1449366444000,
 max: 1013.6,
 min: 1013.6
 },
 {
 timestamp: 1449367344000,
 max: 1013.58,
 min: 1013.58
 },

 The Importer currently uses the max value to store a value.
 */
public class DdscTimeSeriesServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer, PropertiesConsumer {

    private static final Logger log = Logger.getLogger(DdscTimeSeriesServerParser.class);
    public static final String DATETIME = "timestamp";
    public static final String VALUE = "max";
    public static final int MISSING_VALUE = -999;
    public static final String EVENTS = "events";
    private TimeSeriesHeader[] headers = null;
    private Period period = null;
    private final Map uuidMap = new HashMap<>();

    @SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"})
    @Override
    public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
        this.headers = timeSeriesHeaders;
    }

    @Override
    public void setPeriod(Period period) {
        this.period = period;
    }

    @Override
    public void parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws IOException {
        if (period == Period.ANY_TIME) throw new IllegalArgumentException("contentHandler.getWantedPeriod() == Period.ANY_TIME");
        Date startDate = period.getStartDate();
        Date endDate = period.getEndDate();

        String periodQuery = "?start=" + startDate.getTime() + "&end=" + endDate.getTime();
        for (int i = 0; i < headers.length; i++) {
            TimeSeriesHeader header = headers[i];
            try {
                DefaultTimeSeriesHeader defaultTimeSeriesHeader = new DefaultTimeSeriesHeader();
                defaultTimeSeriesHeader.setLocationId(header.getLocationId());
                defaultTimeSeriesHeader.setParameterId(header.getParameterId());
                contentHandler.addMissingValue(MISSING_VALUE);
                contentHandler.createTimeSeriesHeaderAlias(i, defaultTimeSeriesHeader);
                contentHandler.setTimeSeriesHeader(i);
                String key = header.getParameterId() + "," + header.getLocationId();
                String uuid = (String) uuidMap.get(key);
                if (uuid == null) {
                    log.warn("No uuid mapping found for '" + key + "'");
                    continue;
                }
                //noinspection StringConcatenationMissingWhitespace
                URL timeSeriesUrl = new URL(url.toExternalForm() + uuid + "/" + periodQuery +
                        "&format=json");

                log.info("Processing timeseries url: " + timeSeriesUrl);

                try (InputStream timeSeriesInputStream = openHttpInputStream(timeSeriesUrl.toExternalForm(), username, password)) {
                    if (timeSeriesInputStream != null) {
                        parseEvents(contentHandler, timeSeriesInputStream);
                    }
                }
            } catch (ConnectException e) {
                log.warn("Can not connect to " + url + ' ' + e.getMessage(), e);
  is http://chaddataware01:10003/rest/read/data/19839/2014-11-01T18:00/2014-11-02T03:00/1ha.  Depending on the data point we want to access, the server name (chaddataware01), port (10003), and/or the point ID (19839) will change
 *
 */
public class DataWareTimeSeriesParser implements ServerParser<TimeSeriesContentHandler>, XmlParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer, BackupServerUrlConsumer {

    public static final String ARRAY_OF_DATA_ELEMENT = "ArrayOfData";
    public static final String DATA_ELEMENT = "Data";
    public static final String KEY_ELEMENT = "Key";
    public static final String TIME_ELEMENT = "Time";
    public static final String VALUE_ELEMENT = "Value";
    public static final String QUALITY_ELEMENT = "Quality";

    private TimeSeriesHeader[] headers = null;
    private final DefaultTimeSeriesHeader header = new DefaultTimeSeriesHeader();
    private Period period = null;
    private FastDateFormat gmtDateFormat = null;
    private FastDateFormat gmtTimeFormat = null;
    private static final Logger log = LogManager.getLogger();
    private int timeoutMillis = 0;

    XMLStreamReader reader = null;
    @Override
    public void parse(XMLStreamReader reader, String virtualFileName, TimeSeriesContentHandler contentHandler) throws Exception {

        this.reader = reader;

        contentHandler.addMissingValueRange(-999.9999f, -999f);
        reader.require(XMLStreamConstants.START_DOCUMENT, null, null);
        XmlStreamReaderUtils.goTo(reader, ARRAY_OF_DATA_ELEMENT);
        do {
            // go return;
to the beginning of the array. But don't go past the end } catch (Exception e) {array. That's the break condition.
            XmlStreamReaderUtils.goTo(reader, DATA_ELEMENT, ARRAY_OF_DATA_ELEMENT);
  log.warn("Can not parse data for " + header + ' ' + e.getMessageif (TextUtils.equals(reader.getLocalName(), e)ARRAY_OF_DATA_ELEMENT)) break;
            }
        }XmlStreamReaderUtils.goTo(reader, KEY_ELEMENT);

    }

    private static InputStream openHttpInputStream(String restUrl, String username, String password) throws Exception {
     // parameter was already set. We could do a sanity check here...
    URL url = new URL(restUrl);
    // todo check if HttpURLConnectionwe connectionneed =to (HttpURLConnection) url.openConnection();
        // just want to do an HTTP GET here
compare the element text with the header parameter that was set.
            // assumption  connection.setRequestMethod("GET");
        // give it 15 seconds to respond
is that the param name in the resturl should be leading.
             connection.setReadTimeout(15 * 1000);
// header.setParameterId(reader.getElementText());

           connection XmlStreamReaderUtils.setRequestPropertygoTo("username"reader, username);
TIME_ELEMENT);
            connection.setRequestProperty("password", passwordcontentHandler.setTime(contentHandler.getDefaultTimeZone(),"yyyy-MM-dd HH:mm:ss.SSS", reader.getElementText());
        connection.connect();
        return connection.getInputStream(XmlStreamReaderUtils.goTo(reader, VALUE_ELEMENT);
     }

    static void parseEvents(TimeSeriesContentHandler contentHandler.setValue('.', InputStream inputStream) throws IOException {
reader.getElementText());
           if XmlStreamReaderUtils.goTo(inputStream != null) {
    reader, QUALITY_ELEMENT);
        try (JsonParser jsonParser = new JsonFactorycontentHandler.setFlag()reader.createParsergetElementText(inputStream)) {;
            contentHandler.setTimeSeriesHeader(header);
    boolean startParsingEvents = false;
     contentHandler.applyCurrentFields();
        }
    for (JsonToken token; (token =while jsonParser(reader.nextTokenhasNext() );
 != null; ) {}

    private void parseXmlStream(TimeSeriesContentHandler contentHandler, InputStream inputStream) throws Exception {
        ifXMLInputFactory (tokenf == JsonToken.FIELD_NAME) {
XMLInputFactory.newInstance();
        f.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
        f.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
        XMLStreamReader  String fieldNamereader = jsonParserf.getCurrentNamecreateXMLStreamReader(inputStream);
                        if (TextUtils.equals(EVENTS, fieldName)) {parse(reader, "dummy.xml", contentHandler);
    }

    @Override
    public void parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws Exception {
    startParsingEvents = true; // The events object was found.
    if (headers.length <= 0) {
            throw new IllegalStateException("No wanted parameters and locations found in the   continueimport configuration.");
        }
        if (period ==      }null) {
            throw new IllegalStateException("No period defined    }for import!");
         }
        gmtDateFormat =  if (startParsingEvents) {
FastDateFormat.getInstance("yyyy-MM-dd", contentHandler.getDefaultTimeZone(), Locale.US, gmtDateFormat);
        gmtTimeFormat = FastDateFormat.getInstance("HH:mm", contentHandler.getDefaultTimeZone(), Locale.US, gmtTimeFormat);

        for (TimeSeriesHeader timeSeriesHeader :  processJsonEvents(contentHandler, jsonParser);headers) {
            String restUrl = null;
         startParsingEvents = false;
 try {
                header.setLocationId(timeSeriesHeader.getLocationId());
  }
                }header.setParameterId(timeSeriesHeader.getParameterId());

            }
    // URL looks as }
    }


    private static void processJsonEvents(TimeSeriesContentHandler contentHandler, JsonParser jsonParser) throws IOException {
follows: http://chaddataware01:10003/rest/read/data/19839/2014-11-01T18:00/2014-11-02T03:00/1ha
                float value = MISSING_VALUE;
        String date = null;
// http://servername:portnumber/rest/read/data/location/startdate/enddate/parameter
              while (jsonParser.nextToken() != JsonToken.END_ARRAY) { // process all events until the end of the events array.
 String periodQuery = "/*/*";
                if (period != Period.ANY_TIME) periodQuery  String fieldName = jsonParser.getCurrentName();
            if (VALUE.equals(fieldName)) {
= '/' + gmtDateFormat.format(period.getStartTime()) + 'T' + gmtTimeFormat.format(period.getStartTime()) + '/' + gmtDateFormat.format(period.getEndTime()) + 'T' + gmtTimeFormat.format(period.getEndTime());
                 jsonParser.nextToken();//noinspection StringConcatenationMissingWhitespace
                valuerestUrl = url + jsonParserheader.getFloatValuegetParameterId();
 + periodQuery + '/'        } else if (DATETIME.equals(fieldName)) {+ header.getLocationId();
                jsonParserlog.nextToken(info("Processing " + restUrl);
                dateif (timeoutMillis == jsonParser.getText();0) {
            }
            timeoutMillis = 10 * (int) TimeUnit.SECOND_MILLIS;
        if (date != null && value != MISSING_VALUE) {}
                contentHandler.setTime(Long.parseLong(date)HttpURLConnection urlConnection = getHttpURLConnection(new URL(restUrl), timeoutMillis);
                parseXmlStream(contentHandler, urlConnection.setValuegetInputStream(value));
            } catch (IOException ioe) {
       contentHandler.applyCurrentFields();
             log.warn("Error while processing value" = MISSING_VALUE+ restUrl);
            }
    date   = null;}
    }

    private static HttpURLConnection getHttpURLConnection(URL url, }
int connectionTimeout) throws IOException {
    }
    }


HttpURLConnection connection =  /**(HttpURLConnection) url.openConnection();
     *  Properties are used to map parameter location combinations to uuid. connection.setConnectTimeout(connectionTimeout);
        connection.setReadTimeout(connectionTimeout);
     * <properties>
   connection.setDoInput(true);
      * <string key="param1,location1" value="uuid1"/> return connection;
    }

 * <string key="param1,location2" value="uuid2"/> @Override
    public * <string key="param2,location1" value="uuid3"/>void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
     * </properties>
  if (timeSeriesHeaders  */
    @Override
== null) throw new IllegalArgumentException("timeSeriesHeaders == null");
     public  void setProperties(Properties properties) { this.headers = timeSeriesHeaders;
    }

    for@Override
 (int i = 0;public i < properties.size(); i++void setPeriod(Period period) {
            String key = properties.getKey(i)this.period = period;
    }

    @Override
    Stringpublic value = properties.getString(i);void setBackupServerUrls(URL[] backupUrls) {
        // currently not  //noinspection ResultOfMethodCallIgnoredsupported for DataWare parsers..
    }

    @Override
    uuidMap.put(key, value);
public void setConnectionTimeout(int timeoutMillis) {
        this.timeoutMillis = }timeoutMillis;

    }


}