package nl.wldelft.fews.system.plugin.dataImport;

import nl.wldelft.util.io.DatabaseParser;
import nl.wldelft.util.sql.SqlUtils;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import nl.wldelft.util.timeseries.TimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesHeadersConsumer;
import org.apache.log4j.Logger;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.sql.ResultSetMetaData;
import java.io.IOException;


/**
 * Each TMX time series is contained in a single table. The table contains 3 columns;
 * <p/>
 * Column 1: Status (byte)
 * Column 2: TimeStamp (date/time)
 * Column 3: Value (single)
 * <p/>
 * Each table contains a unique parameter/location combination. In other words in the Id Mapping the location id is
 * and parameter id is identical (see also WISKI import).
 * <p/>
 * When importing, consider only those tables included in the IdMapping used. There are additional tables in the
 * database. These should be ignored. If defined to be read, then an error can be generated indicating the format
 * of requested table to read is wrong.
 * <p/>
 * The Status column can be translated using the flag mapping functionality.
 * <p/>
 * Note: The time at midnight is sometimes offset by a few seconds. This may then not be imported.
 * The import module can apply the tolerance functionality to import this to the cardinal time step.
 * <p/>
 * Example (columns truncated)
 * <p/>
 * Loc063Di18
 * --------------------------------
 * Status  TimeStamp         Value
 * --------------------------------
 * 1       7-4-2005 0:00:04  0
 * 1       7-4-2005 0:15:00  0
 * 1       7-4-2005 0:30:00  0
 * 1       7-4-2005 0:45:00  0
 * 1       7-4-2005 1:00:00  0
 * 1       7-4-2005 1:15:00  0
 * 1       7-4-2005 1:30:00  0
 * 1       7-4-2005 1:45:00  0
 * 1       7-4-2005 2:00:00  0
 * 1       7-4-2005 2:15:00  0
 * 1       7-4-2005 2:30:00  0
 * 1       7-4-2005 2:45:00  0
 * <p/>
 * <p/>
 * Please see also http://wiki/confluence/display/FEWS/Tmx%2C+Time+Series for more documentation.
 */

public class TmxTimeSeriesParser implements DatabaseParser<TimeSeriesContentHandler>, TimeSeriesHeadersConsumer {
    private static final Logger log = Logger.getLogger(TmxTimeSeriesParser.class);

    private TimeSeriesHeader[] headers = null;
    private TimeSeriesContentHandler contentHandler = null;
    private Connection connection = null;

    @SuppressWarnings({"AssignmentToCollectionOrArrayFieldFromParameter"})
    @Override
    public void setTimeSeriesHeaders(TimeSeriesHeader[] timeSeriesHeaders) {
        this.headers = timeSeriesHeaders;
    }

    @Override
    public void parse(Connection connection, TimeSeriesContentHandler contentHandler) throws Exception {
        this.connection = connection;
        this.contentHandler = contentHandler;

        for (TimeSeriesHeader header : headers) {
            try {
                parse(header);
            } catch (Exception e) {
                String msg = "TimeSeriesImport.Tmx: time series parameter = " + header.getParameterId()
                        + ", location: " + header.getLocationId()
                        + " can not be imported, table name = " + getTableName(header) + '\n' + e.getMessage();

                log.info(msg, log.isDebugEnabled() ? e : null);
            }
        }
    }

    private String getSql(TimeSeriesHeader header) {
        String tableName = getTableName(header);
        if (!SqlUtils.isTableExists(connection, tableName)) return null;

        boolean digital = header.getQualifierCount() > 0;
        if (digital) return "SELECT ReportDate, "  + getValueColumnName(header) + " FROM " + tableName + " WHERE Channel=? AND LocCode=?";

        return "SELECT TimeStamp, " + getValueColumnName(header) + ", Status FROM " + tableName;
    }

    private static String getTableName(TimeSeriesHeader header) {
        if (header.getQualifierCount() == 0) return header.getLocationId() + header.getParameterId();
        String qualifier = header.getQualifierId(0);
        if (qualifier.equalsIgnoreCase("ReportDi_Open"))
            return "ReportDi";
        if (qualifier.equalsIgnoreCase("ReportDi_Closed"))
            return "ReportDi";
        return qualifier;
    }

    private static String getValueColumnName(TimeSeriesHeader header) {
        if (header.getQualifierCount() == 0) return "Value";
        String qualifier = header.getQualifierId(0);
        if (qualifier.equalsIgnoreCase("ReportDi_Open")) return "MinutesOpen";
        if (qualifier.equalsIgnoreCase("ReportDi_Closed")) return "MinutesClosed";
        return "ActualValue";
    }

    private void parse(TimeSeriesHeader header) throws SQLException, IOException {
        String sql = getSql(header);
        if (sql == null) return;

        contentHandler.setNewTimeSeriesHeader(header);
        assert !contentHandler.isCurrentTimeSeriesHeaderForAllTimesRejected();

        if (log.isDebugEnabled()) log.debug("Parse " + sql);

        PreparedStatement statement = connection.prepareStatement(sql);
        try {
            boolean digital = header.getQualifierCount() > 0;
            if (digital) {
                statement.setString(1, header.getParameterId());
                statement.setString(2, header.getLocationId());
                if (log.isDebugEnabled()) log.debug("par=" + header.getParameterId() + " loc=" + header.getLocationId());
            }
            ResultSet resultSet = statement.executeQuery();
            try {
                parseResultSet(resultSet);
            } finally {
                resultSet.close();
            }
        } finally {
            statement.close();
        }
    }

    private void parseResultSet(ResultSet resultSet) throws SQLException, IOException {
        long timeZoneOffset = contentHandler.getDefaultTimeZone().getRawOffset();
        ResultSetMetaData metaData = resultSet.getMetaData();
        int type = metaData.getColumnType(2);
        boolean hasFlag = metaData.getColumnCount() == 3;
        boolean bool = type == Types.BOOLEAN || type == Types.BIT;
        while (resultSet.next()) {
            contentHandler.setTime(resultSet.getTimestamp(1).getTime() - timeZoneOffset);
            if (contentHandler.isCurrentTimeSeriesHeaderForCurrentTimeRejected()) continue;
            contentHandler.setFlag(hasFlag ? resultSet.getByte(3) : 0);
            if (bool) {
                contentHandler.setValue(resultSet.getBoolean(2) ? -1f : 0f);
            } else {
                contentHandler.setValue(resultSet.getFloat(2));
            }
            contentHandler.applyCurrentFields();
        }
    }
}
  • No labels