]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/indexer/runners.py
move add-data subcommand into a separate file
[nominatim.git] / nominatim / indexer / runners.py
index 3c853cd0a19130b8bda5ab82ed539193bf75fa40..068d7d0fa63edba9a45fe7aef9195d06d164996a 100644 (file)
@@ -2,63 +2,93 @@
 Mix-ins that provide the actual commands for the indexer for various indexing
 tasks.
 """
+import functools
+
+import psycopg2.extras
+from psycopg2 import sql as pysql
+
 # pylint: disable=C0111
 
-class RankRunner:
-    """ Returns SQL commands for indexing one rank within the placex table.
+def _mk_valuelist(template, num):
+    return pysql.SQL(',').join([pysql.SQL(template)] * num)
+
+class AbstractPlacexRunner:
+    """ Returns SQL commands for indexing of the placex table.
     """
+    SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ')
 
-    def __init__(self, rank):
+    def __init__(self, rank, analyzer):
         self.rank = rank
+        self.analyzer = analyzer
+
+
+    @staticmethod
+    @functools.lru_cache(maxsize=1)
+    def _index_sql(num_places):
+        return pysql.SQL(
+            """ UPDATE placex
+                SET indexed_status = 0, address = v.addr, token_info = v.ti
+                FROM (VALUES {}) as v(id, addr, ti)
+                WHERE place_id = v.id
+            """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))
+
+
+    @staticmethod
+    def get_place_details(worker, ids):
+        worker.perform("""SELECT place_id, (placex_prepare_update(placex)).*
+                          FROM placex WHERE place_id IN %s""",
+                       (tuple((p[0] for p in ids)), ))
+
+
+    def index_places(self, worker, places):
+        values = []
+        for place in places:
+            values.extend((place[x] for x in ('place_id', 'address')))
+            values.append(psycopg2.extras.Json(self.analyzer.process_place(place)))
+
+        worker.perform(self._index_sql(len(places)), values)
+
+
+class RankRunner(AbstractPlacexRunner):
+    """ Returns SQL commands for indexing one rank within the placex table.
+    """
 
     def name(self):
         return "rank {}".format(self.rank)
 
     def sql_count_objects(self):
-        return """SELECT count(*) FROM placex
-                  WHERE rank_address = {} and indexed_status > 0
-               """.format(self.rank)
+        return pysql.SQL("""SELECT count(*) FROM placex
+                            WHERE rank_address = {} and indexed_status > 0
+                         """).format(pysql.Literal(self.rank))
 
     def sql_get_objects(self):
-        return """SELECT place_id FROM placex
-                  WHERE indexed_status > 0 and rank_address = {}
-                  ORDER BY geometry_sector""".format(self.rank)
-
-    @staticmethod
-    def sql_index_place(ids):
-        return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
-               .format(','.join((str(i) for i in ids)))
+        return self.SELECT_SQL + pysql.SQL(
+            """WHERE indexed_status > 0 and rank_address = {}
+               ORDER BY geometry_sector
+            """).format(pysql.Literal(self.rank))
 
 
-class BoundaryRunner:
+class BoundaryRunner(AbstractPlacexRunner):
     """ Returns SQL commands for indexing the administrative boundaries
         of a certain rank.
     """
 
-    def __init__(self, rank):
-        self.rank = rank
-
     def name(self):
         return "boundaries rank {}".format(self.rank)
 
     def sql_count_objects(self):
-        return """SELECT count(*) FROM placex
-                  WHERE indexed_status > 0
-                    AND rank_search = {}
-                    AND class = 'boundary' and type = 'administrative'
-               """.format(self.rank)
+        return pysql.SQL("""SELECT count(*) FROM placex
+                            WHERE indexed_status > 0
+                              AND rank_search = {}
+                              AND class = 'boundary' and type = 'administrative'
+                         """).format(pysql.Literal(self.rank))
 
     def sql_get_objects(self):
-        return """SELECT place_id FROM placex
-                  WHERE indexed_status > 0 and rank_search = {}
-                        and class = 'boundary' and type = 'administrative'
-                  ORDER BY partition, admin_level
-               """.format(self.rank)
-
-    @staticmethod
-    def sql_index_place(ids):
-        return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
-               .format(','.join((str(i) for i in ids)))
+        return self.SELECT_SQL + pysql.SQL(
+            """WHERE indexed_status > 0 and rank_search = {}
+                     and class = 'boundary' and type = 'administrative'
+               ORDER BY partition, admin_level
+            """).format(pysql.Literal(self.rank))
 
 
 class InterpolationRunner:
@@ -66,6 +96,10 @@ class InterpolationRunner:
         location_property_osmline.
     """
 
+    def __init__(self, analyzer):
+        self.analyzer = analyzer
+
+
     @staticmethod
     def name():
         return "interpolation lines (location_property_osmline)"
@@ -77,15 +111,37 @@ class InterpolationRunner:
 
     @staticmethod
     def sql_get_objects():
-        return """SELECT place_id FROM location_property_osmline
+        return """SELECT place_id
+                  FROM location_property_osmline
                   WHERE indexed_status > 0
                   ORDER BY geometry_sector"""
 
+
+    @staticmethod
+    def get_place_details(worker, ids):
+        worker.perform("""SELECT place_id, get_interpolation_address(address, osm_id) as address
+                          FROM location_property_osmline WHERE place_id IN %s""",
+                       (tuple((p[0] for p in ids)), ))
+
+
     @staticmethod
-    def sql_index_place(ids):
-        return """UPDATE location_property_osmline
-                  SET indexed_status = 0 WHERE place_id IN ({})
-               """.format(','.join((str(i) for i in ids)))
+    @functools.lru_cache(maxsize=1)
+    def _index_sql(num_places):
+        return pysql.SQL("""UPDATE location_property_osmline
+                            SET indexed_status = 0, address = v.addr, token_info = v.ti
+                            FROM (VALUES {}) as v(id, addr, ti)
+                            WHERE place_id = v.id
+                         """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))
+
+
+    def index_places(self, worker, places):
+        values = []
+        for place in places:
+            values.extend((place[x] for x in ('place_id', 'address')))
+            values.append(psycopg2.extras.Json(self.analyzer.process_place(place)))
+
+        worker.perform(self._index_sql(len(places)), values)
+
 
 
 class PostcodeRunner:
@@ -107,7 +163,7 @@ class PostcodeRunner:
                   ORDER BY country_code, postcode"""
 
     @staticmethod
-    def sql_index_place(ids):
-        return """UPDATE location_postcode SET indexed_status = 0
-                  WHERE place_id IN ({})
-               """.format(','.join((str(i) for i in ids)))
+    def index_places(worker, ids):
+        worker.perform(pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
+                                    WHERE place_id IN ({})""")
+                       .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in ids))))