def define_processing_area(
*args,
window_rule: str = "intersection",
pixel_size_rule: str = "minimal",
interpolator: str = "nn",
reference_window_input: dict = None,
reference_pixel_size_input: str = None,
) -> list[App]:
"""Given several inputs, this function handles the potential resampling and cropping to same extent.
WARNING: Not fully implemented / tested
Args:
*args: list of raster inputs. Can be str (filepath) or pyotb objects
window_rule: Can be 'intersection', 'union', 'same_as_input', 'specify' (Default value = 'intersection')
pixel_size_rule: Can be 'minimal', 'maximal', 'same_as_input', 'specify' (Default value = 'minimal')
interpolator: Can be 'bco', 'nn', 'linear' (Default value = 'nn')
reference_window_input: Required if window_rule = 'same_as_input' (Default value = None)
reference_pixel_size_input: Required if pixel_size_rule = 'same_as_input' (Default value = None)
Returns:
list of in-memory pyotb objects with all the same resolution, shape and extent
"""
# Flatten all args into one list
inputs = []
for arg in args:
if isinstance(arg, (list, tuple)):
inputs.extend(arg)
else:
inputs.append(arg)
# Getting metadatas of inputs
metadatas = {}
for inp in inputs:
if isinstance(inp, str): # this is for filepaths
metadata = Input(inp).app.GetImageMetaData("out")
elif isinstance(inp, App):
metadata = inp.app.GetImageMetaData(inp.output_param)
else:
raise TypeError(f"Wrong input : {inp}")
metadatas[inp] = metadata
# Get a metadata of an arbitrary image. This is just to compare later with other images
any_metadata = next(iter(metadatas.values()))
# Checking if all images have the same projection
if not all(
metadata["ProjectionRef"] == any_metadata["ProjectionRef"]
for metadata in metadatas.values()
):
logger.warning(
"All images may not have the same CRS, which might cause unpredictable results"
)
# Handling different spatial footprints
# TODO: find possible bug - ImageMetaData is not updated when running an app
# cf https://gitlab.orfeo-toolbox.org/orfeotoolbox/otb/-/issues/2234. Should we use ImageOrigin instead?
if not all(
md["UpperLeftCorner"] == any_metadata["UpperLeftCorner"]
and md["LowerRightCorner"] == any_metadata["LowerRightCorner"]
for md in metadatas.values()
):
# Retrieving the bounding box that will be common for all inputs
if window_rule == "intersection":
# The coordinates depend on the orientation of the axis of projection
if any_metadata["GeoTransform"][1] >= 0:
ulx = max(md["UpperLeftCorner"][0] for md in metadatas.values())
lrx = min(md["LowerRightCorner"][0] for md in metadatas.values())
else:
ulx = min(md["UpperLeftCorner"][0] for md in metadatas.values())
lrx = max(md["LowerRightCorner"][0] for md in metadatas.values())
if any_metadata["GeoTransform"][-1] >= 0:
lry = min(md["LowerRightCorner"][1] for md in metadatas.values())
uly = max(md["UpperLeftCorner"][1] for md in metadatas.values())
else:
lry = max(md["LowerRightCorner"][1] for md in metadatas.values())
uly = min(md["UpperLeftCorner"][1] for md in metadatas.values())
elif window_rule == "same_as_input":
ulx = metadatas[reference_window_input]["UpperLeftCorner"][0]
lrx = metadatas[reference_window_input]["LowerRightCorner"][0]
lry = metadatas[reference_window_input]["LowerRightCorner"][1]
uly = metadatas[reference_window_input]["UpperLeftCorner"][1]
elif window_rule == "specify":
# When the user explicitly specifies the bounding box -> add some arguments in the function
raise NotImplementedError(window_rule)
elif window_rule == "union":
# When the user wants the final bounding box to be the union of all bounding box
# It should replace any 'outside' pixel by some NoData -> add `fillvalue` argument in the function
raise NotImplementedError(window_rule)
else:
raise ValueError(f'Unknown window_rule "{window_rule}"')
# Applying this bounding box to all inputs
bounds = (ulx, uly, lrx, lry)
logger.info(
"Cropping all images to extent Upper Left (%s, %s), Lower Right (%s, %s)",
*bounds,
)
new_inputs = []
for inp in inputs:
try:
params = {
"in": inp,
"mode": "extent",
"mode.extent.unit": "phy",
"mode.extent.ulx": ulx,
"mode.extent.uly": uly,
"mode.extent.lrx": lrx,
"mode.extent.lry": lry,
}
new_input = App("ExtractROI", params, quiet=True)
new_inputs.append(new_input)
# Potentially update the reference inputs for later resampling
if str(inp) == str(reference_pixel_size_input):
# We use comparison of string because calling '=='
# on pyotb objects implicitly calls BandMathX application, which is not desirable
reference_pixel_size_input = new_input
except RuntimeError as err:
raise ValueError(
f"Cannot define the processing area for input {inp}"
) from err
inputs = new_inputs
# Update metadatas
metadatas = {input: input.app.GetImageMetaData("out") for input in inputs}
# Get a metadata of an arbitrary image. This is just to compare later with other images
any_metadata = next(iter(metadatas.values()))
# Handling different pixel sizes
if not all(
md["GeoTransform"][1] == any_metadata["GeoTransform"][1]
and md["GeoTransform"][5] == any_metadata["GeoTransform"][5]
for md in metadatas.values()
):
# Retrieving the pixel size that will be common for all inputs
if pixel_size_rule == "minimal":
# selecting the input with the smallest x pixel size
reference_input = min(
metadatas, key=lambda x: metadatas[x]["GeoTransform"][1]
)
if pixel_size_rule == "maximal":
# selecting the input with the highest x pixel size
reference_input = max(
metadatas, key=lambda x: metadatas[x]["GeoTransform"][1]
)
elif pixel_size_rule == "same_as_input":
reference_input = reference_pixel_size_input
elif pixel_size_rule == "specify":
# When the user explicitly specify the pixel size -> add argument inside the function
raise NotImplementedError(pixel_size_rule)
else:
raise ValueError(f'Unknown pixel_size_rule "{pixel_size_rule}"')
pixel_size = metadatas[reference_input]["GeoTransform"][1]
# Perform resampling on inputs that do not comply with the target pixel size
logger.info("Resampling all inputs to resolution: %s", pixel_size)
new_inputs = []
for inp in inputs:
if metadatas[inp]["GeoTransform"][1] != pixel_size:
superimposed = App(
"Superimpose",
inr=reference_input,
inm=inp,
interpolator=interpolator,
)
new_inputs.append(superimposed)
else:
new_inputs.append(inp)
inputs = new_inputs
metadatas = {inp: inp.app.GetImageMetaData("out") for inp in inputs}
# Final superimposition to be sure to have the exact same image sizes
image_sizes = {}
for inp in inputs:
if isinstance(inp, str):
inp = Input(inp)
image_sizes[inp] = inp.shape[:2]
# Selecting the most frequent image size. It will be used as reference.
most_common_image_size, _ = Counter(image_sizes.values()).most_common(1)[0]
same_size_images = [
inp
for inp, image_size in image_sizes.items()
if image_size == most_common_image_size
]
# Superimposition for images that do not have the same size as the others
new_inputs = []
for inp in inputs:
if image_sizes[inp] != most_common_image_size:
superimposed = App(
"Superimpose",
inr=same_size_images[0],
inm=inp,
interpolator=interpolator,
)
new_inputs.append(superimposed)
else:
new_inputs.append(inp)
return new_inputs