]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/indexer/indexer.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / nominatim / indexer / indexer.py
index 61971497f957f15aeda81480f68e0d25cba32593..06c05e1d5a49e32a7e2c44290339d19c1344b548 100644 (file)
@@ -119,13 +119,37 @@ class PostcodeRunner:
                   WHERE place_id IN ({})
                """.format(','.join((str(i) for i in ids)))
 
                   WHERE place_id IN ({})
                """.format(','.join((str(i) for i in ids)))
 
+
+def _analyse_db_if(conn, condition):
+    if condition:
+        with conn.cursor() as cur:
+            cur.execute('ANALYSE')
+
+
 class Indexer:
     """ Main indexing routine.
     """
 
     def __init__(self, dsn, num_threads):
 class Indexer:
     """ Main indexing routine.
     """
 
     def __init__(self, dsn, num_threads):
-        self.conn = psycopg2.connect(dsn)
-        self.threads = [DBConnection(dsn) for _ in range(num_threads)]
+        self.dsn = dsn
+        self.num_threads = num_threads
+        self.conn = None
+        self.threads = []
+
+
+    def _setup_connections(self):
+        self.conn = psycopg2.connect(self.dsn)
+        self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
+
+
+    def _close_connections(self):
+        if self.conn:
+            self.conn.close()
+            self.conn = None
+
+        for thread in self.threads:
+            thread.close()
+        self.threads = []
 
 
     def index_full(self, analyse=True):
 
 
     def index_full(self, analyse=True):
@@ -134,34 +158,41 @@ class Indexer:
             database will be analysed at the appropriate places to
             ensure that database statistics are updated.
         """
             database will be analysed at the appropriate places to
             ensure that database statistics are updated.
         """
-        self.index_by_rank(0, 4)
-        self._analyse_db_if(analyse)
+        conn = psycopg2.connect(self.dsn)
+        conn.autocommit = True
 
 
-        self.index_boundaries(0, 30)
-        self._analyse_db_if(analyse)
+        try:
+            self.index_by_rank(0, 4)
+            _analyse_db_if(conn, analyse)
 
 
-        self.index_by_rank(5, 25)
-        self._analyse_db_if(analyse)
+            self.index_boundaries(0, 30)
+            _analyse_db_if(conn, analyse)
 
 
-        self.index_by_rank(26, 30)
-        self._analyse_db_if(analyse)
+            self.index_by_rank(5, 25)
+            _analyse_db_if(conn, analyse)
 
 
-        self.index_postcodes()
-        self._analyse_db_if(analyse)
+            self.index_by_rank(26, 30)
+            _analyse_db_if(conn, analyse)
+
+            self.index_postcodes()
+            _analyse_db_if(conn, analyse)
+        finally:
+            conn.close()
 
 
-    def _analyse_db_if(self, condition):
-        if condition:
-            with self.conn.cursor() as cur:
-                cur.execute('ANALYSE')
 
     def index_boundaries(self, minrank, maxrank):
         """ Index only administrative boundaries within the given rank range.
         """
         LOG.warning("Starting indexing boundaries using %s threads",
 
     def index_boundaries(self, minrank, maxrank):
         """ Index only administrative boundaries within the given rank range.
         """
         LOG.warning("Starting indexing boundaries using %s threads",
-                    len(self.threads))
+                    self.num_threads)
+
+        self._setup_connections()
 
 
-        for rank in range(max(minrank, 4), min(maxrank, 26)):
-            self.index(BoundaryRunner(rank))
+        try:
+            for rank in range(max(minrank, 4), min(maxrank, 26)):
+                self.index(BoundaryRunner(rank))
+        finally:
+            self._close_connections()
 
     def index_by_rank(self, minrank, maxrank):
         """ Index all entries of placex in the given rank range (inclusive)
 
     def index_by_rank(self, minrank, maxrank):
         """ Index all entries of placex in the given rank range (inclusive)
@@ -172,30 +203,48 @@ class Indexer:
         """
         maxrank = min(maxrank, 30)
         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
         """
         maxrank = min(maxrank, 30)
         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
-                    minrank, maxrank, len(self.threads))
+                    minrank, maxrank, self.num_threads)
 
 
-        for rank in range(max(1, minrank), maxrank):
-            self.index(RankRunner(rank))
+        self._setup_connections()
 
 
-        if maxrank == 30:
-            self.index(RankRunner(0))
-            self.index(InterpolationRunner(), 20)
-            self.index(RankRunner(30), 20)
-        else:
-            self.index(RankRunner(maxrank))
+        try:
+            for rank in range(max(1, minrank), maxrank):
+                self.index(RankRunner(rank))
+
+            if maxrank == 30:
+                self.index(RankRunner(0))
+                self.index(InterpolationRunner(), 20)
+                self.index(RankRunner(30), 20)
+            else:
+                self.index(RankRunner(maxrank))
+        finally:
+            self._close_connections()
 
 
     def index_postcodes(self):
         """Index the entries ofthe location_postcode table.
         """
 
 
     def index_postcodes(self):
         """Index the entries ofthe location_postcode table.
         """
-        self.index(PostcodeRunner(), 20)
+        LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
+
+        self._setup_connections()
+
+        try:
+            self.index(PostcodeRunner(), 20)
+        finally:
+            self._close_connections()
 
     def update_status_table(self):
         """ Update the status in the status table to 'indexed'.
         """
 
     def update_status_table(self):
         """ Update the status in the status table to 'indexed'.
         """
-        with self.conn.cursor() as cur:
-            cur.execute('UPDATE import_status SET indexed = true')
-        self.conn.commit()
+        conn = psycopg2.connect(self.dsn)
+
+        try:
+            with conn.cursor() as cur:
+                cur.execute('UPDATE import_status SET indexed = true')
+
+            conn.commit()
+        finally:
+            conn.close()
 
     def index(self, obj, batch=1):
         """ Index a single rank or table. `obj` describes the SQL to use
 
     def index(self, obj, batch=1):
         """ Index a single rank or table. `obj` describes the SQL to use