]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/steps_osm_data.py
e9c8ebe42f70f47aaaf39f24b5234f119acdaf2e
[nominatim.git] / test / bdd / steps / steps_osm_data.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 import tempfile
8 import random
9 import os
10 from pathlib import Path
11
12 from nominatim_db.tools.exec_utils import run_osm2pgsql
13 from nominatim_db.tools.replication import run_osm2pgsql_updates
14
15 from geometry_alias import ALIASES
16
17 def get_osm2pgsql_options(nominatim_env, fname, append):
18     return dict(import_file=fname,
19                 osm2pgsql='osm2pgsql',
20                 osm2pgsql_cache=50,
21                 osm2pgsql_style=str(nominatim_env.get_test_config().get_import_style_file()),
22                 osm2pgsql_style_path=nominatim_env.get_test_config().lib_dir.lua,
23                 threads=1,
24                 dsn=nominatim_env.get_libpq_dsn(),
25                 flatnode_file='',
26                 tablespaces=dict(slim_data='', slim_index='',
27                                  main_data='', main_index=''),
28                 append=append
29                )
30
31
32 def write_opl_file(opl, grid):
33     """ Create a temporary OSM file from OPL and return the file name. It is
34         the responsibility of the caller to delete the file again.
35
36         Node with missing coordinates, can retrieve their coordinates from
37         a supplied grid. Failing that a random coordinate is assigned.
38     """
39     with tempfile.NamedTemporaryFile(suffix='.opl', delete=False) as fd:
40         for line in opl.splitlines():
41             if line.startswith('n') and line.find(' x') < 0:
42                 coord = grid.grid_node(int(line[1:].split(' ')[0]))
43                 if coord is None:
44                     coord = (random.uniform(-180, 180), random.uniform(-90, 90))
45                 line += " x%f y%f" % coord
46             fd.write(line.encode('utf-8'))
47             fd.write(b'\n')
48
49         return fd.name
50
51 @given('the lua style file')
52 def lua_style_file(context):
53     """ Define a custom style file to use for the import.
54     """
55     style = Path(context.nominatim.website_dir.name) / 'custom.lua'
56     style.write_text(context.text)
57     context.nominatim.test_env['NOMINATIM_IMPORT_STYLE'] = str(style)
58
59
60 @given(u'the ([0-9.]+ )?grid(?: with origin (?P<origin>.*))?')
61 def define_node_grid(context, grid_step, origin):
62     """
63     Define a grid of node positions.
64     Use a table to define the grid. The nodes must be integer ids. Optionally
65     you can give the grid distance. The default is 0.00001 degrees.
66     """
67     if grid_step is not None:
68         grid_step = float(grid_step.strip())
69     else:
70         grid_step = 0.00001
71
72     if origin:
73         if ',' in origin:
74             # TODO coordinate
75             coords = origin.split(',')
76             if len(coords) != 2:
77                 raise RuntimeError('Grid origin expects origin with x,y coordinates.')
78             origin = (float(coords[0]), float(coords[1]))
79         elif origin in ALIASES:
80             origin = ALIASES[origin]
81         else:
82             raise RuntimeError('Grid origin must be either coordinate or alias.')
83     else:
84         origin = (0.0, 0.0)
85
86     context.osm.set_grid([context.table.headings] + [list(h) for h in context.table],
87                          grid_step, origin)
88
89
90 @when(u'loading osm data')
91 def load_osm_file(context):
92     """
93     Load the given data into a freshly created test data using osm2pgsql.
94     No further indexing is done.
95
96     The data is expected as attached text in OPL format.
97     """
98     # create an OSM file and import it
99     fname = write_opl_file(context.text, context.osm)
100     try:
101         run_osm2pgsql(get_osm2pgsql_options(context.nominatim, fname, append=False))
102     finally:
103         os.remove(fname)
104
105     ### reintroduce the triggers/indexes we've lost by having osm2pgsql set up place again
106     cur = context.db.cursor()
107     cur.execute("""CREATE TRIGGER place_before_delete BEFORE DELETE ON place
108                     FOR EACH ROW EXECUTE PROCEDURE place_delete()""")
109     cur.execute("""CREATE TRIGGER place_before_insert BEFORE INSERT ON place
110                    FOR EACH ROW EXECUTE PROCEDURE place_insert()""")
111     cur.execute("""CREATE UNIQUE INDEX idx_place_osm_unique on place using btree(osm_id,osm_type,class,type)""")
112     context.db.commit()
113
114
115 @when(u'updating osm data')
116 def update_from_osm_file(context):
117     """
118     Update a database previously populated with 'loading osm data'.
119     Needs to run indexing on the existing data first to yield the correct result.
120
121     The data is expected as attached text in OPL format.
122     """
123     context.nominatim.copy_from_place(context.db)
124     context.nominatim.run_nominatim('index')
125     context.nominatim.run_nominatim('refresh', '--functions')
126
127     # create an OSM file and import it
128     fname = write_opl_file(context.text, context.osm)
129     try:
130         run_osm2pgsql_updates(context.db,
131                               get_osm2pgsql_options(context.nominatim, fname, append=True))
132     finally:
133         os.remove(fname)
134
135 @when('indexing')
136 def index_database(context):
137     """
138     Run the Nominatim indexing step. This will process data previously
139     loaded with 'updating osm data'
140     """
141     context.nominatim.run_nominatim('index')