Skip to content

Process API

Road Network

clear_network_cache(cache_dir=None, older_than_days=None)

Clear cached road networks.

Parameters:

Name Type Description Default
cache_dir str

Cache directory to clear. If None, uses default.

None
older_than_days int

Only clear networks older than this many days.

None
Source code in landlensdb/process/road_network.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def clear_network_cache(cache_dir=None, older_than_days=None):
    """Clear cached road networks.

    Args:
        cache_dir (str, optional): Cache directory to clear. If None, uses default.
        older_than_days (int, optional): Only clear networks older than this many days.
    """
    if cache_dir is None:
        cache_dir = create_network_cache_dir()

    if not os.path.exists(cache_dir):
        return

    for filename in os.listdir(cache_dir):
        if not filename.endswith('.gpkg'):
            continue

        filepath = os.path.join(cache_dir, filename)
        if older_than_days is not None:
            file_age = (datetime.now() - datetime.fromtimestamp(os.path.getmtime(filepath))).days
            if file_age <= older_than_days:
                continue

        try:
            os.remove(filepath)
        except Exception as e:
            warnings.warn(f"Failed to remove cached network {filename}: {str(e)}")

create_network_cache_dir()

Create a cache directory for storing downloaded road networks.

Returns:

Name Type Description
str

Path to the cache directory

Source code in landlensdb/process/road_network.py
186
187
188
189
190
191
192
193
194
def create_network_cache_dir():
    """Create a cache directory for storing downloaded road networks.

    Returns:
        str: Path to the cache directory
    """
    cache_dir = os.path.join(os.path.expanduser("~"), ".landlensdb", "network_cache")
    os.makedirs(cache_dir, exist_ok=True)
    return cache_dir

get_osm_lines(bbox, network_type='drive', cache_dir=None, retries=3)

Get road network from OpenStreetMap for a given bounding box.

Parameters:

Name Type Description Default
bbox list

Bounding box coordinates [minx, miny, maxx, maxy]. Can be in any CRS.

required
network_type str

Type of network to fetch. Defaults to 'drive' Options are: {"all", "all_public", "bike", "drive", "drive_service", "walk"}.

'drive'
cache_dir str

Directory to cache downloaded networks. Defaults to None.

None
retries int

Number of times to retry fetching network. Defaults to 3.

3

Returns:

Name Type Description
GeoDataFrame

Road network as a GeoDataFrame.

Raises:

Type Description
ConnectionError

If network cannot be fetched after retries.

ValueError

If bbox coordinates are invalid.

Source code in landlensdb/process/road_network.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def get_osm_lines(bbox, network_type='drive', cache_dir=None, retries=3):
    """Get road network from OpenStreetMap for a given bounding box.

    Args:
        bbox (list): Bounding box coordinates [minx, miny, maxx, maxy].
            Can be in any CRS.
        network_type (str, optional): Type of network to fetch.
            Defaults to 'drive' Options are: {"all", "all_public", "bike", "drive", "drive_service", "walk"}.
        cache_dir (str, optional): Directory to cache downloaded networks.
            Defaults to None.
        retries (int, optional): Number of times to retry fetching network.
            Defaults to 3.

    Returns:
        GeoDataFrame: Road network as a GeoDataFrame.

    Raises:
        ConnectionError: If network cannot be fetched after retries.
        ValueError: If bbox coordinates are invalid.
    """
    # Input validation
    if len(bbox) != 4:
        raise ValueError("bbox must contain exactly 4 coordinates")

    # Convert bbox to GeoDataFrame to handle CRS conversion
    bbox_geom = box(bbox[0], bbox[1], bbox[2], bbox[3])
    bbox_gdf = gpd.GeoDataFrame(geometry=[bbox_geom], crs="EPSG:4326")

    # Get bounds in WGS84
    bbox_wgs84 = bbox_gdf.geometry.total_bounds

    # Validate coordinates are within valid ranges
    if (bbox_wgs84[0] < -180 or bbox_wgs84[2] > 180 or
        bbox_wgs84[1] < -90 or bbox_wgs84[3] > 90):
        raise ValueError(
            "Invalid coordinates. Longitude must be between -180 and 180, "
            "latitude between -90 and 90"
        )

    # Create bbox tuple for OSMnx (left, bottom, right, top)
    west, south = min(bbox_wgs84[0], bbox_wgs84[2]), min(bbox_wgs84[1], bbox_wgs84[3])
    east, north = max(bbox_wgs84[0], bbox_wgs84[2]), max(bbox_wgs84[1], bbox_wgs84[3])
    bbox_tuple = (west, south, east, north)

    # Set up cache directory
    if cache_dir:
        os.makedirs(cache_dir, exist_ok=True)
        ox.settings.cache_folder = cache_dir

    # Try to fetch network with retries
    for attempt in range(retries):
        try:
            # Pass bbox as a tuple
            graph = ox.graph_from_bbox(
                bbox=bbox_tuple,
                network_type=network_type,
                truncate_by_edge=True
            )
            network = ox.graph_to_gdfs(graph, nodes=False)
            return network
        except Exception as e:
            if attempt == retries - 1:
                msg = (
                    f"Failed to fetch OSM network after {retries} attempts: {str(e)}"
                )
                raise ConnectionError(msg)
            print(f"Attempt {attempt + 1} failed, retrying in 1 second...")
            time.sleep(1)

    return None

