Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Excerpt
hiddentrue

: Use multiple cores when running ensemble members in a loop

Function:

runInLoopParallelProcessorCount, set the amount of cores available to Delft-FEWS when running ensemble workflows

Module Name:

runInLoopParallelProcessorCount

Where to Use?

global properties file

Why to Use?

to speed-up ensemble runs on multi core machines

Description:

The runInLoopParallelProcessorCount en try in the global properties files indicated the number of cores Delft-FEWS may use when running ensemble members in a loop

Preconditions:

2009-02 release, multi core cpu or multi cpu computer

Outcome(s):

speed-up of the computations

Scheendump(s):

link to attached screendump(s) for displays only

Remark(s):

The speedup that may be obtained is highly dependent on the type of module you are running

Available since:

DelftFEWS200902

Contents

Table of Contents

Overview

Delft-FEWS can split ensemble workflows (that have the runInLoop element set to true) over multiple cores. Based on the available amount of cores a number of queues is made, one for each core. When running the activity the different ensemble members are added to the different queues. An example of a workflow that can use this feature is shown below:

Code Block
xml
xml

	<activity>
		<runIndependent>true</runIndependent>
		<moduleInstanceId>MOGREPS_Spatial_Interpolation</moduleInstanceId>
		<ensemble>
			<ensembleId>MOGREPS</ensembleId>
			<runInLoop>true</runInLoop>
		</ensemble>
	</activity>

...

Module

Remarks

Transformation (old)

Test ok

Interpolation

Test ok. Interpolation via DLL not tested

TransformationModule (new)

Test ok. Interpolation via DLL not tested

pcrTransformation

Test ok

General Adapter

test ok

Sample input and output (with explanation of all options)

An example for parallel running an import and then processing of the import modules is:

Code Block
xml
xml

<parallel>
  <activity>
    <runIndependent>true</runIndependent>
    <moduleInstanceId>Import1</moduleInstanceId>
  </activity>
  <activity>
    <runIndependent>true</runIndependent>
    <moduleInstanceId>Import2</moduleInstanceId>
  </activity>
  <activity>
    <runIndependent>true</runIndependent>
    <moduleInstanceId>Import3</moduleInstanceId>
  </activity>
</parallel>
<activity>
  <runIndependent>true</runIndependent>
  <moduleInstanceId>CombineImports</moduleInstanceId>
</activity>
<activity>
  <runIndependent>true</runIndependent>
  <moduleInstanceId>SpatialInterpolation</moduleInstanceId>
</activity>

...

The workflow shows:

Code Block
xml
xml

<activity>
  <runIndependent>false</runIndependent>
  <moduleInstanceId>Sobek_Prep</moduleInstanceId>
  <ensemble>
    <ensembleId>EPS</ensembleId>
    <runInLoop>true</runInLoop>
  </ensemble>
</activity>
<activity>
  <runIndependent>false</runIndependent>
  <moduleInstanceId>Sobek_GA</moduleInstanceId>
  <ensemble>
    <ensembleId>EPS</ensembleId>
    <runInLoop>true</runInLoop>
  </ensemble>
</activity>
<activity>
  <runIndependent>true</runIndependent>
  <moduleInstanceId>Sobek_Post</moduleInstanceId>
</activity>

...

The General Adapter configuration of the Sobek_GA should look like:

Code Block
xml
xml

<general>
  <rootDir>%TEMP_DIR%</rootDir>
  <workDir>%ROOT_DIR%/work</workDir>
  ...
</general>
<activities>
  <exportActivities>

    <exportDataSetActivity>
      <moduleInstanceId>Sobek_GA</moduleInstanceId>
    </exportDataSetActivity>
  </exportActivities>

  <executeActivities>
    <executeActivity>
      <command>
        <className>nl.wldelft.fews.adapter.sobek.PreSobekModelAdapter</className>
      </command>
      <arguments>
        <argument>%ROOT_DIR%</argument>
        <argument>Config/sobekConfig.xml</argument>
      </arguments>
      <timeOut>60000</timeOut>
      <waitForOtherRun>true</waitForOtherRun>
      <overrulingDiagnosticFile>%ROOT_DIR%/diagnostics/presobekmodeladapter.xml</overrulingDiagnosticFile>
    </executeActivity>
    <executeActivity>
      <command>
        <executable>%ROOT_DIR%/bin/sobeksim.exe</executable>
      </command>
      <arguments>
        <argument>%ROOT_DIR%/bin/sobeksim.fnm</argument>
      </arguments>
      <timeOut>600000</timeOut>
      <ignoreDiagnostics>true</ignoreDiagnostics>
    </executeActivity>
    <executeActivity>
      <command>
        <className>nl.wldelft.fews.adapter.sobek.PostSobekModelAdapter</className>
      </command>
      <arguments>
        <argument>%ROOT_DIR%</argument>
        <argument>Config/sobekConfig.xml</argument>
      </arguments>
      <timeOut>60000</timeOut>
      <overrulingDiagnosticFile>%ROOT_DIR%/diagnostics/postsobekmodeladapter.xml</overrulingDiagnosticFile>
    </executeActivity>
  </executeActivities>
  <importActivities>
    ...

  </importActivities>
