Request and process multiple place_ids at once so that
Postgres can make better use of caching and there are less
transactions running.
self.total_places = total
self.done_places = 0
self.rank_start_time = datetime.now()
self.total_places = total
self.done_places = 0
self.rank_start_time = datetime.now()
- self.next_info = 50 if log.isEnabledFor(logging.INFO) else total + 1
+ self.next_info = 100 if log.isEnabledFor(logging.INFO) else total + 1
def add(self, num=1):
""" Mark `num` places as processed. Print a log message if the
def add(self, num=1):
""" Mark `num` places as processed. Print a log message if the
WHERE indexed_status > 0 and rank_search = {}
ORDER BY geometry_sector""".format(self.rank)
WHERE indexed_status > 0 and rank_search = {}
ORDER BY geometry_sector""".format(self.rank)
- def sql_index_place(self):
- return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s"
+ def sql_index_place(self, ids):
+ return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
+ .format(','.join((str(i) for i in ids)))
class InterpolationRunner(object):
class InterpolationRunner(object):
WHERE indexed_status > 0
ORDER BY geometry_sector"""
WHERE indexed_status > 0
ORDER BY geometry_sector"""
- def sql_index_place(self):
+ def sql_index_place(self, ids):
return """UPDATE location_property_osmline
return """UPDATE location_property_osmline
- SET indexed_status = 0 WHERE place_id = %s"""
+ SET indexed_status = 0 WHERE place_id IN ({})"""\
+ .format(','.join((str(i) for i in ids)))
class DBConnection(object):
class DBConnection(object):
self.index(RankRunner(rank))
if self.maxrank == 30:
self.index(RankRunner(rank))
if self.maxrank == 30:
- self.index(InterpolationRunner())
+ self.index(InterpolationRunner(), 20)
- self.index(RankRunner(self.maxrank))
+ self.index(RankRunner(self.maxrank), 20)
+ def index(self, obj, batch=1):
""" Index a single rank or table. `obj` describes the SQL to use
""" Index a single rank or table. `obj` describes the SQL to use
+ for indexing. `batch` describes the number of objects that
+ should be processed with a single SQL statement
"""
log.warning("Starting {}".format(obj.name()))
"""
log.warning("Starting {}".format(obj.name()))
cur = self.conn.cursor(name='places')
cur.execute(obj.sql_get_objects())
cur = self.conn.cursor(name='places')
cur.execute(obj.sql_get_objects())
- for place in cur:
- place_id = place[0]
- log.debug("Processing place {}".format(place_id))
+ while True:
+ places = [p[0] for p in cur.fetchmany(batch)]
+ if len(places) == 0:
+ break
+
+ log.debug("Processing places: {}".format(places))
thread = next(next_thread)
thread = next(next_thread)
- thread.perform(obj.sql_index_place(), (place_id,))
- progress.add()
+ thread.perform(obj.sql_index_place(places))
+ progress.add(len(places))