- def __init__(self, dsn, num_threads):
- self.conn = psycopg2.connect(dsn)
- self.threads = [DBConnection(dsn) for _ in range(num_threads)]
+ def __init__(self, dsn: str, tokenizer: AbstractTokenizer, num_threads: int):
+ self.dsn = dsn
+ self.tokenizer = tokenizer
+ self.num_threads = num_threads
+
+
+ def has_pending(self) -> bool:
+ """ Check if any data still needs indexing.
+ This function must only be used after the import has finished.
+ Otherwise it will be very expensive.
+ """
+ with connect(self.dsn) as conn:
+ with conn.cursor() as cur:
+ cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
+ return cur.rowcount > 0
+
+
+ def index_full(self, analyse: bool = True) -> None:
+ """ Index the complete database. This will first index boundaries
+ followed by all other objects. When `analyse` is True, then the
+ database will be analysed at the appropriate places to
+ ensure that database statistics are updated.
+ """
+ with connect(self.dsn) as conn:
+ conn.autocommit = True
+
+ def _analyze() -> None:
+ if analyse:
+ with conn.cursor() as cur:
+ cur.execute('ANALYZE')
+
+ if self.index_by_rank(0, 4) > 0:
+ _analyze()
+
+ if self.index_boundaries(0, 30) > 100:
+ _analyze()
+
+ if self.index_by_rank(5, 25) > 100:
+ _analyze()
+
+ if self.index_by_rank(26, 30) > 1000:
+ _analyze()