1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Main work horse for indexing (computing addresses) the database.
10 from typing import cast, List, Any, Optional
16 from ..db.connection import connect, execute_scalar
17 from ..db.query_pool import QueryPool
18 from ..tokenizer.base import AbstractTokenizer
19 from .progress import ProgressLogger
22 LOG = logging.getLogger()
25 """ Main indexing routine.
28 def __init__(self, dsn: str, tokenizer: AbstractTokenizer, num_threads: int):
30 self.tokenizer = tokenizer
31 self.num_threads = num_threads
34 def has_pending(self) -> bool:
35 """ Check if any data still needs indexing.
36 This function must only be used after the import has finished.
37 Otherwise it will be very expensive.
39 with connect(self.dsn) as conn:
40 with conn.cursor() as cur:
41 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
42 return cur.rowcount > 0
45 async def index_full(self, analyse: bool = True) -> None:
46 """ Index the complete database. This will first index boundaries
47 followed by all other objects. When `analyse` is True, then the
48 database will be analysed at the appropriate places to
49 ensure that database statistics are updated.
51 with connect(self.dsn) as conn:
52 conn.autocommit = True
54 def _analyze() -> None:
56 with conn.cursor() as cur:
57 cur.execute('ANALYZE')
60 if await self.index_by_rank(0, 4) > 0:
63 if await self.index_boundaries(0, 30) > 100:
66 if await self.index_by_rank(5, 25) > 100:
69 if await self.index_by_rank(26, 30) > 1000:
72 if await self.index_postcodes() > 100:
75 if not self.has_pending():
79 async def index_boundaries(self, minrank: int, maxrank: int) -> int:
80 """ Index only administrative boundaries within the given rank range.
83 LOG.warning("Starting indexing boundaries using %s threads",
86 minrank = max(minrank, 4)
87 maxrank = min(maxrank, 25)
89 # Precompute number of rows to process for all rows
90 with connect(self.dsn) as conn:
91 hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
92 if hstore_info is None:
93 raise RuntimeError('Hstore extension is requested but not installed.')
94 psycopg.types.hstore.register_hstore(hstore_info)
96 with conn.cursor() as cur:
97 cur = conn.execute(""" SELECT rank_search, count(*)
99 WHERE rank_search between %s and %s
100 AND class = 'boundary' and type = 'administrative'
101 AND indexed_status > 0
102 GROUP BY rank_search""",
104 total_tuples = {row.rank_search: row.count for row in cur}
106 with self.tokenizer.name_analyzer() as analyzer:
107 for rank in range(minrank, maxrank + 1):
108 total += await self._index(runners.BoundaryRunner(rank, analyzer),
109 total_tuples=total_tuples.get(rank, 0))
113 async def index_by_rank(self, minrank: int, maxrank: int) -> int:
114 """ Index all entries of placex in the given rank range (inclusive)
115 in order of their address rank.
117 When rank 30 is requested then also interpolations and
118 places with address rank 0 will be indexed.
121 maxrank = min(maxrank, 30)
122 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
123 minrank, maxrank, self.num_threads)
125 # Precompute number of rows to process for all rows
126 with connect(self.dsn) as conn:
127 hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
128 if hstore_info is None:
129 raise RuntimeError('Hstore extension is requested but not installed.')
130 psycopg.types.hstore.register_hstore(hstore_info)
132 with conn.cursor() as cur:
133 cur = conn.execute(""" SELECT rank_address, count(*)
135 WHERE rank_address between %s and %s
136 AND indexed_status > 0
137 GROUP BY rank_address""",
139 total_tuples = {row.rank_address: row.count for row in cur}
142 with self.tokenizer.name_analyzer() as analyzer:
143 for rank in range(max(1, minrank), maxrank + 1):
150 total += await self._index(runners.RankRunner(rank, analyzer),
151 batch=batch, total_tuples=total_tuples.get(rank, 0))
154 total += await self._index(runners.RankRunner(0, analyzer))
155 total += await self._index(runners.InterpolationRunner(analyzer), batch=20)
160 async def index_postcodes(self) -> int:
161 """Index the entries of the location_postcode table.
163 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
165 return await self._index(runners.PostcodeRunner(), batch=20)
168 def update_status_table(self) -> None:
169 """ Update the status in the status table to 'indexed'.
171 with connect(self.dsn) as conn:
172 with conn.cursor() as cur:
173 cur.execute('UPDATE import_status SET indexed = true')
177 async def _index(self, runner: runners.Runner, batch: int = 1,
178 total_tuples: Optional[int] = None) -> int:
179 """ Index a single rank or table. `runner` describes the SQL to use
180 for indexing. `batch` describes the number of objects that
181 should be processed with a single SQL statement.
183 `total_tuples` may contain the total number of rows to process.
184 When not supplied, the value will be computed using the
185 approriate runner function.
187 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
189 if total_tuples is None:
190 total_tuples = self._prepare_indexing(runner)
192 progress = ProgressLogger(runner.name(), total_tuples)
195 async with await psycopg.AsyncConnection.connect(
196 self.dsn, row_factory=psycopg.rows.dict_row) as aconn,\
197 QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
200 async with aconn.cursor(name='places') as cur:
201 query = runner.index_places_query(batch)
202 params: List[Any] = []
204 async for place in cur.stream(runner.sql_get_objects()):
205 fetcher_time += time.time() - tstart
207 params.extend(runner.index_places_params(place))
210 if num_places >= batch:
211 LOG.debug("Processing places: %s", str(params))
212 await pool.put_query(query, params)
213 progress.add(num_places)
220 await pool.put_query(runner.index_places_query(num_places), params)
222 LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
223 fetcher_time, pool.wait_time)
225 return progress.done()
228 def _prepare_indexing(self, runner: runners.Runner) -> int:
229 with connect(self.dsn) as conn:
230 hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
231 if hstore_info is None:
232 raise RuntimeError('Hstore extension is requested but not installed.')
233 psycopg.types.hstore.register_hstore(hstore_info)
235 total_tuples = execute_scalar(conn, runner.sql_count_objects())
236 LOG.debug("Total number of rows: %i", total_tuples)
237 return cast(int, total_tuples)