Page tree
Skip to end of metadata
Go to start of metadata

What

ImportAmalgamate

Required

no

Description

Amalgamates external historical data

schema location

http://fews.wldelft.nl/schemas/version1.0/importAmalgamate.xsd

Description

Workflows that import or process external historical data may produce small time series objects (stored as binary objects,blobs, in the time series database table). These small blobs may only cover a few time steps. If the time series table contains many of these small blobs, the number of records in the database can grow considerable and become very large (>1 mln records). In those cases it is advised to run a database maintenance workflow that amalgamates the small database records in the time series table into larger database records that cover larger time periods and store the time series data in larger blobs. In Delft-FEWS this can be done with the ImportAmalgamate module; this module amalgamates all time series records that are created in specific workflows. After the amalgamate module completed the amalgamate task, the original amalgamated workflows with all its imported external data is scheduled for deletion. The amalgamated time series will be stored in the database as time series that are created by the database maintenance workflow.

Note: Large grids, external forecasts and samples are not handled through this module.

Configuration of workflow duration

When the database maintenance workflow (with the amalgamate module) is executed for the first time the workflow can take some time to complete. In those cases it is advised to change the allowable run time of a workflow run. The fews.master.mc.conf file contains a chaser grace time property, that defines the maximum time that this workflow is allowed to run. Please consider whether this value needs to be temporarily increased.

Configuration

It is advised to use a separate database maintenance workflow that includes the ImportAmalgamate module instance. The configuration of the ImportAmalgamate module instance contains the following elements, see config example below. 

workflowId

One or more workflow ids that import or created external historical time series over a short time span (scheduled frequently).

importRunMinimalAge

Workflow runs younger than the specified age are skipped; only workflow runs (TaskRuns) that exist in the TaskRuns database table and are older than the importRunMinimalAge will be included in the ImportAmalgamate module.

Note: After the amalgamate has run it is no longer possible to create an archive with the exact original data available during the run. In those cases make sure the inportAmalgamate workflow schedule and minimalAge are streamlined with the archive runs.

amalgamateOrphans

For systems that did not have an amalgamate module running it maybe required to amalgamate the complete database. This should be only be done once or when new workflows are added to the ImportAmalgamate module. It is possible do this with the amalgamateOrphans=true. As this task may take too long (regular FSS tasks have a time out of 3 hours) this should be handled with care. In that case it maybe useful to have the task run at an FSS that has no localDataStore, but has direct access to the central database. Only in that particular case the amalgamate runs (hard coded in the software) for only 1 hour. The task will be aborted after one hour and the remaining imports are not handled in that taskrun anymore. This is done on purpose, as it is now possible to schedule this task with an interval of e.g. 3 hours (making it possible to the MC to do all the required cleaning stuff like rollingBarrel and markedRecordManager). In a few hours (to even days or in some rare situations evens weeks) all the taskruns are handled and the complete database has been amalgamated. Do not forget to stop this scheduled task after the complete database is amalgamated.

Example

ModuleDescriptors.xml
<moduleDescriptor id="ImportAmalgamate">
  <className>nl.wldelft.fews.system.plugin.amalgamate.ImportAmalgamate</className>
</moduleDescriptor>
ModuleInstanceDescriptors.xml
<moduleInstanceDescriptor id="Amalgamate">
  <moduleId>ImportAmalgamate</moduleId>
</moduleInstanceDescriptor>

 

In the module instance configuration the workflowId's can be specified or a workflow pattern can be used when many workflows need to be amalgamated.

Amalgamate.xml
<?xml version="1.0" encoding="UTF-8"?>
<importAmalgamate xmlns="http://www.wldelft.nl/fews" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.wldelft.nl/fews http://fews.wldelft.nl/schemas/version1.0/importAmalgamate.xsd">
  <workflowId>Import_Data</workflowId>
  <workflowId>Procesoverzicht_Update</workflowId>
  <workflowIdPattern>*_Process_Observations</workflowIdPattern>
  <importRunMinimalAge unit="hour"/>
  <amalgamateOrphans>false</amalgamateOrphans>
</importAmalgamate>

Best Practices

The original Taskruns are deleted from the TaskRuns database table after successful run of the ImportAmalgamate module. To make sure the configured workflows are successfully amalgamated and not removed by the rolling barrel it is best to configure a large expiry time for the amalgamated workflows (import, process) in the workflowDescriptors.xml. Also the amalgamation task expiry time should be set (via the AdminInterface) to the largest expiry time for the workflows in the amalgamation task. Furthermore, do not forget to configure an explicit expiry time for the time series sets in the workflows that are amalgamated. The expiry time of the individual time series is preserved while amalgamated.

When the Database Maintenance (ImportAmalgamate) workflow is run frequently (every 12 hours or 1 day) then the small time series blobs generated in workflows that run with high frequency (5-15 minutes) will be amalgamated to larger blobs of 12 hours or 1 day. A second ImportAmalgamate workflow can be scheduled that will run every week or month that amalgamates the already amalgamated workflows in even larger blobs. To do so, schedule a new Database Maintenance workflow with an interval of 30 days that runs an ImportAmalgamate module instance that amalgamates all time series blobs generated in the Database Maintenance workflows. This second ImportAmalgamate step can reduce the number of records in the time series table considerably. 

 

