2 Main work horse for indexing (computing addresses) the database.
4 # pylint: disable=C0111
8 from .progress import ProgressLogger
9 from db.async_connection import DBConnection, make_connection
11 LOG = logging.getLogger()
14 """ Returns SQL commands for indexing one rank within the placex table.
17 def __init__(self, rank):
21 return "rank {}".format(self.rank)
23 def sql_count_objects(self):
24 return """SELECT count(*) FROM placex
25 WHERE rank_address = {} and indexed_status > 0
28 def sql_get_objects(self):
29 return """SELECT place_id FROM placex
30 WHERE indexed_status > 0 and rank_address = {}
31 ORDER BY geometry_sector""".format(self.rank)
34 def sql_index_place(ids):
35 return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
36 .format(','.join((str(i) for i in ids)))
39 class InterpolationRunner:
40 """ Returns SQL commands for indexing the address interpolation table
41 location_property_osmline.
46 return "interpolation lines (location_property_osmline)"
49 def sql_count_objects():
50 return """SELECT count(*) FROM location_property_osmline
51 WHERE indexed_status > 0"""
54 def sql_get_objects():
55 return """SELECT place_id FROM location_property_osmline
56 WHERE indexed_status > 0
57 ORDER BY geometry_sector"""
60 def sql_index_place(ids):
61 return """UPDATE location_property_osmline
62 SET indexed_status = 0 WHERE place_id IN ({})"""\
63 .format(','.join((str(i) for i in ids)))
66 """ Returns SQL commands for indexing the administrative boundaries
70 def __init__(self, rank):
74 return "boundaries rank {}".format(self.rank)
76 def sql_count_objects(self):
77 return """SELECT count(*) FROM placex
78 WHERE indexed_status > 0
80 AND class = 'boundary' and type = 'administrative'""".format(self.rank)
82 def sql_get_objects(self):
83 return """SELECT place_id FROM placex
84 WHERE indexed_status > 0 and rank_search = {}
85 and class = 'boundary' and type = 'administrative'
86 ORDER BY partition, admin_level""".format(self.rank)
89 def sql_index_place(ids):
90 return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
91 .format(','.join((str(i) for i in ids)))
94 """ Main indexing routine.
97 def __init__(self, opts):
98 self.minrank = max(1, opts.minrank)
99 self.maxrank = min(30, opts.maxrank)
100 self.conn = make_connection(opts)
101 self.threads = [DBConnection(opts) for _ in range(opts.threads)]
103 def index_boundaries(self):
104 LOG.warning("Starting indexing boundaries using %s threads",
107 for rank in range(max(self.minrank, 5), min(self.maxrank, 26)):
108 self.index(BoundaryRunner(rank))
110 def index_by_rank(self):
111 """ Run classic indexing by rank.
113 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
114 self.minrank, self.maxrank, len(self.threads))
116 for rank in range(max(1, self.minrank), self.maxrank):
117 self.index(RankRunner(rank))
119 if self.maxrank == 30:
120 self.index(RankRunner(0))
121 self.index(InterpolationRunner(), 20)
122 self.index(RankRunner(self.maxrank), 20)
124 self.index(RankRunner(self.maxrank))
126 def index(self, obj, batch=1):
127 """ Index a single rank or table. `obj` describes the SQL to use
128 for indexing. `batch` describes the number of objects that
129 should be processed with a single SQL statement
131 LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
133 cur = self.conn.cursor()
134 cur.execute(obj.sql_count_objects())
136 total_tuples = cur.fetchone()[0]
137 LOG.debug("Total number of rows: %i", total_tuples)
141 progress = ProgressLogger(obj.name(), total_tuples)
144 cur = self.conn.cursor(name='places')
145 cur.execute(obj.sql_get_objects())
147 next_thread = self.find_free_thread()
149 places = [p[0] for p in cur.fetchmany(batch)]
153 LOG.debug("Processing places: %s", str(places))
154 thread = next(next_thread)
156 thread.perform(obj.sql_index_place(places))
157 progress.add(len(places))
161 for thread in self.threads:
166 def find_free_thread(self):
167 """ Generator that returns the next connection that is free for
179 # refresh the connections occasionaly to avoid potential
180 # memory leaks in Postgresql.
181 if command_stat > 100000:
182 for thread in self.threads:
183 while not thread.is_done():
189 ready, _, _ = select.select(self.threads, [], [])
191 assert False, "Unreachable code"