Here is the material corresponding to the SOBEK 3 scripting tutorial.

 

Code for SOBEK 3 scripting tutorial
#region STEP 1: Import libraries and set data path
from datetime import datetime, time
from Libraries.ChartFunctions import *
from Libraries.Sobek2Functions import *
from Libraries.NetworkFunctions import BranchObjectType
from Libraries.SobekWaterFlowFunctions import *
from Libraries.StandardFunctions import *
import Libraries.MapFunctions as mf
from Libraries.SOBEKTutorialLibrary import *

rootPath = r"D:\Workshop\data"
#endregion
#region STEP 2: Set up flow model
#region STEP 2, OPTION A: Import SOBEK 2.13 model (Zwolle)
model213Path = rootPath + r"\SW_max.lit\2"
hydroModel = ImportSobek2Model(model213Path + r"\NETWORK.TP",False,True,False)
AddToProject(hydroModel)
flowModel = GetModelByName("Flow1D")
# Select location for which measurements are available
dataPoint = GetComputationGridLocationByName(flowModel, "3")
dataPoint.Name = "Gauging Station"
#endregion

#region STEP 2, OPTION B: Import model from multiple source files
#region STEP 2.B.1: Set up model network
network = CreateNetworkFromBranchShapeFile(rootPath + r"\network_Branches.shp")
AddCrossSectionsToNetwork(rootPath + r"\network_Cross Sections.shp", rootPath + r"\CrossSectionProfiles.csv", network)
AddBridgesToNetwork(rootPath + r"\network_Bridges.shp", rootPath + r"\BridgeProfiles.csv", network)
AddBranchObjectsFromShapeFile(BranchObjectType.LateralSource, rootPath + r"\network_Lateral Sources.shp", network)
AddBranchObjectsFromShapeFile(BranchObjectType.Weir , rootPath + r"\network_Weirs.shp", network)
flowModel = WaterFlowModel1D()
flowModel.Network = network
flowModel.Name = "Flow model"
AddToProject(flowModel)
# Show model on map
view = OpenView(flowModel)
# Add satellite image layer 
satLayer = mf.CreateSatelliteImageLayer()
satLayer.ExcludeFromMapExtent = True
map = view.MapView.Map
map.Layers.Add(satLayer)
map.CoordinateSystem = mf.CreateCoordinateSystem(3857) # EPSG code => WGS 84 / Pseudo-Mercator
map.ZoomToExtents()
#endregion
#region STEP 2.B.2: Add Boundary and Initial Conditions
#region Import and assign BC and Laterals
ImportBoundaryConditions(rootPath + r"\boundaryConditions.csv", flowModel)
ImportLateralData(rootPath + r"\laterals.csv", flowModel)
#endregion
#region Roughness
SetDefaultRoughness(flowModel, "Main", RoughnessType.StricklerKs, 30)
sectionFloodPlain = AddNewRoughnessSection(flowModel,"FloodPlain")
crs1 = GetItemByName(network.CrossSections, "prof_SW2815-SW2870_Bo")
minY, maxY = GetMinYMaxYofCrossSectionProfile(crs1)
AddCrossSectionRoughness(flowModel, crs1, minY, maxY, sectionFloodPlain)

