LOG.warning("Starting indexing boundaries using %s threads",
self.num_threads)
- for rank in range(max(minrank, 4), min(maxrank, 26)):
- self._index(runners.BoundaryRunner(rank))
+ with self.tokenizer.name_analyzer() as analyzer:
+ for rank in range(max(minrank, 4), min(maxrank, 26)):
+ self._index(runners.BoundaryRunner(rank, analyzer))
def index_by_rank(self, minrank, maxrank):
""" Index all entries of placex in the given rank range (inclusive)
LOG.warning("Starting indexing rank (%i to %i) using %i threads",
minrank, maxrank, self.num_threads)
- for rank in range(max(1, minrank), maxrank):
- self._index(runners.RankRunner(rank))
+ with self.tokenizer.name_analyzer() as analyzer:
+ for rank in range(max(1, minrank), maxrank):
+ self._index(runners.RankRunner(rank, analyzer))
- if maxrank == 30:
- self._index(runners.RankRunner(0))
- self._index(runners.InterpolationRunner(), 20)
- self._index(runners.RankRunner(30), 20)
- else:
- self._index(runners.RankRunner(maxrank))
+ if maxrank == 30:
+ self._index(runners.RankRunner(0, analyzer))
+ self._index(runners.InterpolationRunner(), 20)
+ self._index(runners.RankRunner(30, analyzer), 20)
+ else:
+ self._index(runners.RankRunner(maxrank, analyzer))
def index_postcodes(self):
Mix-ins that provide the actual commands for the indexer for various indexing
tasks.
"""
+import functools
+
+import psycopg2.extras
+
# pylint: disable=C0111
class AbstractPlacexRunner:
"""
SELECT_SQL = 'SELECT place_id, (placex_prepare_update(placex)).* FROM placex'
- def __init__(self, rank):
+ def __init__(self, rank, analyzer):
self.rank = rank
- self._sql_terms = 0
- self._cached_index_sql = None
+ self.analyzer = analyzer
- def _index_sql(self, num_places):
- if num_places != self._sql_terms:
- self._cached_index_sql = \
- """ UPDATE placex
- SET indexed_status = 0, address = v.addr
- FROM (VALUES {}) as v(id, addr)
- WHERE place_id = v.id
- """.format(','.join(["(%s, %s::hstore)"] * num_places))
- self._sql_terms = num_places
- return self._cached_index_sql
+ @staticmethod
+ @functools.lru_cache(maxsize=1)
+ def _index_sql(num_places):
+ return """ UPDATE placex
+ SET indexed_status = 0, address = v.addr, token_info = v.ti
+ FROM (VALUES {}) as v(id, addr, ti)
+ WHERE place_id = v.id
+ """.format(','.join(["(%s, %s::hstore, %s::json)"] * num_places))
def index_places(self, worker, places):
values = []
for place in places:
values.extend((place[x] for x in ('place_id', 'address')))
+ values.append(psycopg2.extras.Json(self.analyzer.process_place(place)))
worker.perform(self._index_sql(len(places)), values)
import shutil
import psycopg2
+import psycopg2.extras
from nominatim.db.connection import connect
from nominatim.db import properties
self._save_config(conn, config)
+ def name_analyzer(self):
+ """ Create a new analyzer for tokenizing names and queries
+ using this tokinzer. Analyzers are context managers and should
+ be used accordingly:
+
+ ```
+ with tokenizer.name_analyzer() as analyzer:
+ analyser.tokenize()
+ ```
+
+ When used outside the with construct, the caller must ensure to
+ call the close() function before destructing the analyzer.
+
+ Analyzers are not thread-safe. You need to instantiate one per thread.
+ """
+ return LegacyNameAnalyzer(self.dsn)
+
+
def _init_db_tables(self, config):
""" Set up the word table and fill it with pre-computed word
frequencies.
"""
properties.set_property(conn, DBCFG_NORMALIZATION, self.normalization)
properties.set_property(conn, DBCFG_MAXWORDFREQ, config.MAX_WORD_FREQUENCY)
+
+
+
+class LegacyNameAnalyzer:
+ """ The legacy analyzer uses the special Postgresql module for
+ splitting names.
+
+ Each instance opens a connection to the database to request the
+ normalization.
+ """
+
+ def __init__(self, dsn):
+ self.conn = connect(dsn).connection
+ self.conn.autocommit = True
+ psycopg2.extras.register_hstore(self.conn)
+
+
+ def __enter__(self):
+ return self
+
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+
+
+ def close(self):
+ """ Free all resources used by the analyzer.
+ """
+ if self.conn:
+ self.conn.close()
+ self.conn = None
+
+ def process_place(self, place):
+ """ Determine tokenizer information about the given place.
+
+ Returns a JSON-serialisable structure that will be handed into
+ the database via the token_info field.
+ """
+ return {}
def init_from_project(self):
assert self.init_state == None
self.init_state = "loaded"
+
+
+ def name_analyzer(self):
+ return DummyNameAnalyzer()
+
+
+class DummyNameAnalyzer:
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+
+
+ def close(self):
+ """ Free all resources used by the analyzer.
+ """
+ pass
+
+ def process_place(self, place):
+ """ Determine tokenizer information about the given place.
+
+ Returns a JSON-serialisable structure that will be handed into
+ the database via the token_info field.
+ """
+ return {}
partition SMALLINT,
admin_level SMALLINT,
address HSTORE,
+ token_info JSONB,
geometry_sector INTEGER)""")
cur.execute("""CREATE TABLE location_property_osmline (
place_id BIGINT,