optimize_network_for_snapping(network, simplify=True, remove_isolated=True)

Optimize road network for efficient snapping operations.

Parameters:

Name Type Description Default
network GeoDataFrame

The road network to optimize

required
simplify bool

Whether to simplify geometries

True
remove_isolated bool

Whether to remove isolated segments

True

Returns:

Name Type Description
GeoDataFrame

Optimized network

Source code in landlensdb/process/road_network.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def optimize_network_for_snapping(network, simplify=True, remove_isolated=True):
    """Optimize road network for efficient snapping operations.

    Args:
        network (GeoDataFrame): The road network to optimize
        simplify (bool): Whether to simplify geometries
        remove_isolated (bool): Whether to remove isolated segments

    Returns:
        GeoDataFrame: Optimized network
    """
    if network is None or network.empty:
        return network

    # Work on a copy
    network = network.copy()

    # Ensure proper CRS
    if network.crs is None:
        network.set_crs(epsg=4326, inplace=True)

    # Simplify geometries while preserving topology
    if simplify:
        network.geometry = network.geometry.simplify(tolerance=1e-5)

    # Remove duplicate geometries
    network = network.drop_duplicates(subset='geometry')

    # Remove isolated segments if requested
    if remove_isolated:
        # Find connected components
        G = nx.Graph()
        for idx, row in network.iterrows():
            coords = list(row.geometry.coords)
            for i in range(len(coords)-1):
                G.add_edge(coords[i], coords[i+1])

        # Keep only largest component
        largest_cc = max(nx.connected_components(G), key=len)
        network = network[network.geometry.apply(
            lambda g: any(c in largest_cc for c in g.coords)
        )]

    # Create spatial index
    network.sindex

    return network

validate_network_topology(network)

Validate and repair road network topology.

Parameters:

Name Type Description Default
network GeoDataFrame

Road network to validate

required

Returns:

Name Type Description
GeoDataFrame

Validated and repaired network

dict

Report of validation results

