]> git.openstreetmap.org Git - nominatim.git/blob - test/python/conftest.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / python / conftest.py
1 import itertools
2 import sys
3 from pathlib import Path
4
5 import psycopg2
6 import psycopg2.extras
7 import pytest
8
9 SRC_DIR = Path(__file__) / '..' / '..' / '..'
10
11 # always test against the source
12 sys.path.insert(0, str(SRC_DIR.resolve()))
13
14 from nominatim.config import Configuration
15 from nominatim.db import connection
16
17 class _TestingCursor(psycopg2.extras.DictCursor):
18     """ Extension to the DictCursor class that provides execution
19         short-cuts that simplify writing assertions.
20     """
21
22     def scalar(self, sql, params=None):
23         """ Execute a query with a single return value and return this value.
24             Raises an assertion when not exactly one row is returned.
25         """
26         self.execute(sql, params)
27         assert self.rowcount == 1
28         return self.fetchone()[0]
29
30     def row_set(self, sql, params=None):
31         """ Execute a query and return the result as a set of tuples.
32         """
33         self.execute(sql, params)
34         if self.rowcount == 1:
35             return set(tuple(self.fetchone()))
36
37         return set((tuple(row) for row in self))
38
39 @pytest.fixture
40 def temp_db(monkeypatch):
41     """ Create an empty database for the test. The database name is also
42         exported into NOMINATIM_DATABASE_DSN.
43     """
44     name = 'test_nominatim_python_unittest'
45     conn = psycopg2.connect(database='postgres')
46
47     conn.set_isolation_level(0)
48     with conn.cursor() as cur:
49         cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
50         cur.execute('CREATE DATABASE {}'.format(name))
51
52     conn.close()
53
54     monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
55
56     yield name
57
58     conn = psycopg2.connect(database='postgres')
59
60     conn.set_isolation_level(0)
61     with conn.cursor() as cur:
62         cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
63
64     conn.close()
65
66 @pytest.fixture
67 def temp_db_with_extensions(temp_db):
68     conn = psycopg2.connect(database=temp_db)
69     with conn.cursor() as cur:
70         cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
71     conn.commit()
72     conn.close()
73
74     return temp_db
75
76 @pytest.fixture
77 def temp_db_conn(temp_db):
78     """ Connection to the test database.
79     """
80     conn = connection.connect('dbname=' + temp_db)
81     yield conn
82     conn.close()
83
84
85 @pytest.fixture
86 def temp_db_cursor(temp_db):
87     """ Connection and cursor towards the test database. The connection will
88         be in auto-commit mode.
89     """
90     conn = psycopg2.connect('dbname=' + temp_db)
91     conn.set_isolation_level(0)
92     with conn.cursor(cursor_factory=_TestingCursor) as cur:
93         yield cur
94     conn.close()
95
96
97 @pytest.fixture
98 def def_config():
99     return Configuration(None, SRC_DIR.resolve() / 'settings')
100
101
102 @pytest.fixture
103 def status_table(temp_db_conn):
104     """ Create an empty version of the status table and
105         the status logging table.
106     """
107     with temp_db_conn.cursor() as cur:
108         cur.execute("""CREATE TABLE import_status (
109                            lastimportdate timestamp with time zone NOT NULL,
110                            sequence_id integer,
111                            indexed boolean
112                        )""")
113         cur.execute("""CREATE TABLE import_osmosis_log (
114                            batchend timestamp,
115                            batchseq integer,
116                            batchsize bigint,
117                            starttime timestamp,
118                            endtime timestamp,
119                            event text
120                            )""")
121     temp_db_conn.commit()
122
123
124 @pytest.fixture
125 def place_table(temp_db_with_extensions, temp_db_conn):
126     """ Create an empty version of the place table.
127     """
128     with temp_db_conn.cursor() as cur:
129         cur.execute("""CREATE TABLE place (
130                            osm_id int8 NOT NULL,
131                            osm_type char(1) NOT NULL,
132                            class text NOT NULL,
133                            type text NOT NULL,
134                            name hstore,
135                            admin_level smallint,
136                            address hstore,
137                            extratags hstore,
138                            geometry Geometry(Geometry,4326) NOT NULL)""")
139     temp_db_conn.commit()
140
141
142 @pytest.fixture
143 def place_row(place_table, temp_db_cursor):
144     """ A factory for rows in the place table. The table is created as a
145         prerequisite to the fixture.
146     """
147     idseq = itertools.count(1001)
148     def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
149                 admin_level=None, address=None, extratags=None, geom=None):
150         temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
151                                (osm_id or next(idseq), osm_type, cls, typ, names,
152                                 admin_level, address, extratags,
153                                 geom or 'SRID=4326;POINT(0 0 )'))
154
155     return _insert