2 Main work horse for indexing (computing addresses) the database.
4 # pylint: disable=C0111
10 from .progress import ProgressLogger
11 from ..db.async_connection import DBConnection
13 LOG = logging.getLogger()
16 """ Returns SQL commands for indexing one rank within the placex table.
19 def __init__(self, rank):
23 return "rank {}".format(self.rank)
25 def sql_count_objects(self):
26 return """SELECT count(*) FROM placex
27 WHERE rank_address = {} and indexed_status > 0
30 def sql_get_objects(self):
31 return """SELECT place_id FROM placex
32 WHERE indexed_status > 0 and rank_address = {}
33 ORDER BY geometry_sector""".format(self.rank)
36 def sql_index_place(ids):
37 return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
38 .format(','.join((str(i) for i in ids)))
41 class InterpolationRunner:
42 """ Returns SQL commands for indexing the address interpolation table
43 location_property_osmline.
48 return "interpolation lines (location_property_osmline)"
51 def sql_count_objects():
52 return """SELECT count(*) FROM location_property_osmline
53 WHERE indexed_status > 0"""
56 def sql_get_objects():
57 return """SELECT place_id FROM location_property_osmline
58 WHERE indexed_status > 0
59 ORDER BY geometry_sector"""
62 def sql_index_place(ids):
63 return """UPDATE location_property_osmline
64 SET indexed_status = 0 WHERE place_id IN ({})
65 """.format(','.join((str(i) for i in ids)))
68 """ Returns SQL commands for indexing the administrative boundaries
72 def __init__(self, rank):
76 return "boundaries rank {}".format(self.rank)
78 def sql_count_objects(self):
79 return """SELECT count(*) FROM placex
80 WHERE indexed_status > 0
82 AND class = 'boundary' and type = 'administrative'
85 def sql_get_objects(self):
86 return """SELECT place_id FROM placex
87 WHERE indexed_status > 0 and rank_search = {}
88 and class = 'boundary' and type = 'administrative'
89 ORDER BY partition, admin_level
93 def sql_index_place(ids):
94 return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
95 .format(','.join((str(i) for i in ids)))
99 """ Provides the SQL commands for indexing the location_postcode table.
104 return "postcodes (location_postcode)"
107 def sql_count_objects():
108 return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
111 def sql_get_objects():
112 return """SELECT place_id FROM location_postcode
113 WHERE indexed_status > 0
114 ORDER BY country_code, postcode"""
117 def sql_index_place(ids):
118 return """UPDATE location_postcode SET indexed_status = 0
119 WHERE place_id IN ({})
120 """.format(','.join((str(i) for i in ids)))
123 def _analyse_db_if(conn, condition):
125 with conn.cursor() as cur:
126 cur.execute('ANALYSE')
130 """ Main indexing routine.
133 def __init__(self, dsn, num_threads):
135 self.num_threads = num_threads
140 def _setup_connections(self):
141 self.conn = psycopg2.connect(self.dsn)
142 self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
145 def _close_connections(self):
150 for thread in self.threads:
155 def index_full(self, analyse=True):
156 """ Index the complete database. This will first index boudnaries
157 followed by all other objects. When `analyse` is True, then the
158 database will be analysed at the appropriate places to
159 ensure that database statistics are updated.
161 conn = psycopg2.connect(self.dsn)
164 self.index_by_rank(0, 4)
165 _analyse_db_if(conn, analyse)
167 self.index_boundaries(0, 30)
168 _analyse_db_if(conn, analyse)
170 self.index_by_rank(5, 25)
171 _analyse_db_if(conn, analyse)
173 self.index_by_rank(26, 30)
174 _analyse_db_if(conn, analyse)
176 self.index_postcodes()
177 _analyse_db_if(conn, analyse)
182 def index_boundaries(self, minrank, maxrank):
183 """ Index only administrative boundaries within the given rank range.
185 LOG.warning("Starting indexing boundaries using %s threads",
188 self._setup_connections()
191 for rank in range(max(minrank, 4), min(maxrank, 26)):
192 self.index(BoundaryRunner(rank))
194 self._close_connections()
196 def index_by_rank(self, minrank, maxrank):
197 """ Index all entries of placex in the given rank range (inclusive)
198 in order of their address rank.
200 When rank 30 is requested then also interpolations and
201 places with address rank 0 will be indexed.
203 maxrank = min(maxrank, 30)
204 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
205 minrank, maxrank, self.num_threads)
207 self._setup_connections()
210 for rank in range(max(1, minrank), maxrank):
211 self.index(RankRunner(rank))
214 self.index(RankRunner(0))
215 self.index(InterpolationRunner(), 20)
216 self.index(RankRunner(30), 20)
218 self.index(RankRunner(maxrank))
220 self._close_connections()
223 def index_postcodes(self):
224 """Index the entries ofthe location_postcode table.
226 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
228 self._setup_connections()
231 self.index(PostcodeRunner(), 20)
233 self._close_connections()
235 def update_status_table(self):
236 """ Update the status in the status table to 'indexed'.
238 conn = psycopg2.connect(self.dsn)
241 with conn.cursor() as cur:
242 cur.execute('UPDATE import_status SET indexed = true')
248 def index(self, obj, batch=1):
249 """ Index a single rank or table. `obj` describes the SQL to use
250 for indexing. `batch` describes the number of objects that
251 should be processed with a single SQL statement
253 LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
255 cur = self.conn.cursor()
256 cur.execute(obj.sql_count_objects())
258 total_tuples = cur.fetchone()[0]
259 LOG.debug("Total number of rows: %i", total_tuples)
263 progress = ProgressLogger(obj.name(), total_tuples)
266 cur = self.conn.cursor(name='places')
267 cur.execute(obj.sql_get_objects())
269 next_thread = self.find_free_thread()
271 places = [p[0] for p in cur.fetchmany(batch)]
275 LOG.debug("Processing places: %s", str(places))
276 thread = next(next_thread)
278 thread.perform(obj.sql_index_place(places))
279 progress.add(len(places))
283 for thread in self.threads:
288 def find_free_thread(self):
289 """ Generator that returns the next connection that is free for
301 # refresh the connections occasionaly to avoid potential
302 # memory leaks in Postgresql.
303 if command_stat > 100000:
304 for thread in self.threads:
305 while not thread.is_done():
311 ready, _, _ = select.select(self.threads, [], [])
313 assert False, "Unreachable code"