]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
61971497f957f15aeda81480f68e0d25cba32593
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 # pylint: disable=C0111
5 import logging
6 import select
7
8 import psycopg2
9
10 from .progress import ProgressLogger
11 from ..db.async_connection import DBConnection
12
13 LOG = logging.getLogger()
14
15 class RankRunner:
16     """ Returns SQL commands for indexing one rank within the placex table.
17     """
18
19     def __init__(self, rank):
20         self.rank = rank
21
22     def name(self):
23         return "rank {}".format(self.rank)
24
25     def sql_count_objects(self):
26         return """SELECT count(*) FROM placex
27                   WHERE rank_address = {} and indexed_status > 0
28                """.format(self.rank)
29
30     def sql_get_objects(self):
31         return """SELECT place_id FROM placex
32                   WHERE indexed_status > 0 and rank_address = {}
33                   ORDER BY geometry_sector""".format(self.rank)
34
35     @staticmethod
36     def sql_index_place(ids):
37         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
38                .format(','.join((str(i) for i in ids)))
39
40
41 class InterpolationRunner:
42     """ Returns SQL commands for indexing the address interpolation table
43         location_property_osmline.
44     """
45
46     @staticmethod
47     def name():
48         return "interpolation lines (location_property_osmline)"
49
50     @staticmethod
51     def sql_count_objects():
52         return """SELECT count(*) FROM location_property_osmline
53                   WHERE indexed_status > 0"""
54
55     @staticmethod
56     def sql_get_objects():
57         return """SELECT place_id FROM location_property_osmline
58                   WHERE indexed_status > 0
59                   ORDER BY geometry_sector"""
60
61     @staticmethod
62     def sql_index_place(ids):
63         return """UPDATE location_property_osmline
64                   SET indexed_status = 0 WHERE place_id IN ({})
65                """.format(','.join((str(i) for i in ids)))
66
67 class BoundaryRunner:
68     """ Returns SQL commands for indexing the administrative boundaries
69         of a certain rank.
70     """
71
72     def __init__(self, rank):
73         self.rank = rank
74
75     def name(self):
76         return "boundaries rank {}".format(self.rank)
77
78     def sql_count_objects(self):
79         return """SELECT count(*) FROM placex
80                   WHERE indexed_status > 0
81                     AND rank_search = {}
82                     AND class = 'boundary' and type = 'administrative'
83                """.format(self.rank)
84
85     def sql_get_objects(self):
86         return """SELECT place_id FROM placex
87                   WHERE indexed_status > 0 and rank_search = {}
88                         and class = 'boundary' and type = 'administrative'
89                   ORDER BY partition, admin_level
90                """.format(self.rank)
91
92     @staticmethod
93     def sql_index_place(ids):
94         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
95                .format(','.join((str(i) for i in ids)))
96
97
98 class PostcodeRunner:
99     """ Provides the SQL commands for indexing the location_postcode table.
100     """
101
102     @staticmethod
103     def name():
104         return "postcodes (location_postcode)"
105
106     @staticmethod
107     def sql_count_objects():
108         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
109
110     @staticmethod
111     def sql_get_objects():
112         return """SELECT place_id FROM location_postcode
113                   WHERE indexed_status > 0
114                   ORDER BY country_code, postcode"""
115
116     @staticmethod
117     def sql_index_place(ids):
118         return """UPDATE location_postcode SET indexed_status = 0
119                   WHERE place_id IN ({})
120                """.format(','.join((str(i) for i in ids)))
121
122 class Indexer:
123     """ Main indexing routine.
124     """
125
126     def __init__(self, dsn, num_threads):
127         self.conn = psycopg2.connect(dsn)
128         self.threads = [DBConnection(dsn) for _ in range(num_threads)]
129
130
131     def index_full(self, analyse=True):
132         """ Index the complete database. This will first index boudnaries
133             followed by all other objects. When `analyse` is True, then the
134             database will be analysed at the appropriate places to
135             ensure that database statistics are updated.
136         """
137         self.index_by_rank(0, 4)
138         self._analyse_db_if(analyse)
139
140         self.index_boundaries(0, 30)
141         self._analyse_db_if(analyse)
142
143         self.index_by_rank(5, 25)
144         self._analyse_db_if(analyse)
145
146         self.index_by_rank(26, 30)
147         self._analyse_db_if(analyse)
148
149         self.index_postcodes()
150         self._analyse_db_if(analyse)
151
152     def _analyse_db_if(self, condition):
153         if condition:
154             with self.conn.cursor() as cur:
155                 cur.execute('ANALYSE')
156
157     def index_boundaries(self, minrank, maxrank):
158         """ Index only administrative boundaries within the given rank range.
159         """
160         LOG.warning("Starting indexing boundaries using %s threads",
161                     len(self.threads))
162
163         for rank in range(max(minrank, 4), min(maxrank, 26)):
164             self.index(BoundaryRunner(rank))
165
166     def index_by_rank(self, minrank, maxrank):
167         """ Index all entries of placex in the given rank range (inclusive)
168             in order of their address rank.
169
170             When rank 30 is requested then also interpolations and
171             places with address rank 0 will be indexed.
172         """
173         maxrank = min(maxrank, 30)
174         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
175                     minrank, maxrank, len(self.threads))
176
177         for rank in range(max(1, minrank), maxrank):
178             self.index(RankRunner(rank))
179
180         if maxrank == 30:
181             self.index(RankRunner(0))
182             self.index(InterpolationRunner(), 20)
183             self.index(RankRunner(30), 20)
184         else:
185             self.index(RankRunner(maxrank))
186
187
188     def index_postcodes(self):
189         """Index the entries ofthe location_postcode table.
190         """
191         self.index(PostcodeRunner(), 20)
192
193     def update_status_table(self):
194         """ Update the status in the status table to 'indexed'.
195         """
196         with self.conn.cursor() as cur:
197             cur.execute('UPDATE import_status SET indexed = true')
198         self.conn.commit()
199
200     def index(self, obj, batch=1):
201         """ Index a single rank or table. `obj` describes the SQL to use
202             for indexing. `batch` describes the number of objects that
203             should be processed with a single SQL statement
204         """
205         LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
206
207         cur = self.conn.cursor()
208         cur.execute(obj.sql_count_objects())
209
210         total_tuples = cur.fetchone()[0]
211         LOG.debug("Total number of rows: %i", total_tuples)
212
213         cur.close()
214
215         progress = ProgressLogger(obj.name(), total_tuples)
216
217         if total_tuples > 0:
218             cur = self.conn.cursor(name='places')
219             cur.execute(obj.sql_get_objects())
220
221             next_thread = self.find_free_thread()
222             while True:
223                 places = [p[0] for p in cur.fetchmany(batch)]
224                 if not places:
225                     break
226
227                 LOG.debug("Processing places: %s", str(places))
228                 thread = next(next_thread)
229
230                 thread.perform(obj.sql_index_place(places))
231                 progress.add(len(places))
232
233             cur.close()
234
235             for thread in self.threads:
236                 thread.wait()
237
238         progress.done()
239
240     def find_free_thread(self):
241         """ Generator that returns the next connection that is free for
242             sending a query.
243         """
244         ready = self.threads
245         command_stat = 0
246
247         while True:
248             for thread in ready:
249                 if thread.is_done():
250                     command_stat += 1
251                     yield thread
252
253             # refresh the connections occasionaly to avoid potential
254             # memory leaks in Postgresql.
255             if command_stat > 100000:
256                 for thread in self.threads:
257                     while not thread.is_done():
258                         thread.wait()
259                     thread.connect()
260                 command_stat = 0
261                 ready = self.threads
262             else:
263                 ready, _, _ = select.select(self.threads, [], [])
264
265         assert False, "Unreachable code"