]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
adapt tests to new TIGER CSV format
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 import logging
5 import time
6
7 import psycopg2.extras
8
9 from nominatim.indexer.progress import ProgressLogger
10 from nominatim.indexer import runners
11 from nominatim.db.async_connection import DBConnection, WorkerPool
12 from nominatim.db.connection import connect
13
14 LOG = logging.getLogger()
15
16
17 class PlaceFetcher:
18     """ Asynchronous connection that fetches place details for processing.
19     """
20     def __init__(self, dsn, setup_conn):
21         self.wait_time = 0
22         self.current_ids = None
23         self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
24
25         with setup_conn.cursor() as cur:
26             # need to fetch those manually because register_hstore cannot
27             # fetch them on an asynchronous connection below.
28             hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
29             hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
30
31         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
32                                         array_oid=hstore_array_oid)
33
34     def close(self):
35         """ Close the underlying asynchronous connection.
36         """
37         if self.conn:
38             self.conn.close()
39             self.conn = None
40
41
42     def fetch_next_batch(self, cur, runner):
43         """ Send a request for the next batch of places.
44             If details for the places are required, they will be fetched
45             asynchronously.
46
47             Returns true if there is still data available.
48         """
49         ids = cur.fetchmany(100)
50
51         if not ids:
52             self.current_ids = None
53             return False
54
55         if hasattr(runner, 'get_place_details'):
56             runner.get_place_details(self.conn, ids)
57             self.current_ids = []
58         else:
59             self.current_ids = ids
60
61         return True
62
63     def get_batch(self):
64         """ Get the next batch of data, previously requested with
65             `fetch_next_batch`.
66         """
67         if self.current_ids is not None and not self.current_ids:
68             tstart = time.time()
69             self.conn.wait()
70             self.wait_time += time.time() - tstart
71             self.current_ids = self.conn.cursor.fetchall()
72
73         return self.current_ids
74
75     def __enter__(self):
76         return self
77
78
79     def __exit__(self, exc_type, exc_value, traceback):
80         self.conn.wait()
81         self.close()
82
83
84 class Indexer:
85     """ Main indexing routine.
86     """
87
88     def __init__(self, dsn, tokenizer, num_threads):
89         self.dsn = dsn
90         self.tokenizer = tokenizer
91         self.num_threads = num_threads
92
93
94     def index_full(self, analyse=True):
95         """ Index the complete database. This will first index boudnaries
96             followed by all other objects. When `analyse` is True, then the
97             database will be analysed at the appropriate places to
98             ensure that database statistics are updated.
99         """
100         with connect(self.dsn) as conn:
101             conn.autocommit = True
102
103             if analyse:
104                 def _analyze():
105                     with conn.cursor() as cur:
106                         cur.execute('ANALYZE')
107             else:
108                 def _analyze():
109                     pass
110
111             self.index_by_rank(0, 4)
112             _analyze()
113
114             self.index_boundaries(0, 30)
115             _analyze()
116
117             self.index_by_rank(5, 25)
118             _analyze()
119
120             self.index_by_rank(26, 30)
121             _analyze()
122
123             self.index_postcodes()
124             _analyze()
125
126
127     def index_boundaries(self, minrank, maxrank):
128         """ Index only administrative boundaries within the given rank range.
129         """
130         LOG.warning("Starting indexing boundaries using %s threads",
131                     self.num_threads)
132
133         with self.tokenizer.name_analyzer() as analyzer:
134             for rank in range(max(minrank, 4), min(maxrank, 26)):
135                 self._index(runners.BoundaryRunner(rank, analyzer))
136
137     def index_by_rank(self, minrank, maxrank):
138         """ Index all entries of placex in the given rank range (inclusive)
139             in order of their address rank.
140
141             When rank 30 is requested then also interpolations and
142             places with address rank 0 will be indexed.
143         """
144         maxrank = min(maxrank, 30)
145         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
146                     minrank, maxrank, self.num_threads)
147
148         with self.tokenizer.name_analyzer() as analyzer:
149             for rank in range(max(1, minrank), maxrank):
150                 self._index(runners.RankRunner(rank, analyzer))
151
152             if maxrank == 30:
153                 self._index(runners.RankRunner(0, analyzer))
154                 self._index(runners.InterpolationRunner(analyzer), 20)
155                 self._index(runners.RankRunner(30, analyzer), 20)
156             else:
157                 self._index(runners.RankRunner(maxrank, analyzer))
158
159
160     def index_postcodes(self):
161         """Index the entries ofthe location_postcode table.
162         """
163         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
164
165         self._index(runners.PostcodeRunner(), 20)
166
167
168     def update_status_table(self):
169         """ Update the status in the status table to 'indexed'.
170         """
171         with connect(self.dsn) as conn:
172             with conn.cursor() as cur:
173                 cur.execute('UPDATE import_status SET indexed = true')
174
175             conn.commit()
176
177     def _index(self, runner, batch=1):
178         """ Index a single rank or table. `runner` describes the SQL to use
179             for indexing. `batch` describes the number of objects that
180             should be processed with a single SQL statement
181         """
182         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
183
184         with connect(self.dsn) as conn:
185             psycopg2.extras.register_hstore(conn)
186             with conn.cursor() as cur:
187                 total_tuples = cur.scalar(runner.sql_count_objects())
188                 LOG.debug("Total number of rows: %i", total_tuples)
189
190             conn.commit()
191
192             progress = ProgressLogger(runner.name(), total_tuples)
193
194             if total_tuples > 0:
195                 with conn.cursor(name='places') as cur:
196                     cur.execute(runner.sql_get_objects())
197
198                     with PlaceFetcher(self.dsn, conn) as fetcher:
199                         with WorkerPool(self.dsn, self.num_threads) as pool:
200                             has_more = fetcher.fetch_next_batch(cur, runner)
201                             while has_more:
202                                 places = fetcher.get_batch()
203
204                                 # asynchronously get the next batch
205                                 has_more = fetcher.fetch_next_batch(cur, runner)
206
207                                 # And insert the curent batch
208                                 for idx in range(0, len(places), batch):
209                                     part = places[idx:idx+batch]
210                                     LOG.debug("Processing places: %s", str(part))
211                                     runner.index_places(pool.next_free_worker(), part)
212                                     progress.add(len(part))
213
214                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
215                                      fetcher.wait_time, pool.wait_time)
216
217                 conn.commit()
218
219         progress.done()