Amalgamate_Month.xml
<?xml version="1.0" encoding="UTF-8"?>
<importAmalgamate xmlns="http://www.wldelft.nl/fews" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.wldelft.nl/fews http://fews.wldelft.nl/schemas/version1.0/importAmalgamate.xsd">
  <workflowId>Database_Maintenance</workflowId>
  <importRunMinimalAge unit="day" multiplier="30"/>
  <amalgamateOrphans>false</amalgamateOrphans>
</importAmalgamate>

 

Steps

  1. Identify which external historicals need to be amalgamated.
  2. Pick a suitable expiry time T for the taskrun metadata. The taskrun expiry time for amalgamation timeseries should never be set to expire before the timeseries do. By default a TaskRun expires after 10 days and the timeseries become orphaned, but for amalgamation timeseries a much longer expiry time and taskrun expiry time is recommended. It is essential that the taskrun meta data for the "to be amalgamated imported external historical timeseries" does not expire before the amalgamation takes place. When using amalgamate, the taskrun meta data should only be deleted by the amalgamate module.
  3. When import runs for external historical timeseries need to run regularly, set up a task and workflow for importing external historical timeseries. The timeseries expiry time is preserved by the amalgamate module and not influenced by the taskrun metadata expiry time. By default the timeseries expiry time is the same as that of the import run, but it is strongly recommended to overrule this by setting an explicit timeseries expiry time in the import module. Use the taskrun metadata expiry time T for the task determined under step @1. Set for instance the expiry time of the task in the Admin Interface to 50 years.
  4. Optionally set up an Archive export task and workflow. Obviously this must be before the taskrun metadata expires and before a daily amalgamate has acted on the data.
  5. Set up a task and workflow for running a daily amalgamate (after the archive export before taskrun of the import expires). Use the taskrun metadata expiry time T for the task. Set for instance the expiry time of the task in the Admin Interface.
  6. When preserving external historical timeseries much longer than a week, setup a separate task and workflow for a weekly amalgamate. This workflow should be setup to process the output of the daily amalgamate. Obviously this workflow must therefore be run before the daily amalgamate taskrun metadata expires. Use the taskrun metadata expiry time T for the task. Set for instance the expiry time of the task in the Admin Interface..
  7. When preserving external historical timeseries much longer than a month, setup a separate task and workflow for a monthly amalgamate. This workflow should be setup to process the output of the weekly amalgamate. Obviously this workflow must therefore be run before the weekly amalgamate taskrun metadata expires.

Amalgamate with multiple MC's

Special attention must be given to Delft-FEWS systems that have a duty-standby set-up. The amalgamate module will only amalgamate workflows that are run on the MC where the amalgamate module is scheduled on. In standard duty-standby systems the import and data processing tasks are run on both MC's. Also the amalgamate modules (database maintenance workflows) must be scheduled to run on both MC's. This is especially relevant when some of the imported data is synchronised between the MC's. Import, processing and database maintenance workflows may never be run in failover mode.

There has been a change in the Delft-FEWS code since April 2016, when the check on MC's is introduced.

Tuning the amalgamate using SQL

While tuning the ImportAmalgamate the following sql can be used. Please enable the appropriate 7 day condition on creationTime in below query for your database flavour to see what Tasks have been produced over the last week with external historical timeseries. Also the AdminInterface database trend information page can be very useful for this purpose.

SELECT t1.workflowId, t1.maxTaskExpiry, t1.taskCnt, t1.taskRunCnt, t1.maxTaskRunExpiryTime, t2.timeSeriesCnt, t2.maxTimeSeriesExpirytime FROM (
 SELECT t3.workflowId, t3.taskId, t3.maxTaskExpiry, t3.taskCnt, t4.taskRunCnt, t4.maxTaskRunExpiryTime FROM (
 SELECT workflowId, taskId, MAX(expiryTime) AS maxTaskExpiry, count(*) AS taskCnt FROM Tasks WHERE
 
----postgresql ----
-- creationTime > current_date - interval '7 days' AND
 
---- sqlserver ----
-- creationTime > DATEADD(DAY,-7, GETDATE()) AND
 
---- oracle ----
-- creationTime > TRUNC(sysdate) - INTERVAL '7' DAY AND
 
workflowId IN (
SELECT workflowId FROM Tasks WHERE taskId IN (SELECT taskId FROM TaskRuns WHERE taskRunId IN (SELECT creatorTaskRunId FROM TimeSeries WHERE timeSeriesType=0))
---- or only already amalgamated workflowIds, e.g. : ----
--SELECT workflowid FROM WorkflowFiles WHERE workflowId='Import_Amalgamate'
)
 GROUP BY taskId, workflowId
 ) t3 LEFT JOIN (
 SELECT workflowId, t.taskId, count(tr.taskRunId) AS taskRunCnt, max(tr.expiryTime) AS maxTaskRunExpiryTime
 FROM Tasks t, TaskRuns tr
 WHERE t.taskId=tr.taskId
 GROUP BY t.taskId, workflowId
 ) t4 ON t3.taskId=t4.taskId AND t3.workflowId=t4.workflowId
) t1 LEFT JOIN (
 SELECT t.taskId, workflowId, count(1) AS timeSeriesCnt, MAX(ts.expiryTime) AS maxTimeSeriesExpirytime
 FROM Tasks t, TaskRuns tr, TimeSeries ts
 WHERE t.taskId=tr.taskId AND ts.creatorTaskRunId=tr.taskRunId
 GROUP BY workflowId, t.taskId
) t2 ON t1.taskId=t2.taskId AND t1.workflowId=t2.workflowId
ORDER BY t1.workflowId, t1.taskId