package nl.wldelft.lib.hymos.transferdb;

/**
 * Compatible with hymos transferdb database
 * This database can be filled with data from every other datasource by defining external connections and sql queries
 * to get the data from the external database.
 *
 */


import nl.wldelft.util.ByteArrayUtils;
import nl.wldelft.util.FloatArrayUtils;
import nl.wldelft.util.NumberType;
import nl.wldelft.util.Period;
import nl.wldelft.util.ShortArrayUtils;
import nl.wldelft.util.TextUtils;
import nl.wldelft.util.coverage.Geometry;
import nl.wldelft.util.io.AsciiGridReader;
import nl.wldelft.util.io.DatabaseParser;
import nl.wldelft.util.io.MosaicGridFileReader;
import nl.wldelft.util.io.UnsyncBufferedInputStream;
import nl.wldelft.util.sql.SqlUtils;
import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import org.apache.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import java.util.zip.GZIPInputStream;


public class HymosTransferDbTimeSeriesParser implements DatabaseParser<TimeSeriesContentHandler> {
    private static final Logger log = Logger.getLogger(HymosTransferDbTimeSeriesParser.class);

    private Connection connection = null;
    private TimeSeriesContentHandler handler = null;
    private Map<String, String> unitMap = null;
    private long timeZoneOffset = 0L;

    private float[] values = FloatArrayUtils.EMPTY_ARRAY;
    private byte[] byteBuffer = ByteArrayUtils.EMPTY_ARRAY;
    private short[] shortBuffer = ShortArrayUtils.EMPTY_ARRAY;

    private static class SeriesInfo {
        int seriesNr = 0;
        String locationID = null;
        String dataType = null;
        String unit = null;
        String tableName = null;
        TimeZone timeZone = null;
        float missVal = -999F;
        float traceVal = 0.0F;
        boolean forecast = false;

        @Override
        public String toString() {
            return seriesNr + ":" + dataType + ':' + locationID + ':' + tableName;
        }
    }

    public HymosTransferDbTimeSeriesParser() {
    }

    @Override
    public void parse(Connection connection, TimeSeriesContentHandler contentHandler) throws Exception {
        this.connection = connection;
        this.timeZoneOffset = handler.getDefaultTimeZone().getRawOffset();

        unitMap = readUnitMap();

        SeriesInfo[] seriesInfos = readSeriesInfos();
        for (int i = 0; i < seriesInfos.length; i++) {
            SeriesInfo seriesInfo = seriesInfos[i];
            try {
                read(seriesInfo);
            } catch (Exception e) {
                log.error("Reading time serie failed " + seriesInfo, e);
            }
        }
    }

    @SuppressWarnings({"OverlyLongMethod"})
    private SeriesInfo[] readSeriesInfos() throws SQLException {
        ArrayList<SeriesInfo> list = new ArrayList<SeriesInfo>();
        Statement statement = connection.createStatement();
        try {
            ResultSet resultSet = statement.executeQuery("SELECT * FROM Series");
            try {
                int seriesNrColumnIndex = resultSet.findColumn("ID");
                int realStatColumnIndex;
                try {
                    realStatColumnIndex = resultSet.findColumn("REALSTAT");
                } catch (SQLException e) {
                    realStatColumnIndex = resultSet.findColumn("LocationId");
                }
                int dataTypeColumnIndex;
                try {
                    dataTypeColumnIndex = resultSet.findColumn("DATATYPE");
                } catch (SQLException e) {
                    dataTypeColumnIndex = resultSet.findColumn("ParameterId");
                }
                int tableNameColumnIndex = resultSet.findColumn("TABLENAME");
                int missValColumnIndex;
                try {
                    missValColumnIndex = resultSet.findColumn("MISSVAL");
                } catch (SQLException e) {
                    missValColumnIndex = resultSet.findColumn("MissingValue");
                }


                int traceValColumnIndex;
                try {
                    traceValColumnIndex = resultSet.findColumn("TRACEVAL");
                } catch (SQLException e) {
                    traceValColumnIndex = -1;
                }


                int forecastColumnIndex;
                try {
                    forecastColumnIndex = resultSet.findColumn("FORECAST");
                } catch (SQLException e) {
                    forecastColumnIndex = -1;
                }

                while (resultSet.next()) {
                    SeriesInfo seriesInfo = new SeriesInfo();
                    seriesInfo.seriesNr = resultSet.getInt(seriesNrColumnIndex);
                    if (log.isDebugEnabled()) log.debug("Parse series info for " + seriesInfo.seriesNr);
                    seriesInfo.locationID
                            = resultSet.getString(realStatColumnIndex).trim();

                    seriesInfo.dataType
                            = resultSet.getString(dataTypeColumnIndex).trim();


                    seriesInfo.unit = unitMap.get(seriesInfo.dataType);
                    //                if (seriesInfo.unit == null)
                    //                    logger.log(Priority.WARN, "Can not find datatype/parameter in table parameters " + seriesInfo.dataType);


                    seriesInfo.tableName
                            = resultSet.getString(tableNameColumnIndex).trim();

                    if (seriesInfo.tableName == null) {
                        throw new SQLException("Table name is not specified for series:" + seriesInfo.seriesNr);
                    }

                    seriesInfo.missVal = resultSet.getFloat(missValColumnIndex);
                    if (resultSet.wasNull()) seriesInfo.missVal = Float.NaN;

                    if (traceValColumnIndex != -1) {
                        seriesInfo.traceVal = resultSet.getFloat(traceValColumnIndex);
                        if (resultSet.wasNull()) seriesInfo.traceVal = 0;
                    }

                    if (forecastColumnIndex != -1) {
                        seriesInfo.forecast = resultSet.getBoolean(forecastColumnIndex);
                    }

                    if (log.isDebugEnabled()) log.debug("series info parsed " + seriesInfo);

                    list.add(seriesInfo);

                }
            } finally {
                resultSet.close();
            }
        } finally {
            statement.close();
        }
        return list.toArray(new SeriesInfo[list.size()]);
    }

