package nl.wldelft.fews.system.plugin.dataImport; import nl.wldelft.util.io.DatabaseParser; import nl.wldelft.util.sql.SqlUtils; import nl.wldelft.util.timeseries.TimeSeriesContentHandler; import nl.wldelft.util.timeseries.TimeSeriesHeader; import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer; import org.apache.log4j.Logger; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; import java.sql.ResultSetMetaData; import java.io.IOException; /** * Each TMX time series is contained in a single table. The table contains 3 columns; * <p/> * Column 1: Status (byte) * Column 2: TimeStamp (date/time) * Column 3: Value (single) * <p/> * Each table contains a unique parameter/location combination. In other words in the Id Mapping the location id is * and parameter id is identical (see also WISKI import). * <p/> * When importing, consider only those tables included in the IdMapping used. There are additional tables in the * database. These should be ignored. If defined to be read, then an error can be generated indicating the format * of requested table to read is wrong. * <p/> * The Status column can be translated using the flag mapping functionality. * <p/> * Note: The time at midnight is sometimes offset by a few seconds. This may then not be imported. * The import module can apply the tolerance functionality to import this to the cardinal time step. * <p/> * Example (columns truncated) * <p/> * Loc063Di18 * -------------------------------- * Status TimeStamp Value * -------------------------------- * 1 7-4-2005 0:00:04 0 * 1 7-4-2005 0:15:00 0 * 1 7-4-2005 0:30:00 0 * 1 7-4-2005 0:45:00 0 * 1 7-4-2005 1:00:00 0 * 1 7-4-2005 1:15:00 0 * 1 7-4-2005 1:30:00 0 * 1 7-4-2005 1:45:00 0 * 1 7-4-2005 2:00:00 0 * 1 7-4-2005 2:15:00 0 * 1 7-4-2005 2:30:00 0 * 1 7-4-2005 2:45:00 0 * <p/> * <p/> * Please see also http://wiki/confluence/display/FEWS/Tmx%2C+Time+Series for more documentation. */ public class TmxTimeSeriesParser implements DatabaseParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer { private static final Logger log = Logger.getLogger(TmxTimeSeriesParser.class); private TimeSeriesHeader[] headers = null; private TimeSeriesContentHandler contentHandler = null; private Connection connection = null; @SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"}) @Override public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) { this.headers = timeSeriesHeaders; } @Override public void parse(Connection connection, TimeSeriesContentHandler contentHandler) throws Exception { this.connection = connection; this.contentHandler = contentHandler; for (TimeSeriesHeader header : headers) { try { parse(header); } catch (Exception e) { String msg = "TimeSeriesImport.Tmx: time series parameter = " + header.getParameterId() + ", location: " + header.getLocationId() + " can not be imported, table name = " + getTableName(header) + '\n' + e.getMessage(); log.info(msg, log.isDebugEnabled() ? e : null); } } } private String getSql(TimeSeriesHeader header) { String tableName = getTableName(header); if (!SqlUtils.isTableExists(connection, tableName)) return null; boolean digital = header.getQualifierCount() > 0; if (digital) return "SELECT ReportDate, " + getValueColumnName(header) + " FROM " + tableName + " WHERE Channel=? AND LocCode=?"; return "SELECT TimeStamp, " + getValueColumnName(header) + ", Status FROM " + tableName; } private static String getTableName(TimeSeriesHeader header) { if (header.getQualifierCount() == 0) return header.getLocationId() + header.getParameterId(); String qualifier = header.getQualifierId(0); if (qualifier.equalsIgnoreCase("ReportDi_Open")) return "ReportDi"; if (qualifier.equalsIgnoreCase("ReportDi_Closed")) return "ReportDi"; return qualifier; } private static String getValueColumnName(TimeSeriesHeader header) { if (header.getQualifierCount() == 0) return "Value"; String qualifier = header.getQualifierId(0); if (qualifier.equalsIgnoreCase("ReportDi_Open")) return "MinutesOpen"; if (qualifier.equalsIgnoreCase("ReportDi_Closed")) return "MinutesClosed"; return "ActualValue"; } private void parse(TimeSeriesHeader header) throws SQLException, IOException { String sql = getSql(header); if (sql == null) return; contentHandler.setNewTimeSeriesHeader(header); assert !contentHandler.isCurrentTimeSeriesHeaderForAllTimesRejected(); if (log.isDebugEnabled()) log.debug("Parse " + sql); PreparedStatement statement = connection.prepareStatement(sql); try { boolean digital = header.getQualifierCount() > 0; if (digital) { statement.setString(1, header.getParameterId()); statement.setString(2, header.getLocationId()); if (log.isDebugEnabled()) log.debug("par=" + header.getParameterId() + " loc=" + header.getLocationId()); } ResultSet resultSet = statement.executeQuery(); try { parseResultSet(resultSet); } finally { resultSet.close(); } } finally { statement.close(); } } private void parseResultSet(ResultSet resultSet) throws SQLException, IOException { long timeZoneOffset = contentHandler.getDefaultTimeZone().getRawOffset(); ResultSetMetaData metaData = resultSet.getMetaData(); int type = metaData.getColumnType(2); boolean hasFlag = metaData.getColumnCount() == 3; boolean bool = type == Types.BOOLEAN || type == Types.BIT; while (resultSet.next()) { contentHandler.setTime(resultSet.getTimestamp(1).getTime() - timeZoneOffset); if (contentHandler.isCurrentTimeSeriesHeaderForCurrentTimeRejected()) continue; contentHandler.setFlag(hasFlag ? resultSet.getByte(3) : 0); if (bool) { contentHandler.setValue(resultSet.getBoolean(2) ? -1f : 0f); } else { contentHandler.setValue(resultSet.getFloat(2)); } contentHandler.applyCurrentFields(); } } }