Source code in landlensdb/process/road_network.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def validate_network_topology(network):
    """Validate and repair road network topology.

    Args:
        network (GeoDataFrame): Road network to validate

    Returns:
        GeoDataFrame: Validated and repaired network
        dict: Report of validation results
    """
    if network is None or network.empty:
        return network, {'status': 'empty'}

    report = {
        'original_size': len(network),
        'issues': [],
        'repairs': []
    }

    # Check for invalid geometries
    invalid_mask = ~network.geometry.is_valid
    if invalid_mask.any():
        report['issues'].append(f"Found {invalid_mask.sum()} invalid geometries")
        network.geometry = network.geometry.buffer(0)
        report['repairs'].append("Applied buffer(0) to fix invalid geometries")

    # Check for duplicate geometries
    duplicates = network.geometry.duplicated()
    if duplicates.any():
        report['issues'].append(f"Found {duplicates.sum()} duplicate geometries")
        network = network[~duplicates]
        report['repairs'].append("Removed duplicate geometries")

    # Check for null geometries
    null_geoms = network.geometry.isna()
    if null_geoms.any():
        report['issues'].append(f"Found {null_geoms.sum()} null geometries")
        network = network[~null_geoms]
        report['repairs'].append("Removed null geometries")

    # Check connectivity
    G = nx.Graph()
    for idx, row in network.iterrows():
        coords = list(row.geometry.coords)
        for i in range(len(coords)-1):
            G.add_edge(coords[i], coords[i+1])

    components = list(nx.connected_components(G))
    if len(components) > 1:
        report['issues'].append(f"Found {len(components)} disconnected components")
        report['repairs'].append("Consider using optimize_network_for_snapping() to clean")

    report['final_size'] = len(network)
    return network, report

Snap

_calculate_bearing(point1, point2)

Calculate the bearing between two points.

Parameters:

Name Type Description Default
point1 Point

Starting point.

required
point2 Point

Ending point.

required

Returns:

Name Type Description
float

The bearing in degrees.

Source code in landlensdb/process/snap.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def _calculate_bearing(point1, point2):
    """Calculate the bearing between two points.

    Args:
        point1 (shapely.geometry.Point): Starting point.
        point2 (shapely.geometry.Point): Ending point.

    Returns:
        float: The bearing in degrees.
    """
    lon1, lat1 = math.radians(point1.x), math.radians(point1.y)
    lon2, lat2 = math.radians(point2.x), math.radians(point2.y)
    dlon = lon2 - lon1
    x = math.atan2(
        math.sin(dlon) * math.cos(lat2),
        math.cos(lat1) * math.sin(lat2)
        - math.sin(lat1) * math.cos(lat2) * math.cos(dlon),
    )
    bearing = (math.degrees(x) + 360) % 360
    return bearing

_create_spatial_index(network)

Create a spatial index for the given network using Rtree.

Parameters:

Name Type Description Default
network GeoDataFrame

A GeoDataFrame containing the network lines.

required

Returns:

Type Description

rtree.index.Index: A spatial index for efficient spatial querying.

Source code in landlensdb/process/snap.py
19
20
21
22
23
24
25
26
27
28
29
30
31
def _create_spatial_index(network):
    """Create a spatial index for the given network using Rtree.

    Args:
        network (GeoDataFrame): A GeoDataFrame containing the network lines.

    Returns:
        rtree.index.Index: A spatial index for efficient spatial querying.
    """
    idx = index.Index()
    for i, line in enumerate(network):
        idx.insert(i, line.bounds)
    return idx

_get_nearest_segment(point, network, idx)

Find the nearest segment to a given point in a spatially indexed network.

Parameters:

Name Type Description Default
point Point

The point to find the nearest segment to.

required
network GeoDataFrame

A GeoDataFrame containing the network lines.

required
idx Index

The spatial index of the network.

required

Returns:

Type Description

shapely.geometry.LineString: The nearest line segment to the point.

Source code in landlensdb/process/snap.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def _get_nearest_segment(point, network, idx):
    """Find the nearest segment to a given point in a spatially indexed network.

    Args:
        point (shapely.geometry.Point): The point to find the nearest segment to.
        network (GeoDataFrame): A GeoDataFrame containing the network lines.
        idx (rtree.index.Index): The spatial index of the network.

    Returns:
        shapely.geometry.LineString: The nearest line segment to the point.
    """
    nearest_lines = list(idx.nearest(point.bounds, 1))
    nearest = min(
        [
            (line, point.distance(line))
            for line in [network.iloc[i] for i in nearest_lines]
        ],
        key=lambda x: x[1],
    )[0]
    return nearest

align_compass_with_road(points, network)

Aligns the compass angle of points with the bearing of the nearest road segment.