crs2 = GetItemByName(network.CrossSections, "prof_D20060515-DP-295")
minY, maxY = GetMinYMaxYofCrossSectionProfile(crs2)
AddCrossSectionRoughness(flowModel, crs2, minY, maxY, sectionFloodPlain)
#endregion
# set initial conditions
SetInitialConditionType(flowModel, InitialConditionType.Depth)
flowModel.DefaultInitialDepth = 3
#endregion
#region STEP 2.B.3: Space and time discretization
# create computational grid with calculation points at every 0.5 m
CreateComputationalGrid(flowModel, gridAtFixedLength = True, fixedLength = 200)
# Select location for which measurements are available
dataPoint = GetComputationGridLocationByName(flowModel, "stadsgrachten Zwolle Midden_0.000")
dataPoint.Name = "Gauging Station"
SetModelTimes(flowModel, datetime(2007, 1, 15, 1, 0, 0), datetime(2007, 1, 17, 1, 0, 0), time(0,10,0,))
#endregion
#endregion
#endregion
#region STEP 3: Run model
RunModel(flowModel, showDialog=True)
#endregion
#region STEP 4: Get Water Level measurements and model results at Gauging Station
measuredTimeSeries = GetTimeSeriesFromCSVFile(rootPath + "\waterLevel_at_GaugingStation.csv", dateTimeFormat ="%d-%b-%y %H:%M:%S")
# Get timeseries at calculation Gauging Station (begin Zwollen center channels)
calculationPoint = GetComputationGridLocationByName(flowModel, "Gauging Station")
waterlevelResults = GetTimeSeriesFromWaterFlowModel(flowModel, calculationPoint, "Water level")
#endregion
#region STEP 5: Plot model results and compare with measurement data
dataSeries = CreatePointSeries(measuredTimeSeries)
lineResults = CreateLineSeries(waterlevelResults)
chart = CreateChart([dataSeries, lineResults])
chart.Name = "Water level at center Zwolle channels" 
# Show the chart
view = OpenView(chart)
#endregion
#region STEP 6: Make chart nicer
lineResults.Title = "Model results"
dataSeries.Title = "Measurements"
dataSeries.Color = Color.Red
dataSeries.Size = 4
dataSeries.LineVisible = True
dataSeries.LineColor = Color.Black
chart.BottomAxis.Title = "Time"
chart.LeftAxis.Title = "Waterlevel [m]"
chart.LeftAxis.Automatic = False
chart.LeftAxis.Maximum = 2
chart.LeftAxis.Minimum = 0
chart.Legend.Visible = True
chart.Legend.Alignment = LegendAlignment.Bottom
chart.Title = "Comparison measurements vs results"
chart.TitleVisible = True
#endregion
#region STEP 7: Main loop calibration
# Save measurements and initial model results in a list of time series to compare 
timeSeriesToCompare = [measuredTimeSeries, waterlevelResults]
# Selection of parameters used for calibration and their range of variation
bcCalibration = GetBoundaryDataByName(flowModel, "Boxbergen")
rangeBC = [8, 10, 12]
roughnessMain = GetItemByName(flowModel.RoughnessSections, "Main").RoughnessNetworkCoverage
rangeRoughness = [24, 26, 28, 30, 32, 34]
# Deviation of current model results with respect to measurements
roughnessInitialValue = roughnessMain.DefaultValue
bcInitialValue = bcCalibration.Flow
warmUpTimeSteps = 23
dev = GetAverageDeviation(measuredTimeSeries, waterlevelResults , startAt = warmUpTimeSteps)
listCalibration = [[bcInitialValue, roughnessInitialValue, dev]]
# Main calibration loop
for bcValue in rangeBC:
    
    # Adjust boundary condition value
    bcCalibration.Flow = bcValue
    
    for roughnessValue in rangeRoughness:
        
        # Adjust default roughness value for roughness section Main     
        roughnessMain.DefaultValue = roughnessValue
        
        RunModel(flowModel, showDialog=True)
        
        # Add deviation of results of current run with respect to measurements
        calibResults = GetTimeSeriesFromWaterFlowModel(flowModel, calculationPoint, "Water level")
        deviation = GetAverageDeviation(measuredTimeSeries, calibResults, startAt = warmUpTimeSteps)
        listCalibration.append([bcValue, roughnessValue, deviation])
        
        # Add timeseries to timeSeriesToCompare list
        timeSeriesToCompare.append(calibResults)
#endregion
#region STEP 8: show calibration results in chart
# create line series for timeseries
lineFirstResults = CreateLineSeries(timeSeriesToCompare[1])
lineFirstResults.Title = "Initial model"
lineFirstResults.ShowInLegend = True
lineFirstResults.PointerVisible = False
lineFirstResults.Color = Color.LightGreen
series = [lineFirstResults]
for timeseries in timeSeriesToCompare[2:]:
    lineResults = CreateLineSeries(timeseries)
    lineResults.ShowInLegend = False
    lineResults.Color = Color.Orange
    lineResults.PointerVisible = False
    series.append(lineResults)
lineResults.ShowInLegend = True
lineResults.Title = "Calibration results"
# create series for measurements
measurementsSeries = CreatePointSeries(measuredTimeSeries)
measurementsSeries.Title = "Measurements"
measurementsSeries.Color = Color.Red
measurementsSeries.Size = 4
measurementsSeries.LineVisible = True
measurementsSeries.LineColor = Color.Black
series.append(measurementsSeries)
chartCalibration = CreateChart(series)
chartCalibration.Name = "Calibration using water level at center Zwolle channels"
chartCalibration.Title = "Comparison measurements vs results"
chartCalibration.TitleVisible = True
chartCalibration.Legend.Visible = True
chartCalibration.BottomAxis.Title = "Time"
chartCalibration.LeftAxis.Title = "Waterlevel [m]"
chartCalibration.LeftAxis.Automatic = False
chartCalibration.LeftAxis.Maximum = 2
chartCalibration.LeftAxis.Minimum = -0.5
# Show the chart
viewCalibration = OpenView(chartCalibration)
#endregion
#region STEP 9: calculate goodness and select best fit
indexBestFit = -1
bcBestFit = 1e100
roughnessBestFit = 1e100
devBestFit = 1e100
for i in range(len(listCalibration)): # case 0 is data so don't use it.
    case = listCalibration[i]
    if case[2] < devBestFit:
        indexBestFit = i
        bcBestFit = case[0]
        roughnessBestFit = case[1]
        devBestFit = case[2]

