]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/nominatim.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / nominatim / nominatim.py
1 #! /usr/bin/env python3
2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
5 #
6 # Indexing tool for the Nominatim database.
7 #
8 # Based on C version by Brian Quinion
9 #
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
23 #-----------------------------------------------------------------------------
24
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
26 import logging
27 import sys
28 import re
29 import getpass
30 from datetime import datetime
31 import psycopg2
32 from psycopg2.extras import wait_select
33 import select
34
35 log = logging.getLogger()
36
37 def make_connection(options, asynchronous=False):
38     return psycopg2.connect(dbname=options.dbname, user=options.user,
39                             password=options.password, host=options.host,
40                             port=options.port, async_=asynchronous)
41
42
43 class RankRunner(object):
44     """ Returns SQL commands for indexing one rank within the placex table.
45     """
46
47     def __init__(self, rank):
48         self.rank = rank
49
50     def name(self):
51         return "rank {}".format(self.rank)
52
53     def sql_index_sectors(self):
54         return """SELECT geometry_sector, count(*) FROM placex
55                   WHERE rank_search = {} and indexed_status > 0
56                   GROUP BY geometry_sector
57                   ORDER BY geometry_sector""".format(self.rank)
58
59     def sql_nosector_places(self):
60         return """SELECT place_id FROM placex
61                   WHERE indexed_status > 0 and rank_search = {}
62                   ORDER BY geometry_sector""".format(self.rank)
63
64     def sql_sector_places(self):
65         return """SELECT place_id FROM placex
66                   WHERE indexed_status > 0 and rank_search = {}
67                         and geometry_sector = %s""".format(self.rank)
68
69     def sql_index_place(self):
70         return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s"
71
72
73 class InterpolationRunner(object):
74     """ Returns SQL commands for indexing the address interpolation table
75         location_property_osmline.
76     """
77
78     def name(self):
79         return "interpolation lines (location_property_osmline)"
80
81     def sql_index_sectors(self):
82         return """SELECT geometry_sector, count(*) FROM location_property_osmline
83                   WHERE indexed_status > 0
84                   GROUP BY geometry_sector
85                   ORDER BY geometry_sector"""
86
87     def sql_nosector_places(self):
88         return """SELECT place_id FROM location_property_osmline
89                   WHERE indexed_status > 0
90                   ORDER BY geometry_sector"""
91
92     def sql_sector_places(self):
93         return """SELECT place_id FROM location_property_osmline
94                   WHERE indexed_status > 0 and geometry_sector = %s
95                   ORDER BY geometry_sector"""
96
97     def sql_index_place(self):
98         return """UPDATE location_property_osmline
99                   SET indexed_status = 0 WHERE place_id = %s"""
100
101
102 class DBConnection(object):
103     """ A single non-blocking database connection.
104     """
105
106     def __init__(self, options):
107         self.conn = make_connection(options, asynchronous=True)
108         self.wait()
109
110         self.cursor = self.conn.cursor()
111
112         self.current_query = None
113         self.current_params = None
114
115     def wait(self):
116         """ Block until any pending operation is done.
117         """
118         wait_select(self.conn)
119         self.current_query = None
120
121     def perform(self, sql, args=None):
122         """ Send SQL query to the server. Returns immediately without
123             blocking.
124         """
125         self.current_query = sql
126         self.current_params = args
127         self.cursor.execute(sql, args)
128
129     def fileno(self):
130         """ File descriptor to wait for. (Makes this class select()able.)
131         """
132         return self.conn.fileno()
133
134     def is_done(self):
135         """ Check if the connection is available for a new query.
136
137             Also checks if the previous query has run into a deadlock.
138             If so, then the previous query is repeated.
139         """
140         if self.current_query is None:
141             return True
142
143         try:
144             if self.conn.poll() == psycopg2.extensions.POLL_OK:
145                 self.current_query = None
146                 return True
147         except psycopg2.extensions.TransactionRollbackError as e:
148             if e.pgcode == '40P01':
149                 log.info("Deadlock detected (params = {}), retry.".format(self.current_params))
150                 self.cursor.execute(self.current_query, self.current_params)
151             else:
152                 raise
153
154         return False
155
156
157 class Indexer(object):
158     """ Main indexing routine.
159     """
160
161     def __init__(self, options):
162         self.minrank = max(0, options.minrank)
163         self.maxrank = min(30, options.maxrank)
164         self.conn = make_connection(options)
165         self.threads = [DBConnection(options) for i in range(options.threads)]
166
167     def run(self):
168         """ Run indexing over the entire database.
169         """
170         log.warning("Starting indexing rank ({} to {}) using {} threads".format(
171                  self.minrank, self.maxrank, len(self.threads)))
172
173         for rank in range(self.minrank, self.maxrank):
174             self.index(RankRunner(rank))
175
176         if self.maxrank == 30:
177             self.index(InterpolationRunner())
178
179         self.index(RankRunner(self.maxrank))
180
181     def index(self, obj):
182         """ Index a single rank or table. `obj` describes the SQL to use
183             for indexing.
184         """
185         log.warning("Starting {}".format(obj.name()))
186
187         cur = self.conn.cursor(name='main')
188         cur.execute(obj.sql_index_sectors())
189
190         total_tuples = 0
191         for r in cur:
192             total_tuples += r[1]
193         log.debug("Total number of rows; {}".format(total_tuples))
194
195         cur.scroll(0, mode='absolute')
196
197         next_thread = self.find_free_thread()
198         done_tuples = 0
199         rank_start_time = datetime.now()
200
201         sector_sql = obj.sql_sector_places()
202         index_sql = obj.sql_index_place()
203         min_grouped_tuples = total_tuples - len(self.threads) * 1000
204
205         next_info = 100 if log.isEnabledFor(logging.INFO) else total_tuples + 1
206
207         for r in cur:
208             sector = r[0]
209
210             # Should we do the remaining ones together?
211             do_all = done_tuples > min_grouped_tuples
212
213             pcur = self.conn.cursor(name='places')
214
215             if do_all:
216                 pcur.execute(obj.sql_nosector_places())
217             else:
218                 pcur.execute(sector_sql, (sector, ))
219
220             for place in pcur:
221                 place_id = place[0]
222                 log.debug("Processing place {}".format(place_id))
223                 thread = next(next_thread)
224
225                 thread.perform(index_sql, (place_id,))
226                 done_tuples += 1
227
228                 if done_tuples >= next_info:
229                     now = datetime.now()
230                     done_time = (now - rank_start_time).total_seconds()
231                     tuples_per_sec = done_tuples / done_time
232                     log.info("Done {} in {} @ {:.3f} per second - {} ETA (seconds): {:.2f}"
233                            .format(done_tuples, int(done_time),
234                                    tuples_per_sec, obj.name(),
235                                    (total_tuples - done_tuples)/tuples_per_sec))
236                     next_info += int(tuples_per_sec)
237
238             pcur.close()
239
240             if do_all:
241                 break
242
243         cur.close()
244
245         for t in self.threads:
246             t.wait()
247
248         rank_end_time = datetime.now()
249         diff_seconds = (rank_end_time-rank_start_time).total_seconds()
250
251         log.warning("Done {}/{} in {} @ {:.3f} per second - FINISHED {}\n".format(
252                  done_tuples, total_tuples, int(diff_seconds),
253                  done_tuples/diff_seconds, obj.name()))
254
255     def find_free_thread(self):
256         """ Generator that returns the next connection that is free for
257             sending a query.
258         """
259         ready = self.threads
260
261         while True:
262             for thread in ready:
263                 if thread.is_done():
264                     yield thread
265
266             ready, _, _ = select.select(self.threads, [], [])
267
268         assert(False, "Unreachable code")
269
270
271 def nominatim_arg_parser():
272     """ Setup the command-line parser for the tool.
273     """
274     def h(s):
275         return re.sub("\s\s+" , " ", s)
276
277     p = ArgumentParser(description="Indexing tool for Nominatim.",
278                        formatter_class=RawDescriptionHelpFormatter)
279
280     p.add_argument('-d', '--database',
281                    dest='dbname', action='store', default='nominatim',
282                    help='Name of the PostgreSQL database to connect to.')
283     p.add_argument('-U', '--username',
284                    dest='user', action='store',
285                    help='PostgreSQL user name.')
286     p.add_argument('-W', '--password',
287                    dest='password_prompt', action='store_true',
288                    help='Force password prompt.')
289     p.add_argument('-H', '--host',
290                    dest='host', action='store',
291                    help='PostgreSQL server hostname or socket location.')
292     p.add_argument('-P', '--port',
293                    dest='port', action='store',
294                    help='PostgreSQL server port')
295     p.add_argument('-r', '--minrank',
296                    dest='minrank', type=int, metavar='RANK', default=0,
297                    help='Minimum/starting rank.')
298     p.add_argument('-R', '--maxrank',
299                    dest='maxrank', type=int, metavar='RANK', default=30,
300                    help='Maximum/finishing rank.')
301     p.add_argument('-t', '--threads',
302                    dest='threads', type=int, metavar='NUM', default=1,
303                    help='Number of threads to create for indexing.')
304     p.add_argument('-v', '--verbose',
305                    dest='loglevel', action='count', default=0,
306                    help='Increase verbosity')
307
308     return p
309
310 if __name__ == '__main__':
311     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
312
313     options = nominatim_arg_parser().parse_args(sys.argv[1:])
314
315     log.setLevel(max(3 - options.loglevel, 0) * 10)
316
317     options.password = None
318     if options.password_prompt:
319         password = getpass.getpass("Database password: ")
320         options.password = password
321
322     Indexer(options).run()