package nl.wldelft.timeseriesparsers;

import nl.wldelft.util.TextUtils;
import nl.wldelft.util.TimeZoneUtils;
import nl.wldelft.util.io.LineReader;
import nl.wldelft.util.io.TextParser;
import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import java.io.IOException;
import java.util.TimeZone;

/**
 * This parser supports two formats:  WISKI7 and the previous format of WISKI (number unknown)
 * Recognizing the formats:
 * a) If the header contains the keyword TSPATH, then it is WISKI7.
 * The event rows wil be parsed according to the format as specified with the keyword LAYOUT. If no  LAYOUT specified, the defaults wil be used.
 * b) otherwise it is older Wiski format.
 * The event rows wil be parsed as follows: the first column is timestamp, the second column is value
 */
public class WiskiTimeSeriesParser implements TextParser<TimeSeriesContentHandler> {

    private static final Logger log = LogManager.getLogger();
    private static final char remarkSepar = '"';

    private boolean anyHeaderInvalid = false;
    private LineReader reader = null;
    private TimeSeriesContentHandler contentHandler = null;
    private DefaultTimeSeriesHeader header = new DefaultTimeSeriesHeader();
    private TimeZone headerTimeZone; //timeZone read from the file header
    private String virtualFileName;

    //Fields <name>Index specify the column of the timestamp, value, status ,....  in the event row.
    //With  the keyword LAYOUT an another sequence of the data can be specified.
    private int timestampIndex;
    private int valueIndex;
    private int statusIndex;
    private int interpolation_typeIndex;
    private int remarkIndex;

    private boolean defaultWiski7LayoutUsed = true;

    @Override
    public void parse(LineReader reader, String virtualFileName, TimeSeriesContentHandler contentHandler) throws Exception {
        this.virtualFileName = virtualFileName;
        this.contentHandler = contentHandler;
        this.contentHandler.addMissingValue(-777.0f);

        this.reader = reader;
        this.reader.setCommentLinePrefix('?');
        this.reader.setSkipEmptyLines(true);

        this.header.clear();
        this.headerTimeZone = null;
        //Initialized the columns to read data from the event rows.
        initializeWiski7RowLayout();

        anyHeaderInvalid = false;
        boolean headerValid = false;

        reader.mark(500);
        String[] buffer = new String[5];
        for (String line; (line = reader.readLine()) != null; reader.mark(500)) {
            line = line.trim();
            if (line.equals("ENDOFFILE")) return;

            if (line.charAt(0) == '#') {
                reader.reset();
                headerValid = parseHeader();
                continue;
            }
            if (!headerValid) {
                anyHeaderInvalid = true;
                continue;
            }

            if (this.header.getLocationId() == null && this.header.getParameterId() == null)
                throw new Exception("Not a valid wiski file, REXCHANGE, CNAME, SANR tags are all missing in the file header");

            if (this.contentHandler.isCurrentTimeSeriesHeaderForAllTimesRejected()) continue;

            //If default layout used, the remarks (if any) must be specified last in the line and start with "-token
            String remarksDefaultLayout;
            if (this.defaultWiski7LayoutUsed && line.indexOf(remarkSepar) != -1) {
                //extract remarks first (if any), so that they cannot be confused with the status or interp.type
                String leftStr = TextUtils.leftFrom(line, remarkSepar);
                remarksDefaultLayout = TextUtils.rightFrom(line, remarkSepar).replace(remarkSepar, ' ');
                TextUtils.split(leftStr, ' ', buffer);
            } else {
                remarksDefaultLayout = null;
                TextUtils.split(line, ' ', buffer);
            }

            if (this.headerTimeZone != null) {
                contentHandler.setTime(this.headerTimeZone, "yyyyMMddHHmmss", buffer[this.timestampIndex]);
            } else {
                contentHandler.setTime(contentHandler.getDefaultTimeZone(), "yyyyMMddHHmmss", buffer[this.timestampIndex]);
            }

            int flag = getFlag(this.defaultWiski7LayoutUsed, this.statusIndex, this.interpolation_typeIndex, buffer, line);
            if (flag != Integer.MIN_VALUE) {
                contentHandler.setFlag(flag);
            }

            contentHandler.setValue('.', buffer[this.valueIndex]);

            contentHandler.setComment(null); //reset
            if (remarksDefaultLayout != null) {
                if (!remarksDefaultLayout.isEmpty()) contentHandler.setComment(remarksDefaultLayout);
            } else {
                if (this.remarkIndex != -1) {
                    String remark = buffer[this.remarkIndex].replace(remarkSepar, ' ');
                    if (!remark.isEmpty()) contentHandler.setComment(remark);

                }
            }
            contentHandler.applyCurrentFields();

        }
        if (anyHeaderInvalid) throw new IOException(" the file has one or more invalid headers"); // throw exception since the file should be marked as not fully successful
    }

