Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

What

ImportAmalgamate

Required

no

Description

Amalgamates external historical data

schema location

http

https://

fews

fewsdocs.

wldelft

deltares.nl/schemas/version1.0/importAmalgamate.xsd


Excerpt
hiddentrue

Amalgamates import time series based on workflows

...

One or more workflow ids that import or created create external historical time series over a short time span (scheduled frequently). Note that this should be the id of the main workflow, i.e. the one that is actually scheduled and leads to a task run. Referring to workflows that are part of the main workflow will not work. 

importRunMinimalAge

Workflow runs younger than the specified age are skipped; only workflow runs (TaskRuns) that exist in the TaskRuns database table and are older than the importRunMinimalAge will be included in the ImportAmalgamate module.

Note: After the amalgamate has run it is no longer possible to create an archive with the exact original data available during the run. In those cases make sure the inportAmalgamate importAmalgamate workflow schedule and minimalAge are streamlined with the archive runs.

afterAmalgamateImportRunMetaDataExpiryTime

Since 2023.01. By default the import/update run is deleted from the database after amalgamation. It is possible to keep the metadata for a while after amalgamation. This metadata can be still seen in the time series lister as an entry without time series. NB. Only use for workflows that you need to keep, e.g. not for a frequently running SystemMetrics workflow since otherwise the SystemActivities table may grow unexpectedly.

amalgamateOrphans

For systems that did not have an amalgamate module running it maybe required to amalgamate the complete database. This should be only be done once or when new workflows are added to the ImportAmalgamate module. It is possible to do this with the amalgamateOrphans=true. As this task may take too long (regular FSS tasks have a time out of 3 hours) this should be handled with care. In that case it maybe may be useful to have the task run at an FSS that has no localDataStore, but has direct access to the central database. Only in that particular case the amalgamate runs (hard coded in the software) only for only 1 hour. The task will be aborted after one hour and the remaining imports are not handled in that taskrun anymore. This is done on purpose, as it is now possible to schedule this task with an interval of e.g. 3 hours (making it possible to the MC to do all the required cleaning stuff like rollingBarrel and markedRecordManager). In a few hours (to even days or in some rare situations evens weeks) all the taskruns are handled and the complete database has been amalgamated. Do not forget to stop this scheduled task after the complete database is amalgamated.

Example

Code Block
xml
xml
titleModuleDescriptors.xml
<moduleDescriptor id="ImportAmalgamate">
  <className>nl.wldelft.fews.system.plugin.amalgamate.ImportAmalgamate</className>
</moduleDescriptor>
Code Block
xmlxmltitleModuleInstanceDescriptors.xml
<moduleInstanceDescriptor id="Amalgamate">
  <moduleId>ImportAmalgamate</moduleId>
</moduleInstanceDescriptor>

...


In the module instance configuration the workflowId's can be specified or a workflow pattern can be used when many workflows need to be amalgamated.

Code Block
xml
xml
titleAmalgamate.xml
<?xml version="1.0" encoding="UTF-8"?>
<importAmalgamate xmlns="http://www.wldelft.nl/fews" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.wldelft.nl/fews httphttps://fewsfewsdocs.wldelftdeltares.nl/schemas/version1.0/importAmalgamate.xsd">
  <workflowId>Import_Data</workflowId>
  <workflowId>Procesoverzicht_Update</workflowId>
  <workflowIdPattern>*_Process_Observations</workflowIdPattern>
  <importRunMinimalAge unit="hour"/>
  <amalgamateOrphans>false</amalgamateOrphans>
</importAmalgamate>

Best Practices

The original Taskruns are deleted from the TaskRuns database table after successful run of the ImportAmalgamate module. To make sure the configured workflows are successfully amalgamated and not removed by the rolling barrel it is best to configure a large expiry time for the amalgamated workflows (import, process) in the workflowDescriptors.xml. Also the amalgamation task expiry time should be set (via the AdminInterface) to the largest expiry time for the workflows in the amalgamation task. Furthermore, do not forget to configure an explicit expiry time for the time series sets in the workflows that are amalgamated. The expiry time of the individual time series is preserved while amalgamated.


