package nl.wldelft.fews.system.plugin.dataImport;
import nl.wldelft.util.io.DatabaseParser;
import nl.wldelft.util.sql.SqlUtils;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import nl.wldelft.util.timeseries.TimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer;
import org.apache.log4j.Logger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.sql.ResultSetMetaData;
import java.io.IOException;
/**
* Each TMX time series is contained in a single table. The table contains 3 columns;
* <p/>
* Column 1: Status (byte)
* Column 2: TimeStamp (date/time)
* Column 3: Value (single)
* <p/>
* Each table contains a unique parameter/location combination. In other words in the Id Mapping the location id is
* and parameter id is identical (see also WISKI import).
* <p/>
* When importing, consider only those tables included in the IdMapping used. There are additional tables in the
* database. These should be ignored. If defined to be read, then an error can be generated indicating the format
* of requested table to read is wrong.
* <p/>
* The Status column can be translated using the flag mapping functionality.
* <p/>
* Note: The time at midnight is sometimes offset by a few seconds. This may then not be imported.
* The import module can apply the tolerance functionality to import this to the cardinal time step.
* <p/>
* Example (columns truncated)
* <p/>
* Loc063Di18
* --------------------------------
* Status TimeStamp Value
* --------------------------------
* 1 7-4-2005 0:00:04 0
* 1 7-4-2005 0:15:00 0
* 1 7-4-2005 0:30:00 0
* 1 7-4-2005 0:45:00 0
* 1 7-4-2005 1:00:00 0
* 1 7-4-2005 1:15:00 0
* 1 7-4-2005 1:30:00 0
* 1 7-4-2005 1:45:00 0
* 1 7-4-2005 2:00:00 0
* 1 7-4-2005 2:15:00 0
* 1 7-4-2005 2:30:00 0
* 1 7-4-2005 2:45:00 0
* <p/>
* <p/>
* Please see also http://wiki/confluence/display/FEWS/Tmx%2C+Time+Series for more documentation.
*/
public class TmxTimeSeriesParser implements DatabaseParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer {
private static final Logger log = Logger.getLogger(TmxTimeSeriesParser.class);
private TimeSeriesHeader[] headers = null;
private TimeSeriesContentHandler contentHandler = null;
private Connection connection = null;
@SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"})
@Override
public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
this.headers = timeSeriesHeaders;
}
@Override
public void parse(Connection connection, TimeSeriesContentHandler contentHandler) throws Exception {
this.connection = connection;
this.contentHandler = contentHandler;
for (TimeSeriesHeader header : headers) {
try {
parse(header);
} catch (Exception e) {
String msg = "TimeSeriesImport.Tmx: time series parameter = " + header.getParameterId()
+ ", location: " + header.getLocationId()
+ " can not be imported, table name = " + getTableName(header) + '\n' + e.getMessage();
log.info(msg, log.isDebugEnabled() ? e : null);
}
}
}
private String getSql(TimeSeriesHeader header) {
String tableName = getTableName(header);
if (!SqlUtils.isTableExists(connection, tableName)) return null;
boolean digital = header.getQualifierCount() > 0;
if (digital) return "SELECT ReportDate, " + getValueColumnName(header) + " FROM " + tableName + " WHERE Channel=? AND LocCode=?";
return "SELECT TimeStamp, " + getValueColumnName(header) + ", Status FROM " + tableName;
}
private static String getTableName(TimeSeriesHeader header) {
if (header.getQualifierCount() == 0) return header.getLocationId() + header.getParameterId();
String qualifier = header.getQualifierId(0);
if (qualifier.equalsIgnoreCase("ReportDi_Open"))
return "ReportDi";
if (qualifier.equalsIgnoreCase("ReportDi_Closed"))
return "ReportDi";
return qualifier;
}
private static String getValueColumnName(TimeSeriesHeader header) {
if (header.getQualifierCount() == 0) return "Value";
String qualifier = header.getQualifierId(0);
if (qualifier.equalsIgnoreCase("ReportDi_Open")) return "MinutesOpen";
if (qualifier.equalsIgnoreCase("ReportDi_Closed")) return "MinutesClosed";
return "ActualValue";
}
private void parse(TimeSeriesHeader header) throws SQLException, IOException {
String sql = getSql(header);
if (sql == null) return;
contentHandler.setNewTimeSeriesHeader(header);
assert !contentHandler.isCurrentTimeSeriesHeaderForAllTimesRejected();
if (log.isDebugEnabled()) log.debug("Parse " + sql);
PreparedStatement statement = connection.prepareStatement(sql);
try {
boolean digital = header.getQualifierCount() > 0;
if (digital) {
statement.setString(1, header.getParameterId());
statement.setString(2, header.getLocationId());
if (log.isDebugEnabled()) log.debug("par=" + header.getParameterId() + " loc=" + header.getLocationId());
}
ResultSet resultSet = statement.executeQuery();
try {
parseResultSet(resultSet);
} finally {
resultSet.close();
}
} finally {
statement.close();
}
}
private void parseResultSet(ResultSet resultSet) throws SQLException, IOException {
long timeZoneOffset = contentHandler.getDefaultTimeZone().getRawOffset();
ResultSetMetaData metaData = resultSet.getMetaData();
int type = metaData.getColumnType(2);
boolean hasFlag = metaData.getColumnCount() == 3;
boolean bool = type == Types.BOOLEAN || type == Types.BIT;
while (resultSet.next()) {
contentHandler.setTime(resultSet.getTimestamp(1).getTime() - timeZoneOffset);
if (contentHandler.isCurrentTimeSeriesHeaderForCurrentTimeRejected()) continue;
contentHandler.setFlag(hasFlag ? resultSet.getByte(3) : 0);
if (bool) {
contentHandler.setValue(resultSet.getBoolean(2) ? -1f : 0f);
} else {
contentHandler.setValue(resultSet.getFloat(2));
}
contentHandler.applyCurrentFields();
}
}
}