2 Main work horse for indexing (computing addresses) the database.
9 from nominatim.indexer.progress import ProgressLogger
10 from nominatim.indexer import runners
11 from nominatim.db.async_connection import DBConnection, WorkerPool
12 from nominatim.db.connection import connect
14 LOG = logging.getLogger()
18 """ Asynchronous connection that fetches place details for processing.
20 def __init__(self, dsn, setup_conn):
22 self.current_ids = None
23 self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
25 with setup_conn.cursor() as cur:
26 # need to fetch those manually because register_hstore cannot
27 # fetch them on an asynchronous connection below.
28 hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
29 hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
31 psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
32 array_oid=hstore_array_oid)
35 """ Close the underlying asynchronous connection.
42 def fetch_next_batch(self, cur, runner):
43 """ Send a request for the next batch of places.
44 If details for the places are required, they will be fetched
47 Returns true if there is still data available.
49 ids = cur.fetchmany(100)
52 self.current_ids = None
55 if hasattr(runner, 'get_place_details'):
56 runner.get_place_details(self.conn, ids)
59 self.current_ids = ids
64 """ Get the next batch of data, previously requested with
67 if self.current_ids is not None and not self.current_ids:
70 self.wait_time += time.time() - tstart
71 self.current_ids = self.conn.cursor.fetchall()
73 return self.current_ids
79 def __exit__(self, exc_type, exc_value, traceback):
85 """ Main indexing routine.
88 def __init__(self, dsn, tokenizer, num_threads):
90 self.tokenizer = tokenizer
91 self.num_threads = num_threads
94 def has_pending(self):
95 """ Check if any data still needs indexing.
96 This function must only be used after the import has finished.
97 Otherwise it will be very expensive.
99 with connect(self.dsn) as conn:
100 with conn.cursor() as cur:
101 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
102 return cur.rowcount > 0
105 def index_full(self, analyse=True):
106 """ Index the complete database. This will first index boundaries
107 followed by all other objects. When `analyse` is True, then the
108 database will be analysed at the appropriate places to
109 ensure that database statistics are updated.
111 with connect(self.dsn) as conn:
112 conn.autocommit = True
116 with conn.cursor() as cur:
117 cur.execute('ANALYZE')
119 self.index_by_rank(0, 4)
122 self.index_boundaries(0, 30)
125 self.index_by_rank(5, 25)
128 self.index_by_rank(26, 30)
131 self.index_postcodes()
135 def index_boundaries(self, minrank, maxrank):
136 """ Index only administrative boundaries within the given rank range.
138 LOG.warning("Starting indexing boundaries using %s threads",
141 with self.tokenizer.name_analyzer() as analyzer:
142 for rank in range(max(minrank, 4), min(maxrank, 26)):
143 self._index(runners.BoundaryRunner(rank, analyzer))
145 def index_by_rank(self, minrank, maxrank):
146 """ Index all entries of placex in the given rank range (inclusive)
147 in order of their address rank.
149 When rank 30 is requested then also interpolations and
150 places with address rank 0 will be indexed.
152 maxrank = min(maxrank, 30)
153 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
154 minrank, maxrank, self.num_threads)
156 with self.tokenizer.name_analyzer() as analyzer:
157 for rank in range(max(1, minrank), maxrank):
158 self._index(runners.RankRunner(rank, analyzer))
161 self._index(runners.RankRunner(0, analyzer))
162 self._index(runners.InterpolationRunner(analyzer), 20)
163 self._index(runners.RankRunner(30, analyzer), 20)
165 self._index(runners.RankRunner(maxrank, analyzer))
168 def index_postcodes(self):
169 """Index the entries ofthe location_postcode table.
171 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
173 self._index(runners.PostcodeRunner(), 20)
176 def update_status_table(self):
177 """ Update the status in the status table to 'indexed'.
179 with connect(self.dsn) as conn:
180 with conn.cursor() as cur:
181 cur.execute('UPDATE import_status SET indexed = true')
185 def _index(self, runner, batch=1):
186 """ Index a single rank or table. `runner` describes the SQL to use
187 for indexing. `batch` describes the number of objects that
188 should be processed with a single SQL statement
190 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
192 with connect(self.dsn) as conn:
193 psycopg2.extras.register_hstore(conn)
194 with conn.cursor() as cur:
195 total_tuples = cur.scalar(runner.sql_count_objects())
196 LOG.debug("Total number of rows: %i", total_tuples)
200 progress = ProgressLogger(runner.name(), total_tuples)
203 with conn.cursor(name='places') as cur:
204 cur.execute(runner.sql_get_objects())
206 with PlaceFetcher(self.dsn, conn) as fetcher:
207 with WorkerPool(self.dsn, self.num_threads) as pool:
208 has_more = fetcher.fetch_next_batch(cur, runner)
210 places = fetcher.get_batch()
212 # asynchronously get the next batch
213 has_more = fetcher.fetch_next_batch(cur, runner)
215 # And insert the curent batch
216 for idx in range(0, len(places), batch):
217 part = places[idx:idx + batch]
218 LOG.debug("Processing places: %s", str(part))
219 runner.index_places(pool.next_free_worker(), part)
220 progress.add(len(part))
222 LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
223 fetcher.wait_time, pool.wait_time)