Code Block
xml
xml
titleAmalgamate.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml version="1.0" encoding="UTF-8"?>
	<importAmalgamate xmlns="http://www.wldelft.nl/fews"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.wldelft.nl/fews http://fews.wldelft.nl/schemas/version1.0/importAmalgamate.xsd">
	<workflowId>Import</workflowId>
	<workflowIdPattern>Import*</workflowIdPattern>
    <importRunMinimalAge unit="hour"/> 
	<afterAmalgamateImportRunMetaDataExpiryTime multiplier="365" unit="day"/>
</importAmalgamate>

Best Practices

The Database Maintenance (ImportAmalgamate) workflow should When the Database Maintenance (ImportAmalgamate) workflow is run frequently (every 12 hours or 1 day) then in order to amalgamate the small time series blobs generated in workflows that run with high frequency (5-15 minutes) will be amalgamated to larger blobs of 12 hours or 1 day. A second ImportAmalgamate workflow can should be scheduled that will run for example every week or every month that amalgamates the already amalgamated workflows in even larger blobs. To do so, schedule a new Database Maintenance workflow with an interval of 30 days that runs an ImportAmalgamate module instance that amalgamates all time series blobs generated in the Database Maintenance workflows. This second ImportAmalgamate step can reduce the the number of records in the time series table considerably.  The expiry time of the individual time series is preserved while amalgamated. Therefore it is important to configure an expiry time for the time series sets in the workflows that are amalgamated. 


Code Block
titleAmalgamate_Month.xml
<?xml version="1.0" encoding="UTF-8"?>
<importAmalgamate xmlns="http://www.wldelft.nl/fews" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.wldelft.nl/fews httphttps://fewsfewsdocs.wldelftdeltares.nl/schemas/version1.0/importAmalgamate.xsd">
  <workflowId>Database_Maintenance</workflowId>
  <importRunMinimalAge unit="day" multiplier="30"/>
  <amalgamateOrphans>false</amalgamateOrphans>
</importAmalgamate>

 Steps

  1. Identify which external historicals need to be amalgamated.
  2. Pick a suitable expiry time T for the taskrun metadata. The taskrun expiry time for amalgamation timeseries should never be set to expire before the timeseries do. By default a TaskRun expires after 10 days and the timeseries become orphaned, but for amalgamation timeseries a much longer expiry time and taskrun expiry time is recommended. It is essential that the taskrun meta data for the "to be amalgamated imported external historical timeseries" does not expire before the amalgamation takes place. When using amalgamate, the taskrun meta data should only be deleted by the amalgamate module.
  3. When import runs for external historical timeseries need to run regularly, set up a task and workflow for importing external historical timeseries. The timeseries expiry time is preserved by the amalgamate module and not influenced by the taskrun metadata expiry time. By default the timeseries expiry time is the same as that of the import run, but it is strongly recommended to overrule this by setting an explicit timeseries expiry time in the import module. Use the taskrun metadata expiry time T for the task determined under step @1. Set for instance the expiry time of the task in the Admin Interface to 50 years.
  4. Optionally set up an Archive export task and workflow. Obviously this must be before the taskrun metadata expires and before a daily amalgamate has acted on the data.
  5. Set up a task and workflow for running a daily amalgamate (after the archive export before taskrun of the import expires). Use the taskrun metadata expiry time T for the task. Set for instance the expiry time of the task in the Admin Interface.
  6. When preserving external historical timeseries much longer than a week, setup a separate task and workflow for a weekly amalgamate. This workflow should be setup to process the output of the daily amalgamate. Obviously this workflow must therefore be run before the daily amalgamate taskrun metadata expires. Use the taskrun metadata expiry time T for the task. Set for instance the expiry time of the task in the Admin Interface..
  7. When preserving external historical timeseries much longer than a month, setup a separate task and workflow for a monthly amalgamate. This workflow should be setup to process the output of the weekly amalgamate. Obviously this workflow must therefore be run before the weekly amalgamate taskrun metadata expires.

