package nl.wldelft.aquarius.timeseriesparsers;

import nl.wldelft.aquarius.CsvDataSetParser;
import nl.wldelft.aquarius.CsvTimeSeriesDataParser;
import nl.wldelft.aquarius.DataSetContentHandler;
import nl.wldelft.aquarius.util.DateTimeUtils;
import nl.wldelft.util.Period;
import nl.wldelft.util.PeriodConsumer;
import nl.wldelft.util.Properties;
import nl.wldelft.util.PropertiesConsumer;
import nl.wldelft.util.io.LineReader;
import nl.wldelft.util.io.ServerParser;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import nl.wldelft.util.timeseries.TimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer;

import javax.naming.AuthenticationException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashSet;
import java.util.TimeZone;

/**
 * Created by IntelliJ IDEA.
 * User: rooij_e
 * Date: 2/25/13
 * Time: 4:20 PM
 *
 * This class imports timeseries data from an Aquarius REST server
 */
public class AquariusTimeSeriesRestServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer, PropertiesConsumer {

    /* period can be given to filter data */
    private Period period = null;

    private TimeSeriesHeader[] headers = null;

    /**
     * Loop over the locations when retrieving the DataSetList values. If false then all datasets will be retrieve
     * for all locations in one single call.
     */
    private boolean loopOverLocations = false;
    /**
     * Pre configured value for the PublishView. (Optional)
     */
    private String publishView = null;
    /** Option to skip the DataSetList call. This is only possible if the location ids of the timeseries headers
     *  already contain the dataId values
     *  **/
    private boolean skipDataSetList = false;
    private String subscriptionKey;

    private TimeZone defaultTimeZone = TimeZone.getTimeZone("GMT");


    @Override
    public void parse(URL url, String username, String password, TimeSeriesContentHandler timeSeriesContentHandler) throws Exception {

        if (headers == null || headers.length == 0) {
            throw new Exception("No wanted parameters and locations found. Configure these in the id map of"
                    + " your import.");
        }

        if (period == null) {
            throw new Exception("No time period defined. Configure this in your import.");
        }
        if (timeSeriesContentHandler.getDefaultTimeZone() != null) {
            defaultTimeZone = timeSeriesContentHandler.getDefaultTimeZone();
        }

        String requestUrl = subscriptionKey == null ? String.format("%s/GetAuthToken?user=%s&encPwd=%s", url, username, password) : url.toString();
        String authToken = null;
        if (subscriptionKey == null) {
            try {
                authToken = httpGet(requestUrl, null);
            } catch (Exception e) {
                throw new AuthenticationException("Error getting authentication code: " + e.getMessage());
            }
        }

        if (skipDataSetList){
            //location id has already been mapped to datasetid
            for (TimeSeriesHeader header : headers) {
                String dataSetId = header.getLocationId();
                Period dataSetPeriod = period;
                retrieveTimeSeriesData(timeSeriesContentHandler, url, authToken, header, dataSetId, dataSetPeriod);
            }
        } else {
            //get dataset id for each header.
            DataSetContentHandler contentHandler = new DataSetContentHandler(period, headers);
            if (loopOverLocations){
                HashSet<String> processedLocationIds = new HashSet<>();
                //in case of very large databases it might be better to retrieve data per location instead of
                //all data in one go.
                for (TimeSeriesHeader header : headers) {
                    String locationId = header.getLocationId();
                    if (!processedLocationIds.add(locationId)) continue;
                    requestUrl = String.format("%s/GetDataSetsList?locationId=%s", url, locationId);
                    String csvDataSet = httpGet(requestUrl, authToken);
                    CsvDataSetParser parser = new CsvDataSetParser();
                    parser.parse(new LineReader(new StringReader(csvDataSet), "dataset.csv"), contentHandler);
                }
            } else {
                requestUrl = String.format("%s/GetDataSetsList", url);
                String csvDataSet = httpGet(requestUrl, authToken);
                CsvDataSetParser parser = new CsvDataSetParser();
                parser.parse(new LineReader(new StringReader(csvDataSet), "dataset.csv"), contentHandler);
            }

            int dataSetCount = contentHandler.getDataSetCount();
            for (int i = 0; i < dataSetCount; i++) {
                contentHandler.setDataSetIndex(i);
                if (!contentHandler.isDataSetAvailable()) continue;
                String dataSetId = contentHandler.getDataSetId();
                Period dataSetPeriod = contentHandler.getDataSetPeriod();
                TimeSeriesHeader timeSeriesHeader = contentHandler.getTimeSeriesHeader();
                retrieveTimeSeriesData(timeSeriesContentHandler, url, authToken, timeSeriesHeader, dataSetId, dataSetPeriod);
            }

        }

    }

