1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Main work horse for indexing (computing addresses) the database.
10 from typing import cast, List, Any
16 from ..db.connection import connect, execute_scalar
17 from ..db.query_pool import QueryPool
18 from ..tokenizer.base import AbstractTokenizer
19 from .progress import ProgressLogger
22 LOG = logging.getLogger()
25 """ Main indexing routine.
28 def __init__(self, dsn: str, tokenizer: AbstractTokenizer, num_threads: int):
30 self.tokenizer = tokenizer
31 self.num_threads = num_threads
34 def has_pending(self) -> bool:
35 """ Check if any data still needs indexing.
36 This function must only be used after the import has finished.
37 Otherwise it will be very expensive.
39 with connect(self.dsn) as conn:
40 with conn.cursor() as cur:
41 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
42 return cur.rowcount > 0
45 async def index_full(self, analyse: bool = True) -> None:
46 """ Index the complete database. This will first index boundaries
47 followed by all other objects. When `analyse` is True, then the
48 database will be analysed at the appropriate places to
49 ensure that database statistics are updated.
51 with connect(self.dsn) as conn:
52 conn.autocommit = True
54 def _analyze() -> None:
56 with conn.cursor() as cur:
57 cur.execute('ANALYZE')
60 if await self.index_by_rank(0, 4) > 0:
63 if await self.index_boundaries(0, 30) > 100:
66 if await self.index_by_rank(5, 25) > 100:
69 if await self.index_by_rank(26, 30) > 1000:
72 if await self.index_postcodes() > 100:
75 if not self.has_pending():
79 async def index_boundaries(self, minrank: int, maxrank: int) -> int:
80 """ Index only administrative boundaries within the given rank range.
83 LOG.warning("Starting indexing boundaries using %s threads",
86 with self.tokenizer.name_analyzer() as analyzer:
87 for rank in range(max(minrank, 4), min(maxrank, 26)):
88 total += await self._index(runners.BoundaryRunner(rank, analyzer))
92 async def index_by_rank(self, minrank: int, maxrank: int) -> int:
93 """ Index all entries of placex in the given rank range (inclusive)
94 in order of their address rank.
96 When rank 30 is requested then also interpolations and
97 places with address rank 0 will be indexed.
100 maxrank = min(maxrank, 30)
101 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
102 minrank, maxrank, self.num_threads)
104 with self.tokenizer.name_analyzer() as analyzer:
105 for rank in range(max(1, minrank), maxrank + 1):
112 total += await self._index(runners.RankRunner(rank, analyzer), batch)
115 total += await self._index(runners.RankRunner(0, analyzer))
116 total += await self._index(runners.InterpolationRunner(analyzer), 20)
121 async def index_postcodes(self) -> int:
122 """Index the entries of the location_postcode table.
124 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
126 return await self._index(runners.PostcodeRunner(), 20)
129 def update_status_table(self) -> None:
130 """ Update the status in the status table to 'indexed'.
132 with connect(self.dsn) as conn:
133 with conn.cursor() as cur:
134 cur.execute('UPDATE import_status SET indexed = true')
138 async def _index(self, runner: runners.Runner, batch: int = 1) -> int:
139 """ Index a single rank or table. `runner` describes the SQL to use
140 for indexing. `batch` describes the number of objects that
141 should be processed with a single SQL statement
143 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
145 total_tuples = self._prepare_indexing(runner)
147 progress = ProgressLogger(runner.name(), total_tuples)
150 async with await psycopg.AsyncConnection.connect(
151 self.dsn, row_factory=psycopg.rows.dict_row) as aconn,\
152 QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
155 async with aconn.cursor(name='places') as cur:
156 query = runner.index_places_query(batch)
157 params: List[Any] = []
159 async for place in cur.stream(runner.sql_get_objects()):
160 fetcher_time += time.time() - tstart
162 params.extend(runner.index_places_params(place))
165 if num_places >= batch:
166 LOG.debug("Processing places: %s", str(params))
167 await pool.put_query(query, params)
168 progress.add(num_places)
175 await pool.put_query(runner.index_places_query(num_places), params)
177 LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
178 fetcher_time, pool.wait_time)
180 return progress.done()
183 def _prepare_indexing(self, runner: runners.Runner) -> int:
184 with connect(self.dsn) as conn:
185 hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
186 if hstore_info is None:
187 raise RuntimeError('Hstore extension is requested but not installed.')
188 psycopg.types.hstore.register_hstore(hstore_info)
190 total_tuples = execute_scalar(conn, runner.sql_count_objects())
191 LOG.debug("Total number of rows: %i", total_tuples)
192 return cast(int, total_tuples)