    private Map<String, String> readUnitMap() throws SQLException {

        Map<String, String> res = new HashMap<String, String>();
        Statement statement = connection.createStatement();
        try {
            ResultSet resultSet = statement.executeQuery("SELECT * FROM Parameter");
            try {
                int idColumnIndex = resultSet.findColumn("ID");
                int unitColumnIndex = resultSet.findColumn("UNIT");

                while (resultSet.next()) {
                    String parId = resultSet.getString(idColumnIndex);
                    String unit = resultSet.getString(unitColumnIndex);
                    res.put(parId, unit);
                }
            } finally {
                resultSet.close();
            }
        } finally {
            statement.close();
        }


        return res;
    }


    @SuppressWarnings({"OverlyLongMethod"})
    private void read(SeriesInfo seriesInfo) throws IOException, SQLException {
        log.info("Start reading table " + seriesInfo.tableName + " for " + seriesInfo.locationID + " and " + seriesInfo.dataType);


        int rowCount = 0;
        int missingValueCount = 0;
        int traceValueCount = 0;
        float minValue = Float.POSITIVE_INFINITY;
        float maxValue = Float.NEGATIVE_INFINITY;
        long minTime = Long.MAX_VALUE;
        long maxTime = Long.MIN_VALUE;

        boolean hasLabel = hasLabel(seriesInfo);
        String labelSql = hasLabel ? ", LABEL" : "";
        String sql;
        if (seriesInfo.forecast) {
            sql = "SELECT FORECASTDATE, VALUEDATE, FORECASTVALUE" + labelSql + " FROM [" + seriesInfo.tableName + ']';
        } else {
            sql = "SELECT MEASDATE, MEASVALUE" + labelSql + " FROM [" + seriesInfo.tableName + ']';
        }

        int forecastDateColumn = seriesInfo.forecast ? 1 : -1;
        int valueDateColumn = seriesInfo.forecast ? 2 : 1;
        int valueColumn = seriesInfo.forecast ? 3 : 2;
        int labelColumn = seriesInfo.forecast ? 4 : 3;

        Statement statement = connection.createStatement();
        try {
            ResultSet rows = statement.executeQuery(sql);
            int columnType = rows.getMetaData().getColumnType(valueColumn);
            boolean binary = SqlUtils.isBinaryColumn(columnType);
            try {
                DefaultTimeSeriesHeader header = null;
                while (rows.next()) {
                    rowCount++;
                    long time = rows.getTimestamp(valueDateColumn).getTime() - timeZoneOffset;
                    handler.setTime(time);
                    if (time > maxTime) maxTime = time;
                    if (time < minTime) minTime = time;

                    long forecastTime = seriesInfo.forecast
                            ? rows.getTimestamp(forecastDateColumn).getTime() - timeZoneOffset : Long.MIN_VALUE;


                    if (header == null || header.getForecastTime() != forecastTime) {
                        header = new DefaultTimeSeriesHeader();
                        header.setLocationId(seriesInfo.locationID);
                        header.setParameterId(seriesInfo.dataType);
                        header.setUnit(seriesInfo.unit);
                        header.setForecastTime(forecastTime);
                        handler.setTimeSeriesHeader(header);
                        if (handler.isCurrentTimeSeriesHeaderForAllTimesRejected()) {
                            log.info("Table skipped  " + seriesInfo.tableName + " for " + seriesInfo.locationID + " and " + seriesInfo.dataType);
                            return;
                        }
                    }

                    if (handler.isCurrentTimeSeriesHeaderForCurrentTimeRejected()) continue;

                    if (hasLabel)
                        handler.setFlag(rows.getInt(labelColumn));

                    if (binary) {
                        byte[] bytes = rows.getBytes(valueColumn);
                        try {
                            InputStream inputStream;
                            try {
                                inputStream = new UnsyncBufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes), 100000), 100000);
                            } catch (IOException e) {
                                inputStream = new ByteArrayInputStream(bytes);
                            }
                            boolean asciiGrid = isAsciiGrid(inputStream);
                            if (asciiGrid) {
                                AsciiGridReader reader = new AsciiGridReader(inputStream, "hymostransferdb", handler.getDefaultGeoDatum());
                                try {
                                    Geometry geometry = reader.getGeometry();
                                    if (values.length != geometry.size()) {
                                        values = new float[geometry.size()];
                                        byteBuffer = new byte[geometry.size() * NumberType.INT16_SIZE];
                                        shortBuffer = new short[geometry.size()];
                                    }
                                    reader.readCoverage(values);
                                    handler.setGeometry(geometry);
                                    handler.setCoverageValues(values);
                                    handler.applyCurrentFields();
                                } finally {
                                    reader.close();
                                }
                            } else {
                                MosaicGridFileReader reader = new MosaicGridFileReader(inputStream);
                                try {
                                    Geometry geometry = reader.getGridGeometry();
                                    if (values.length != geometry.size()) {
                                        values = new float[geometry.size()];
                                        byteBuffer = new byte[geometry.size() * NumberType.INT16_SIZE];
                                        shortBuffer = new short[geometry.size()];
                                    }

                                    reader.read(values, byteBuffer, shortBuffer);
                                    handler.setGeometry(geometry);
                                    handler.setValueResolution(reader.getValueResolution());
                                    handler.setCoverageValues(values);
                                    handler.applyCurrentFields();
                                } finally {
                                    reader.close();
                                }
                            }

                        } catch (Exception e) {

                           log.warn("HymosTransferDbPare: Can not read grid " + header);

                        }
                    } else {
                        float value = rows.getFloat(valueColumn);
                        if (value == seriesInfo.missVal) {
                            missingValueCount++;
                            value = Float.NaN;
                        }

                        // everyvalue  that is about -999 is also recognised as missing value
                        // hack for taiwan
                        if (value > -1000 && value <= -998.99) {
                            missingValueCount++;
                            value = Float.NaN;
                        }

                        if (value == seriesInfo.traceVal) {
                            traceValueCount++;
                            value = 0;
                        }

                        if (!Float.isNaN(value)) {
                            if (value < minValue) minValue = value;
                            if (value > maxValue) maxValue = value;
                        }

                        handler.setValue(value);
                        handler.applyCurrentFields();

                    }

                    rowCount++;


                }

            } finally {
                rows.close();
            }
        } finally {
            statement.close();
        }

        if (rowCount == 0) {
            log.info("No values found");
        } else {
            Period period = new Period(minTime, maxTime);
            log.info("Period: " + period);
            log.info("Row count: " + rowCount);
            log.info("Missing value count: " + missingValueCount);
            log.info("Trace value count: " + traceValueCount);
        }
    }

    private static boolean isAsciiGrid(InputStream inputStream) {
        inputStream.mark(100);
        try {
            try {
                InputStreamReader reader = new InputStreamReader(inputStream);
                char[] chars = new char[99];
                reader.read(chars);
                String header = new String(chars);
                return TextUtils.containsIgoreCase(header, "cols");
            } finally {
                inputStream.reset();
            }
        } catch (Exception e) {
            return false;
        }
    }

    private boolean hasLabel(SeriesInfo seriesInfo) throws SQLException {
        Statement statement = connection.createStatement();
        try {
            statement.setMaxRows(1);
            ResultSet resultSet = statement.executeQuery("SELECT * FROM [" + seriesInfo.tableName + ']');
            try {
                return SqlUtils.columnExistsIgnoreCase(resultSet.getMetaData(), "LABEL");
            } finally {
                resultSet.close();
            }
        } finally {
            statement.close();
        }
    }

}
  • No labels