You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

package nl.wldelft.timeseriesparsers;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import nl.wldelft.util.FastDateFormat;
import nl.wldelft.util.Period;
import nl.wldelft.util.PeriodConsumer;
import nl.wldelft.util.io.ServerParser;
import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import nl.wldelft.util.timeseries.TimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer;
import org.apache.http.util.TextUtils;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Date;
import java.util.Locale;

public class DdscTimeSeriesServerParser implements ServerParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer, PeriodConsumer {

 private static final Logger log = Logger.getLogger(DdscTimeSeriesServerParser.class);
public static final String EVENTS = "events";
public static final String RESULTS = "results";
public static final String DATETIME = "datetime";
public static final String VALUE = "value";
public static final String FLAG = "flag";
public static final String NEXT = "next";
public static final int MISSING_VALUE = -999;
private FastDateFormat dateFormat = null;
private TimeSeriesHeader[] headers = null;
private Period period = null;

@SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"})
 @Override
 public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
 this.headers = timeSeriesHeaders;
}

 @Override
 public void setPeriod(Period period) {
 this.period = period;
}

 @Override
 public void parse(URL url, String username, String password, TimeSeriesContentHandler contentHandler) throws IOException {
 if (period == Period.ANY_TIME) throw new IllegalArgumentException("contentHandler.getWantedPeriod() == Period.ANY_TIME");

dateFormat = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", contentHandler.getDefaultTimeZone(), Locale.US, dateFormat);
Date startDate = period.getStartDate();
Date endDate = period.getEndDate();

String periodQuery = "?start=" + dateFormat.format(startDate) + "&end=" + dateFormat.format(endDate);
log.debug("periodQuery:" + periodQuery);

for (int i = 0; i < headers.length; i++) {
 TimeSeriesHeader header = headers[i];
try {
 DefaultTimeSeriesHeader defaultTimeSeriesHeader = new DefaultTimeSeriesHeader();
defaultTimeSeriesHeader.setLocationId(header.getLocationId());
defaultTimeSeriesHeader.setParameterId(header.getParameterId());
contentHandler.addMissingValue(MISSING_VALUE);
contentHandler.createTimeSeriesHeaderAlias(i, defaultTimeSeriesHeader);
contentHandler.setTimeSeriesHeader(i);

URL timeSeriesUrl = new URL(url.toExternalForm() +
 "?name=" + header.getLocationId() + "_" + header.getParameterId() +
 "&format=json");
log.debug("timeseries url: " + timeSeriesUrl);

try (InputStream timeSeriesInputStream = openHttpInputStream(timeSeriesUrl.toExternalForm(), username, password)) {
 if (timeSeriesInputStream != null) {
 String events = parseEventsUrlFromTimeseries(timeSeriesInputStream, periodQuery);
log.info("events url: " + events);
while (!TextUtils.isEmpty(events) && !TextUtils.isBlank(events)) {
 try (InputStream eventsStream = openHttpInputStream(events,username,password)) {
 events = parseEvents(contentHandler, eventsStream);
log.debug("next: " + events);
}
}
}
}
} catch (ConnectException e) {
 log.warn("Can not connect to " + url + ' ' + e.getMessage(), e);
return;
} catch (Exception e) {
 log.warn("Can not parse data for " + header + ' ' + e.getMessage(), e);
}
}
}

 private static InputStream openHttpInputStream(String restUrl, String username, String password) throws Exception {
 URL url = new URL(restUrl);
log.debug("url:" + url);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// just want to do an HTTP GET here
 connection.setRequestMethod("GET");
// give it 15 seconds to respond
 connection.setReadTimeout(15 * 1000);
connection.setRequestProperty("username", username);
connection.setRequestProperty("password", password);
connection.connect();
return connection.getInputStream();
}

 static String parseEvents(TimeSeriesContentHandler contentHandler, InputStream inputStream) throws IOException {
 String next = null;
if (inputStream != null) {
 try (JsonParser jsonParser = new JsonFactory().createParser(inputStream)) {
 while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
 String fieldName = jsonParser.getCurrentName();
// Lees eventuele next url
 if (NEXT.equals(fieldName)) {
 jsonParser.nextToken();
if (!jsonParser.getText().equals("null")) {
 next = jsonParser.getText();
}
}
 if (RESULTS.equals(fieldName)) writeTimeSeriesEvent(contentHandler, jsonParser);
}
}
}
 return next;
}

 private static void writeTimeSeriesEvent(TimeSeriesContentHandler contentHandler, JsonParser jsonParser) throws IOException {
 String flag = null;
float value = MISSING_VALUE;
String date = null;

while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
 String fieldName = jsonParser.getCurrentName();
if (FLAG.equals(fieldName)) {
 jsonParser.nextToken();
flag = jsonParser.getText();
} else if (VALUE.equals(fieldName)) {
 jsonParser.nextToken();
value = jsonParser.getFloatValue();
} else if (DATETIME.equals(fieldName)) {
 jsonParser.nextToken();
date = jsonParser.getText();
}
 if (flag != null && date != null && value != MISSING_VALUE) {
 contentHandler.setTime(contentHandler.getDefaultTimeZone(), "yyyy-MM-dd'T'HH:mm:ss.S", date);
contentHandler.setValue(value);
contentHandler.setFlag(flag);
contentHandler.applyCurrentFields();
flag = null;
value = MISSING_VALUE;
date = null;
}
}
}
 static String parseEventsUrlFromTimeseries(InputStream timeSeriesDataInputStream, String periodQuery) throws IOException {
 if (timeSeriesDataInputStream != null) {
 //noinspection resource caller has to close the input stream.
 JsonParser jsonParser = new JsonFactory().createParser(timeSeriesDataInputStream);
while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
 String fieldName = jsonParser.getCurrentName();
if (RESULTS.equals(fieldName)) {
 while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
 fieldName = jsonParser.getCurrentName();
log.debug("fieldname = " + fieldName);
if (EVENTS.equals(fieldName)) {
 jsonParser.nextToken();
//noinspection StringConcatenationMissingWhitespace
 return jsonParser.getText() + periodQuery + "&format=json";
}
}
}
}
}
 return null;
}
}
  • No labels