package nl.wldelft.fews.system.plugin.dataImport;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import nl.wldelft.util.Clasz;
import nl.wldelft.util.FastDateFormat;
import nl.wldelft.util.Period;
import nl.wldelft.util.PeriodConsumer;
import nl.wldelft.util.Properties;
import nl.wldelft.util.PropertiesConsumer;
import nl.wldelft.util.TextUtils;
import nl.wldelft.util.TimeUnit;
import nl.wldelft.util.TimeZoneUtils;
import nl.wldelft.util.geodatum.GeoPoint;
import nl.wldelft.util.io.BackupServerUrlConsumer;
import nl.wldelft.util.io.ServerParser;
import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader;
import nl.wldelft.util.timeseries.TimeSeriesContentHandler;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
 * TimeSeries reader for the akvo flow service
 * <p/>
 * The service queries all locations using the site registration form survey. The survey id is used to get all instances of this survey.
 *
 * </pre>
 */
public class AkvoTimeSeriesServerParser implements ServerParser<TimeSeriesContentHandler>, PropertiesConsumer, PeriodConsumer, BackupServerUrlConsumer {
    FastDateFormat fastDateFormat = null;
    private static final Logger log = LogManager.getLogger();
    private static long connectionTimeOut = 30 * TimeUnit.SECOND_MILLIS;
    private Period period = null;
    final static String SITE_REGISTRATION_SURVEY_ID = "siteRegistrationSurveyId"; // for example 7210020 for the Bishnondi registration form
    final static String SITE_MONITORING_SURVEY_ID = "siteMonitoringSurveyId"; // for example 7280001 for the Bishnondi monitoring form
    final static String SITE_SURVEY_ID = "siteSurveyId"; //
    public final static String OPEN_ID_CONNECT_URL = "openIdConnectUrl";
    private Properties properties = null;
    private Map<JsonNode,String[]> questionsMap = new HashMap<>();

    public AkvoTimeSeriesServerParser() {
        period = null;
    }

    @Override
    public void parse(URL baseUrl, String username, String password, TimeSeriesContentHandler contentHandler) throws Exception {
        this.fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss'Z'", TimeZoneUtils.GMT, Locale.US, null);
        String openIdConnectUrl = properties.getString(OPEN_ID_CONNECT_URL);
        String sessionKey = AkvoFlowApi.getApiAccess(username, password, openIdConnectUrl);
        parseJsonStream(contentHandler, baseUrl, sessionKey, period);
    }

    private void parseJsonStream(TimeSeriesContentHandler contentHandler, URL baseUrl, String sessionKey, Period period) throws IOException, NoSuchAlgorithmException, InvalidKeyException, ParseException {
        // create question metadata map. SITE_SURVEY_ID is, for example, for Bishnondi: 9290008
        JsonNode surveyDefinitionJson = AkvoFlowApi.getJsonResponseFromFlow(baseUrl +
                "/surveys/" + properties.getString(SITE_SURVEY_ID), sessionKey);
        questionsMap = createQuestionsMap(surveyDefinitionJson);

        // get registration data
        String baseQueryUrl = baseUrl.toString();
        List<FormInstance> registrationSites = parseRegistrationForm(baseQueryUrl, sessionKey);

        // create map of identifier to formInstance for the registration form
        Map<String, FormInstance> registrationIdMap = new HashMap<>();
        for (FormInstance formInstance: registrationSites) {
            //noinspection ResultOfMethodCallIgnored
            registrationIdMap.put(formInstance.surveyedLocaleIdentifier, formInstance);
        }

        // get monitoring data
        parseMonitoringSurvey(baseQueryUrl, sessionKey, contentHandler, registrationIdMap, period);
    }