    private String httpGet(String requestUrl, String authToken) throws IOException {
        URL url;
        try {
            url = new URL(requestUrl);
        } catch (MalformedURLException e) {
            throw new IOException(String.format("Invalid request URL %s: %s", requestUrl, e.getMessage()));
        }

        HttpURLConnection conn = (HttpURLConnection) url.openConnection();

        if (authToken != null){
            conn.addRequestProperty("AQAuthToken", authToken);
        }
        if (subscriptionKey != null){
            conn.setRequestProperty("Ocp-Apim-Subscription-Key", subscriptionKey);
        }

        int responseCode = conn.getResponseCode();
        if (responseCode != 200) {
            throw new IOException(String.format("Invalid response code %d: %s.", responseCode, conn.getResponseMessage()));
        }

        // Buffer the result into a string
        BufferedReader rd = new BufferedReader(
                new InputStreamReader(conn.getInputStream()));
        StringBuilder sb = new StringBuilder();
        String line;
        boolean first = true;
        while ((line = rd.readLine()) != null) {
            if (!first){
                sb.append('\n');
            }
            sb.append(line);
            first = false;
        }
        rd.close();

        conn.disconnect();
        return sb.toString();
    }

    private void retrieveTimeSeriesData(TimeSeriesContentHandler timeSeriesContentHandler, URL baseUrl, String authToken,
                                        TimeSeriesHeader timeSeriesHeader, String dataSetId, Period dataSetPeriod) throws Exception {

        /** Set destination header for returned data **/
        timeSeriesContentHandler.setTimeSeriesHeader(timeSeriesHeader);
        if (timeSeriesContentHandler.isCurrentTimeSeriesHeaderForAllTimesRejected()) {
            //this should never be the case otherwise why request header.
            throw new IllegalArgumentException("Requested header is not valid! " + timeSeriesHeader.toString());
        }

        /** Read data for given dataset id **/
        String startTime = DateTimeUtils.formatDate(dataSetPeriod.getStartDate(), defaultTimeZone);
        String endTime = DateTimeUtils.formatDate(dataSetPeriod.getEndDate(), defaultTimeZone);
        String requestUrl = String.format("%s/GetTimeSeriesData?dataId=%s&view=%s&queryFrom=%s&queryTo=%s",
                baseUrl, dataSetId, publishView, startTime, endTime);

        String csvTimeSeriesData = httpGet(requestUrl, authToken);

        CsvTimeSeriesDataParser csvTimeSeriesParser = new CsvTimeSeriesDataParser();

        LineReader lineReader = new LineReader(new StringReader(csvTimeSeriesData), "timeseries.csv");
        try {
            csvTimeSeriesParser.parse(lineReader, "timeseries.csv", timeSeriesContentHandler);
        } catch (IOException e) {
            throw new IOException(e.getMessage() + csvTimeSeriesData);
        }

    }


    @Override
    public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
        this.headers = timeSeriesHeaders;
    }

    @Override
    public void setPeriod(Period period) {
        this.period = period;
    }

    @Override
    public void setProperties(Properties properties) {
        loopOverLocations = properties.getBool("LoopOverLocations", false);
        publishView = properties.getString("PublishView", null);
        skipDataSetList = properties.getBool("SkipDataSetList", false);
        subscriptionKey = properties.getString("SubscriptionKey", null);
    }
}



  • No labels