print "selected bc: " + str(bcBestFit) + "m3/s"
print "selected roughness " + str(roughnessBestFit) + "m^(1/3) s^-1"
print indexBestFit
print listCalibration
lineResultsBestFit = CreateLineSeries(timeSeriesToCompare[indexBestFit+1])
lineResultsBestFit.Color = Color.Blue
lineResultsBestFit.Width = 2
lineResultsBestFit.Title = "Best fit"
lineResultsBestFit.ShowInLegend = True
lineResultsBestFit.PointerVisible = False
chartCalibration.Series.Add(lineResultsBestFit)
#endregion
#region STEP 10: export results
ExportListToCsvFile(rootPath + r"\calibration.csv", listCalibration, ["BC value", "Roughness", "Average deviation"], delimiterChar = ",")
ExportListToCsvFile(rootPath + r"\bestFit.csv", timeSeriesToCompare[indexBestFit], ["Time", "Water level at Gauging station"])
ExportListToCsvFile(rootPath + r"\allTimeSeries.csv", timeSeriesToCompare[indexBestFit], ["Time", "Water level at Gauging station"])
values = []
for i in range(len(timeSeriesToCompare[1])):
    tempValues = []
    for j in range(1,len(timeSeriesToCompare)):
        tempValues.append(timeSeriesToCompare[j][i][1])
    values.append(tempValues)
index = 0
for ts in timeSeriesToCompare:
    if index == 0:
        flag = " (initial set up) "
    elif index == indexBestFit:
        flag = " (best fit) "
    else:
        flag = " "
        
    fileName = rootPath + r"\Water Level at Gauging Station case " + str(index) + flag + ".csv"
    ExportListToCsvFile(fileName, ts, ["Time", "Water level at Gauging station"], ",")
    index += 1
chartCalibration.ExportAsImage(rootPath + r"\Calibration.png",1250,650)
#endregion
  • Script corresponding to the library for this tutorial. The folder structure assumed in this script can be different from the one in your computer. So the script should be adapted before being run.

Code for the SOBEK 3 scripting Library
 import csv as _csv
from datetime import datetime
import Libraries.ChartFunctions as _cf 
import Libraries.SobekWaterFlowFunctions as _wff
import Libraries.StandardFunctions as _st
import Libraries.NetworkFunctions as _nf
import Libraries.MapFunctions as _mf
from Libraries.NetworkFunctions import *
from DelftTools.Hydro.Structures import BridgeFrictionType

def CreateNetworkFromBranchShapeFile(branchShapeFile):
	"""Creates a network using the provided shapefile (with line geometries)."""
	# create network
	network = _nf.HydroNetwork(Name = "Network")
	network.CoordinateSystem = _mf.GetShapeFileCoordinateSystem(branchShapeFile)
	
	number = 1
	
	for feature in _mf.GetShapeFileFeatures(branchShapeFile):
		firstCoordinate = feature.Geometry.Coordinates[0]
		lastCoordinate = feature.Geometry.Coordinates[feature.Geometry.Coordinates.Length -1]
		
		# create nodes
		startNode = _nf.HydroNode(Name= feature.Attributes["Source"] ,Geometry = _mf.CreatePointGeometry(firstCoordinate.X, firstCoordinate.Y))
		endNode = _nf.HydroNode(Name= feature.Attributes["Target"], Geometry = _mf.CreatePointGeometry(lastCoordinate.X, lastCoordinate.Y))
		
		# create channel
		channel = _nf.Channel(startNode, endNode, Name = feature.Attributes["Name"])
		channel.Geometry = feature.Geometry
		
		# add channel and nodes to network
		network.Nodes.AddRange([startNode, endNode])
		network.Branches.Add(channel)
		
		number += 1
	_nf.MergeNodesWithSameGeometry(network)
	return network
def _CreateCsvLookup(profileCsvFile):
	lookup = {}
	with open(profileCsvFile) as csvfile:
		lines = _csv.reader(csvfile, delimiter=',') 
		dataRows = None
		name = None
		for line in lines:
			if (len(line) == 0): # empty row
				continue
			if (len(line) == 1): # new definition
				if (name != None and dataRows != None):
					lookup[name] = dataRows # add definition to lookup
					
				name = line[0]
				dataRows = []
				continue
				
			dataRows.append([float(s) for s in line])
			
		# commit last row
		lookup[name] = dataRows
	return lookup
