]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/steps_osm_data.py
make compund decomposition pure import feature
[nominatim.git] / test / bdd / steps / steps_osm_data.py
1 import tempfile
2 import random
3 import os
4 from pathlib import Path
5
6 from nominatim.tools.exec_utils import run_osm2pgsql
7
8 def get_osm2pgsql_options(nominatim_env, fname, append):
9     return dict(import_file=fname,
10                 osm2pgsql=str(nominatim_env.build_dir / 'osm2pgsql' / 'osm2pgsql'),
11                 osm2pgsql_cache=50,
12                 osm2pgsql_style=str(nominatim_env.src_dir / 'settings' / 'import-extratags.style'),
13                 threads=1,
14                 dsn=nominatim_env.get_libpq_dsn(),
15                 flatnode_file='',
16                 tablespaces=dict(slim_data='', slim_index='',
17                                  main_data='', main_index=''),
18                 append=append
19                )
20
21
22 def write_opl_file(opl, grid):
23     """ Create a temporary OSM file from OPL and return the file name. It is
24         the responsibility of the caller to delete the file again.
25
26         Node with missing coordinates, can retrieve their coordinates from
27         a supplied grid. Failing that a random coordinate is assigned.
28     """
29     with tempfile.NamedTemporaryFile(suffix='.opl', delete=False) as fd:
30         for line in opl.splitlines():
31             if line.startswith('n') and line.find(' x') < 0:
32                 coord = grid.grid_node(int(line[1:].split(' ')[0]))
33                 if coord is None:
34                     coord = (random.random() * 360 - 180,
35                              random.random() * 180 - 90)
36                 line += " x%f y%f" % coord
37             fd.write(line.encode('utf-8'))
38             fd.write(b'\n')
39
40         return fd.name
41
42 @given(u'the scene (?P<scene>.+)')
43 def set_default_scene(context, scene):
44     context.scene = scene
45
46 @given(u'the ([0-9.]+ )?grid')
47 def define_node_grid(context, grid_step):
48     """
49     Define a grid of node positions.
50     Use a table to define the grid. The nodes must be integer ids. Optionally
51     you can give the grid distance. The default is 0.00001 degrees.
52     """
53     if grid_step is not None:
54         grid_step = float(grid_step.strip())
55     else:
56         grid_step = 0.00001
57
58     context.osm.set_grid([context.table.headings] + [list(h) for h in context.table],
59                          grid_step)
60
61
62 @when(u'loading osm data')
63 def load_osm_file(context):
64     """
65     Load the given data into a freshly created test data using osm2pgsql.
66     No further indexing is done.
67
68     The data is expected as attached text in OPL format.
69     """
70     # create an OSM file and import it
71     fname = write_opl_file(context.text, context.osm)
72     try:
73         run_osm2pgsql(get_osm2pgsql_options(context.nominatim, fname, append=False))
74     finally:
75         os.remove(fname)
76
77     ### reintroduce the triggers/indexes we've lost by having osm2pgsql set up place again
78     cur = context.db.cursor()
79     cur.execute("""CREATE TRIGGER place_before_delete BEFORE DELETE ON place
80                     FOR EACH ROW EXECUTE PROCEDURE place_delete()""")
81     cur.execute("""CREATE TRIGGER place_before_insert BEFORE INSERT ON place
82                    FOR EACH ROW EXECUTE PROCEDURE place_insert()""")
83     cur.execute("""CREATE UNIQUE INDEX idx_place_osm_unique on place using btree(osm_id,osm_type,class,type)""")
84     context.db.commit()
85
86
87 @when(u'updating osm data')
88 def update_from_osm_file(context):
89     """
90     Update a database previously populated with 'loading osm data'.
91     Needs to run indexing on the existing data first to yield the correct result.
92
93     The data is expected as attached text in OPL format.
94     """
95     context.nominatim.copy_from_place(context.db)
96     context.nominatim.run_nominatim('index')
97     context.nominatim.run_nominatim('refresh', '--functions')
98
99     # create an OSM file and import it
100     fname = write_opl_file(context.text, context.osm)
101     try:
102         run_osm2pgsql(get_osm2pgsql_options(context.nominatim, fname, append=True))
103     finally:
104         os.remove(fname)