3 from pathlib import Path
9 SRC_DIR = Path(__file__) / '..' / '..' / '..'
11 # always test against the source
12 sys.path.insert(0, str(SRC_DIR.resolve()))
14 from nominatim.config import Configuration
15 from nominatim.db import connection
17 class _TestingCursor(psycopg2.extras.DictCursor):
18 """ Extension to the DictCursor class that provides execution
19 short-cuts that simplify writing assertions.
22 def scalar(self, sql, params=None):
23 """ Execute a query with a single return value and return this value.
24 Raises an assertion when not exactly one row is returned.
26 self.execute(sql, params)
27 assert self.rowcount == 1
28 return self.fetchone()[0]
30 def row_set(self, sql, params=None):
31 """ Execute a query and return the result as a set of tuples.
33 self.execute(sql, params)
34 if self.rowcount == 1:
35 return set(tuple(self.fetchone()))
37 return set((tuple(row) for row in self))
39 def table_exists(self, table):
40 """ Check that a table with the given name exists in the database.
42 num = self.scalar("""SELECT count(*) FROM pg_tables
43 WHERE tablename = %s""", (table, ))
46 def table_rows(self, table):
47 """ Return the number of rows in the given table.
49 return self.scalar('SELECT count(*) FROM ' + table)
53 def temp_db(monkeypatch):
54 """ Create an empty database for the test. The database name is also
55 exported into NOMINATIM_DATABASE_DSN.
57 name = 'test_nominatim_python_unittest'
58 conn = psycopg2.connect(database='postgres')
60 conn.set_isolation_level(0)
61 with conn.cursor() as cur:
62 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
63 cur.execute('CREATE DATABASE {}'.format(name))
67 monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
71 conn = psycopg2.connect(database='postgres')
73 conn.set_isolation_level(0)
74 with conn.cursor() as cur:
75 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
82 return 'dbname=' + temp_db
86 def temp_db_with_extensions(temp_db):
87 conn = psycopg2.connect(database=temp_db)
88 with conn.cursor() as cur:
89 cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
96 def temp_db_conn(temp_db):
97 """ Connection to the test database.
99 with connection.connect('dbname=' + temp_db) as conn:
104 def temp_db_cursor(temp_db):
105 """ Connection and cursor towards the test database. The connection will
106 be in auto-commit mode.
108 conn = psycopg2.connect('dbname=' + temp_db)
109 conn.set_isolation_level(0)
110 with conn.cursor(cursor_factory=_TestingCursor) as cur:
116 def table_factory(temp_db_cursor):
117 def mk_table(name, definition='id INT', content=None):
118 temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
119 if content is not None:
120 if not isinstance(content, str):
121 content = '),('.join([str(x) for x in content])
122 temp_db_cursor.execute("INSERT INTO {} VALUES ({})".format(name, content))
129 return Configuration(None, SRC_DIR.resolve() / 'settings')
133 return SRC_DIR.resolve()
136 def status_table(temp_db_conn):
137 """ Create an empty version of the status table and
138 the status logging table.
140 with temp_db_conn.cursor() as cur:
141 cur.execute("""CREATE TABLE import_status (
142 lastimportdate timestamp with time zone NOT NULL,
146 cur.execute("""CREATE TABLE import_osmosis_log (
154 temp_db_conn.commit()
158 def place_table(temp_db_with_extensions, temp_db_conn):
159 """ Create an empty version of the place table.
161 with temp_db_conn.cursor() as cur:
162 cur.execute("""CREATE TABLE place (
163 osm_id int8 NOT NULL,
164 osm_type char(1) NOT NULL,
168 admin_level smallint,
171 geometry Geometry(Geometry,4326) NOT NULL)""")
172 temp_db_conn.commit()
176 def place_row(place_table, temp_db_cursor):
177 """ A factory for rows in the place table. The table is created as a
178 prerequisite to the fixture.
180 idseq = itertools.count(1001)
181 def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
182 admin_level=None, address=None, extratags=None, geom=None):
183 temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
184 (osm_id or next(idseq), osm_type, cls, typ, names,
185 admin_level, address, extratags,
186 geom or 'SRID=4326;POINT(0 0)'))
191 def placex_table(temp_db_with_extensions, temp_db_conn):
192 """ Create an empty version of the place table.
194 with temp_db_conn.cursor() as cur:
195 cur.execute("""CREATE TABLE placex (
197 parent_place_id BIGINT,
198 linked_place_id BIGINT,
200 indexed_date TIMESTAMP,
201 geometry_sector INTEGER,
202 rank_address SMALLINT,
203 rank_search SMALLINT,
205 indexed_status SMALLINT,
211 admin_level smallint,
214 geometry Geometry(Geometry,4326),
216 country_code varchar(2),
219 centroid GEOMETRY(Geometry, 4326))""")
220 temp_db_conn.commit()
224 def osmline_table(temp_db_with_extensions, temp_db_conn):
225 with temp_db_conn.cursor() as cur:
226 cur.execute("""CREATE TABLE location_property_osmline (
229 parent_place_id BIGINT,
230 geometry_sector INTEGER,
231 indexed_date TIMESTAMP,
235 indexed_status SMALLINT,
237 interpolationtype TEXT,
240 country_code VARCHAR(2))""")
241 temp_db_conn.commit()
245 def word_table(temp_db, temp_db_conn):
246 with temp_db_conn.cursor() as cur:
247 cur.execute("""CREATE TABLE word (
253 country_code varchar(2),
254 search_name_count INTEGER,
256 temp_db_conn.commit()
260 def osm2pgsql_options(temp_db):
261 return dict(osm2pgsql='echo',
263 osm2pgsql_style='style.file',
265 dsn='dbname=' + temp_db,
267 tablespaces=dict(slim_data='', slim_index='',
268 main_data='', main_index=''))