package nl.wldelft.lib.hymos.transferdb; /** * Compatible with hymos transferdb database * This database can be filled with data from every other datasource by defining external connections and sql queries * to get the data from the external database. * */ import nl.wldelft.util.ByteArrayUtils; import nl.wldelft.util.FloatArrayUtils; import nl.wldelft.util.NumberType; import nl.wldelft.util.Period; import nl.wldelft.util.ShortArrayUtils; import nl.wldelft.util.TextUtils; import nl.wldelft.util.coverage.Geometry; import nl.wldelft.util.io.AsciiGridReader; import nl.wldelft.util.io.DatabaseParser; import nl.wldelft.util.io.MosaicGridFileReader; import nl.wldelft.util.io.UnsyncBufferedInputStream; import nl.wldelft.util.sql.SqlUtils; import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader; import nl.wldelft.util.timeseries.TimeSeriesContentHandler; import org.apache.log4j.Logger; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.TimeZone; import java.util.zip.GZIPInputStream; public class HymosTransferDbTimeSeriesParser implements DatabaseParser<TimeSeriesContentHandler> { private static final Logger log = Logger.getLogger(HymosTransferDbTimeSeriesParser.class); private Connection connection = null; private TimeSeriesContentHandler handler = null; private Map<String, String> unitMap = null; private long timeZoneOffset = 0L; private float[] values = FloatArrayUtils.EMPTY_ARRAY; private byte[] byteBuffer = ByteArrayUtils.EMPTY_ARRAY; private short[] shortBuffer = ShortArrayUtils.EMPTY_ARRAY; private static class SeriesInfo { int seriesNr = 0; String locationID = null; String dataType = null; String unit = null; String tableName = null; TimeZone timeZone = null; float missVal = -999F; float traceVal = 0.0F; boolean forecast = false; @Override public String toString() { return seriesNr + ":" + dataType + ':' + locationID + ':' + tableName; } } public HymosTransferDbTimeSeriesParser() { } @Override public void parse(Connection connection, TimeSeriesContentHandler contentHandler) throws Exception { this.connection = connection; this.timeZoneOffset = handler.getDefaultTimeZone().getRawOffset(); unitMap = readUnitMap(); SeriesInfo[] seriesInfos = readSeriesInfos(); for (int i = 0; i < seriesInfos.length; i++) { SeriesInfo seriesInfo = seriesInfos[i]; try { read(seriesInfo); } catch (Exception e) { log.error("Reading time serie failed " + seriesInfo, e); } } } @SuppressWarnings({"OverlyLongMethod"}) private SeriesInfo[] readSeriesInfos() throws SQLException { ArrayList<SeriesInfo> list = new ArrayList<SeriesInfo>(); Statement statement = connection.createStatement(); try { ResultSet resultSet = statement.executeQuery("SELECT * FROM Series"); try { int seriesNrColumnIndex = resultSet.findColumn("ID"); int realStatColumnIndex; try { realStatColumnIndex = resultSet.findColumn("REALSTAT"); } catch (SQLException e) { realStatColumnIndex = resultSet.findColumn("LocationId"); } int dataTypeColumnIndex; try { dataTypeColumnIndex = resultSet.findColumn("DATATYPE"); } catch (SQLException e) { dataTypeColumnIndex = resultSet.findColumn("ParameterId"); } int tableNameColumnIndex = resultSet.findColumn("TABLENAME"); int missValColumnIndex; try { missValColumnIndex = resultSet.findColumn("MISSVAL"); } catch (SQLException e) { missValColumnIndex = resultSet.findColumn("MissingValue"); } int traceValColumnIndex; try { traceValColumnIndex = resultSet.findColumn("TRACEVAL"); } catch (SQLException e) { traceValColumnIndex = -1; } int forecastColumnIndex; try { forecastColumnIndex = resultSet.findColumn("FORECAST"); } catch (SQLException e) { forecastColumnIndex = -1; } while (resultSet.next()) { SeriesInfo seriesInfo = new SeriesInfo(); seriesInfo.seriesNr = resultSet.getInt(seriesNrColumnIndex); if (log.isDebugEnabled()) log.debug("Parse series info for " + seriesInfo.seriesNr); seriesInfo.locationID = resultSet.getString(realStatColumnIndex).trim(); seriesInfo.dataType = resultSet.getString(dataTypeColumnIndex).trim(); seriesInfo.unit = unitMap.get(seriesInfo.dataType); // if (seriesInfo.unit == null) // logger.log(Priority.WARN, "Can not find datatype/parameter in table parameters " + seriesInfo.dataType); seriesInfo.tableName = resultSet.getString(tableNameColumnIndex).trim(); if (seriesInfo.tableName == null) { throw new SQLException("Table name is not specified for series:" + seriesInfo.seriesNr); } seriesInfo.missVal = resultSet.getFloat(missValColumnIndex); if (resultSet.wasNull()) seriesInfo.missVal = Float.NaN; if (traceValColumnIndex != -1) { seriesInfo.traceVal = resultSet.getFloat(traceValColumnIndex); if (resultSet.wasNull()) seriesInfo.traceVal = 0; } if (forecastColumnIndex != -1) { seriesInfo.forecast = resultSet.getBoolean(forecastColumnIndex); } if (log.isDebugEnabled()) log.debug("series info parsed " + seriesInfo); list.add(seriesInfo); } } finally { resultSet.close(); } } finally { statement.close(); } return list.toArray(new SeriesInfo[list.size()]); } private Map<String, String> readUnitMap() throws SQLException { Map<String, String> res = new HashMap<String, String>(); Statement statement = connection.createStatement(); try { ResultSet resultSet = statement.executeQuery("SELECT * FROM Parameter"); try { int idColumnIndex = resultSet.findColumn("ID"); int unitColumnIndex = resultSet.findColumn("UNIT"); while (resultSet.next()) { String parId = resultSet.getString(idColumnIndex); String unit = resultSet.getString(unitColumnIndex); res.put(parId, unit); } } finally { resultSet.close(); } } finally { statement.close(); } return res; } @SuppressWarnings({"OverlyLongMethod"}) private void read(SeriesInfo seriesInfo) throws IOException, SQLException { log.info("Start reading table " + seriesInfo.tableName + " for " + seriesInfo.locationID + " and " + seriesInfo.dataType); int rowCount = 0; int missingValueCount = 0; int traceValueCount = 0; float minValue = Float.POSITIVE_INFINITY; float maxValue = Float.NEGATIVE_INFINITY; long minTime = Long.MAX_VALUE; long maxTime = Long.MIN_VALUE; boolean hasLabel = hasLabel(seriesInfo); String labelSql = hasLabel ? ", LABEL" : ""; String sql; if (seriesInfo.forecast) { sql = "SELECT FORECASTDATE, VALUEDATE, FORECASTVALUE" + labelSql + " FROM [" + seriesInfo.tableName + ']'; } else { sql = "SELECT MEASDATE, MEASVALUE" + labelSql + " FROM [" + seriesInfo.tableName + ']'; } int forecastDateColumn = seriesInfo.forecast ? 1 : -1; int valueDateColumn = seriesInfo.forecast ? 2 : 1; int valueColumn = seriesInfo.forecast ? 3 : 2; int labelColumn = seriesInfo.forecast ? 4 : 3; Statement statement = connection.createStatement(); try { ResultSet rows = statement.executeQuery(sql); int columnType = rows.getMetaData().getColumnType(valueColumn); boolean binary = SqlUtils.isBinaryColumn(columnType); try { DefaultTimeSeriesHeader header = null; while (rows.next()) { rowCount++; long time = rows.getTimestamp(valueDateColumn).getTime() - timeZoneOffset; handler.setTime(time); if (time > maxTime) maxTime = time; if (time < minTime) minTime = time; long forecastTime = seriesInfo.forecast ? rows.getTimestamp(forecastDateColumn).getTime() - timeZoneOffset : Long.MIN_VALUE; if (header == null || header.getForecastTime() != forecastTime) { header = new DefaultTimeSeriesHeader(); header.setLocationId(seriesInfo.locationID); header.setParameterId(seriesInfo.dataType); header.setUnit(seriesInfo.unit); header.setForecastTime(forecastTime); handler.setTimeSeriesHeader(header); if (handler.isCurrentTimeSeriesHeaderForAllTimesRejected()) { log.info("Table skipped " + seriesInfo.tableName + " for " + seriesInfo.locationID + " and " + seriesInfo.dataType); return; } } if (handler.isCurrentTimeSeriesHeaderForCurrentTimeRejected()) continue; if (hasLabel) handler.setFlag(rows.getInt(labelColumn)); if (binary) { byte[] bytes = rows.getBytes(valueColumn); try { InputStream inputStream; try { inputStream = new UnsyncBufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes), 100000), 100000); } catch (IOException e) { inputStream = new ByteArrayInputStream(bytes); } boolean asciiGrid = isAsciiGrid(inputStream); if (asciiGrid) { AsciiGridReader reader = new AsciiGridReader(inputStream, "hymostransferdb", handler.getDefaultGeoDatum()); try { Geometry geometry = reader.getGeometry(); if (values.length != geometry.size()) { values = new float[geometry.size()]; byteBuffer = new byte[geometry.size() * NumberType.INT16_SIZE]; shortBuffer = new short[geometry.size()]; } reader.readCoverage(values); handler.setGeometry(geometry); handler.setCoverageValues(values); handler.applyCurrentFields(); } finally { reader.close(); } } else { MosaicGridFileReader reader = new MosaicGridFileReader(inputStream); try { Geometry geometry = reader.getGridGeometry(); if (values.length != geometry.size()) { values = new float[geometry.size()]; byteBuffer = new byte[geometry.size() * NumberType.INT16_SIZE]; shortBuffer = new short[geometry.size()]; } reader.read(values, byteBuffer, shortBuffer); handler.setGeometry(geometry); handler.setValueResolution(reader.getValueResolution()); handler.setCoverageValues(values); handler.applyCurrentFields(); } finally { reader.close(); } } } catch (Exception e) { log.warn("HymosTransferDbPare: Can not read grid " + header); } } else { float value = rows.getFloat(valueColumn); if (value == seriesInfo.missVal) { missingValueCount++; value = Float.NaN; } // everyvalue that is about -999 is also recognised as missing value // hack for taiwan if (value > -1000 && value <= -998.99) { missingValueCount++; value = Float.NaN; } if (value == seriesInfo.traceVal) { traceValueCount++; value = 0; } if (!Float.isNaN(value)) { if (value < minValue) minValue = value; if (value > maxValue) maxValue = value; } handler.setValue(value); handler.applyCurrentFields(); } rowCount++; } } finally { rows.close(); } } finally { statement.close(); } if (rowCount == 0) { log.info("No values found"); } else { Period period = new Period(minTime, maxTime); log.info("Period: " + period); log.info("Row count: " + rowCount); log.info("Missing value count: " + missingValueCount); log.info("Trace value count: " + traceValueCount); } } private static boolean isAsciiGrid(InputStream inputStream) { inputStream.mark(100); try { try { InputStreamReader reader = new InputStreamReader(inputStream); char[] chars = new char[99]; reader.read(chars); String header = new String(chars); return TextUtils.containsIgoreCase(header, "cols"); } finally { inputStream.reset(); } } catch (Exception e) { return false; } } private boolean hasLabel(SeriesInfo seriesInfo) throws SQLException { Statement statement = connection.createStatement(); try { statement.setMaxRows(1); ResultSet resultSet = statement.executeQuery("SELECT * FROM [" + seriesInfo.tableName + ']'); try { return SqlUtils.columnExistsIgnoreCase(resultSet.getMetaData(), "LABEL"); } finally { resultSet.close(); } } finally { statement.close(); } } }