package nl.wldelft.aquarius.timeseriesparsers; import nl.wldelft.aquarius.CsvDataSetParser; import nl.wldelft.aquarius.CsvTimeSeriesDataParser; import nl.wldelft.aquarius.DataSetContentHandler; import nl.wldelft.aquarius.util.DateTimeUtils; import nl.wldelft.util.Period; import nl.wldelft.util.PeriodConsumer; import nl.wldelft.util.Properties; import nl.wldelft.util.PropertiesConsumer; import nl.wldelft.util.io.LineReader; import nl.wldelft.util.io.ServerParser; import nl.wldelft.util.timeseries.TimeSeriesContentHandler; import nl.wldelft.util.timeseries.TimeSeriesHeader; import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer; import javax.naming.AuthenticationException; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.StringReader; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; import java.util.HashSet; import java.util.TimeZone; /** * Created by IntelliJ IDEA. * User: rooij_e * Date: 2/25/13 * Time: 4:20 PM * * This class imports timeseries data from an Aquarius REST server */ public class AquariusTimeSeriesRestServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer, PropertiesConsumer { /* period can be given to filter data */ private Period period = null; private TimeSeriesHeader[] headers = null; /** * Loop over the locations when retrieving the DataSetList values. If false then all datasets will be retrieve * for all locations in one single call. */ private boolean loopOverLocations = false; /** * Pre configured value for the PublishView. (Optional) */ private String publishView = null; /** Option to skip the DataSetList call. This is only possible if the location ids of the timeseries headers * already contain the dataId values * **/ private boolean skipDataSetList = false; private String subscriptionKey; private TimeZone defaultTimeZone = TimeZone.getTimeZone("GMT"); @Override public void parse(URL url, String username, String password, TimeSeriesContentHandler timeSeriesContentHandler) throws Exception { if (headers == null || headers.length == 0) { throw new Exception("No wanted parameters and locations found. Configure these in the id map of" + " your import."); } if (period == null) { throw new Exception("No time period defined. Configure this in your import."); } if (timeSeriesContentHandler.getDefaultTimeZone() != null) { defaultTimeZone = timeSeriesContentHandler.getDefaultTimeZone(); } String requestUrl = subscriptionKey == null ? String.format("%s/GetAuthToken?user=%s&encPwd=%s", url, username, password) : url.toString(); String authToken = null; if (subscriptionKey == null) { try { authToken = httpGet(requestUrl, null); } catch (Exception e) { throw new AuthenticationException("Error getting authentication code: " + e.getMessage()); } } if (skipDataSetList){ //location id has already been mapped to datasetid for (TimeSeriesHeader header : headers) { String dataSetId = header.getLocationId(); Period dataSetPeriod = period; retrieveTimeSeriesData(timeSeriesContentHandler, url, authToken, header, dataSetId, dataSetPeriod); } } else { //get dataset id for each header. DataSetContentHandler contentHandler = new DataSetContentHandler(period, headers); if (loopOverLocations){ HashSet<String> processedLocationIds = new HashSet<>(); //in case of very large databases it might be better to retrieve data per location instead of //all data in one go. for (TimeSeriesHeader header : headers) { String locationId = header.getLocationId(); if (!processedLocationIds.add(locationId)) continue; requestUrl = String.format("%s/GetDataSetsList?locationId=%s", url, locationId); String csvDataSet = httpGet(requestUrl, authToken); CsvDataSetParser parser = new CsvDataSetParser(); parser.parse(new LineReader(new StringReader(csvDataSet), "dataset.csv"), contentHandler); } } else { requestUrl = String.format("%s/GetDataSetsList", url); String csvDataSet = httpGet(requestUrl, authToken); CsvDataSetParser parser = new CsvDataSetParser(); parser.parse(new LineReader(new StringReader(csvDataSet), "dataset.csv"), contentHandler); } int dataSetCount = contentHandler.getDataSetCount(); for (int i = 0; i < dataSetCount; i++) { contentHandler.setDataSetIndex(i); if (!contentHandler.isDataSetAvailable()) continue; String dataSetId = contentHandler.getDataSetId(); Period dataSetPeriod = contentHandler.getDataSetPeriod(); TimeSeriesHeader timeSeriesHeader = contentHandler.getTimeSeriesHeader(); retrieveTimeSeriesData(timeSeriesContentHandler, url, authToken, timeSeriesHeader, dataSetId, dataSetPeriod); } } } private String httpGet(String requestUrl, String authToken) throws IOException { URL url; try { url = new URL(requestUrl); } catch (MalformedURLException e) { throw new IOException(String.format("Invalid request URL %s: %s", requestUrl, e.getMessage())); } HttpURLConnection conn = (HttpURLConnection) url.openConnection(); if (authToken != null){ conn.addRequestProperty("AQAuthToken", authToken); } if (subscriptionKey != null){ conn.setRequestProperty("Ocp-Apim-Subscription-Key", subscriptionKey); } int responseCode = conn.getResponseCode(); if (responseCode != 200) { throw new IOException(String.format("Invalid response code %d: %s.", responseCode, conn.getResponseMessage())); } // Buffer the result into a string BufferedReader rd = new BufferedReader( new InputStreamReader(conn.getInputStream())); StringBuilder sb = new StringBuilder(); String line; boolean first = true; while ((line = rd.readLine()) != null) { if (!first){ sb.append('\n'); } sb.append(line); first = false; } rd.close(); conn.disconnect(); return sb.toString(); } private void retrieveTimeSeriesData(TimeSeriesContentHandler timeSeriesContentHandler, URL baseUrl, String authToken, TimeSeriesHeader timeSeriesHeader, String dataSetId, Period dataSetPeriod) throws Exception { /** Set destination header for returned data **/ timeSeriesContentHandler.setTimeSeriesHeader(timeSeriesHeader); if (timeSeriesContentHandler.isCurrentTimeSeriesHeaderForAllTimesRejected()) { //this should never be the case otherwise why request header. throw new IllegalArgumentException("Requested header is not valid! " + timeSeriesHeader.toString()); } /** Read data for given dataset id **/ String startTime = DateTimeUtils.formatDate(dataSetPeriod.getStartDate(), defaultTimeZone); String endTime = DateTimeUtils.formatDate(dataSetPeriod.getEndDate(), defaultTimeZone); String requestUrl = String.format("%s/GetTimeSeriesData?dataId=%s&view=%s&queryFrom=%s&queryTo=%s", baseUrl, dataSetId, publishView, startTime, endTime); String csvTimeSeriesData = httpGet(requestUrl, authToken); CsvTimeSeriesDataParser csvTimeSeriesParser = new CsvTimeSeriesDataParser(); LineReader lineReader = new LineReader(new StringReader(csvTimeSeriesData), "timeseries.csv"); try { csvTimeSeriesParser.parse(lineReader, "timeseries.csv", timeSeriesContentHandler); } catch (IOException e) { throw new IOException(e.getMessage() + csvTimeSeriesData); } } @Override public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) { this.headers = timeSeriesHeaders; } @Override public void setPeriod(Period period) { this.period = period; } @Override public void setProperties(Properties properties) { loopOverLocations = properties.getBool("LoopOverLocations", false); publishView = properties.getString("PublishView", null); skipDataSetList = properties.getBool("SkipDataSetList", false); subscriptionKey = properties.getString("SubscriptionKey", null); } }