    /**
     * Read metadata from the #-records. Metadata block is followed by the timeseries-records
     * but the  timeseries-records may be also omitted. In this case the Metadata block MUST start
     * with a record that begins with ## !
     * Empty records wil be ignored.
     * <p/>
     * The meaning of the keys is:
     * TZ : time zone. TZ are UTC0 and UTC+/-x (e.g. UTC+1 or UTC-2).
     * TSPATH :  /site id/location id/parameter id/ts shortname
     * example   TSPATH/160/160_1/WATHTE/cmd.p
     * only location id and parameter id is parsed and used
     * SANR : location id. Used only if not specified with  TSPATH
     * CNAME: parameter id. Used only if not specified with  TSPATH
     * CUNIT: unit
     * RINVAL: missing value
     * REXCHANGE: location-parameter. Wil be used only if the metadata block does not contain keys TSPATH, SANR or CNAME.
     * The string specified by keyword REXCHANGE represents location Id and also parameter-id (so locations Id and parameter Id equals)
     *
     * @throws IOException if the header format is incorrect
     */
    private boolean parseHeader() throws IOException {
        this.header.clear();
        this.headerTimeZone = null;

        //Initialized the columns to read data from the event rows.
        initializeWiski7RowLayout();

        String tspathPar = null;
        String tspathQual = null;
        String tspathLoc = null;
        String fallbackParLoc = null;

        for (String line; (line = this.reader.readLine()) != null; reader.mark(500)) {
            line = line.trim();
            if (line.charAt(0) != '#') {
                reader.reset();
                break;
            }

            String layoutString = parseKeyValue("LAYOUT", line);
            if (layoutString != null) {
                defaultWiski7LayoutUsed = false;
                //Obtain information how to parse event rows
                parseEventRowLayout(layoutString.trim());
            }

            String tzString = parseKeyValue("|TZ", line);
            if (tzString != null) {
                this.headerTimeZone = parseTimeZone(tzString, this.virtualFileName, this.contentHandler.getDefaultTimeZone().getID());
            }

            //Parse location id and parameter specified with keyword TSPATH
            //format: TSPATH/<site id>/<station id>/<parameter shortname>/<ts shortname>
            //example: TSPATH/160/160_1/WATHTE/cmd.p  (contains always all these 4 elements )
            //<ts shortname> is read as qualifier
            String tspath = parseKeyValue("TSPATH", line);
            int index = line.contains("TSPATH/") ? 1 : 0;
            if (tspath != null && !tspath.trim().equals("/")) {
                //TSPATH available and not empty
                String[] buffer = TextUtils.split(tspath, '/');
                if (buffer.length != 4 + index || buffer[1 + index].length() < 1 || buffer[2+index].length() < 1) {
                    throw new IOException("Not a valid wiski file, TSPATH has a incorrect format: " + tspath +
                            "   expected: TSPATH/<site id>/<station id>/<parameter shortname>/<ts shortname>");
                }
                tspathLoc = buffer[1+index];
                tspathPar = buffer[2+index];
                tspathQual = buffer[3+index].replace('.', '_'); // dots are not allowed in fews as internal qualifiers, replace dots with underscores
            }
            String locationId = parseKeyValue("SANR", line);
            if (locationId != null && !locationId.isBlank()) header.setLocationId(locationId);
            if (log.isDebugEnabled() && locationId != null && locationId.isBlank()) {
                log.debug("Location id is blank, skipping current line.");
            }
            String parameterId = parseKeyValue("CNAME", line);
            if (parameterId != null && !parameterId.isBlank()) header.setParameterId(parameterId);
            if (log.isDebugEnabled() && parameterId != null && parameterId.isBlank()) {
                log.debug("Parameter id is blank, skipping current line.");
            }
            String unit = parseKeyValue("CUNIT", line);
            if (unit != null) header.setUnit(unit);
            String missingValue = parseKeyValue("RINVAL", line);
            if (missingValue != null) contentHandler.addMissingValue(missingValue);
            String parLoc = parseKeyValue("REXCHANGE", line);
            if (parLoc != null) fallbackParLoc = parLoc;

        }

        if (tspathPar != null && tspathLoc != null) {
            //If par id, qualifier id and loc are specified with  TSPATH, use them , even if the keywords SANR and SNAME are also present in the file
            header.setParameterId(tspathPar);
            header.setQualifierIds(tspathQual);
            header.setLocationId(tspathLoc);
        } else {
            //The header has an OLD format (no WISKI7), if any LAYOUT specifid, it wil be ignored !
            //Re-initialized the columns to read data from the event rows (no flags reading !).
            initializeRowLayout();
            if (header.getParameterId() == null || header.getLocationId() == null) {
                if (fallbackParLoc != null && !fallbackParLoc.isEmpty()) {
                    header.setParameterId(fallbackParLoc);
                    header.setLocationId(fallbackParLoc);
                } else {
                    log.warn(this.reader.getFileAndLineNumber()+"   parameter/location is missing next to the keyword REXCHANGE");
                    return false;
                }
            }
        }
        if (log.isDebugEnabled()) log.debug("Reading data for " + header.toString());
        if (header.getLocationId() != null || header.getParameterId() != null) contentHandler.setTimeSeriesHeader(header);
        return true;
    }