def AddBranchObjectsFromShapeFile(branchObjectType, shapeFileName, network):
	for feature in _mf.GetShapeFileFeatures(shapeFileName):
		name = feature.Attributes ["Name"]
		branchName = feature.Attributes ["Branch"]
		chainage = feature.Attributes ["Chainage"]
		longName = feature.Attributes ["LongName"]
		
		branch = _st.GetItemByName(network.Branches, branchName)
		
		branchObject = _nf.CreateBranchObjectOnBranchUsingChainage(branchObjectType, name, branch, chainage)
		branchObject.LongName = longName
		
		if branchObjectType == _nf.BranchObjectType.Weir:
			branchObject.CrestWidth = feature.Attributes ["CrestWidth"]
			branchObject.CrestLevel = feature.Attributes ["CrestLevel"]
			
def AddCrossSectionsToNetwork(csShapeFile, profileCsvFile, network):
	# get crossSection definitions as lookup (name, values)
	lookup = _CreateCsvLookup(profileCsvFile)
	
	for feature in _mf.GetShapeFileFeatures(csShapeFile):
		# get attributes
		name = feature.Attributes ["Name"]
		branchName = feature.Attributes ["Branch"]
		chainage = feature.Attributes ["Chainage"]
		crossSectionType = feature.Attributes ["CrossSectio"]
		thalweg = feature.Attributes ["Thalweg"]
		definitionName = feature.Attributes ["DefName"]
		
		# search branch
		branch = _st.GetItemByName(network.Branches, branchName)
		
		if (crossSectionType == "ZW"):
			type = _nf.BranchObjectType.CrossSectionZW
		elif (crossSectionType == "YZ"):
			type = _nf.BranchObjectType.CrossSectionYZ
		else :
			continue
	
		crossSection = _nf.CreateBranchObjectOnBranchUsingChainage(type, name, branch, chainage)
		_nf.SetCrossSectionProfile(crossSection, lookup[definitionName], thalweg)
def AddBridgesToNetwork(csShapeFile, profileCsvFile, network):
    # get crossSection definitions as lookup (name, values)
    lookup = _CreateCsvLookup(profileCsvFile)
    
    for feature in _mf.GetShapeFileFeatures(csShapeFile):
        # get attributes
        name = feature.Attributes ["Name"]
        branchName = feature.Attributes ["Branch"]
        chainage = feature.Attributes ["Chainage"]
        length = feature.Attributes ["Length"]
        roughType = feature.Attributes ["RoughType"]
        roughnessValue = feature.Attributes ["Roughness"]
        inletLoss = feature.Attributes ["InLossCoef"]
        outletLoss = feature.Attributes ["OutLossCoef"]
        groundLayerRoughness = feature.Attributes ["GLRoughness"]
        pillarWidth= feature.Attributes ["PillarWidth"]
        shapeFactor= feature.Attributes ["ShapeFactor"]
        longName= feature.Attributes ["LongName"]
        
        
        # search branch
        branch = _st.GetItemByName(network.Branches, branchName)
        
        bridge = _nf.CreateBranchObjectOnBranchUsingChainage(_nf.BranchObjectType.Bridge, name, branch, chainage)
        bridge.Length = length
        bridge.InletLossCoefficient = inletLoss
        bridge.OutletLossCoefficient = outletLoss
        
        """brFrictionTypeDictionary = {"StricklerKs":BridgeFrictionType.StricklerKs, 
                "Chezy":BridgeFrictionType.Chezy,
                "Manning":BridgeFrictionType.Manning,
                "StricklerKn":BridgeFrictionType.StricklerKn,
                "WhiteColebrook":BridgeFrictionType.WhiteColebrook}"""
        
        bridge.FrictionType = _st.ParseToEnum(roughType,BridgeFrictionType)
        bridge.Friction = roughnessValue
        bridge.GroundLayerRoughness = groundLayerRoughness
        bridge.PillarWidth = pillarWidth
        bridge.ShapeFactor = shapeFactor
        bridge.LongName = longName
        if (lookup.has_key(name)):
            bridge.BridgeType = _nf.BridgeType.Tabulated
            bridge.TabulatedCrossSectionDefinition.ZWDataTable.Clear()
            for item in lookup[name]:
                bridge.TabulatedCrossSectionDefinition.ZWDataTable.AddCrossSectionZWRow(item[0], item[1], item[2])   