Troubleshooting

  1. Verify that the workflow id's in the ImportAmalgamate module refer to the main workflows (i.e. the ones that are actually scheduled) and not to any workflows that are run as part of a main workflow.
  2. Verify that the import workflows are inserting expected amounts of external historical TimeSeries with the correct period - e.g. the begin time must start after the end time of the previous run. Check in DatabaseLister!
  3. Check that the processing steps that operate on the external historical data do not contain transformations that produce warnings about "Overwriting with missing" (evil!).
  4. Verify the history popup (Ctrl-H) in the TimeSeriesDialog does not contain multiple entries.
  5. Verify that the external historical TimeSeries records are created with a healthy expiryTime.
  6. Verify that the TaskRuns for external historical TimeSeries records created with a healthy expiryTime.
  7. Verify that the Tasks for external historical TimeSeries records created with a healthy expiryTime.
  8. Does the ImportAmalgamate configuration include all the workflowIds of interest.
  9. Does the ImportAmalgamate run without errors.

Amalgamate with multiple MC's

...

While tuning the ImportAmalgamate the following sql can be used. Please enable the appropriate  appropriate 7 day condition on creationTime in below query for your database flavour to see what Tasks have been produced over the last week with external historical timeseries. Also the AdminInterface database trend information page can be very useful for this purpose.

Code Block
languagesql
SELECT t1.workflowId, t1.maxTaskExpiry, t1.taskCnt, t1.taskRunCnt, t1.maxTaskRunExpiryTime, t2.timeSeriesCnt, t2.maxTimeSeriesExpirytime FROM (
 SELECT t3.workflowId, t3.taskId, t3.maxTaskExpiry, t3.taskCnt, t4.taskRunCnt, t4.maxTaskRunExpiryTime FROM (
 SELECT workflowId, taskId, MAX(expiryTime) AS maxTaskExpiry, count(*) AS taskCnt FROM Tasks WHERE
 
 ----WHERE creationTime > TRUNC(sysdate) - INTERVAL '7' DAY --oracle
 --WHEREpostgresql ----
-- creationTime > current_date - interval '7 days' AND
 
---- sqlserver ----postgresql
 --WHERE creationTime > DATEADD(DAY,-7, GETDATE()) AND
 
-- sqlserver
 
 WHERE -- oracle ----
-- creationTime > TRUNC(sysdate) - INTERVAL '7' DAY AND
 
workflowId IN (
SELECT workflowId FROM Tasks WHERE taskId IN (SELECT taskId FROM TaskRuns WHERE taskRunId IN (SELECT creatorTaskRunId FROM TimeSeries WHERE timeSeriesType IN (2,3))))=0))
---- or only already amalgamated workflowIds, e.g. : ----
--SELECT workflowid FROM WorkflowFiles WHERE workflowId='Import_Amalgamate'
)
 GROUP BY taskId, workflowId
 ) t3 LEFT JOIN (
 SELECT workflowId, t.taskId, count(tr.taskRunId) AS taskRunCnt, max(tr.expiryTime) AS maxTaskRunExpiryTime
 FROM Tasks t, TaskRuns tr
 WHERE t.taskId=tr.taskId
 GROUP BY t.taskId, workflowId
 ) t4 ON t3.taskId=t4.taskId AND t3.workflowId=t4.workflowId
) t1 LEFT JOIN (
 SELECT t.taskId, workflowId, count(1) AS timeSeriesCnt, MAX(ts.expiryTime) AS maxTimeSeriesExpirytime
 FROM Tasks t, TaskRuns tr, TimeSeries ts
 WHERE t.taskId=tr.taskId AND ts.creatorTaskRunId=tr.taskRunId
 GROUP BY workflowId, t.taskId
) t2 ON t1.taskId=t2.taskId AND t1.workflowId=t2.workflowId
ORDER BY t1.workflowId, t1.taskId