Parameters:

Name Type Description Default
points GeoDataFrame

A GeoDataFrame containing points with compass angles.

required
network GeoDataFrame

A GeoDataFrame containing the road network.

required

Returns:

Name Type Description
GeoDataFrame

A GeoDataFrame with updated snapped_angle field.

Source code in landlensdb/process/snap.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def align_compass_with_road(points, network):
    """Aligns the compass angle of points with the bearing of the nearest road segment.

    Args:
        points (GeoDataFrame): A GeoDataFrame containing points with compass angles.
        network (GeoDataFrame): A GeoDataFrame containing the road network.

    Returns:
        GeoDataFrame: A GeoDataFrame with updated snapped_angle field.
    """
    if points["snapped_geometry"].isnull().any():
        warnings.warn(
            """
            GeodataImageFrame contains rows with empty snapped_geometry. Non-snapped images will be skipped.
            To snap all images, try increasing the threshold or changing the road network.
            """
        )
    idx = _create_spatial_index(network.geometry)
    for row_idx, point in points.iterrows():
        if point.snapped_geometry is None:
            continue
        nearest_segment = _get_nearest_segment(
            point.snapped_geometry, network.geometry, idx
        )
        segment_coords = nearest_segment.coords[:]
        segment_bearing = _calculate_bearing(
            Point(segment_coords[0]), Point(segment_coords[1])
        )

        difference_0 = abs(segment_bearing - point.compass_angle)
        difference_180 = abs((segment_bearing + 180) % 360 - point.compass_angle)

        if difference_0 < difference_180:
            points.at[row_idx, "snapped_angle"] = segment_bearing
        else:
            points.at[row_idx, "snapped_angle"] = (segment_bearing + 180) % 360
    return points

create_bbox(point, x_distance_meters, y_distance_meters)

Create a bounding box around a point.

Parameters:

Name Type Description Default
point Point

The center point for the bounding box.

required
x_distance_meters float

The horizontal distance in meters.

required
y_distance_meters float

The vertical distance in meters.

required

Returns:

Name Type Description
list

Bounding box coordinates [minx, miny, maxx, maxy] in EPSG:4326.

Raises:

Type Description
ValueError

If the input is not a Shapely Point.

Source code in landlensdb/process/snap.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def create_bbox(point, x_distance_meters, y_distance_meters):
    """Create a bounding box around a point.

    Args:
        point (shapely.geometry.Point): The center point for the bounding box.
        x_distance_meters (float): The horizontal distance in meters.
        y_distance_meters (float): The vertical distance in meters.

    Returns:
        list: Bounding box coordinates [minx, miny, maxx, maxy] in EPSG:4326.

    Raises:
        ValueError: If the input is not a Shapely Point.
    """
    if not isinstance(point, Point):
        raise ValueError("Input must be a Shapely Point.")

    geoseries = gpd.GeoSeries([point], crs="EPSG:4326")
    point_mercator = geoseries.to_crs("EPSG:3857").iloc[0]

    minx = point_mercator.x - x_distance_meters / 2
    maxx = point_mercator.x + x_distance_meters / 2
    miny = point_mercator.y - y_distance_meters / 2
    maxy = point_mercator.y + y_distance_meters / 2

    bbox_geoseries = gpd.GeoSeries(
        [Point(coord) for coord in [(minx, miny), (maxx, maxy)]], crs="EPSG:3857"
    )
    transformed_coords = bbox_geoseries.to_crs("EPSG:4326").tolist()

    bbox = [
        transformed_coords[0].x,
        transformed_coords[0].y,
        transformed_coords[1].x,
        transformed_coords[1].y,
    ]

    return bbox

snap_to_road_network(gif, tolerance, network=None, bbox=None, network_type='all_private', realign_camera=True, cache_dir=None)

Enhanced function to snap points to road network with automatic network fetching.

Parameters:

Name Type Description Default
gif GeoDataFrame

A GeoDataFrame containing images and their geometries

required
tolerance float

The snapping distance in meters

