package nl.wldelft.aquarius.timeseriesparsers;
import nl.wldelft.aquarius.CsvDataSetParser;
import nl.wldelft.aquarius.CsvTimeSeriesDataParser;
import nl.wldelft.aquarius.DataSetContentHandler;
import nl.wldelft.aquarius.util.DateTimeUtils;
import nl.wldelft.util.Period;
import nl.wldelft.util.PeriodConsumer;
import nl.wldelft.util.Properties;
import nl.wldelft.util.PropertiesConsumer;
import nl.wldelft.util.io.LineReader;
import nl.wldelft.util.io.ServerParser;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import nl.wldelft.util.timeseries.TimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer;
import javax.naming.AuthenticationException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashSet;
import java.util.TimeZone;
/**
* Created by IntelliJ IDEA.
* User: rooij_e
* Date: 2/25/13
* Time: 4:20 PM
*
* This class imports timeseries data from an Aquarius REST server
*/
public class AquariusTimeSeriesRestServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer, PropertiesConsumer {
/* period can be given to filter data */
private Period period = null;
private TimeSeriesHeader[] headers = null;
/**
* Loop over the locations when retrieving the DataSetList values. If false then all datasets will be retrieve
* for all locations in one single call.
*/
private boolean loopOverLocations = false;
/**
* Pre configured value for the PublishView. (Optional)
*/
private String publishView = null;
/** Option to skip the DataSetList call. This is only possible if the location ids of the timeseries headers
* already contain the dataId values
* **/
private boolean skipDataSetList = false;
private String subscriptionKey;
private TimeZone defaultTimeZone = TimeZone.getTimeZone("GMT");
@Override
public void parse(URL url, String username, String password, TimeSeriesContentHandler timeSeriesContentHandler) throws Exception {
if (headers == null || headers.length == 0) {
throw new Exception("No wanted parameters and locations found. Configure these in the id map of"
+ " your import.");
}
if (period == null) {
throw new Exception("No time period defined. Configure this in your import.");
}
if (timeSeriesContentHandler.getDefaultTimeZone() != null) {
defaultTimeZone = timeSeriesContentHandler.getDefaultTimeZone();
}
String requestUrl = subscriptionKey == null ? String.format("%s/GetAuthToken?user=%s&encPwd=%s", url, username, password) : url.toString();
String authToken = null;
if (subscriptionKey == null) {
try {
authToken = httpGet(requestUrl, null);
} catch (Exception e) {
throw new AuthenticationException("Error getting authentication code: " + e.getMessage());
}
}
if (skipDataSetList){
//location id has already been mapped to datasetid
for (TimeSeriesHeader header : headers) {
String dataSetId = header.getLocationId();
Period dataSetPeriod = period;
retrieveTimeSeriesData(timeSeriesContentHandler, url, authToken, header, dataSetId, dataSetPeriod);
}
} else {
//get dataset id for each header.
DataSetContentHandler contentHandler = new DataSetContentHandler(period, headers);
if (loopOverLocations){
HashSet<String> processedLocationIds = new HashSet<>();
//in case of very large databases it might be better to retrieve data per location instead of
//all data in one go.
for (TimeSeriesHeader header : headers) {
String locationId = header.getLocationId();
if (!processedLocationIds.add(locationId)) continue;
requestUrl = String.format("%s/GetDataSetsList?locationId=%s", url, locationId);
String csvDataSet = httpGet(requestUrl, authToken);
CsvDataSetParser parser = new CsvDataSetParser();
parser.parse(new LineReader(new StringReader(csvDataSet), "dataset.csv"), contentHandler);
}
} else {
requestUrl = String.format("%s/GetDataSetsList", url);
String csvDataSet = httpGet(requestUrl, authToken);
CsvDataSetParser parser = new CsvDataSetParser();
parser.parse(new LineReader(new StringReader(csvDataSet), "dataset.csv"), contentHandler);
}
int dataSetCount = contentHandler.getDataSetCount();
for (int i = 0; i < dataSetCount; i++) {
contentHandler.setDataSetIndex(i);
if (!contentHandler.isDataSetAvailable()) continue;
String dataSetId = contentHandler.getDataSetId();
Period dataSetPeriod = contentHandler.getDataSetPeriod();
TimeSeriesHeader timeSeriesHeader = contentHandler.getTimeSeriesHeader();
retrieveTimeSeriesData(timeSeriesContentHandler, url, authToken, timeSeriesHeader, dataSetId, dataSetPeriod);
}
}
}
private String httpGet(String requestUrl, String authToken) throws IOException {
URL url;
try {
url = new URL(requestUrl);
} catch (MalformedURLException e) {
throw new IOException(String.format("Invalid request URL %s: %s", requestUrl, e.getMessage()));
}
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
if (authToken != null){
conn.addRequestProperty("AQAuthToken", authToken);
}
if (subscriptionKey != null){
conn.setRequestProperty("Ocp-Apim-Subscription-Key", subscriptionKey);
}
int responseCode = conn.getResponseCode();
if (responseCode != 200) {
throw new IOException(String.format("Invalid response code %d: %s.", responseCode, conn.getResponseMessage()));
}
// Buffer the result into a string
BufferedReader rd = new BufferedReader(
new InputStreamReader(conn.getInputStream()));
StringBuilder sb = new StringBuilder();
String line;
boolean first = true;
while ((line = rd.readLine()) != null) {
if (!first){
sb.append('\n');
}
sb.append(line);
first = false;
}
rd.close();
conn.disconnect();
return sb.toString();
}
private void retrieveTimeSeriesData(TimeSeriesContentHandler timeSeriesContentHandler, URL baseUrl, String authToken,
TimeSeriesHeader timeSeriesHeader, String dataSetId, Period dataSetPeriod) throws Exception {
/** Set destination header for returned data **/
timeSeriesContentHandler.setTimeSeriesHeader(timeSeriesHeader);
if (timeSeriesContentHandler.isCurrentTimeSeriesHeaderForAllTimesRejected()) {
//this should never be the case otherwise why request header.
throw new IllegalArgumentException("Requested header is not valid! " + timeSeriesHeader.toString());
}
/** Read data for given dataset id **/
String startTime = DateTimeUtils.formatDate(dataSetPeriod.getStartDate(), defaultTimeZone);
String endTime = DateTimeUtils.formatDate(dataSetPeriod.getEndDate(), defaultTimeZone);
String requestUrl = String.format("%s/GetTimeSeriesData?dataId=%s&view=%s&queryFrom=%s&queryTo=%s",
baseUrl, dataSetId, publishView, startTime, endTime);
String csvTimeSeriesData = httpGet(requestUrl, authToken);
CsvTimeSeriesDataParser csvTimeSeriesParser = new CsvTimeSeriesDataParser();
LineReader lineReader = new LineReader(new StringReader(csvTimeSeriesData), "timeseries.csv");
try {
csvTimeSeriesParser.parse(lineReader, "timeseries.csv", timeSeriesContentHandler);
} catch (IOException e) {
throw new IOException(e.getMessage() + csvTimeSeriesData);
}
}
@Override
public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
this.headers = timeSeriesHeaders;
}
@Override
public void setPeriod(Period period) {
this.period = period;
}
@Override
public void setProperties(Properties properties) {
loopOverLocations = properties.getBool("LoopOverLocations", false);
publishView = properties.getString("PublishView", null);
skipDataSetList = properties.getBool("SkipDataSetList", false);
subscriptionKey = properties.getString("SubscriptionKey", null);
}
}
|