</activities>

...

In this example the next global.properties should be defined:

No Format

runInLoopParallelProcessorCount=2
tempDir=d:/temp

...

A configuration example:

Code Block
xml
xml

<errorModelSet>
  <inputVariable variableId="observation">
    <timeSeriesSet>
      <moduleInstanceId>Import</moduleInstanceId>
      <valueType>scalar</valueType>
      <parameterId>Q.obs</parameterId>
      <locationId>NA_Mastenbroek</locationId>
      <timeSeriesType>external historical</timeSeriesType>
      <timeStep unit="hour"/>
      <relativeViewPeriod unit="hour" start="-96" end="0" startOverrulable="true" endOverrulable="false"/>
      <readWriteMode>read only</readWriteMode>
      <ensembleId>main</ensembleId>
    </timeSeriesSet>
  </inputVariable>
  <inputVariable variableId="update_run">
    <timeSeriesSet>
      <moduleInstanceId>Sobek_Update</moduleInstanceId>
      <valueType>scalar</valueType>
      <parameterId>Q.sim.hist</parameterId>
      <locationId>NA_Mastenbroek</locationId>
      <timeSeriesType>simulated historical</timeSeriesType>
      <timeStep unit="hour"/>
      <relativeViewPeriod unit="hour" start="-96" end="0" startOverrulable="true" endOverrulable="false"/>
      <readWriteMode>read only</readWriteMode>
      <ensembleId>main</ensembleId>
    </timeSeriesSet>
  </inputVariable>
  <inputVariable variableId="forecast_run">
    <timeSeriesSet>
      <moduleInstanceId>Sobek_Forecast</moduleInstanceId>
      <valueType>scalar</valueType>
      <parameterId>Q.sim.for</parameterId>
      <locationId>NA_Mastenbroek</locationId>
      <timeSeriesType>simulated forecasting</timeSeriesType>
      <timeStep unit="hour"/>
      <relativeViewPeriod unit="hour" start="-96" end="120" startOverrulable="true" endOverrulable="true"/>
      <readWriteMode>read only</readWriteMode>
      <ensembleId>EPS</ensembleId>
    </timeSeriesSet>
  </inputVariable>
  <autoOrderMethod>
    <orderSelection>true</orderSelection>
    <order_ar>3</order_ar>
    <order_ma>1</order_ma>
    <subtractMean>true</subtractMean>
    <boxcoxTransformation>false</boxcoxTransformation>
    <lambda>0</lambda>
    <observedTimeSeriesId>meting</observedTimeSeriesId>
    <simulatedTimeSeriesId>forecast_run</simulatedTimeSeriesId>
    <simulatedTimeSeriesId>update_run</simulatedTimeSeriesId>
    <outputTimeSeriesId>corrected</outputTimeSeriesId>
  </autoOrderMethod>
  <interpolationOptions>
    <interpolationType>linear</interpolationType>
    <gapLength>6</gapLength>
  </interpolationOptions>
  <minResult>0</minResult>
  <ignoreDoubtful>true</ignoreDoubtful>
  <outputVariable variableId="corrected">
    <timeSeriesSet>
      <moduleInstanceId>Sobek_Forecast</moduleInstanceId>
      <valueType>scalar</valueType>
      <parameterId>Q.updated.for</parameterId>
      <locationId>NA_Mastenbroek</locationId>
      <timeSeriesType>simulated forecasting</timeSeriesType>
      <timeStep unit="hour"/>
      <relativeViewPeriod unit="hour" start="-96" end="120" startOverrulable="true" endOverrulable="true"/>
      <readWriteMode>add originals</readWriteMode>
      <ensembleId>EPS</ensembleId>
    </timeSeriesSet>
  </outputVariable>
</errorModelSet>

...