1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Main work horse for indexing (computing addresses) the database.
10 from typing import cast, List, Any, Optional
16 from ..db.connection import connect, execute_scalar
17 from ..db.query_pool import QueryPool
18 from ..tokenizer.base import AbstractTokenizer
19 from .progress import ProgressLogger
22 LOG = logging.getLogger()
26 """ Main indexing routine.
29 def __init__(self, dsn: str, tokenizer: AbstractTokenizer, num_threads: int):
31 self.tokenizer = tokenizer
32 self.num_threads = num_threads
34 def has_pending(self) -> bool:
35 """ Check if any data still needs indexing.
36 This function must only be used after the import has finished.
37 Otherwise it will be very expensive.
39 with connect(self.dsn) as conn:
40 with conn.cursor() as cur:
41 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
42 return cur.rowcount > 0
44 async def index_full(self, analyse: bool = True) -> None:
45 """ Index the complete database. This will first index boundaries
46 followed by all other objects. When `analyse` is True, then the
47 database will be analysed at the appropriate places to
48 ensure that database statistics are updated.
50 with connect(self.dsn) as conn:
51 conn.autocommit = True
53 def _analyze() -> None:
55 with conn.cursor() as cur:
56 cur.execute('ANALYZE')
59 if await self.index_by_rank(0, 4) > 0:
62 if await self.index_boundaries(0, 30) > 100:
65 if await self.index_by_rank(5, 25) > 100:
68 if await self.index_by_rank(26, 30) > 1000:
71 if await self.index_postcodes() > 100:
74 if not self.has_pending():
77 async def index_boundaries(self, minrank: int, maxrank: int) -> int:
78 """ Index only administrative boundaries within the given rank range.
81 LOG.warning("Starting indexing boundaries using %s threads",
84 minrank = max(minrank, 4)
85 maxrank = min(maxrank, 25)
87 # Precompute number of rows to process for all rows
88 with connect(self.dsn) as conn:
89 hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
90 if hstore_info is None:
91 raise RuntimeError('Hstore extension is requested but not installed.')
92 psycopg.types.hstore.register_hstore(hstore_info)
94 with conn.cursor() as cur:
95 cur = conn.execute(""" SELECT rank_search, count(*)
97 WHERE rank_search between %s and %s
98 AND class = 'boundary' and type = 'administrative'
99 AND indexed_status > 0
100 GROUP BY rank_search""",
102 total_tuples = {row.rank_search: row.count for row in cur}
104 with self.tokenizer.name_analyzer() as analyzer:
105 for rank in range(minrank, maxrank + 1):
106 total += await self._index(runners.BoundaryRunner(rank, analyzer),
107 total_tuples=total_tuples.get(rank, 0))
111 async def index_by_rank(self, minrank: int, maxrank: int) -> int:
112 """ Index all entries of placex in the given rank range (inclusive)
113 in order of their address rank.
115 When rank 30 is requested then also interpolations and
116 places with address rank 0 will be indexed.
119 maxrank = min(maxrank, 30)
120 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
121 minrank, maxrank, self.num_threads)
123 # Precompute number of rows to process for all rows
124 with connect(self.dsn) as conn:
125 hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
126 if hstore_info is None:
127 raise RuntimeError('Hstore extension is requested but not installed.')
128 psycopg.types.hstore.register_hstore(hstore_info)
130 with conn.cursor() as cur:
131 cur = conn.execute(""" SELECT rank_address, count(*)
133 WHERE rank_address between %s and %s
134 AND indexed_status > 0
135 GROUP BY rank_address""",
137 total_tuples = {row.rank_address: row.count for row in cur}
139 with self.tokenizer.name_analyzer() as analyzer:
140 for rank in range(max(1, minrank), maxrank + 1):
147 total += await self._index(runners.RankRunner(rank, analyzer),
148 batch=batch, total_tuples=total_tuples.get(rank, 0))
151 total += await self._index(runners.RankRunner(0, analyzer))
152 total += await self._index(runners.InterpolationRunner(analyzer), batch=20)
156 async def index_postcodes(self) -> int:
157 """Index the entries of the location_postcode table.
159 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
161 return await self._index(runners.PostcodeRunner(), batch=20)
163 def update_status_table(self) -> None:
164 """ Update the status in the status table to 'indexed'.
166 with connect(self.dsn) as conn:
167 with conn.cursor() as cur:
168 cur.execute('UPDATE import_status SET indexed = true')
172 async def _index(self, runner: runners.Runner, batch: int = 1,
173 total_tuples: Optional[int] = None) -> int:
174 """ Index a single rank or table. `runner` describes the SQL to use
175 for indexing. `batch` describes the number of objects that
176 should be processed with a single SQL statement.
178 `total_tuples` may contain the total number of rows to process.
179 When not supplied, the value will be computed using the
180 approriate runner function.
182 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
184 if total_tuples is None:
185 total_tuples = self._prepare_indexing(runner)
187 progress = ProgressLogger(runner.name(), total_tuples)
190 async with await psycopg.AsyncConnection.connect(
191 self.dsn, row_factory=psycopg.rows.dict_row) as aconn, \
192 QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
195 async with aconn.cursor(name='places') as cur:
196 query = runner.index_places_query(batch)
197 params: List[Any] = []
199 async for place in cur.stream(runner.sql_get_objects()):
200 fetcher_time += time.time() - tstart
202 params.extend(runner.index_places_params(place))
205 if num_places >= batch:
206 LOG.debug("Processing places: %s", str(params))
207 await pool.put_query(query, params)
208 progress.add(num_places)
215 await pool.put_query(runner.index_places_query(num_places), params)
217 LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
218 fetcher_time, pool.wait_time)
220 return progress.done()
222 def _prepare_indexing(self, runner: runners.Runner) -> int:
223 with connect(self.dsn) as conn:
224 hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
225 if hstore_info is None:
226 raise RuntimeError('Hstore extension is requested but not installed.')
227 psycopg.types.hstore.register_hstore(hstore_info)
229 total_tuples = execute_scalar(conn, runner.sql_count_objects())
230 LOG.debug("Total number of rows: %i", total_tuples)
231 return cast(int, total_tuples)