    private void initializeWiski7RowLayout() {
        defaultWiski7LayoutUsed = true;

        timestampIndex = 0;           //  timestamp wil be read from column 1 by default
        valueIndex = 1;               //  value wil be read from column 2 by default
        statusIndex = 2;             //   status wil be read from column 3 by default
        interpolation_typeIndex = 3; //   interpolation_type wil be read from column 4 by default
        remarkIndex = 4;             //   remark wil be read from column 5 by default
    }

    private void initializeRowLayout() {
        defaultWiski7LayoutUsed = false;

        timestampIndex = 0;           //  timestamp is always read from column 1
        valueIndex = 1;               //  value is always read from column 2
        statusIndex = -1;             //   status  NOT read
        interpolation_typeIndex = -1; //   interpolation_type NOT read
        remarkIndex = -1;             //   remark NOT read
    }

    //Some examples of the buffer, that is parsed from the header line LAYOUT:
    //(timestamp,value)
    //(timestamp,value,status,interpolationm_type,remark)
    private void parseEventRowLayout(String bufferIn) throws IOException {

        //Check the passed  buffer, remove brackets from buffer
        String buffer = null;
        if (bufferIn.length() > 5) {
            if (bufferIn.charAt(0) == '(' && bufferIn.charAt(bufferIn.length() - 1) == ')') {
                buffer = bufferIn.substring(1, bufferIn.length() - 1).trim();
            }
        }
        if (buffer == null)
            throw new IOException("String specified with the LAYOUT-keyword has wrong format:" + bufferIn);

        String[] keywords = new String[5]; //max. 5 keywords expected , as specified in the second example
        TextUtils.split(buffer, ',', keywords);

        //-1 = data not specified in the file  (accorning to the header keyword LAYOUT)
        timestampIndex = -1;
        valueIndex = -1;
        statusIndex = -1;
        interpolation_typeIndex = -1;
        remarkIndex = -1;

        for (int i = 0; i < 5; i++) {

            String keyword = keywords[i].trim();

            if (keyword.isEmpty())
                continue; //not all keywords must be specified, however at least 2:  timestamp and value

            if (keyword.equals("timestamp")) {
                if (timestampIndex != -1)
                    throw new IOException("Keyword '" + keyword + "' specified more than once in the header line LAYOUT");
                timestampIndex = i;
            } else if (keyword.equals("value")) {
                if (valueIndex != -1)
                    throw new IOException("Keyword '" + keyword + "' specified more than once in the header line LAYOUT");
                valueIndex = i;
            } else if (keyword.equals("status") || keyword.equals("primary_status")) {
                if (statusIndex != -1)
                    throw new IOException("Keyword '" + keyword + "' specified more than once in the header line LAYOUT");
                statusIndex = i;
            } else if (keyword.equals("interpolation_type")) {
                if (interpolation_typeIndex != -1)
                    throw new IOException("Keyword '" + keyword + "' specified more than once in the header line LAYOUT");
                interpolation_typeIndex = i;
            } else if (keyword.equals("remark")) {
                if (remarkIndex != -1)
                    throw new IOException("Keyword '" + keyword + "' specified more than once in the header line LAYOUT");
                remarkIndex = i;
            } else if (isKnownKeyword(keyword)) {
                // Ignore
            } else {
                throw new IOException("Wrong keyword '" + keyword + "' specified in the header line #LAYOUT ");
            }
        }

        if (timestampIndex == -1 || valueIndex == -1) {
            throw new IOException("Keywords timestamp and/or value are not specified in the header line LAYOUT !");
        }

    }

    private static boolean isKnownKeyword(String key) {
        if (key.equals("timestampoccurence")) return true;
        if (key.equals("forecast")) return true;
        if (key.equals("member")) return true;
        if (key.equals("dispatch_info")) return true;
        return false;
    }

    //Returns value or null if the key not found in the buffer
    private static String parseKeyValue(String key, String buffer) {
        int keyPos = buffer.indexOf(key);
        if (keyPos == -1) return null;
        int endValuePos = buffer.indexOf(";*;", keyPos + key.length());
        if (endValuePos == -1) endValuePos = buffer.indexOf("|*|", keyPos + key.length());
        if (endValuePos == -1) return null;
        return buffer.substring(keyPos + key.length(), endValuePos);
    }

