2 Main work horse for indexing (computing addresses) the database.
9 from nominatim.indexer.progress import ProgressLogger
10 from nominatim.indexer import runners
11 from nominatim.db.async_connection import DBConnection, WorkerPool
12 from nominatim.db.connection import connect
14 LOG = logging.getLogger()
18 """ Asynchronous connection that fetches place details for processing.
20 def __init__(self, dsn, setup_conn):
22 self.current_ids = None
23 self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
25 with setup_conn.cursor() as cur:
26 # need to fetch those manually because register_hstore cannot
27 # fetch them on an asynchronous connection below.
28 hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
29 hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
31 psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
32 array_oid=hstore_array_oid)
35 """ Close the underlying asynchronous connection.
42 def fetch_next_batch(self, cur, runner):
43 """ Send a request for the next batch of places.
44 If details for the places are required, they will be fetched
47 Returns true if there is still data available.
49 ids = cur.fetchmany(100)
52 self.current_ids = None
55 if hasattr(runner, 'get_place_details'):
56 runner.get_place_details(self.conn, ids)
59 self.current_ids = ids
64 """ Get the next batch of data, previously requested with
67 if self.current_ids is not None and not self.current_ids:
70 self.wait_time += time.time() - tstart
71 self.current_ids = self.conn.cursor.fetchall()
73 return self.current_ids
79 def __exit__(self, exc_type, exc_value, traceback):
85 """ Main indexing routine.
88 def __init__(self, dsn, tokenizer, num_threads):
90 self.tokenizer = tokenizer
91 self.num_threads = num_threads
94 def index_full(self, analyse=True):
95 """ Index the complete database. This will first index boundaries
96 followed by all other objects. When `analyse` is True, then the
97 database will be analysed at the appropriate places to
98 ensure that database statistics are updated.
100 with connect(self.dsn) as conn:
101 conn.autocommit = True
105 with conn.cursor() as cur:
106 cur.execute('ANALYZE')
108 self.index_by_rank(0, 4)
111 self.index_boundaries(0, 30)
114 self.index_by_rank(5, 25)
117 self.index_by_rank(26, 30)
120 self.index_postcodes()
124 def index_boundaries(self, minrank, maxrank):
125 """ Index only administrative boundaries within the given rank range.
127 LOG.warning("Starting indexing boundaries using %s threads",
130 with self.tokenizer.name_analyzer() as analyzer:
131 for rank in range(max(minrank, 4), min(maxrank, 26)):
132 self._index(runners.BoundaryRunner(rank, analyzer))
134 def index_by_rank(self, minrank, maxrank):
135 """ Index all entries of placex in the given rank range (inclusive)
136 in order of their address rank.
138 When rank 30 is requested then also interpolations and
139 places with address rank 0 will be indexed.
141 maxrank = min(maxrank, 30)
142 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
143 minrank, maxrank, self.num_threads)
145 with self.tokenizer.name_analyzer() as analyzer:
146 for rank in range(max(1, minrank), maxrank):
147 self._index(runners.RankRunner(rank, analyzer))
150 self._index(runners.RankRunner(0, analyzer))
151 self._index(runners.InterpolationRunner(analyzer), 20)
152 self._index(runners.RankRunner(30, analyzer), 20)
154 self._index(runners.RankRunner(maxrank, analyzer))
157 def index_postcodes(self):
158 """Index the entries ofthe location_postcode table.
160 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
162 self._index(runners.PostcodeRunner(), 20)
165 def update_status_table(self):
166 """ Update the status in the status table to 'indexed'.
168 with connect(self.dsn) as conn:
169 with conn.cursor() as cur:
170 cur.execute('UPDATE import_status SET indexed = true')
174 def _index(self, runner, batch=1):
175 """ Index a single rank or table. `runner` describes the SQL to use
176 for indexing. `batch` describes the number of objects that
177 should be processed with a single SQL statement
179 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
181 with connect(self.dsn) as conn:
182 psycopg2.extras.register_hstore(conn)
183 with conn.cursor() as cur:
184 total_tuples = cur.scalar(runner.sql_count_objects())
185 LOG.debug("Total number of rows: %i", total_tuples)
189 progress = ProgressLogger(runner.name(), total_tuples)
192 with conn.cursor(name='places') as cur:
193 cur.execute(runner.sql_get_objects())
195 with PlaceFetcher(self.dsn, conn) as fetcher:
196 with WorkerPool(self.dsn, self.num_threads) as pool:
197 has_more = fetcher.fetch_next_batch(cur, runner)
199 places = fetcher.get_batch()
201 # asynchronously get the next batch
202 has_more = fetcher.fetch_next_batch(cur, runner)
204 # And insert the curent batch
205 for idx in range(0, len(places), batch):
206 part = places[idx:idx + batch]
207 LOG.debug("Processing places: %s", str(part))
208 runner.index_places(pool.next_free_worker(), part)
209 progress.add(len(part))
211 LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
212 fetcher.wait_time, pool.wait_time)