def ImportBoundaryConditions(path, flowModel):
    bcDictionary = {"H": _wff.BoundaryConditionType.WaterLevelConstant, 
                    "Q": _wff.BoundaryConditionType.FlowConstant,
                    "None": _wff.BoundaryConditionType.NoBoundary}
    with open(path) as csvfile:
            lines = _csv.reader(csvfile,delimiter = ",")
            for line in lines:
            	bcName = line[0]
            	bcType = bcDictionary[line[1]] 
            	bcValue = float(line[2])
            	_wff.SetBoundaryCondition(flowModel, bcName, bcType, bcValue) 
def ImportLateralData(path, flowModel):
    latDictionary = {"FlowConstant": _wff.LateralDataType.FlowConstant}
    with open(path) as csvfile:
            lines = _csv.reader(csvfile,delimiter = ",")
            for line in lines:
            	latName = line[0]
            	latType = latDictionary[line[1]] 
            	latValue = float(line[2])
            	_wff.SetLateralData(flowModel, latName, latType, latValue)
def GetMinYMaxYofCrossSectionProfile(crossSection):
    min = crossSection.Definition.YZDataTable[0].Yq
    max = crossSection.Definition.YZDataTable[crossSection.Definition.YZDataTable.Count-1].Yq
    return min, max
def CreateOutputChart(flowModel, outputName, locationNames, colors):
	"""Creates an output chart for the provided locations."""
	chart = _cf.CreateChart([])
	
	index = 0
	for locationName in locationNames:
		location  = _wff.GetComputationGridLocationByName(flowModel, locationName)
		timeSeries  = _wff.GetTimeSeriesFromWaterFlowModel(flowModel, location, outputName)
		
		lineSeries = _cf.CreateLineSeries(timeSeries)
		
		lineSeries.Title = locationName
		lineSeries.Color = colors[index]
		lineSeries.Width = 3
		lineSeries.PointerVisible = False
		lineSeries.Transparency = 50
		
		chart.Series.Add(lineSeries)
		index += 1
	
			
	chart.Name = outputName
	chart.TitleVisible = True
	chart.Title = "Compare " + outputName + " zwolle"
	chart.Legend.Visible = True
	
	chart.BottomAxis.Title = "Time"
	chart.LeftAxis.Title = outputName
	
	_st.OpenView(chart)
	
	return chart
	
def OpenModelMapViewWithBackground(model):
	# Show model on map
	view = _st.OpenView(model)
	
	# Add satellite image layer 
	satLayer = _mf.CreateSatelliteImageLayer()
	satLayer.ExcludeFromMapExtent = True
	
	map = view.MapView.Map
	map.Layers.Add(satLayer)
	map.CoordinateSystem = _mf.CreateCoordinateSystem(3857) # EPSG code => WGS 84 / Pseudo-Mercator
	map.ZoomToExtents()
def _GetInterpolatedValue(timeLeft, valueLeft, timeRight, valueRight, timeToSearch):
	perc = (timeToSearch - timeLeft).total_seconds() / (timeRight - timeLeft).total_seconds()
	return ((valueRight - valueLeft) * perc) + valueLeft
#region definition
def GetAverageDeviation(ts1, ts2, startAt = 0):
	ts1Lookup = sorted(ts1, key=lambda tup: tup[0]) # measurements
	ts2Lookup = sorted(ts2, key=lambda tup: tup[0]) # calculated values
	
	ts1Min = ts1Lookup[0][0]
	ts1Max = ts1Lookup[-1][0]
	
	ts2Min = ts2Lookup[0][0]
	ts2Max = ts2Lookup[-1][0]
	if (ts1Min < ts2Min or ts1Max > ts2Max):
		raise Exception("Measurements out of range")
	
	totalSum = 0
	minValueIndex = 0
	
	for timeValueList in ts1Lookup[startAt:]:
		timeMeasurement = timeValueList[0]
		valueMeasurement = timeValueList[1]
				
		for i in range(minValueIndex,len(ts2Lookup)):
			timeCalc = ts2Lookup[i][0]
			valueCalc = ts2Lookup[i][1]
			
			
			if (timeCalc >= timeMeasurement):
				minValueIndex = i -1
				timeleft = ts2Lookup[i-1][0]
				valueleft = ts2Lookup[i-1][1]
				totalSum += abs(valueMeasurement - _GetInterpolatedValue(timeleft, valueleft, timeCalc, valueCalc, timeMeasurement))
				break
	return totalSum/ (len(ts1Lookup[startAt:])) # average deviation 
#endregion



 

 

 

  • No labels