    /**
     * Creates a map of question ids to the name and type of the question
     * This is necessary, as the raw responses only contain the question id.
     */
    private static Map<JsonNode,String[]> createQuestionsMap(JsonNode surveyDefinition) {
        Map<JsonNode,String[]> questionsMap = new LinkedHashMap<>();
        // get number of forms
        int numForms = surveyDefinition.get("forms").size();
        for (int i = 0; i < numForms; i++) {
            JsonNode form = surveyDefinition.path("forms").get(i);
            // Iterate over question groups and questions for the registration form
            // Creates a question map which contains the id of the question, its name and type.
            ArrayNode questionGroups = (ArrayNode) ((JsonNode) form).path("questionGroups");
            for (int j = 0; j < questionGroups.size(); j++) {
                ArrayNode questions = (ArrayNode) questionGroups.get(j).path("questions");
                for (int k = 0; k < questions.size(); k++) {
                    JsonNode id = questions.get(k).get("id");
                    String[] value = {questions.get(k).get("name").asText(), questions.get(k).get("type").asText()};
                    questionsMap.put(id, value);
                }
            }
        }
        return questionsMap;
    }

    private List<FormInstance> parseRegistrationForm(String baseUrl, String sessionKey) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
        String formInstanceUrl = baseUrl +"/form_instances?survey_id="+ properties.getString(SITE_SURVEY_ID) + 
                "&form_id=" + properties.getString(SITE_REGISTRATION_SURVEY_ID);

        List<FormInstance> registrationSurveyList = getAllFormInstances(formInstanceUrl, sessionKey);

        // get the name and geolocation for all form instances
        JsonNode nameQuestionId = null;
        JsonNode geolocationQuestionId = null;

        // iterate over all the question ids.

        for (JsonNode qID : questionsMap.keySet()) {
            String[] nameAndType = questionsMap.get(qID);
            if (nameAndType[0].equals("Name site")) {
                nameQuestionId = qID;
            }
            if (nameAndType[0].equals("Geolocation")) {
                geolocationQuestionId = qID;
            }
        }

        // get the name and geolocation out of the responses
        for (FormInstance fi : registrationSurveyList){
            if (nameQuestionId != null) {
                JsonNode name = fi.responses.findValue(nameQuestionId.asText());
                if (name != null) {
                    fi.name = name.asText();
                }
            }

            if (geolocationQuestionId != null) {
                JsonNode geolocation = fi.responses.findValue(geolocationQuestionId.asText());
                if (geolocation != null){
                    fi.lat = geolocation.get("lat").asText();
                    fi.lon = geolocation.get("long").asText();
                }
            }
        }