    private static TimeZone parseTimeZone(String buffer, String fileName, String defaultTimeZone) throws IOException {
        if (buffer.equals("MEZ")) return TimeZone.getTimeZone("GMT+1");
        if (buffer.equals("MESZ")) return TimeZone.getTimeZone("CET");
        if (buffer.equals("CET")) return TimeZone.getTimeZone("CET");
        if (buffer.equals("CEST")) return TimeZone.getTimeZone("GMT+2");
        if (buffer.equals("Europe/Amsterdam") || buffer.equals("Europe/Berlin") || buffer.equals("Europe/Brussels") || buffer.equals("Europe/Luxembourg") ||
                buffer.equals("Europe/Madrid") || buffer.equals("Europe/Paris") || buffer.equals("Europe/Rome") || buffer.equals("Europe/Vienna") || buffer.equals("Europe/Zurich")) {
            return TimeZone.getTimeZone("CET");
        }

        String strOffset = getUtcGmtOffset(buffer);
        if (strOffset == null) {
            log.warn(fileName + ": invalid timezone specified with TZ keyword - " + buffer + " , " + defaultTimeZone + " will be used.");
            return null;
        }
        TimeZone timeZone;
        try {
            double offset = Double.parseDouble(strOffset);
            timeZone = TimeZoneUtils.createTimeZoneFromDouble(offset);
        } catch (NumberFormatException e) {
            throw new IOException("Invalid timeZone specified with TZ keyword:" + buffer, e);
        }
        return timeZone;
    }

    private static String getUtcGmtOffset(String buffer) {
        if ((buffer.startsWith("UTC") || buffer.startsWith("GMT")) && buffer.length() >= 4) {
            return buffer.substring(3);
        }
        if ((buffer.startsWith("Etc/UTC") || buffer.startsWith("Etc/GMT")) && buffer.length() >= 8) {
            return buffer.substring(7);
        }
        return null;
    }

    //Parse flags from the line.
    //First flag is 'status', the second one is interpolation type.
    //Two optional flags are composed to one flag as follows: flag1*1000+flag2
    //Flag2 must be between 0 and 999
    //Line examples and the composed flags:
    //20100227000709 3.0 200 103   -> 200103
    //20100227000709 3.0 0 103     -> 103
    //20100227000709 3.0 200 0     -> 200000
    //20100227000709 3.0 200       -> 200000
    private static int getFlag(boolean defaultLayout, int statusColumnIndex, int interpTypeColumnIndex, String[] buffer, String fileLine) {

        int statusFlag = Integer.MIN_VALUE;

        if (statusColumnIndex != -1) {
            String status = buffer[statusColumnIndex];

            if (status.isEmpty()) {
                if (!defaultLayout) {
                    //status not specified according to the header, give message
                    log.error("Status expected, but is ommited in the line: " + fileLine);
                }
                return Integer.MIN_VALUE; //no status specified in defaultLayout
            }
            statusFlag = parseIntFlag(status);
            if (statusFlag == Integer.MIN_VALUE) {
                log.error("Wrong status specified in the line:" + fileLine);
                return Integer.MIN_VALUE;   //flag cannot be converted to integer, so no flags wil be set (for this timestep)
            }
        }

        if (statusFlag != Integer.MIN_VALUE) statusFlag *= 1000;

        int interpTypeFlag = Integer.MIN_VALUE;

        if (interpTypeColumnIndex != -1) {
            String interpType = buffer[interpTypeColumnIndex];
            if (interpType.isEmpty()) {
                if (!defaultLayout) {
                    //Interpolation type not specified according to the header , give message
                    log.error("Interpolation type expected, but is ommited in the line: " + fileLine);
                }
                return statusFlag;

            }
            interpTypeFlag = parseIntFlag(interpType);
            if (interpTypeFlag == Integer.MIN_VALUE || interpTypeFlag < 0 || interpTypeFlag > 999) {
                log.error("Wrong interpolation type specified, it should be between 0 and 999. Line: " + fileLine);
                return Integer.MIN_VALUE;
            }
        }

        if (statusFlag == Integer.MIN_VALUE) {
            return interpTypeFlag;  //only the interpolation type  flag specified
        }
        if (interpTypeFlag == Integer.MIN_VALUE) {
            return statusFlag;  //only the status flag type specified
        }
        return statusFlag + interpTypeFlag;
    }

    private static int parseIntFlag(String buffer) {
        int flag;
        try {
            flag = TextUtils.parseInt(buffer);
        } catch (NumberFormatException e) {
            flag = Integer.MIN_VALUE;
        }
        return flag;
    }
}



  • No labels