package nl.wldelft.timeseriesparsers;

import nl.wldelft.util.FastDateFormat;
import nl.wldelft.util.Period;
import nl.wldelft.util.PeriodConsumer;
import nl.wldelft.util.TextUtils;
import nl.wldelft.util.TimeUnit;
import nl.wldelft.util.io.BackupServerUrlConsumer;
import nl.wldelft.util.io.ServerParser;
import nl.wldelft.util.io.XmlParser;
import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import nl.wldelft.util.timeseries.TimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamReader;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Locale;

/**
 * Time series parser for the dataware services.
 *
 * The service can be called as follows:
 *
 * http://servername:portnumber/rest/read/data/<point ID>/<start-date>/<end-date>/<stationname>
 *
 * The exact URL used to request the attached file is http://chaddataware01:10003/rest/read/data/19839/2014-11-01T18:00/2014-11-02T03:00/1ha.  Depending on the data point we want to access, the server name (chaddataware01), port (10003), and/or the point ID (19839) will change
 *
 */
public class DataWareTimeSeriesParser implements ServerParser<TimeSeriesContentHandler>, XmlParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer, BackupServerUrlConsumer {

    public static final String ARRAY_OF_DATA_ELEMENT = "ArrayOfData";
    public static final String DATA_ELEMENT = "Data";
    public static final String KEY_ELEMENT = "Key";
    public static final String TIME_ELEMENT = "Time";
    public static final String VALUE_ELEMENT = "Value";
    public static final String QUALITY_ELEMENT = "Quality";

    private TimeSeriesHeader[] headers = null;
    private final DefaultTimeSeriesHeader header = new DefaultTimeSeriesHeader();
    private Period period = null;
    private FastDateFormat gmtDateFormat = null;
    private FastDateFormat gmtTimeFormat = null;
    private static final Logger log = LogManager.getLogger();
    private int timeoutMillis = 0;

    XMLStreamReader reader = null;
    @Override
    public void parse(XMLStreamReader reader, String virtualFileName, TimeSeriesContentHandler contentHandler) throws Exception {

        this.reader = reader;

        contentHandler.addMissingValueRange(-999.9999f, -999f);
        reader.require(XMLStreamConstants.START_DOCUMENT, null, null);
        XmlStreamReaderUtils.goTo(reader, ARRAY_OF_DATA_ELEMENT);
        do {
            // go to the beginning of the array. But don't go past the end array. That's the break condition.
            XmlStreamReaderUtils.goTo(reader, DATA_ELEMENT, ARRAY_OF_DATA_ELEMENT);
            if (TextUtils.equals(reader.getLocalName(), ARRAY_OF_DATA_ELEMENT)) break;
            XmlStreamReaderUtils.goTo(reader, KEY_ELEMENT);

            // parameter was already set. We could do a sanity check here...
            // todo check if we need to compare the element text with the header parameter that was set.
            // assumption is that the param name in the resturl should be leading.
            // header.setParameterId(reader.getElementText());

            XmlStreamReaderUtils.goTo(reader, TIME_ELEMENT);
            contentHandler.setTime(contentHandler.getDefaultTimeZone(),"yyyy-MM-dd HH:mm:ss.SSS", reader.getElementText());
            XmlStreamReaderUtils.goTo(reader, VALUE_ELEMENT);
            contentHandler.setValue('.', reader.getElementText());
            XmlStreamReaderUtils.goTo(reader, QUALITY_ELEMENT);
            contentHandler.setFlag(reader.getElementText());
            contentHandler.setTimeSeriesHeader(header);
            contentHandler.applyCurrentFields();
        }
        while (reader.hasNext() );
    }

    private void parseXmlStream(TimeSeriesContentHandler contentHandler, InputStream inputStream) throws Exception {
        XMLInputFactory f = XMLInputFactory.newInstance();
        f.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
        f.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
        XMLStreamReader reader = f.createXMLStreamReader(inputStream);
        parse(reader, "dummy.xml", contentHandler);
    }

    @Override
    public void parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws Exception {
        if (headers.length <= 0) {
            throw new IllegalStateException("No wanted parameters and locations found in the import configuration.");
        }
        if (period == null) {
            throw new IllegalStateException("No period defined for import!");
        }
        gmtDateFormat = FastDateFormat.getInstance("yyyy-MM-dd", contentHandler.getDefaultTimeZone(), Locale.US, gmtDateFormat);
        gmtTimeFormat = FastDateFormat.getInstance("HH:mm", contentHandler.getDefaultTimeZone(), Locale.US, gmtTimeFormat);

        for (TimeSeriesHeader timeSeriesHeader : headers) {
            String restUrl = null;
            try {
                header.setLocationId(timeSeriesHeader.getLocationId());
                header.setParameterId(timeSeriesHeader.getParameterId());

                // URL looks as follows: http://chaddataware01:10003/rest/read/data/19839/2014-11-01T18:00/2014-11-02T03:00/1ha
                // http://servername:portnumber/rest/read/data/location/startdate/enddate/parameter
                String periodQuery = "/*/*";
                if (period != Period.ANY_TIME) periodQuery = '/' + gmtDateFormat.format(period.getStartTime()) + 'T' + gmtTimeFormat.format(period.getStartTime()) + '/' + gmtDateFormat.format(period.getEndTime()) + 'T' + gmtTimeFormat.format(period.getEndTime());
                //noinspection StringConcatenationMissingWhitespace
                restUrl = url + header.getParameterId() + periodQuery + '/' + header.getLocationId();
                log.info("Processing " + restUrl);
                if (timeoutMillis == 0) {
                    timeoutMillis = 10 * (int) TimeUnit.SECOND_MILLIS;
                }
                HttpURLConnection urlConnection = getHttpURLConnection(new URL(restUrl), timeoutMillis);
                parseXmlStream(contentHandler, urlConnection.getInputStream());
            } catch (IOException ioe) {
                    log.warn("Error while processing " + restUrl);
            }
        }
    }

    private static HttpURLConnection getHttpURLConnection(URL url, int connectionTimeout) throws IOException {
        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        connection.setConnectTimeout(connectionTimeout);
        connection.setReadTimeout(connectionTimeout);
        connection.setDoInput(true);
        return connection;
    }

    @Override
    public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
        if (timeSeriesHeaders == null) throw new IllegalArgumentException("timeSeriesHeaders == null");
        this.headers = timeSeriesHeaders;
    }

    @Override
    public void setPeriod(Period period) {
        this.period = period;
    }

    @Override
    public void setBackupServerUrls(URL[] backupUrls) {
        // currently not supported for DataWare parsers..
    }

    @Override
    public void setConnectionTimeout(int timeoutMillis) {
        this.timeoutMillis = timeoutMillis;
    }
}

  • No labels