        for (FormInstance fi: registrationSurveyList) {
            log.info("Registration Survey Name: '" + fi.name + "' id: '" + fi.surveyedLocaleIdentifier + "' Lat: '" + fi.lat + "' Lon: '" + fi.lon + "'");
        }
        return registrationSurveyList;
    }

    /**
     * Determine all registration surveys.
     * <p>
     * example call using python test script.
     * python flow_api_client.py --url "https://akvoflow-141.appspot.com/api/v1/survey_instances?surveyId=1100001" --key "iseNzrMd5gIH3KLYurQEG+o3t3tPGu7BpTDDkMPj3W4=" --secret "cziZu8wWQ2NFHM1t7UW96nZEa+AAh4o4pDOtjDlwM78="
     */
    private void parseMonitoringSurvey(String baseUrl, String sessionKey,
                                       TimeSeriesContentHandler contentHandler, Map<String, FormInstance> registrationIdMap,
                                       Period viewPeriod) throws IOException, InvalidKeyException, NoSuchAlgorithmException, ParseException {
        String formInstanceUrl = baseUrl +"/form_instances?survey_id="+ properties.getString(SITE_SURVEY_ID) +
                "&form_id=" + properties.getString(SITE_MONITORING_SURVEY_ID);
        formInstanceUrl += "&beginDate=" + viewPeriod.getStartTime() + "&endDate=" + viewPeriod.getEndTime();
        List<FormInstance> monitoringSurveyList = getAllFormInstances(formInstanceUrl, sessionKey);

        for (FormInstance fi : monitoringSurveyList) {
            Map<String, String> parameterValues = new LinkedHashMap<>();
            Map<String, Long> parameterTimes = new LinkedHashMap<>();
            List<String> qualifiers = new ArrayList<>();
            String locationId = fi.surveyedLocaleIdentifier;

            // get corresponding site formInstance (which has name and geolocation)
            FormInstance surveySite = registrationIdMap.get(locationId);

            // parse all question responses
            // iterate over all the question ids.
            Iterator<JsonNode> qItr = questionsMap.keySet().iterator();

            while(qItr.hasNext()){
                JsonNode qID = qItr.next();
                String[] nameAndType = questionsMap.get(qID);
                // is there a node that corresponds to this id? As answers are stored in the question groups,
                // the responses object is two levels deep. So it is easier to just find questions by id.
                JsonNode answer = fi.responses.findValue(qID.asText());

                if (answer == null) continue;
                // handle different types of results
                if ("FREE_TEXT".equals(nameAndType[1])) {
                    //noinspection ResultOfMethodCallIgnored
                    parameterValues.put(nameAndType[0], answer.asText()); // param name and value.
                    //noinspection ResultOfMethodCallIgnored
                    String submissionDate = fi.responses.get("submissionDate").asText();
                    parameterTimes.put(nameAndType[0], fastDateFormat.parseToMillis(submissionDate));
                }
                if ("OPTION".equals(nameAndType[1])) {
                    qualifiers.add(nameAndType[0] + "_" + handleOptionQuestionAnswer(answer));
                }
                if ("CADDISFLY".equals(nameAndType[1])) {
                    JsonNode results = answer.findValue("result");
                    for(int i = 0; i < results.size(); i++){
                        String name = results.get(i).get("name").asText();
                        String value = results.get(i).get("value").asText();
                        // add as parameter values
                        //noinspection ResultOfMethodCallIgnored
                        parameterValues.put(name, value); // param name and value.
                        parameterTimes.put(name, fastDateFormat.parseToMillis(fi.submissionDate));
                    }
                }
            }
            for (Map.Entry<String, String> stringStringEntry : parameterValues.entrySet()) {
                DefaultTimeSeriesHeader header = new DefaultTimeSeriesHeader();
                if (surveySite != null) {
                    contentHandler.setGeometry(determineGeometry(contentHandler, surveySite.lat, surveySite.lon));
                }
                header.setLocationId(locationId);
                header.setParameterId(stringStringEntry.getKey());
                header.setQualifierIds(qualifiers.toArray(Clasz.strings.emptyArray()));
                contentHandler.setTimeSeriesHeader(header);
                contentHandler.setValue('.', stringStringEntry.getValue());
                contentHandler.setTime(parameterTimes.get(stringStringEntry.getKey()));
                contentHandler.applyCurrentFields();
            }
            // Now write the time series.
        }
    }

    // Option questions are rendered as a single string, in which the different responses are
    // concatenated with a "|" character as separator.
    private static String handleOptionQuestionAnswer(JsonNode response) {
        String responseStr = "";
        ArrayNode values = (ArrayNode)response;
        for(int i = 0; i < values.size(); i++){
            JsonNode value = values.get(i).get("text");
            if(value != null){
                if(responseStr.isEmpty()){
                    responseStr = values.get(i).get("text").asText();
                }else{
                    responseStr = responseStr + "|" + value.asText();
                }
            }
        }
        return responseStr;
    }

    private static FormInstanceResult getFormInstances(String url, String sessionKey) {
        JsonNode queryResultJson = AkvoFlowApi.getJsonResponseFromFlow(url, sessionKey);
        return parseJsonFormInstances(queryResultJson);
    }

    private static List<FormInstance> getAllFormInstances(String url, String sessionKey) {
        List<FormInstance> result = new ArrayList<>();
        FormInstanceResult formInstanceResult = getFormInstances(url, sessionKey);
        result.addAll(formInstanceResult.formInstances);

        while(formInstanceResult.nextPageUrl != null){
            formInstanceResult = getFormInstances(formInstanceResult.nextPageUrl, sessionKey);
            result.addAll(formInstanceResult.formInstances);
        }
        return result;
    }

    private static FormInstanceResult parseJsonFormInstances(JsonNode jsonNode) {
        FormInstanceResult formInstanceResult = new FormInstanceResult();
        List<FormInstance> result = new ArrayList<>();

        ArrayNode formInstanceNodes = (ArrayNode)jsonNode.path("formInstances");
        for (JsonNode formInstanceNode : formInstanceNodes){
            FormInstance formInstance = new FormInstance();
            formInstance.surveyedLocaleIdentifier = formInstanceNode.get("identifier").asText();
            formInstance.formInstanceId = formInstanceNode.get("id").asText();
            formInstance.submissionDate = formInstanceNode.get("submissionDate").asText();
            formInstance.responses = formInstanceNode.get("responses");
            result.add(formInstance);
        }

        // add all form instances to the result object
        formInstanceResult.formInstances = result;

        // if there is more data, populate the nextPageResult field
        if (jsonNode != null && jsonNode.get("nextPageUrl") != null){
            formInstanceResult.nextPageUrl = jsonNode.get("nextPageUrl").asText();
        }
        return formInstanceResult;
    }

    private static GeoPoint determineGeometry(TimeSeriesContentHandler contentHandler, String latitude, String longitude) throws IOException {
        double x = TextUtils.tryParseDouble(longitude, Double.NaN);
        if (Double.isNaN(x)) throw new IOException("Number expected for y coordinate;" + longitude);
        double y = TextUtils.tryParseDouble(latitude, Double.NaN);
        if (Double.isNaN(y)) throw new IOException("Number expected for x coordinate;" + latitude);
        return contentHandler.getDefaultGeoDatum().createXYZ(x, y, 0d);
    }

    private static HttpURLConnection getHttpURLConnection(URL url) throws IOException {
        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        int connectionTimeout = (int) connectionTimeOut;
        connection.setConnectTimeout(connectionTimeout);
        int readTimeout = (int) connectionTimeOut;
        connection.setReadTimeout(readTimeout);
        connection.setDoInput(true);
        return connection;
    }



    /**
     * Properties are used to filter out specific sensors for a location.
     */
    @Override
    public void setProperties(Properties properties) {
        this.properties = properties;
    }

    @Override
    public void setPeriod(Period period) {
        this.period = period;
    }

    @Override
    public void setBackupServerUrls(URL[] backupUrls) {
        // not implemented.
    }

    @Override
    public void setConnectionTimeout(int timeoutMillis) {
        //noinspection AssignmentToStaticFieldFromInstanceMethod
        connectionTimeOut = timeoutMillis;
    }

    private static class FormInstance {
        private String surveyedLocaleIdentifier = null; // unique locale identifier
        private String formInstanceId = null; // substance instance id. unique identifier to the retrieve question answers for a survey instance.
        private JsonNode responses = null;
        private String name = null;
        String lat = null;
        String lon = null;
        private String submissionDate = null;
    }

    private static class FormInstanceResult {
        // If a nextUrl property was set in the meta object, there are more results. This value will be passed in the next call.
        String nextPageUrl = null;
        // The detected survey instances.
        List<FormInstance> formInstances = null;
        long time = 0L;
    }

    public static final class AkvoFlowApi {

        private AkvoFlowApi() {
        }

        public static String getApiAccess(String userName, String passWord, String idConnnectUrl) throws
                IOException {
            String refreshToken = getOfflineTokenFromKeyCloak(userName, passWord, idConnnectUrl);
            return getAccessTokenFromKeycloak(refreshToken, idConnnectUrl);
        }

        /*
         *  A POST needs to be made to "https://login.akvo.org/auth/realms/akvo/protocol/openid-connect/token" with params
         *  -client_id=curl
         *  -username=<keycloakUserName>
         *  -password=<keycloackPassword>
         *  -grant_type=password
         *  -scope=openid offline_access
         * The username and password are the keycloak user credentials. Here, the are obtained from environment variables.
         * This will generate a json object with the refresh_token. This token needs to be used to generate the access_token
         *
         */
        public static String getOfflineTokenFromKeyCloak(String userName, String passWord, String idConnectUrl) throws IOException {
            try (CloseableHttpClient client = HttpClients.createDefault()) {
                HttpPost post = new HttpPost(
                        idConnectUrl);
                List<NameValuePair> data = new ArrayList<>();
                data.add(new BasicNameValuePair("client_id", "curl"));
                data.add(new BasicNameValuePair("username", userName));
                data.add(new BasicNameValuePair("password", passWord));
                data.add(new BasicNameValuePair("grant_type", "password"));
                data.add(new BasicNameValuePair("scope", "openid offline_access"));
                post.setEntity(new UrlEncodedFormEntity(data));
                try (CloseableHttpResponse response = client.execute(post)) {
                    JsonFactory jsonFactory = new JsonFactory();
                    ObjectMapper objectMapper = new ObjectMapper(jsonFactory);
                    JsonNode root = objectMapper.readTree(response.getEntity().getContent());
                    return root.get("refresh_token").asText();
                }
            }
        }

        /*
         * To make any API call to fetch data from Akvo Flow , we need to generate an access_token.
         * A new access_token should be generated before every API call
         * To generate access_token POST to https://login.akvo.org/auth/realms/akvo/protocol/openid-connect/token with
         * -client_id=curl
         * -refresh_token=<refreshToken> (see getOfflineToken method)
         * -grant_type=refresh_token
         *
         * The POST returns a JSON object with the access_token
         */
        public static String getAccessTokenFromKeycloak(String refreshToken, String idConnnectUrl) throws IOException {
            try (CloseableHttpClient client = HttpClients.createDefault()) {
                HttpPost post = new HttpPost(idConnnectUrl);
                List<NameValuePair> data = new ArrayList<NameValuePair>();
                data.add(new BasicNameValuePair("client_id", "curl"));
                data.add(new BasicNameValuePair("refresh_token", refreshToken));
                data.add(new BasicNameValuePair("grant_type", "refresh_token"));
                post.setEntity(new UrlEncodedFormEntity(data));
                try (CloseableHttpResponse response = client.execute(post)) {
                    JsonFactory jsonFactory = new JsonFactory();
                    ObjectMapper objectMapper = new ObjectMapper(jsonFactory);
                    JsonNode root = objectMapper.readTree(response.getEntity().getContent());
                    JsonNode accessTokenJson = root.get("access_token");
                    return accessTokenJson.asText();

                }
            }
        }

        public static JsonNode getJsonResponseFromFlow(String url, String accessToken) {
            CloseableHttpResponse response = null;
            try {
                HttpGet get = new HttpGet(url);
                get.addHeader("Authorization", "Bearer " + accessToken);
                get.addHeader("Accept", "application/vnd.akvo.flow.v2+json");
                get.addHeader("User-Agent", "java-flow-api-example");
                try (CloseableHttpClient client = HttpClients.createDefault()) {
                    response = client.execute(get);
                    JsonFactory jsonFactory = new JsonFactory();
                    ObjectMapper objectMapper = new ObjectMapper(jsonFactory);
                    JsonNode responseNode;
                    try {
                        HttpEntity entity = response.getEntity();
                        responseNode = objectMapper.readTree(EntityUtils.toString(entity, "UTF-8"));
                    } catch (UnsupportedOperationException | IOException e) {
                        log.warn("Error ", e);
                        return null;
                    }
                    return responseNode;
                }
            } catch (IOException e1) {
                log.warn("Error ", e1);
                return null;
            }
        }
    }
}

  • No labels