2 Main work horse for indexing (computing addresses) the database.
9 from nominatim.indexer.progress import ProgressLogger
10 from nominatim.indexer import runners
11 from nominatim.db.async_connection import DBConnection, WorkerPool
12 from nominatim.db.connection import connect
14 LOG = logging.getLogger()
18 """ Asynchronous connection that fetches place details for processing.
20 def __init__(self, dsn, setup_conn):
22 self.current_ids = None
23 self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
25 with setup_conn.cursor() as cur:
26 # need to fetch those manually because register_hstore cannot
27 # fetch them on an asynchronous connection below.
28 hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
29 hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
31 psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
32 array_oid=hstore_array_oid)
35 """ Close the underlying asynchronous connection.
42 def fetch_next_batch(self, cur, runner):
43 """ Send a request for the next batch of places.
44 If details for the places are required, they will be fetched
47 Returns true if there is still data available.
49 ids = cur.fetchmany(100)
52 self.current_ids = None
55 if hasattr(runner, 'get_place_details'):
56 runner.get_place_details(self.conn, ids)
59 self.current_ids = ids
64 """ Get the next batch of data, previously requested with
67 if self.current_ids is not None and not self.current_ids:
70 self.wait_time += time.time() - tstart
71 self.current_ids = self.conn.cursor.fetchall()
73 return self.current_ids
79 def __exit__(self, exc_type, exc_value, traceback):
85 """ Main indexing routine.
88 def __init__(self, dsn, tokenizer, num_threads):
90 self.tokenizer = tokenizer
91 self.num_threads = num_threads
94 def index_full(self, analyse=True):
95 """ Index the complete database. This will first index boudnaries
96 followed by all other objects. When `analyse` is True, then the
97 database will be analysed at the appropriate places to
98 ensure that database statistics are updated.
100 with connect(self.dsn) as conn:
101 conn.autocommit = True
105 with conn.cursor() as cur:
106 cur.execute('ANALYZE')
111 self.index_by_rank(0, 4)
114 self.index_boundaries(0, 30)
117 self.index_by_rank(5, 25)
120 self.index_by_rank(26, 30)
123 self.index_postcodes()
127 def index_boundaries(self, minrank, maxrank):
128 """ Index only administrative boundaries within the given rank range.
130 LOG.warning("Starting indexing boundaries using %s threads",
133 with self.tokenizer.name_analyzer() as analyzer:
134 for rank in range(max(minrank, 4), min(maxrank, 26)):
135 self._index(runners.BoundaryRunner(rank, analyzer))
137 def index_by_rank(self, minrank, maxrank):
138 """ Index all entries of placex in the given rank range (inclusive)
139 in order of their address rank.
141 When rank 30 is requested then also interpolations and
142 places with address rank 0 will be indexed.
144 maxrank = min(maxrank, 30)
145 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
146 minrank, maxrank, self.num_threads)
148 with self.tokenizer.name_analyzer() as analyzer:
149 for rank in range(max(1, minrank), maxrank):
150 self._index(runners.RankRunner(rank, analyzer))
153 self._index(runners.RankRunner(0, analyzer))
154 self._index(runners.InterpolationRunner(analyzer), 20)
155 self._index(runners.RankRunner(30, analyzer), 20)
157 self._index(runners.RankRunner(maxrank, analyzer))
160 def index_postcodes(self):
161 """Index the entries ofthe location_postcode table.
163 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
165 self._index(runners.PostcodeRunner(), 20)
168 def update_status_table(self):
169 """ Update the status in the status table to 'indexed'.
171 with connect(self.dsn) as conn:
172 with conn.cursor() as cur:
173 cur.execute('UPDATE import_status SET indexed = true')
177 def _index(self, runner, batch=1):
178 """ Index a single rank or table. `runner` describes the SQL to use
179 for indexing. `batch` describes the number of objects that
180 should be processed with a single SQL statement
182 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
184 with connect(self.dsn) as conn:
185 psycopg2.extras.register_hstore(conn)
186 with conn.cursor() as cur:
187 total_tuples = cur.scalar(runner.sql_count_objects())
188 LOG.debug("Total number of rows: %i", total_tuples)
192 progress = ProgressLogger(runner.name(), total_tuples)
195 with conn.cursor(name='places') as cur:
196 cur.execute(runner.sql_get_objects())
198 with PlaceFetcher(self.dsn, conn) as fetcher:
199 with WorkerPool(self.dsn, self.num_threads) as pool:
200 has_more = fetcher.fetch_next_batch(cur, runner)
202 places = fetcher.get_batch()
204 # asynchronously get the next batch
205 has_more = fetcher.fetch_next_batch(cur, runner)
207 # And insert the curent batch
208 for idx in range(0, len(places), batch):
209 part = places[idx:idx+batch]
210 LOG.debug("Processing places: %s", str(part))
211 runner.index_places(pool.next_free_worker(), part)
212 progress.add(len(part))
214 LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
215 fetcher.wait_time, pool.wait_time)