]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/steps_osm_data.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / bdd / steps / steps_osm_data.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 import tempfile
8 import random
9 import os
10 from pathlib import Path
11
12 from nominatim_db.tools.exec_utils import run_osm2pgsql
13 from nominatim_db.tools.replication import run_osm2pgsql_updates
14
15 from geometry_alias import ALIASES
16
17
18 def get_osm2pgsql_options(nominatim_env, fname, append):
19     return dict(import_file=fname,
20                 osm2pgsql='osm2pgsql',
21                 osm2pgsql_cache=50,
22                 osm2pgsql_style=str(nominatim_env.get_test_config().get_import_style_file()),
23                 osm2pgsql_style_path=nominatim_env.get_test_config().lib_dir.lua,
24                 threads=1,
25                 dsn=nominatim_env.get_libpq_dsn(),
26                 flatnode_file='',
27                 tablespaces=dict(slim_data='', slim_index='',
28                                  main_data='', main_index=''),
29                 append=append)
30
31
32 def write_opl_file(opl, grid):
33     """ Create a temporary OSM file from OPL and return the file name. It is
34         the responsibility of the caller to delete the file again.
35
36         Node with missing coordinates, can retrieve their coordinates from
37         a supplied grid. Failing that a random coordinate is assigned.
38     """
39     with tempfile.NamedTemporaryFile(suffix='.opl', delete=False) as fd:
40         for line in opl.splitlines():
41             if line.startswith('n') and line.find(' x') < 0:
42                 coord = grid.grid_node(int(line[1:].split(' ')[0]))
43                 if coord is None:
44                     coord = (random.uniform(-180, 180), random.uniform(-90, 90))
45                 line += " x%f y%f" % coord
46             fd.write(line.encode('utf-8'))
47             fd.write(b'\n')
48
49         return fd.name
50
51
52 @given('the lua style file')
53 def lua_style_file(context):
54     """ Define a custom style file to use for the import.
55     """
56     style = Path(context.nominatim.website_dir.name) / 'custom.lua'
57     style.write_text(context.text)
58     context.nominatim.test_env['NOMINATIM_IMPORT_STYLE'] = str(style)
59
60
61 @given(u'the ([0-9.]+ )?grid(?: with origin (?P<origin>.*))?')
62 def define_node_grid(context, grid_step, origin):
63     """
64     Define a grid of node positions.
65     Use a table to define the grid. The nodes must be integer ids. Optionally
66     you can give the grid distance. The default is 0.00001 degrees.
67     """
68     if grid_step is not None:
69         grid_step = float(grid_step.strip())
70     else:
71         grid_step = 0.00001
72
73     if origin:
74         if ',' in origin:
75             # TODO coordinate
76             coords = origin.split(',')
77             if len(coords) != 2:
78                 raise RuntimeError('Grid origin expects origin with x,y coordinates.')
79             origin = (float(coords[0]), float(coords[1]))
80         elif origin in ALIASES:
81             origin = ALIASES[origin]
82         else:
83             raise RuntimeError('Grid origin must be either coordinate or alias.')
84     else:
85         origin = (0.0, 0.0)
86
87     context.osm.set_grid([context.table.headings] + [list(h) for h in context.table],
88                          grid_step, origin)
89
90
91 @when(u'loading osm data')
92 def load_osm_file(context):
93     """
94     Load the given data into a freshly created test database using osm2pgsql.
95     No further indexing is done.
96
97     The data is expected as attached text in OPL format.
98     """
99     # create an OSM file and import it
100     fname = write_opl_file(context.text, context.osm)
101     try:
102         run_osm2pgsql(get_osm2pgsql_options(context.nominatim, fname, append=False))
103     finally:
104         os.remove(fname)
105
106     # reintroduce the triggers/indexes we've lost by having osm2pgsql set up place again
107     cur = context.db.cursor()
108     cur.execute("""CREATE TRIGGER place_before_delete BEFORE DELETE ON place
109                     FOR EACH ROW EXECUTE PROCEDURE place_delete()""")
110     cur.execute("""CREATE TRIGGER place_before_insert BEFORE INSERT ON place
111                    FOR EACH ROW EXECUTE PROCEDURE place_insert()""")
112     cur.execute("""CREATE UNIQUE INDEX idx_place_osm_unique ON place
113                    USING btree(osm_id,osm_type,class,type)""")
114     context.db.commit()
115
116
117 @when(u'updating osm data')
118 def update_from_osm_file(context):
119     """
120     Update a database previously populated with 'loading osm data'.
121     Needs to run indexing on the existing data first to yield the correct result.
122
123     The data is expected as attached text in OPL format.
124     """
125     context.nominatim.copy_from_place(context.db)
126     context.nominatim.run_nominatim('index')
127     context.nominatim.run_nominatim('refresh', '--functions')
128
129     # create an OSM file and import it
130     fname = write_opl_file(context.text, context.osm)
131     try:
132         run_osm2pgsql_updates(context.db,
133                               get_osm2pgsql_options(context.nominatim, fname, append=True))
134     finally:
135         os.remove(fname)
136
137
138 @when('indexing')
139 def index_database(context):
140     """
141     Run the Nominatim indexing step. This will process data previously
142     loaded with 'updating osm data'
143     """
144     context.nominatim.run_nominatim('index')