You are viewing an old version of this page. View the current version.
Compare with Current
View Page History
Version 1
Next »
package nl.wldelft.timeseriesparsers;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import nl.wldelft.util.FastDateFormat;
import nl.wldelft.util.Period;
import nl.wldelft.util.PeriodConsumer;
import nl.wldelft.util.io.ServerParser;
import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import nl.wldelft.util.timeseries.TimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer;
import org.apache.http.util.TextUtils;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Date;
import java.util.Locale;
public class DdscTimeSeriesServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer {
private static final Logger log = Logger.getLogger(DdscTimeSeriesServerParser.class);
public static final String EVENTS = "events";
public static final String RESULTS = "results";
public static final String DATETIME = "datetime";
public static final String VALUE = "value";
public static final String FLAG = "flag";
public static final String NEXT = "next";
public static final int MISSING_VALUE = -999;
private FastDateFormat dateFormat = null;
private TimeSeriesHeader[] headers = null;
private Period period = null;
@SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"})
@Override
public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
this.headers = timeSeriesHeaders;
}
@Override
public void setPeriod(Period period) {
this.period = period;
}
@Override
public void parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws IOException {
if (period == Period.ANY_TIME) throw new IllegalArgumentException("contentHandler.getWantedPeriod() == Period.ANY_TIME");
dateFormat = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", contentHandler.getDefaultTimeZone(), Locale.US, dateFormat);
Date startDate = period.getStartDate();
Date endDate = period.getEndDate();
String periodQuery = "?start=" + dateFormat.format(startDate) + "&end=" + dateFormat.format(endDate);
log.debug("periodQuery:" + periodQuery);
for (int i = 0; i < headers.length; i++) {
TimeSeriesHeader header = headers[i];
try {
DefaultTimeSeriesHeader defaultTimeSeriesHeader = new DefaultTimeSeriesHeader();
defaultTimeSeriesHeader.setLocationId(header.getLocationId());
defaultTimeSeriesHeader.setParameterId(header.getParameterId());
contentHandler.addMissingValue(MISSING_VALUE);
contentHandler.createTimeSeriesHeaderAlias(i, defaultTimeSeriesHeader);
contentHandler.setTimeSeriesHeader(i);
URL timeSeriesUrl = new URL(url.toExternalForm() +
"?name=" + header.getLocationId() + "_" + header.getParameterId() +
"&format=json");
log.debug("timeseries url: " + timeSeriesUrl);
try (InputStream timeSeriesInputStream = openHttpInputStream(timeSeriesUrl.toExternalForm(), username, password)) {
if (timeSeriesInputStream != null) {
String events = parseEventsUrlFromTimeseries(timeSeriesInputStream, periodQuery);
log.info("events url: " + events);
while (!TextUtils.isEmpty(events) && !TextUtils.isBlank(events)) {
try (InputStream eventsStream = openHttpInputStream(events,username,password)) {
events = parseEvents(contentHandler, eventsStream);
log.debug("next: " + events);
}
}
}
}
} catch (ConnectException e) {
log.warn("Can not connect to " + url + ' ' + e.getMessage(), e);
return;
} catch (Exception e) {
log.warn("Can not parse data for " + header + ' ' + e.getMessage(), e);
}
}
}
private static InputStream openHttpInputStream(String restUrl, String username, String password) throws Exception {
URL url = new URL(restUrl);
log.debug("url:" + url);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// just want to do an HTTP GET here
connection.setRequestMethod("GET");
// give it 15 seconds to respond
connection.setReadTimeout(15 * 1000);
connection.setRequestProperty("username", username);
connection.setRequestProperty("password", password);
connection.connect();
return connection.getInputStream();
}
static String parseEvents(TimeSeriesContentHandler contentHandler, InputStream inputStream) throws IOException {
String next = null;
if (inputStream != null) {
try (JsonParser jsonParser = new JsonFactory().createParser(inputStream)) {
while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
String fieldName = jsonParser.getCurrentName();
// Lees eventuele next url
if (NEXT.equals(fieldName)) {
jsonParser.nextToken();
if (!jsonParser.getText().equals("null")) {
next = jsonParser.getText();
}
}
if (RESULTS.equals(fieldName)) writeTimeSeriesEvent(contentHandler, jsonParser);
}
}
}
return next;
}
private static void writeTimeSeriesEvent(TimeSeriesContentHandler contentHandler, JsonParser jsonParser) throws IOException {
String flag = null;
float value = MISSING_VALUE;
String date = null;
while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
String fieldName = jsonParser.getCurrentName();
if (FLAG.equals(fieldName)) {
jsonParser.nextToken();
flag = jsonParser.getText();
} else if (VALUE.equals(fieldName)) {
jsonParser.nextToken();
value = jsonParser.getFloatValue();
} else if (DATETIME.equals(fieldName)) {
jsonParser.nextToken();
date = jsonParser.getText();
}
if (flag != null && date != null && value != MISSING_VALUE) {
contentHandler.setTime(contentHandler.getDefaultTimeZone(), "yyyy-MM-dd'T'HH:mm:ss.S", date);
contentHandler.setValue(value);
contentHandler.setFlag(flag);
contentHandler.applyCurrentFields();
flag = null;
value = MISSING_VALUE;
date = null;
}
}
}
static String parseEventsUrlFromTimeseries(InputStream timeSeriesDataInputStream, String periodQuery) throws IOException {
if (timeSeriesDataInputStream != null) {
//noinspection resource caller has to close the input stream.
JsonParser jsonParser = new JsonFactory().createParser(timeSeriesDataInputStream);
while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
String fieldName = jsonParser.getCurrentName();
if (RESULTS.equals(fieldName)) {
while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
fieldName = jsonParser.getCurrentName();
log.debug("fieldname = " + fieldName);
if (EVENTS.equals(fieldName)) {
jsonParser.nextToken();
//noinspection StringConcatenationMissingWhitespace
return jsonParser.getText() + periodQuery + "&format=json";
}
}
}
}
}
return null;
}
}