package nl.wldelft.fews.system.plugin.dataImport; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import nl.wldelft.util.Clasz; import nl.wldelft.util.FastDateFormat; import nl.wldelft.util.Period; import nl.wldelft.util.PeriodConsumer; import nl.wldelft.util.Properties; import nl.wldelft.util.PropertiesConsumer; import nl.wldelft.util.TextUtils; import nl.wldelft.util.TimeUnit; import nl.wldelft.util.TimeZoneUtils; import nl.wldelft.util.geodatum.GeoPoint; import nl.wldelft.util.io.BackupServerUrlConsumer; import nl.wldelft.util.io.ServerParser; import nl.wldelft.util.timeseries.DefaultTimeSeriesHeader; import nl.wldelft.util.timeseries.TimeSeriesContentHandler; import org.apache.http.HttpEntity; import org.apache.http.NameValuePair; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.text.ParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; /** * TimeSeries reader for the akvo flow service * <p/> * The service queries all locations using the site registration form survey. The survey id is used to get all instances of this survey. * * </pre> */ public class AkvoTimeSeriesServerParser implements ServerParser<TimeSeriesContentHandler>, PropertiesConsumer, PeriodConsumer, BackupServerUrlConsumer { FastDateFormat fastDateFormat = null; private static final Logger log = LogManager.getLogger(); private static long connectionTimeOut = 30 * TimeUnit.SECOND_MILLIS; private Period period = null; final static String SITE_REGISTRATION_SURVEY_ID = "siteRegistrationSurveyId"; // for example 7210020 for the Bishnondi registration form final static String SITE_MONITORING_SURVEY_ID = "siteMonitoringSurveyId"; // for example 7280001 for the Bishnondi monitoring form final static String SITE_SURVEY_ID = "siteSurveyId"; // public final static String OPEN_ID_CONNECT_URL = "openIdConnectUrl"; private Properties properties = null; private Map<JsonNode,String[]> questionsMap = new HashMap<>(); public AkvoTimeSeriesServerParser() { period = null; } @Override public void parse(URL baseUrl, String username, String password, TimeSeriesContentHandler contentHandler) throws Exception { this.fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss'Z'", TimeZoneUtils.GMT, Locale.US, null); String openIdConnectUrl = properties.getString(OPEN_ID_CONNECT_URL); String sessionKey = AkvoFlowApi.getApiAccess(username, password, openIdConnectUrl); parseJsonStream(contentHandler, baseUrl, sessionKey, period); } private void parseJsonStream(TimeSeriesContentHandler contentHandler, URL baseUrl, String sessionKey, Period period) throws IOException, NoSuchAlgorithmException, InvalidKeyException, ParseException { // create question metadata map. SITE_SURVEY_ID is, for example, for Bishnondi: 9290008 JsonNode surveyDefinitionJson = AkvoFlowApi.getJsonResponseFromFlow(baseUrl + "/surveys/" + properties.getString(SITE_SURVEY_ID), sessionKey); questionsMap = createQuestionsMap(surveyDefinitionJson); // get registration data String baseQueryUrl = baseUrl.toString(); List<FormInstance> registrationSites = parseRegistrationForm(baseQueryUrl, sessionKey); // create map of identifier to formInstance for the registration form Map<String, FormInstance> registrationIdMap = new HashMap<>(); for (FormInstance formInstance: registrationSites) { //noinspection ResultOfMethodCallIgnored registrationIdMap.put(formInstance.surveyedLocaleIdentifier, formInstance); } // get monitoring data parseMonitoringSurvey(baseQueryUrl, sessionKey, contentHandler, registrationIdMap, period); } /** * Creates a map of question ids to the name and type of the question * This is necessary, as the raw responses only contain the question id. */ private static Map<JsonNode,String[]> createQuestionsMap(JsonNode surveyDefinition) { Map<JsonNode,String[]> questionsMap = new LinkedHashMap<>(); // get number of forms int numForms = surveyDefinition.get("forms").size(); for (int i = 0; i < numForms; i++) { JsonNode form = surveyDefinition.path("forms").get(i); // Iterate over question groups and questions for the registration form // Creates a question map which contains the id of the question, its name and type. ArrayNode questionGroups = (ArrayNode) ((JsonNode) form).path("questionGroups"); for (int j = 0; j < questionGroups.size(); j++) { ArrayNode questions = (ArrayNode) questionGroups.get(j).path("questions"); for (int k = 0; k < questions.size(); k++) { JsonNode id = questions.get(k).get("id"); String[] value = {questions.get(k).get("name").asText(), questions.get(k).get("type").asText()}; questionsMap.put(id, value); } } } return questionsMap; } private List<FormInstance> parseRegistrationForm(String baseUrl, String sessionKey) throws IOException, InvalidKeyException, NoSuchAlgorithmException { String formInstanceUrl = baseUrl +"/form_instances?survey_id="+ properties.getString(SITE_SURVEY_ID) + "&form_id=" + properties.getString(SITE_REGISTRATION_SURVEY_ID); List<FormInstance> registrationSurveyList = getAllFormInstances(formInstanceUrl, sessionKey); // get the name and geolocation for all form instances JsonNode nameQuestionId = null; JsonNode geolocationQuestionId = null; // iterate over all the question ids. for (JsonNode qID : questionsMap.keySet()) { String[] nameAndType = questionsMap.get(qID); if (nameAndType[0].equals("Name site")) { nameQuestionId = qID; } if (nameAndType[0].equals("Geolocation")) { geolocationQuestionId = qID; } } // get the name and geolocation out of the responses for (FormInstance fi : registrationSurveyList){ if (nameQuestionId != null) { JsonNode name = fi.responses.findValue(nameQuestionId.asText()); if (name != null) { fi.name = name.asText(); } } if (geolocationQuestionId != null) { JsonNode geolocation = fi.responses.findValue(geolocationQuestionId.asText()); if (geolocation != null){ fi.lat = geolocation.get("lat").asText(); fi.lon = geolocation.get("long").asText(); } } } for (FormInstance fi: registrationSurveyList) { log.info("Registration Survey Name: '" + fi.name + "' id: '" + fi.surveyedLocaleIdentifier + "' Lat: '" + fi.lat + "' Lon: '" + fi.lon + "'"); } return registrationSurveyList; } /** * Determine all registration surveys. * <p> * example call using python test script. * python flow_api_client.py --url "https://akvoflow-141.appspot.com/api/v1/survey_instances?surveyId=1100001" --key "iseNzrMd5gIH3KLYurQEG+o3t3tPGu7BpTDDkMPj3W4=" --secret "cziZu8wWQ2NFHM1t7UW96nZEa+AAh4o4pDOtjDlwM78=" */ private void parseMonitoringSurvey(String baseUrl, String sessionKey, TimeSeriesContentHandler contentHandler, Map<String, FormInstance> registrationIdMap, Period viewPeriod) throws IOException, InvalidKeyException, NoSuchAlgorithmException, ParseException { String formInstanceUrl = baseUrl +"/form_instances?survey_id="+ properties.getString(SITE_SURVEY_ID) + "&form_id=" + properties.getString(SITE_MONITORING_SURVEY_ID); formInstanceUrl += "&beginDate=" + viewPeriod.getStartTime() + "&endDate=" + viewPeriod.getEndTime(); List<FormInstance> monitoringSurveyList = getAllFormInstances(formInstanceUrl, sessionKey); for (FormInstance fi : monitoringSurveyList) { Map<String, String> parameterValues = new LinkedHashMap<>(); Map<String, Long> parameterTimes = new LinkedHashMap<>(); List<String> qualifiers = new ArrayList<>(); String locationId = fi.surveyedLocaleIdentifier; // get corresponding site formInstance (which has name and geolocation) FormInstance surveySite = registrationIdMap.get(locationId); // parse all question responses // iterate over all the question ids. Iterator<JsonNode> qItr = questionsMap.keySet().iterator(); while(qItr.hasNext()){ JsonNode qID = qItr.next(); String[] nameAndType = questionsMap.get(qID); // is there a node that corresponds to this id? As answers are stored in the question groups, // the responses object is two levels deep. So it is easier to just find questions by id. JsonNode answer = fi.responses.findValue(qID.asText()); if (answer == null) continue; // handle different types of results if ("FREE_TEXT".equals(nameAndType[1])) { //noinspection ResultOfMethodCallIgnored parameterValues.put(nameAndType[0], answer.asText()); // param name and value. //noinspection ResultOfMethodCallIgnored String submissionDate = fi.responses.get("submissionDate").asText(); parameterTimes.put(nameAndType[0], fastDateFormat.parseToMillis(submissionDate)); } if ("OPTION".equals(nameAndType[1])) { qualifiers.add(nameAndType[0] + "_" + handleOptionQuestionAnswer(answer)); } if ("CADDISFLY".equals(nameAndType[1])) { JsonNode results = answer.findValue("result"); for(int i = 0; i < results.size(); i++){ String name = results.get(i).get("name").asText(); String value = results.get(i).get("value").asText(); // add as parameter values //noinspection ResultOfMethodCallIgnored parameterValues.put(name, value); // param name and value. parameterTimes.put(name, fastDateFormat.parseToMillis(fi.submissionDate)); } } } for (Map.Entry<String, String> stringStringEntry : parameterValues.entrySet()) { DefaultTimeSeriesHeader header = new DefaultTimeSeriesHeader(); if (surveySite != null) { contentHandler.setGeometry(determineGeometry(contentHandler, surveySite.lat, surveySite.lon)); } header.setLocationId(locationId); header.setParameterId(stringStringEntry.getKey()); header.setQualifierIds(qualifiers.toArray(Clasz.strings.emptyArray())); contentHandler.setTimeSeriesHeader(header); contentHandler.setValue('.', stringStringEntry.getValue()); contentHandler.setTime(parameterTimes.get(stringStringEntry.getKey())); contentHandler.applyCurrentFields(); } // Now write the time series. } } // Option questions are rendered as a single string, in which the different responses are // concatenated with a "|" character as separator. private static String handleOptionQuestionAnswer(JsonNode response) { String responseStr = ""; ArrayNode values = (ArrayNode)response; for(int i = 0; i < values.size(); i++){ JsonNode value = values.get(i).get("text"); if(value != null){ if(responseStr.isEmpty()){ responseStr = values.get(i).get("text").asText(); }else{ responseStr = responseStr + "|" + value.asText(); } } } return responseStr; } private static FormInstanceResult getFormInstances(String url, String sessionKey) { JsonNode queryResultJson = AkvoFlowApi.getJsonResponseFromFlow(url, sessionKey); return parseJsonFormInstances(queryResultJson); } private static List<FormInstance> getAllFormInstances(String url, String sessionKey) { List<FormInstance> result = new ArrayList<>(); FormInstanceResult formInstanceResult = getFormInstances(url, sessionKey); result.addAll(formInstanceResult.formInstances); while(formInstanceResult.nextPageUrl != null){ formInstanceResult = getFormInstances(formInstanceResult.nextPageUrl, sessionKey); result.addAll(formInstanceResult.formInstances); } return result; } private static FormInstanceResult parseJsonFormInstances(JsonNode jsonNode) { FormInstanceResult formInstanceResult = new FormInstanceResult(); List<FormInstance> result = new ArrayList<>(); ArrayNode formInstanceNodes = (ArrayNode)jsonNode.path("formInstances"); for (JsonNode formInstanceNode : formInstanceNodes){ FormInstance formInstance = new FormInstance(); formInstance.surveyedLocaleIdentifier = formInstanceNode.get("identifier").asText(); formInstance.formInstanceId = formInstanceNode.get("id").asText(); formInstance.submissionDate = formInstanceNode.get("submissionDate").asText(); formInstance.responses = formInstanceNode.get("responses"); result.add(formInstance); } // add all form instances to the result object formInstanceResult.formInstances = result; // if there is more data, populate the nextPageResult field if (jsonNode != null && jsonNode.get("nextPageUrl") != null){ formInstanceResult.nextPageUrl = jsonNode.get("nextPageUrl").asText(); } return formInstanceResult; } private static GeoPoint determineGeometry(TimeSeriesContentHandler contentHandler, String latitude, String longitude) throws IOException { double x = TextUtils.tryParseDouble(longitude, Double.NaN); if (Double.isNaN(x)) throw new IOException("Number expected for y coordinate;" + longitude); double y = TextUtils.tryParseDouble(latitude, Double.NaN); if (Double.isNaN(y)) throw new IOException("Number expected for x coordinate;" + latitude); return contentHandler.getDefaultGeoDatum().createXYZ(x, y, 0d); } private static HttpURLConnection getHttpURLConnection(URL url) throws IOException { HttpURLConnection connection = (HttpURLConnection) url.openConnection(); int connectionTimeout = (int) connectionTimeOut; connection.setConnectTimeout(connectionTimeout); int readTimeout = (int) connectionTimeOut; connection.setReadTimeout(readTimeout); connection.setDoInput(true); return connection; } /** * Properties are used to filter out specific sensors for a location. */ @Override public void setProperties(Properties properties) { this.properties = properties; } @Override public void setPeriod(Period period) { this.period = period; } @Override public void setBackupServerUrls(URL[] backupUrls) { // not implemented. } @Override public void setConnectionTimeout(int timeoutMillis) { //noinspection AssignmentToStaticFieldFromInstanceMethod connectionTimeOut = timeoutMillis; } private static class FormInstance { private String surveyedLocaleIdentifier = null; // unique locale identifier private String formInstanceId = null; // substance instance id. unique identifier to the retrieve question answers for a survey instance. private JsonNode responses = null; private String name = null; String lat = null; String lon = null; private String submissionDate = null; } private static class FormInstanceResult { // If a nextUrl property was set in the meta object, there are more results. This value will be passed in the next call. String nextPageUrl = null; // The detected survey instances. List<FormInstance> formInstances = null; long time = 0L; } public static final class AkvoFlowApi { private AkvoFlowApi() { } public static String getApiAccess(String userName, String passWord, String idConnnectUrl) throws IOException { String refreshToken = getOfflineTokenFromKeyCloak(userName, passWord, idConnnectUrl); return getAccessTokenFromKeycloak(refreshToken, idConnnectUrl); } /* * A POST needs to be made to "https://login.akvo.org/auth/realms/akvo/protocol/openid-connect/token" with params * -client_id=curl * -username=<keycloakUserName> * -password=<keycloackPassword> * -grant_type=password * -scope=openid offline_access * The username and password are the keycloak user credentials. Here, the are obtained from environment variables. * This will generate a json object with the refresh_token. This token needs to be used to generate the access_token * */ public static String getOfflineTokenFromKeyCloak(String userName, String passWord, String idConnectUrl) throws IOException { try (CloseableHttpClient client = HttpClients.createDefault()) { HttpPost post = new HttpPost( idConnectUrl); List<NameValuePair> data = new ArrayList<>(); data.add(new BasicNameValuePair("client_id", "curl")); data.add(new BasicNameValuePair("username", userName)); data.add(new BasicNameValuePair("password", passWord)); data.add(new BasicNameValuePair("grant_type", "password")); data.add(new BasicNameValuePair("scope", "openid offline_access")); post.setEntity(new UrlEncodedFormEntity(data)); try (CloseableHttpResponse response = client.execute(post)) { JsonFactory jsonFactory = new JsonFactory(); ObjectMapper objectMapper = new ObjectMapper(jsonFactory); JsonNode root = objectMapper.readTree(response.getEntity().getContent()); return root.get("refresh_token").asText(); } } } /* * To make any API call to fetch data from Akvo Flow , we need to generate an access_token. * A new access_token should be generated before every API call * To generate access_token POST to https://login.akvo.org/auth/realms/akvo/protocol/openid-connect/token with * -client_id=curl * -refresh_token=<refreshToken> (see getOfflineToken method) * -grant_type=refresh_token * * The POST returns a JSON object with the access_token */ public static String getAccessTokenFromKeycloak(String refreshToken, String idConnnectUrl) throws IOException { try (CloseableHttpClient client = HttpClients.createDefault()) { HttpPost post = new HttpPost(idConnnectUrl); List<NameValuePair> data = new ArrayList<NameValuePair>(); data.add(new BasicNameValuePair("client_id", "curl")); data.add(new BasicNameValuePair("refresh_token", refreshToken)); data.add(new BasicNameValuePair("grant_type", "refresh_token")); post.setEntity(new UrlEncodedFormEntity(data)); try (CloseableHttpResponse response = client.execute(post)) { JsonFactory jsonFactory = new JsonFactory(); ObjectMapper objectMapper = new ObjectMapper(jsonFactory); JsonNode root = objectMapper.readTree(response.getEntity().getContent()); JsonNode accessTokenJson = root.get("access_token"); return accessTokenJson.asText(); } } } public static JsonNode getJsonResponseFromFlow(String url, String accessToken) { CloseableHttpResponse response = null; try { HttpGet get = new HttpGet(url); get.addHeader("Authorization", "Bearer " + accessToken); get.addHeader("Accept", "application/vnd.akvo.flow.v2+json"); get.addHeader("User-Agent", "java-flow-api-example"); try (CloseableHttpClient client = HttpClients.createDefault()) { response = client.execute(get); JsonFactory jsonFactory = new JsonFactory(); ObjectMapper objectMapper = new ObjectMapper(jsonFactory); JsonNode responseNode; try { HttpEntity entity = response.getEntity(); responseNode = objectMapper.readTree(EntityUtils.toString(entity, "UTF-8")); } catch (UnsupportedOperationException | IOException e) { log.warn("Error ", e); return null; } return responseNode; } } catch (IOException e1) { log.warn("Error ", e1); return null; } } } }