package nl.wldelft.lib.hymos.transferdb;
/**
* Compatible with hymos transferdb database
* This database can be filled with data from every other datasource by defining external connections and sql queries
* to get the data from the external database.
*
*/
import nl.wldelft.util.ByteArrayUtils;
import nl.wldelft.util.FloatArrayUtils;
import nl.wldelft.util.NumberType;
import nl.wldelft.util.Period;
import nl.wldelft.util.ShortArrayUtils;
import nl.wldelft.util.TextUtils;
import nl.wldelft.util.coverage.Geometry;
import nl.wldelft.util.io.AsciiGridReader;
import nl.wldelft.util.io.DatabaseParser;
import nl.wldelft.util.io.MosaicGridFileReader;
import nl.wldelft.util.io.UnsyncBufferedInputStream;
import nl.wldelft.util.sql.SqlUtils;
import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import org.apache.log4j.Logger;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import java.util.zip.GZIPInputStream;
public class HymosTransferDbTimeSeriesParser implements DatabaseParser<TimeSeriesContentHandler> {
private static final Logger log = Logger.getLogger(HymosTransferDbTimeSeriesParser.class);
private Connection connection = null;
private TimeSeriesContentHandler handler = null;
private Map<String, String> unitMap = null;
private long timeZoneOffset = 0L;
private float[] values = FloatArrayUtils.EMPTY_ARRAY;
private byte[] byteBuffer = ByteArrayUtils.EMPTY_ARRAY;
private short[] shortBuffer = ShortArrayUtils.EMPTY_ARRAY;
private static class SeriesInfo {
int seriesNr = 0;
String locationID = null;
String dataType = null;
String unit = null;
String tableName = null;
TimeZone timeZone = null;
float missVal = -999F;
float traceVal = 0.0F;
boolean forecast = false;
@Override
public String toString() {
return seriesNr + ":" + dataType + ':' + locationID + ':' + tableName;
}
}
public HymosTransferDbTimeSeriesParser() {
}
@Override
public void parse(Connection connection, TimeSeriesContentHandler contentHandler) throws Exception {
this.connection = connection;
this.timeZoneOffset = handler.getDefaultTimeZone().getRawOffset();
unitMap = readUnitMap();
SeriesInfo[] seriesInfos = readSeriesInfos();
for (int i = 0; i < seriesInfos.length; i++) {
SeriesInfo seriesInfo = seriesInfos[i];
try {
read(seriesInfo);
} catch (Exception e) {
log.error("Reading time serie failed " + seriesInfo, e);
}
}
}
@SuppressWarnings({"OverlyLongMethod"})
private SeriesInfo[] readSeriesInfos() throws SQLException {
ArrayList<SeriesInfo> list = new ArrayList<SeriesInfo>();
Statement statement = connection.createStatement();
try {
ResultSet resultSet = statement.executeQuery("SELECT * FROM Series");
try {
int seriesNrColumnIndex = resultSet.findColumn("ID");
int realStatColumnIndex;
try {
realStatColumnIndex = resultSet.findColumn("REALSTAT");
} catch (SQLException e) {
realStatColumnIndex = resultSet.findColumn("LocationId");
}
int dataTypeColumnIndex;
try {
dataTypeColumnIndex = resultSet.findColumn("DATATYPE");
} catch (SQLException e) {
dataTypeColumnIndex = resultSet.findColumn("ParameterId");
}
int tableNameColumnIndex = resultSet.findColumn("TABLENAME");
int missValColumnIndex;
try {
missValColumnIndex = resultSet.findColumn("MISSVAL");
} catch (SQLException e) {
missValColumnIndex = resultSet.findColumn("MissingValue");
}
int traceValColumnIndex;
try {
traceValColumnIndex = resultSet.findColumn("TRACEVAL");
} catch (SQLException e) {
traceValColumnIndex = -1;
}
int forecastColumnIndex;
try {
forecastColumnIndex = resultSet.findColumn("FORECAST");
} catch (SQLException e) {
forecastColumnIndex = -1;
}
while (resultSet.next()) {
SeriesInfo seriesInfo = new SeriesInfo();
seriesInfo.seriesNr = resultSet.getInt(seriesNrColumnIndex);
if (log.isDebugEnabled()) log.debug("Parse series info for " + seriesInfo.seriesNr);
seriesInfo.locationID
= resultSet.getString(realStatColumnIndex).trim();
seriesInfo.dataType
= resultSet.getString(dataTypeColumnIndex).trim();
seriesInfo.unit = unitMap.get(seriesInfo.dataType);
// if (seriesInfo.unit == null)
// logger.log(Priority.WARN, "Can not find datatype/parameter in table parameters " + seriesInfo.dataType);
seriesInfo.tableName
= resultSet.getString(tableNameColumnIndex).trim();
if (seriesInfo.tableName == null) {
throw new SQLException("Table name is not specified for series:" + seriesInfo.seriesNr);
}
seriesInfo.missVal = resultSet.getFloat(missValColumnIndex);
if (resultSet.wasNull()) seriesInfo.missVal = Float.NaN;
if (traceValColumnIndex != -1) {
seriesInfo.traceVal = resultSet.getFloat(traceValColumnIndex);
if (resultSet.wasNull()) seriesInfo.traceVal = 0;
}
if (forecastColumnIndex != -1) {
seriesInfo.forecast = resultSet.getBoolean(forecastColumnIndex);
}
if (log.isDebugEnabled()) log.debug("series info parsed " + seriesInfo);
list.add(seriesInfo);
}
} finally {
resultSet.close();
}
} finally {
statement.close();
}
return list.toArray(new SeriesInfo[list.size()]);
}
private Map<String, String> readUnitMap() throws SQLException {
Map<String, String> res = new HashMap<String, String>();
Statement statement = connection.createStatement();
try {
ResultSet resultSet = statement.executeQuery("SELECT * FROM Parameter");
try {
int idColumnIndex = resultSet.findColumn("ID");
int unitColumnIndex = resultSet.findColumn("UNIT");
while (resultSet.next()) {
String parId = resultSet.getString(idColumnIndex);
String unit = resultSet.getString(unitColumnIndex);
res.put(parId, unit);
}
} finally {
resultSet.close();
}
} finally {
statement.close();
}
return res;
}
@SuppressWarnings({"OverlyLongMethod"})
private void read(SeriesInfo seriesInfo) throws IOException, SQLException {
log.info("Start reading table " + seriesInfo.tableName + " for " + seriesInfo.locationID + " and " + seriesInfo.dataType);
int rowCount = 0;
int missingValueCount = 0;
int traceValueCount = 0;
float minValue = Float.POSITIVE_INFINITY;
float maxValue = Float.NEGATIVE_INFINITY;
long minTime = Long.MAX_VALUE;
long maxTime = Long.MIN_VALUE;
boolean hasLabel = hasLabel(seriesInfo);
String labelSql = hasLabel ? ", LABEL" : "";
String sql;
if (seriesInfo.forecast) {
sql = "SELECT FORECASTDATE, VALUEDATE, FORECASTVALUE" + labelSql + " FROM [" + seriesInfo.tableName + ']';
} else {
sql = "SELECT MEASDATE, MEASVALUE" + labelSql + " FROM [" + seriesInfo.tableName + ']';
}
int forecastDateColumn = seriesInfo.forecast ? 1 : -1;
int valueDateColumn = seriesInfo.forecast ? 2 : 1;
int valueColumn = seriesInfo.forecast ? 3 : 2;
int labelColumn = seriesInfo.forecast ? 4 : 3;
Statement statement = connection.createStatement();
try {
ResultSet rows = statement.executeQuery(sql);
int columnType = rows.getMetaData().getColumnType(valueColumn);
boolean binary = SqlUtils.isBinaryColumn(columnType);
try {
DefaultTimeSeriesHeader header = null;
while (rows.next()) {
rowCount++;
long time = rows.getTimestamp(valueDateColumn).getTime() - timeZoneOffset;
handler.setTime(time);
if (time > maxTime) maxTime = time;
if (time < minTime) minTime = time;
long forecastTime = seriesInfo.forecast
? rows.getTimestamp(forecastDateColumn).getTime() - timeZoneOffset : Long.MIN_VALUE;
if (header == null || header.getForecastTime() != forecastTime) {
header = new DefaultTimeSeriesHeader();
header.setLocationId(seriesInfo.locationID);
header.setParameterId(seriesInfo.dataType);
header.setUnit(seriesInfo.unit);
header.setForecastTime(forecastTime);
handler.setTimeSeriesHeader(header);
if (handler.isCurrentTimeSeriesHeaderForAllTimesRejected()) {
log.info("Table skipped " + seriesInfo.tableName + " for " + seriesInfo.locationID + " and " + seriesInfo.dataType);
return;
}
}
if (handler.isCurrentTimeSeriesHeaderForCurrentTimeRejected()) continue;
if (hasLabel)
handler.setFlag(rows.getInt(labelColumn));
if (binary) {
byte[] bytes = rows.getBytes(valueColumn);
try {
InputStream inputStream;
try {
inputStream = new UnsyncBufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes), 100000), 100000);
} catch (IOException e) {
inputStream = new ByteArrayInputStream(bytes);
}
boolean asciiGrid = isAsciiGrid(inputStream);
if (asciiGrid) {
AsciiGridReader reader = new AsciiGridReader(inputStream, "hymostransferdb", handler.getDefaultGeoDatum());
try {
Geometry geometry = reader.getGeometry();
if (values.length != geometry.size()) {
values = new float[geometry.size()];
byteBuffer = new byte[geometry.size() * NumberType.INT16_SIZE];
shortBuffer = new short[geometry.size()];
}
reader.readCoverage(values);
handler.setGeometry(geometry);
handler.setCoverageValues(values);
handler.applyCurrentFields();
} finally {
reader.close();
}
} else {
MosaicGridFileReader reader = new MosaicGridFileReader(inputStream);
try {
Geometry geometry = reader.getGridGeometry();
if (values.length != geometry.size()) {
values = new float[geometry.size()];
byteBuffer = new byte[geometry.size() * NumberType.INT16_SIZE];
shortBuffer = new short[geometry.size()];
}
reader.read(values, byteBuffer, shortBuffer);
handler.setGeometry(geometry);
handler.setValueResolution(reader.getValueResolution());
handler.setCoverageValues(values);
handler.applyCurrentFields();
} finally {
reader.close();
}
}
} catch (Exception e) {
log.warn("HymosTransferDbPare: Can not read grid " + header);
}
} else {
float value = rows.getFloat(valueColumn);
if (value == seriesInfo.missVal) {
missingValueCount++;
value = Float.NaN;
}
// everyvalue that is about -999 is also recognised as missing value
// hack for taiwan
if (value > -1000 && value <= -998.99) {
missingValueCount++;
value = Float.NaN;
}
if (value == seriesInfo.traceVal) {
traceValueCount++;
value = 0;
}
if (!Float.isNaN(value)) {
if (value < minValue) minValue = value;
if (value > maxValue) maxValue = value;
}
handler.setValue(value);
handler.applyCurrentFields();
}
rowCount++;
}
} finally {
rows.close();
}
} finally {
statement.close();
}
if (rowCount == 0) {
log.info("No values found");
} else {
Period period = new Period(minTime, maxTime);
log.info("Period: " + period);
log.info("Row count: " + rowCount);
log.info("Missing value count: " + missingValueCount);
log.info("Trace value count: " + traceValueCount);
}
}
private static boolean isAsciiGrid(InputStream inputStream) {
inputStream.mark(100);
try {
try {
InputStreamReader reader = new InputStreamReader(inputStream);
char[] chars = new char[99];
reader.read(chars);
String header = new String(chars);
return TextUtils.containsIgoreCase(header, "cols");
} finally {
inputStream.reset();
}
} catch (Exception e) {
return false;
}
}
private boolean hasLabel(SeriesInfo seriesInfo) throws SQLException {
Statement statement = connection.createStatement();
try {
statement.setMaxRows(1);
ResultSet resultSet = statement.executeQuery("SELECT * FROM [" + seriesInfo.tableName + ']');
try {
return SqlUtils.columnExistsIgnoreCase(resultSet.getMetaData(), "LABEL");
} finally {
resultSet.close();
}
} finally {
statement.close();
}
}
}
|