required
network GeoDataFrame

Existing road network to use

None
bbox list

Bounding box to fetch network for if none provided

None
network_type str

Type of network to fetch

'all_private'
realign_camera bool

Whether to realign camera angles

True
cache_dir str

Directory to cache downloaded networks

None

Returns:

Name Type Description
GeoDataFrame

A GeoDataFrame with updated snapped geometries

Source code in landlensdb/process/snap.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def snap_to_road_network(gif, tolerance, network=None, bbox=None, network_type="all_private", 
                        realign_camera=True, cache_dir=None):
    """Enhanced function to snap points to road network with automatic network fetching.

    Args:
        gif (GeoDataFrame): A GeoDataFrame containing images and their geometries
        tolerance (float): The snapping distance in meters
        network (GeoDataFrame, optional): Existing road network to use
        bbox (list, optional): Bounding box to fetch network for if none provided
        network_type (str, optional): Type of network to fetch
        realign_camera (bool, optional): Whether to realign camera angles
        cache_dir (str, optional): Directory to cache downloaded networks

    Returns:
        GeoDataFrame: A GeoDataFrame with updated snapped geometries
    """
    if network is None and bbox is None:
        bbox = gif.geometry.total_bounds

    if network is None:
        if cache_dir is None:
            cache_dir = create_network_cache_dir()
        network = get_osm_lines(bbox, network_type=network_type, cache_dir=cache_dir)

    # Optimize network for snapping
    network = optimize_network_for_snapping(network)

    # Validate network topology
    network, report = validate_network_topology(network)
    if report['issues']:
        warnings.warn(f"Network validation found issues: {report['issues']}")

    points = gif[["image_url", "geometry"]].copy()
    points = points.to_crs(3857)

    if network is None:
        raise Exception(
            "Network is missing. Please supply road network or set use_osm to True"
        )
    if "LineString" not in network.geom_type.values:
        raise Exception(
            "Invalid geometry. Network geodataframe geometry must contain LineString geometries"
        )

    lines = network.to_crs(3857)

    bbox_series = points.bounds + [-tolerance, -tolerance, tolerance, tolerance]
    hits = bbox_series.apply(lambda row: list(lines.sindex.intersection(row)), axis=1)

    tmp = pd.DataFrame(
        {
            "pt_idx": np.repeat(hits.index, hits.apply(len)),
            "line_i": np.concatenate(hits.values),
        }
    )
    tmp = tmp.join(lines.reset_index(drop=True), on="line_i")
    tmp = tmp.join(points.geometry.rename("point"), on="pt_idx")
    tmp = gpd.GeoDataFrame(tmp, geometry="geometry", crs=points.crs)

    tmp["snap_dist"] = tmp.geometry.distance(gpd.GeoSeries(tmp.point))
    tmp = tmp.loc[tmp.snap_dist <= tolerance]
    tmp = tmp.sort_values(by=["snap_dist"])

    closest = tmp.groupby("pt_idx").first()
    closest = gpd.GeoDataFrame(closest, geometry="geometry")

    pos = closest.geometry.project(gpd.GeoSeries(closest.point))
    new_pts = closest.geometry.interpolate(pos)

    new_pts.crs = "EPSG:3857"
    new_pts = new_pts.to_crs(4326)
    gif["snapped_geometry"] = new_pts.geometry

    missing = gif[gif["snapped_geometry"].isnull()].image_url.tolist()
    if len(missing) > 0:
        warnings.warn(
            f"""
        Not all images were snapped. Non-snapped images will not be added 
        to the snapped image table. To snap all images, try increasing the threshold 
        or changing the road network. The following images could not be snapped 
        to a road network: {missing}
        """
        )

    if realign_camera:
        if "compass_angle" not in gif.columns:
            warnings.warn(
                f"realign_camera requires the compass_angle field. Cannot calculate snapped camera angle."
            )
        else:
            if "snapped_angle" not in gif.columns:
                gif["snapped_angle"] = np.nan
            gif = align_compass_with_road(gif, network)

    return gif