+++ /dev/null
--- SPDX-License-Identifier: GPL-2.0-only
--- This file is part of Nominatim. (https://nominatim.org)
--- Copyright (C) 2022 by the Nominatim developer community.
--- For a full list of authors see the git log.
--- Get tokens used for searching the given place.
--- These are the tokens that will be saved in the search_name table.
-CREATE OR REPLACE FUNCTION token_get_name_search_tokens(info JSONB)
-AS $$
- SELECT (info->>'names')::INTEGER[]
--- Get tokens for matching the place name against others.
--- This should usually be restricted to full name tokens.
-CREATE OR REPLACE FUNCTION token_get_name_match_tokens(info JSONB)
-AS $$
- SELECT (info->>'names')::INTEGER[]
--- Return the housenumber tokens applicable for the place.
-CREATE OR REPLACE FUNCTION token_get_housenumber_search_tokens(info JSONB)
-AS $$
- SELECT (info->>'hnr_tokens')::INTEGER[]
--- Return the housenumber in the form that it can be matched during search.
-CREATE OR REPLACE FUNCTION token_normalized_housenumber(info JSONB)
-AS $$
- SELECT info->>'hnr';
-CREATE OR REPLACE FUNCTION token_is_street_address(info JSONB)
-AS $$
- SELECT info->>'street' is not null or info->>'place_search' is null;
-CREATE OR REPLACE FUNCTION token_has_addr_street(info JSONB)
-AS $$
- SELECT info->>'street' is not null and info->>'street' != '{}';
-CREATE OR REPLACE FUNCTION token_has_addr_place(info JSONB)
-AS $$
- SELECT info->>'place_match' is not null;
-CREATE OR REPLACE FUNCTION token_matches_street(info JSONB, street_tokens INTEGER[])
-AS $$
- SELECT (info->>'street')::INTEGER[] && street_tokens
-CREATE OR REPLACE FUNCTION token_matches_place(info JSONB, place_tokens INTEGER[])
-AS $$
- SELECT (info->>'place_match')::INTEGER[] && place_tokens
-CREATE OR REPLACE FUNCTION token_addr_place_search_tokens(info JSONB)
-AS $$
- SELECT (info->>'place_search')::INTEGER[]
-CREATE OR REPLACE FUNCTION token_get_address_keys(info JSONB)
-AS $$
- SELECT * FROM jsonb_object_keys(info->'addr');
-CREATE OR REPLACE FUNCTION token_get_address_search_tokens(info JSONB, key TEXT)
-AS $$
- SELECT (info->'addr'->key->>0)::INTEGER[];
-CREATE OR REPLACE FUNCTION token_matches_address(info JSONB, key TEXT, tokens INTEGER[])
-AS $$
- SELECT (info->'addr'->key->>1)::INTEGER[] && tokens;
-CREATE OR REPLACE FUNCTION token_get_postcode(info JSONB)
-AS $$
- SELECT info->>'postcode';
--- Return token info that should be saved permanently in the database.
-CREATE OR REPLACE FUNCTION token_strip_info(info JSONB)
-AS $$
---------------- private functions ----------------------------------------------
--- Functions for term normalisation and access to the 'word' table.
-CREATE OR REPLACE FUNCTION transliteration(text) RETURNS text
- AS '{{ modulepath }}/nominatim.so', 'transliteration'
-CREATE OR REPLACE FUNCTION gettokenstring(text) RETURNS text
- AS '{{ modulepath }}/nominatim.so', 'gettokenstring'
- AS $$
- o TEXT;
- o := public.gettokenstring(public.transliteration(name));
- RETURN trim(substr(o,1,length(o)));
--- returns NULL if the word is too common
-CREATE OR REPLACE FUNCTION getorcreate_word_id(lookup_word TEXT)
- AS $$
- lookup_token TEXT;
- return_word_id INTEGER;
- count INTEGER;
- lookup_token := trim(lookup_word);
- SELECT min(word_id), max(search_name_count) FROM word
- WHERE word_token = lookup_token and class is null and type is null
- INTO return_word_id, count;
- IF return_word_id IS NULL THEN
- return_word_id := nextval('seq_word');
- INSERT INTO word VALUES (return_word_id, lookup_token, null, null, null, null, 0);
- IF count > {{ max_word_freq }} THEN
- return_word_id := NULL;
- RETURN return_word_id;
-LANGUAGE plpgsql;
--- Create housenumber tokens from an OSM addr:housenumber.
--- The housnumber is split at comma and semicolon as necessary.
--- The function returns the normalized form of the housenumber suitable
--- for comparison.
-CREATE OR REPLACE FUNCTION create_housenumbers(housenumbers TEXT[],
- OUT tokens TEXT,
- OUT normtext TEXT)
- AS $$
- SELECT array_to_string(array_agg(trans), ';'), array_agg(tid)::TEXT
- INTO normtext, tokens
- FROM (SELECT lookup_word as trans, getorcreate_housenumber_id(lookup_word) as tid
- FROM (SELECT make_standard_name(h) as lookup_word
- FROM unnest(housenumbers) h) x) y;
-CREATE OR REPLACE FUNCTION getorcreate_housenumber_id(lookup_word TEXT)
- AS $$
- lookup_token TEXT;
- return_word_id INTEGER;
- lookup_token := ' ' || trim(lookup_word);
- SELECT min(word_id) FROM word
- WHERE word_token = lookup_token and class='place' and type='house'
- INTO return_word_id;
- IF return_word_id IS NULL THEN
- return_word_id := nextval('seq_word');
- INSERT INTO word VALUES (return_word_id, lookup_token, null,
- 'place', 'house', null, 0);
- RETURN return_word_id;
-LANGUAGE plpgsql;
-CREATE OR REPLACE FUNCTION create_postcode_id(postcode TEXT)
- AS $$
- lookup_token TEXT;
- return_word_id INTEGER;
- lookup_token := ' ' || make_standard_name(postcode);
- FOR r IN
- SELECT word_id FROM word
- WHERE word_token = lookup_token and word = postcode
- and class='place' and type='postcode'
- RETURN false;
- INSERT INTO word VALUES (nextval('seq_word'), lookup_token, postcode,
- 'place', 'postcode', null, 0);
- RETURN true;
-LANGUAGE plpgsql;
-CREATE OR REPLACE FUNCTION getorcreate_name_id(lookup_word TEXT, src_word TEXT)
- AS $$
- lookup_token TEXT;
- nospace_lookup_token TEXT;
- return_word_id INTEGER;
- lookup_token := ' '||trim(lookup_word);
- SELECT min(word_id) FROM word
- WHERE word_token = lookup_token and class is null and type is null
- INTO return_word_id;
- IF return_word_id IS NULL THEN
- return_word_id := nextval('seq_word');
- INSERT INTO word VALUES (return_word_id, lookup_token, src_word,
- null, null, null, 0);
- RETURN return_word_id;
-LANGUAGE plpgsql;
--- Normalize a string and lookup its word ids (partial words).
-CREATE OR REPLACE FUNCTION addr_ids_from_name(lookup_word TEXT)
- AS $$
- words TEXT[];
- return_word_id INTEGER[];
- word_ids INTEGER[];
- words := string_to_array(make_standard_name(lookup_word), ' ');
- IF array_upper(words, 1) IS NOT NULL THEN
- FOR j IN 1..array_upper(words, 1) LOOP
- IF (words[j] != '') THEN
- SELECT array_agg(word_id) INTO word_ids
- FROM word
- WHERE word_token = words[j] and class is null and type is null;
- IF word_ids IS NULL THEN
- id := nextval('seq_word');
- INSERT INTO word VALUES (id, words[j], null, null, null, null, 0);
- return_word_id := return_word_id || id;
- return_word_id := array_merge(return_word_id, word_ids);
- RETURN return_word_id;
-LANGUAGE plpgsql;
--- Normalize a string and look up its name ids (full words).
-CREATE OR REPLACE FUNCTION word_ids_from_name(lookup_word TEXT)
- AS $$
- lookup_token TEXT;
- return_word_ids INTEGER[];
- lookup_token := ' '|| make_standard_name(lookup_word);
- SELECT array_agg(word_id) FROM word
- WHERE word_token = lookup_token and class is null and type is null
- INTO return_word_ids;
- RETURN return_word_ids;
- AS $$
- result INTEGER[];
- s TEXT;
- words TEXT[];
- value TEXT;
- result := '{}'::INTEGER[];
- FOR value IN SELECT unnest(regexp_split_to_array(svals(src), E'[,;]')) LOOP
- -- full name
- s := make_standard_name(value);
- w := getorcreate_name_id(s, value);
- IF not(ARRAY[w] <@ result) THEN
- result := result || w;
- -- partial single-word terms
- words := string_to_array(s, ' ');
- IF array_upper(words, 1) IS NOT NULL THEN
- FOR j IN 1..array_upper(words, 1) LOOP
- IF (words[j] != '') THEN
- w = getorcreate_word_id(words[j]);
- IF w IS NOT NULL AND NOT (ARRAY[w] <@ result) THEN
- result := result || w;
- -- consider parts before an opening bracket a full word as well
- words := regexp_split_to_array(value, E'[(]');
- IF array_upper(words, 1) > 1 THEN
- s := make_standard_name(words[1]);
- IF s != '' THEN
- w := getorcreate_name_id(s, words[1]);
- IF w IS NOT NULL AND NOT (ARRAY[w] <@ result) THEN
- result := result || w;
- s := regexp_replace(value, '市$', '');
- IF s != value THEN
- s := make_standard_name(s);
- IF s != '' THEN
- w := getorcreate_name_id(s, value);
- IF NOT (ARRAY[w] <@ result) THEN
- result := result || w;
- RETURN result;
-LANGUAGE plpgsql;
-CREATE OR REPLACE FUNCTION precompute_words(src TEXT)
- AS $$
- s TEXT;
- words TEXT[];
- s := make_standard_name(src);
- w := getorcreate_name_id(s, src);
- w := getorcreate_word_id(s);
- words := string_to_array(s, ' ');
- IF array_upper(words, 1) IS NOT NULL THEN
- FOR j IN 1..array_upper(words, 1) LOOP
- IF (words[j] != '') THEN
- w := getorcreate_word_id(words[j]);
- words := regexp_split_to_array(src, E'[,;()]');
- IF array_upper(words, 1) != 1 THEN
- FOR j IN 1..array_upper(words, 1) LOOP
- s := make_standard_name(words[j]);
- IF s != '' THEN
- w := getorcreate_word_id(s);
- s := regexp_replace(src, '市$', '');
- IF s != src THEN
- s := make_standard_name(s);
- IF s != '' THEN
- w := getorcreate_name_id(s, src);
-LANGUAGE plpgsql;
+++ /dev/null
--- SPDX-License-Identifier: GPL-2.0-only
--- This file is part of Nominatim. (https://nominatim.org)
--- Copyright (C) 2022 by the Nominatim developer community.
--- For a full list of authors see the git log.
--- Required for details lookup.
-CREATE INDEX IF NOT EXISTS idx_word_word_id
- ON word USING BTREE (word_id) {{db.tablespace.search_index}};
+++ /dev/null
--- SPDX-License-Identifier: GPL-2.0-only
--- This file is part of Nominatim. (https://nominatim.org)
--- Copyright (C) 2022 by the Nominatim developer community.
--- For a full list of authors see the git log.
- word_id INTEGER,
- word_token text NOT NULL,
- word text,
- class text,
- type text,
- country_code varchar(2),
- search_name_count INTEGER,
- operator TEXT
-) {{db.tablespace.search_data}};
-CREATE INDEX idx_word_word_token ON word
- USING BTREE (word_token) {{db.tablespace.search_index}};
-CREATE INDEX idx_word_word ON word
- USING BTREE (word) {{db.tablespace.search_index}} WHERE word is not null;
-CREATE SEQUENCE seq_word start 1;
-GRANT SELECT ON seq_word to "{{config.DATABASE_WEBUSER}}";
+++ /dev/null
-# SPDX-License-Identifier: GPL-3.0-or-later
-# This file is part of Nominatim. (https://nominatim.org)
-# Copyright (C) 2024 by the Nominatim developer community.
-# For a full list of authors see the git log.
-Implementation of query analysis for the legacy tokenizer.
-from typing import Tuple, Dict, List, Optional, Iterator, Any, cast
-from copy import copy
-from collections import defaultdict
-import dataclasses
-import sqlalchemy as sa
-from ..typing import SaRow
-from ..connection import SearchConnection
-from ..logging import log
-from . import query as qmod
-from .query_analyzer_factory import AbstractQueryAnalyzer
-def yield_words(terms: List[str], start: int) -> Iterator[Tuple[str, qmod.TokenRange]]:
- """ Return all combinations of words in the terms list after the
- given position.
- """
- total = len(terms)
- for first in range(start, total):
- word = terms[first]
- yield word, qmod.TokenRange(first, first + 1)
- for last in range(first + 1, min(first + 20, total)):
- word = ' '.join((word, terms[last]))
- yield word, qmod.TokenRange(first, last + 1)
-class LegacyToken(qmod.Token):
- """ Specialised token for legacy tokenizer.
- """
- word_token: str
- category: Optional[Tuple[str, str]]
- country: Optional[str]
- operator: Optional[str]
- @property
- def info(self) -> Dict[str, Any]:
- """ Dictionary of additional properties of the token.
- Should only be used for debugging purposes.
- """
- return {'category': self.category,
- 'country': self.country,
- 'operator': self.operator}
- def get_category(self) -> Tuple[str, str]:
- assert self.category
- return self.category
-class LegacyQueryAnalyzer(AbstractQueryAnalyzer):
- """ Converter for query strings into a tokenized query
- using the tokens created by a legacy tokenizer.
- """
- def __init__(self, conn: SearchConnection) -> None:
- self.conn = conn
- async def setup(self) -> None:
- """ Set up static data structures needed for the analysis.
- """
- self.max_word_freq = int(await self.conn.get_property('tokenizer_maxwordfreq'))
- if 'word' not in self.conn.t.meta.tables:
- sa.Table('word', self.conn.t.meta,
- sa.Column('word_id', sa.Integer),
- sa.Column('word_token', sa.Text, nullable=False),
- sa.Column('word', sa.Text),
- sa.Column('class', sa.Text),
- sa.Column('type', sa.Text),
- sa.Column('country_code', sa.Text),
- sa.Column('search_name_count', sa.Integer),
- sa.Column('operator', sa.Text))
- async def analyze_query(self, phrases: List[qmod.Phrase]) -> qmod.QueryStruct:
- """ Analyze the given list of phrases and return the
- tokenized query.
- """
- log().section('Analyze query (using Legacy tokenizer)')
- normalized = []
- if phrases:
- for row in await self.conn.execute(sa.select(*(sa.func.make_standard_name(p.text)
- for p in phrases))):
- normalized = [qmod.Phrase(p.ptype, r) for r, p in zip(row, phrases) if r]
- break
- query = qmod.QueryStruct(normalized)
- log().var_dump('Normalized query', query.source)
- if not query.source:
- return query
- parts, words = self.split_query(query)
- lookup_words = list(words.keys())
- log().var_dump('Split query', parts)
- log().var_dump('Extracted words', lookup_words)
- for row in await self.lookup_in_db(lookup_words):
- for trange in words[row.word_token.strip()]:
- token, ttype = self.make_token(row)
- if ttype == qmod.TokenType.NEAR_ITEM:
- if trange.start == 0:
- query.add_token(trange, qmod.TokenType.NEAR_ITEM, token)
- elif ttype == qmod.TokenType.QUALIFIER:
- query.add_token(trange, qmod.TokenType.QUALIFIER, token)
- if trange.start == 0 or trange.end == query.num_token_slots():
- token = copy(token)
- token.penalty += 0.1 * (query.num_token_slots())
- query.add_token(trange, qmod.TokenType.NEAR_ITEM, token)
- elif ttype != qmod.TokenType.PARTIAL or trange.start + 1 == trange.end:
- query.add_token(trange, ttype, token)
- self.add_extra_tokens(query, parts)
- self.rerank_tokens(query)
- log().table_dump('Word tokens', _dump_word_tokens(query))
- return query
- def normalize_text(self, text: str) -> str:
- """ Bring the given text into a normalized form.
- This only removes case, so some difference with the normalization
- in the phrase remains.
- """
- return text.lower()
- def split_query(self, query: qmod.QueryStruct) -> Tuple[List[str],
- Dict[str, List[qmod.TokenRange]]]:
- """ Transliterate the phrases and split them into tokens.
- Returns a list of transliterated tokens and a dictionary
- of words for lookup together with their position.
- """
- parts: List[str] = []
- phrase_start = 0
- words = defaultdict(list)
- for phrase in query.source:
- query.nodes[-1].ptype = phrase.ptype
- for trans in phrase.text.split(' '):
- if trans:
- for term in trans.split(' '):
- if term:
- parts.append(trans)
- query.add_node(qmod.BreakType.TOKEN, phrase.ptype)
- query.nodes[-1].btype = qmod.BreakType.WORD
- query.nodes[-1].btype = qmod.BreakType.PHRASE
- for word, wrange in yield_words(parts, phrase_start):
- words[word].append(wrange)
- phrase_start = len(parts)
- query.nodes[-1].btype = qmod.BreakType.END
- return parts, words
- async def lookup_in_db(self, words: List[str]) -> 'sa.Result[Any]':
- """ Return the token information from the database for the
- given word tokens.
- """
- t = self.conn.t.meta.tables['word']
- sql = t.select().where(t.c.word_token.in_(words + [' ' + w for w in words]))
- return await self.conn.execute(sql)
- def make_token(self, row: SaRow) -> Tuple[LegacyToken, qmod.TokenType]:
- """ Create a LegacyToken from the row of the word table.
- Also determines the type of token.
- """
- penalty = 0.0
- is_indexed = True
- rowclass = getattr(row, 'class')
- if row.country_code is not None:
- ttype = qmod.TokenType.COUNTRY
- lookup_word = row.country_code
- elif rowclass is not None:
- if rowclass == 'place' and row.type == 'house':
- ttype = qmod.TokenType.HOUSENUMBER
- lookup_word = row.word_token[1:]
- elif rowclass == 'place' and row.type == 'postcode':
- ttype = qmod.TokenType.POSTCODE
- lookup_word = row.word
- else:
- ttype = qmod.TokenType.NEAR_ITEM if row.operator in ('in', 'near')\
- else qmod.TokenType.QUALIFIER
- lookup_word = row.word
- elif row.word_token.startswith(' '):
- ttype = qmod.TokenType.WORD
- lookup_word = row.word or row.word_token[1:]
- else:
- ttype = qmod.TokenType.PARTIAL
- lookup_word = row.word_token
- penalty = 0.21
- if row.search_name_count > self.max_word_freq:
- is_indexed = False
- return LegacyToken(penalty=penalty, token=row.word_id,
- count=max(1, row.search_name_count or 1),
- addr_count=1, # not supported
- lookup_word=lookup_word,
- word_token=row.word_token.strip(),
- category=(rowclass, row.type) if rowclass is not None else None,
- country=row.country_code,
- operator=row.operator,
- is_indexed=is_indexed),\
- ttype
- def add_extra_tokens(self, query: qmod.QueryStruct, parts: List[str]) -> None:
- """ Add tokens to query that are not saved in the database.
- """
- for part, node, i in zip(parts, query.nodes, range(1000)):
- if len(part) <= 4 and part.isdigit()\
- and not node.has_tokens(i+1, qmod.TokenType.HOUSENUMBER):
- query.add_token(qmod.TokenRange(i, i+1), qmod.TokenType.HOUSENUMBER,
- LegacyToken(penalty=0.5, token=0, count=1, addr_count=1,
- lookup_word=part, word_token=part,
- category=None, country=None,
- operator=None, is_indexed=True))
- def rerank_tokens(self, query: qmod.QueryStruct) -> None:
- """ Add penalties to tokens that depend on presence of other token.
- """
- for _, node, tlist in query.iter_token_lists():
- if tlist.ttype == qmod.TokenType.POSTCODE:
- for repl in node.starting:
- if repl.end == tlist.end and repl.ttype != qmod.TokenType.POSTCODE \
- and (repl.ttype != qmod.TokenType.HOUSENUMBER
- or len(tlist.tokens[0].lookup_word) > 4):
- repl.add_penalty(0.39)
- elif tlist.ttype == qmod.TokenType.HOUSENUMBER \
- and len(tlist.tokens[0].lookup_word) <= 3:
- if any(c.isdigit() for c in tlist.tokens[0].lookup_word):
- for repl in node.starting:
- if repl.end == tlist.end and repl.ttype != qmod.TokenType.HOUSENUMBER:
- repl.add_penalty(0.5 - tlist.tokens[0].penalty)
-def _dump_word_tokens(query: qmod.QueryStruct) -> Iterator[List[Any]]:
- yield ['type', 'token', 'word_token', 'lookup_word', 'penalty', 'count', 'info', 'indexed']
- for node in query.nodes:
- for tlist in node.starting:
- for token in tlist.tokens:
- t = cast(LegacyToken, token)
- yield [tlist.ttype.name, t.token, t.word_token or '',
- t.lookup_word or '', t.penalty, t.count, t.info,
- 'Y' if t.is_indexed else 'N']
-async def create_query_analyzer(conn: SearchConnection) -> AbstractQueryAnalyzer:
- """ Create and set up a new query analyzer for a database based
- on the ICU tokenizer.
- """
- out = LegacyQueryAnalyzer(conn)
- await out.setup()
- return out
+++ /dev/null
-# SPDX-License-Identifier: GPL-3.0-or-later
-# This file is part of Nominatim. (https://nominatim.org)
-# Copyright (C) 2024 by the Nominatim developer community.
-# For a full list of authors see the git log.
-Tokenizer implementing normalisation as used before Nominatim 4.
-from typing import Optional, Sequence, List, Tuple, Mapping, Any, Callable, \
- cast, Dict, Set, Iterable
-from collections import OrderedDict
-import logging
-from pathlib import Path
-import re
-import shutil
-from icu import Transliterator
-import psycopg
-from psycopg import sql as pysql
-from ..errors import UsageError
-from ..db.connection import connect, Connection, drop_tables, table_exists,\
- execute_scalar, register_hstore
-from ..config import Configuration
-from ..db import properties
-from ..db import utils as db_utils
-from ..db.sql_preprocessor import SQLPreprocessor
-from ..data.place_info import PlaceInfo
-from .base import AbstractAnalyzer, AbstractTokenizer
-DBCFG_NORMALIZATION = "tokenizer_normalization"
-DBCFG_MAXWORDFREQ = "tokenizer_maxwordfreq"
-LOG = logging.getLogger()
-def create(dsn: str, data_dir: Path) -> 'LegacyTokenizer':
- """ Create a new instance of the tokenizer provided by this module.
- """
- LOG.warning('WARNING: the legacy tokenizer is deprecated '
- 'and will be removed in Nominatim 5.0.')
- return LegacyTokenizer(dsn, data_dir)
-def _install_module(config_module_path: str, src_dir: Optional[Path], module_dir: Path) -> str:
- """ Copies the PostgreSQL normalisation module into the project
- directory if necessary. For historical reasons the module is
- saved in the '/module' subdirectory and not with the other tokenizer
- data.
- The function detects when the installation is run from the
- build directory. It doesn't touch the module in that case.
- """
- # Custom module locations are simply used as is.
- if config_module_path:
- LOG.info("Using custom path for database module at '%s'", config_module_path)
- return config_module_path
- # Otherwise a source dir must be given.
- if src_dir is None:
- raise UsageError("The legacy tokenizer cannot be used with the Nominatim pip module.")
- # Compatibility mode for builddir installations.
- if module_dir.exists() and src_dir.samefile(module_dir):
- LOG.info('Running from build directory. Leaving database module as is.')
- return str(module_dir)
- # In any other case install the module in the project directory.
- if not module_dir.exists():
- module_dir.mkdir()
- destfile = module_dir / 'nominatim.so'
- shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
- destfile.chmod(0o755)
- LOG.info('Database module installed at %s', str(destfile))
- return str(module_dir)
-def _check_module(module_dir: str, conn: Connection) -> None:
- """ Try to use the PostgreSQL module to confirm that it is correctly
- installed and accessible from PostgreSQL.
- """
- with conn.cursor() as cur:
- try:
- cur.execute(pysql.SQL("""CREATE FUNCTION nominatim_test_import_func(text)
- RETURNS text AS {}, 'transliteration'
- DROP FUNCTION nominatim_test_import_func(text)
- """).format(pysql.Literal(f'{module_dir}/nominatim.so')))
- except psycopg.DatabaseError as err:
- LOG.fatal("Error accessing database module: %s", err)
- raise UsageError("Database module cannot be accessed.") from err
-class LegacyTokenizer(AbstractTokenizer):
- """ The legacy tokenizer uses a special PostgreSQL module to normalize
- names and queries. The tokenizer thus implements normalization through
- calls to the database.
- """
- def __init__(self, dsn: str, data_dir: Path) -> None:
- self.dsn = dsn
- self.data_dir = data_dir
- self.normalization: Optional[str] = None
- def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
- """ Set up a new tokenizer for the database.
- This copies all necessary data in the project directory to make
- sure the tokenizer remains stable even over updates.
- """
- assert config.project_dir is not None
- module_dir = _install_module(config.DATABASE_MODULE_PATH,
- config.lib_dir.module,
- config.project_dir / 'module')
- self.normalization = config.TERM_NORMALIZATION
- with connect(self.dsn) as conn:
- _check_module(module_dir, conn)
- self._save_config(conn, config)
- conn.commit()
- if init_db:
- self.update_sql_functions(config)
- self._init_db_tables(config)
- def init_from_project(self, config: Configuration) -> None:
- """ Initialise the tokenizer from the project directory.
- """
- assert config.project_dir is not None
- with connect(self.dsn) as conn:
- self.normalization = properties.get_property(conn, DBCFG_NORMALIZATION)
- if not (config.project_dir / 'module' / 'nominatim.so').exists():
- _install_module(config.DATABASE_MODULE_PATH,
- config.lib_dir.module,
- config.project_dir / 'module')
- def finalize_import(self, config: Configuration) -> None:
- """ Do any required postprocessing to make the tokenizer data ready
- for use.
- """
- with connect(self.dsn) as conn:
- sqlp = SQLPreprocessor(conn, config)
- sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_indices.sql')
- def update_sql_functions(self, config: Configuration) -> None:
- """ Reimport the SQL functions for this tokenizer.
- """
- assert config.project_dir is not None
- with connect(self.dsn) as conn:
- max_word_freq = properties.get_property(conn, DBCFG_MAXWORDFREQ)
- modulepath = config.DATABASE_MODULE_PATH or \
- str((config.project_dir / 'module').resolve())
- sqlp = SQLPreprocessor(conn, config)
- sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer.sql',
- max_word_freq=max_word_freq,
- modulepath=modulepath)
- def check_database(self, _: Configuration) -> Optional[str]:
- """ Check that the tokenizer is set up correctly.
- """
- hint = """\
- The Postgresql extension nominatim.so was not correctly loaded.
- Error: {error}
- Hints:
- * Check the output of the CMmake/make installation step
- * Does nominatim.so exist?
- * Does nominatim.so exist on the database server?
- * Can nominatim.so be accessed by the database user?
- """
- with connect(self.dsn) as conn:
- try:
- out = execute_scalar(conn, "SELECT make_standard_name('a')")
- except psycopg.Error as err:
- return hint.format(error=str(err))
- if out != 'a':
- return hint.format(error='Unexpected result for make_standard_name()')
- return None
- def migrate_database(self, config: Configuration) -> None:
- """ Initialise the project directory of an existing database for
- use with this tokenizer.
- This is a special migration function for updating existing databases
- to new software versions.
- """
- assert config.project_dir is not None
- self.normalization = config.TERM_NORMALIZATION
- module_dir = _install_module(config.DATABASE_MODULE_PATH,
- config.lib_dir.module,
- config.project_dir / 'module')
- with connect(self.dsn) as conn:
- _check_module(module_dir, conn)
- self._save_config(conn, config)
- def update_statistics(self, config: Configuration, threads: int = 1) -> None:
- """ Recompute the frequency of full words.
- """
- with connect(self.dsn) as conn:
- if table_exists(conn, 'search_name'):
- drop_tables(conn, "word_frequencies")
- with conn.cursor() as cur:
- LOG.info("Computing word frequencies")
- cur.execute("""CREATE TEMP TABLE word_frequencies AS
- SELECT unnest(name_vector) as id, count(*)
- FROM search_name GROUP BY id""")
- cur.execute("CREATE INDEX ON word_frequencies(id)")
- LOG.info("Update word table with recomputed frequencies")
- cur.execute("""UPDATE word SET search_name_count = count
- FROM word_frequencies
- WHERE word_token like ' %' and word_id = id""")
- drop_tables(conn, "word_frequencies")
- conn.commit()
- def update_word_tokens(self) -> None:
- """ No house-keeping implemented for the legacy tokenizer.
- """
- LOG.info("No tokenizer clean-up available.")
- def name_analyzer(self) -> 'LegacyNameAnalyzer':
- """ Create a new analyzer for tokenizing names and queries
- using this tokinzer. Analyzers are context managers and should
- be used accordingly:
- ```
- with tokenizer.name_analyzer() as analyzer:
- analyser.tokenize()
- ```
- When used outside the with construct, the caller must ensure to
- call the close() function before destructing the analyzer.
- Analyzers are not thread-safe. You need to instantiate one per thread.
- """
- normalizer = Transliterator.createFromRules("phrase normalizer",
- self.normalization)
- return LegacyNameAnalyzer(self.dsn, normalizer)
- def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
- """ Return a list of the `num` most frequent full words
- in the database.
- """
- with conn.cursor() as cur:
- cur.execute(""" SELECT word FROM word WHERE word is not null
- ORDER BY search_name_count DESC LIMIT %s""", (num,))
- return list(s[0] for s in cur)
- def _init_db_tables(self, config: Configuration) -> None:
- """ Set up the word table and fill it with pre-computed word
- frequencies.
- """
- with connect(self.dsn) as conn:
- sqlp = SQLPreprocessor(conn, config)
- sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_tables.sql')
- conn.commit()
- LOG.warning("Precomputing word tokens")
- db_utils.execute_file(self.dsn, config.lib_dir.data / 'words.sql')
- def _save_config(self, conn: Connection, config: Configuration) -> None:
- """ Save the configuration that needs to remain stable for the given
- database as database properties.
- """
- assert self.normalization is not None
- properties.set_property(conn, DBCFG_NORMALIZATION, self.normalization)
- properties.set_property(conn, DBCFG_MAXWORDFREQ, config.MAX_WORD_FREQUENCY)
-class LegacyNameAnalyzer(AbstractAnalyzer):
- """ The legacy analyzer uses the special Postgresql module for
- splitting names.
- Each instance opens a connection to the database to request the
- normalization.
- """
- def __init__(self, dsn: str, normalizer: Any):
- self.conn: Optional[Connection] = connect(dsn)
- self.conn.autocommit = True
- self.normalizer = normalizer
- register_hstore(self.conn)
- self._cache = _TokenCache(self.conn)
- def close(self) -> None:
- """ Free all resources used by the analyzer.
- """
- if self.conn:
- self.conn.close()
- self.conn = None
- def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
- """ Return token information for the given list of words.
- If a word starts with # it is assumed to be a full name
- otherwise is a partial name.
- The function returns a list of tuples with
- (original word, word token, word id).
- The function is used for testing and debugging only
- and not necessarily efficient.
- """
- assert self.conn is not None
- with self.conn.cursor() as cur:
- cur.execute("""SELECT t.term, word_token, word_id
- FROM word, (SELECT unnest(%s::TEXT[]) as term) t
- WHERE word_token = (CASE
- WHEN left(t.term, 1) = '#' THEN
- ' ' || make_standard_name(substring(t.term from 2))
- make_standard_name(t.term)
- END)
- and class is null and country_code is null""",
- (words, ))
- return [(r[0], r[1], r[2]) for r in cur]
- def normalize(self, phrase: str) -> str:
- """ Normalize the given phrase, i.e. remove all properties that
- are irrelevant for search.
- """
- return cast(str, self.normalizer.transliterate(phrase))
- def normalize_postcode(self, postcode: str) -> str:
- """ Convert the postcode to a standardized form.
- This function must yield exactly the same result as the SQL function
- 'token_normalized_postcode()'.
- """
- return postcode.strip().upper()
- def update_postcodes_from_db(self) -> None:
- """ Update postcode tokens in the word table from the location_postcode
- table.
- """
- assert self.conn is not None
- with self.conn.cursor() as cur:
- # This finds us the rows in location_postcode and word that are
- # missing in the other table.
- cur.execute("""SELECT * FROM
- (SELECT pc, word FROM
- (SELECT distinct(postcode) as pc FROM location_postcode) p
- (SELECT word FROM word
- WHERE class ='place' and type = 'postcode') w
- ON pc = word) x
- WHERE pc is null or word is null""")
- to_delete = []
- to_add = []
- for postcode, word in cur:
- if postcode is None:
- to_delete.append(word)
- else:
- to_add.append(postcode)
- if to_delete:
- cur.execute("""DELETE FROM WORD
- WHERE class ='place' and type = 'postcode'
- and word = any(%s)
- """, (to_delete, ))
- if to_add:
- cur.execute("""SELECT count(create_postcode_id(pc))
- FROM unnest(%s::text[]) as pc
- """, (to_add, ))
- def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
- should_replace: bool) -> None:
- """ Replace the search index for special phrases with the new phrases.
- """
- assert self.conn is not None
- norm_phrases = set(((self.normalize(p[0]), p[1], p[2], p[3])
- for p in phrases))
- with self.conn.cursor() as cur:
- # Get the old phrases.
- existing_phrases = set()
- cur.execute("""SELECT word, class as cls, type, operator FROM word
- WHERE class != 'place'
- OR (type != 'house' AND type != 'postcode')""")
- for label, cls, typ, oper in cur:
- existing_phrases.add((label, cls, typ, oper or '-'))
- to_add = norm_phrases - existing_phrases
- to_delete = existing_phrases - norm_phrases
- if to_add:
- cur.executemany(
- """ INSERT INTO word (word_id, word_token, word, class, type,
- search_name_count, operator)
- (SELECT nextval('seq_word'), ' ' || make_standard_name(name), name,
- class, type, 0,
- CASE WHEN op in ('in', 'near') THEN op ELSE null END
- FROM (VALUES (%s, %s, %s, %s)) as v(name, class, type, op))""",
- to_add)
- if to_delete and should_replace:
- cur.executemany(
- """ DELETE FROM word
- USING (VALUES (%s, %s, %s, %s)) as v(name, in_class, in_type, op)
- WHERE word = name and class = in_class and type = in_type
- and ((op = '-' and operator is null) or op = operator)""",
- to_delete)
- LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
- len(norm_phrases), len(to_add), len(to_delete))
- def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
- """ Add names for the given country to the search index.
- """
- assert self.conn is not None
- with self.conn.cursor() as cur:
- cur.execute(
- """INSERT INTO word (word_id, word_token, country_code)
- (SELECT nextval('seq_word'), lookup_token, %s
- FROM (SELECT DISTINCT ' ' || make_standard_name(n) as lookup_token
- FROM unnest(%s::TEXT[])n) y
- WHERE word_token = lookup_token and country_code = %s))
- """, (country_code, list(names.values()), country_code))
- def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
- """ Determine tokenizer information about the given place.
- Returns a JSON-serialisable structure that will be handed into
- the database via the token_info field.
- """
- assert self.conn is not None
- token_info = _TokenInfo(self._cache)
- names = place.name
- if names:
- token_info.add_names(self.conn, names)
- if place.is_country():
- assert place.country_code is not None
- self.add_country_names(place.country_code, names)
- address = place.address
- if address:
- self._process_place_address(token_info, address)
- return token_info.data
- def _process_place_address(self, token_info: '_TokenInfo', address: Mapping[str, str]) -> None:
- assert self.conn is not None
- hnrs = []
- addr_terms = []
- for key, value in address.items():
- if key == 'postcode':
- # Make sure the normalized postcode is present in the word table.
- if re.search(r'[:,;]', value) is None:
- norm_pc = self.normalize_postcode(value)
- token_info.set_postcode(norm_pc)
- self._cache.add_postcode(self.conn, norm_pc)
- elif key in ('housenumber', 'streetnumber', 'conscriptionnumber'):
- hnrs.append(value)
- elif key == 'street':
- token_info.add_street(self.conn, value)
- elif key == 'place':
- token_info.add_place(self.conn, value)
- elif not key.startswith('_') \
- and key not in ('country', 'full', 'inclusion'):
- addr_terms.append((key, value))
- if hnrs:
- token_info.add_housenumbers(self.conn, hnrs)
- if addr_terms:
- token_info.add_address_terms(self.conn, addr_terms)
-class _TokenInfo:
- """ Collect token information to be sent back to the database.
- """
- def __init__(self, cache: '_TokenCache') -> None:
- self.cache = cache
- self.data: Dict[str, Any] = {}
- def add_names(self, conn: Connection, names: Mapping[str, str]) -> None:
- """ Add token information for the names of the place.
- """
- # Create the token IDs for all names.
- self.data['names'] = execute_scalar(conn, "SELECT make_keywords(%s)::text",
- (names, ))
- def add_housenumbers(self, conn: Connection, hnrs: Sequence[str]) -> None:
- """ Extract housenumber information from the address.
- """
- if len(hnrs) == 1:
- token = self.cache.get_housenumber(hnrs[0])
- if token is not None:
- self.data['hnr_tokens'] = token
- self.data['hnr'] = hnrs[0]
- return
- # split numbers if necessary
- simple_list: List[str] = []
- for hnr in hnrs:
- simple_list.extend((x.strip() for x in re.split(r'[;,]', hnr)))
- if len(simple_list) > 1:
- simple_list = list(set(simple_list))
- with conn.cursor() as cur:
- cur.execute("SELECT * FROM create_housenumbers(%s)", (simple_list, ))
- result = cur.fetchone()
- assert result is not None
- self.data['hnr_tokens'], self.data['hnr'] = result
- def set_postcode(self, postcode: str) -> None:
- """ Set or replace the postcode token with the given value.
- """
- self.data['postcode'] = postcode
- def add_street(self, conn: Connection, street: str) -> None:
- """ Add addr:street match terms.
- """
- def _get_street(name: str) -> Optional[str]:
- return cast(Optional[str],
- execute_scalar(conn, "SELECT word_ids_from_name(%s)::text", (name, )))
- tokens = self.cache.streets.get(street, _get_street)
- self.data['street'] = tokens or '{}'
- def add_place(self, conn: Connection, place: str) -> None:
- """ Add addr:place search and match terms.
- """
- def _get_place(name: str) -> Tuple[List[int], List[int]]:
- with conn.cursor() as cur:
- cur.execute("""SELECT make_keywords(hstore('name' , %s))::text,
- word_ids_from_name(%s)::text""",
- (name, name))
- return cast(Tuple[List[int], List[int]], cur.fetchone())
- self.data['place_search'], self.data['place_match'] = \
- self.cache.places.get(place, _get_place)
- def add_address_terms(self, conn: Connection, terms: Sequence[Tuple[str, str]]) -> None:
- """ Add additional address terms.
- """
- def _get_address_term(name: str) -> Tuple[List[int], List[int]]:
- with conn.cursor() as cur:
- cur.execute("""SELECT addr_ids_from_name(%s)::text,
- word_ids_from_name(%s)::text""",
- (name, name))
- return cast(Tuple[List[int], List[int]], cur.fetchone())
- tokens = {}
- for key, value in terms:
- items = self.cache.address_terms.get(value, _get_address_term)
- if items[0] or items[1]:
- tokens[key] = items
- if tokens:
- self.data['addr'] = tokens
-class _LRU:
- """ Least recently used cache that accepts a generator function to
- produce the item when there is a cache miss.
- """
- def __init__(self, maxsize: int = 128):
- self.data: 'OrderedDict[str, Any]' = OrderedDict()
- self.maxsize = maxsize
- def get(self, key: str, generator: Callable[[str], Any]) -> Any:
- """ Get the item with the given key from the cache. If nothing
- is found in the cache, generate the value through the
- generator function and store it in the cache.
- """
- value = self.data.get(key)
- if value is not None:
- self.data.move_to_end(key)
- else:
- value = generator(key)
- if len(self.data) >= self.maxsize:
- self.data.popitem(last=False)
- self.data[key] = value
- return value
-class _TokenCache:
- """ Cache for token information to avoid repeated database queries.
- This cache is not thread-safe and needs to be instantiated per
- analyzer.
- """
- def __init__(self, conn: Connection):
- # various LRU caches
- self.streets = _LRU(maxsize=256)
- self.places = _LRU(maxsize=128)
- self.address_terms = _LRU(maxsize=1024)
- # Lookup houseunumbers up to 100 and cache them
- with conn.cursor() as cur:
- cur.execute("""SELECT i, ARRAY[getorcreate_housenumber_id(i::text)]::text
- FROM generate_series(1, 100) as i""")
- self._cached_housenumbers: Dict[str, str] = {str(r[0]): r[1] for r in cur}
- # For postcodes remember the ones that have already been added
- self.postcodes: Set[str] = set()
- def get_housenumber(self, number: str) -> Optional[str]:
- """ Get a housenumber token from the cache.
- """
- return self._cached_housenumbers.get(number)
- def add_postcode(self, conn: Connection, postcode: str) -> None:
- """ Make sure the given postcode is in the database.
- """
- if postcode not in self.postcodes:
- with conn.cursor() as cur:
- cur.execute('SELECT create_postcode_id(%s)', (postcode, ))
- self.postcodes.add(postcode)
+++ /dev/null
-# SPDX-License-Identifier: GPL-3.0-or-later
-# This file is part of Nominatim. (https://nominatim.org)
-# Copyright (C) 2024 by the Nominatim developer community.
-# For a full list of authors see the git log.
-Tests for query analyzer for legacy tokenizer.
-import pytest
-import pytest_asyncio
-from nominatim_api import NominatimAPIAsync
-from nominatim_api.search.query import Phrase, PhraseType, TokenType, BreakType
-import nominatim_api.search.legacy_tokenizer as tok
-from nominatim_api.logging import set_log_output, get_and_disable
-async def add_word(conn, word_id, word_token, word, count):
- t = conn.t.meta.tables['word']
- await conn.execute(t.insert(), {'word_id': word_id,
- 'word_token': word_token,
- 'search_name_count': count,
- 'word': word})
-async def add_housenumber(conn, word_id, hnr):
- t = conn.t.meta.tables['word']
- await conn.execute(t.insert(), {'word_id': word_id,
- 'word_token': ' ' + hnr,
- 'word': hnr,
- 'class': 'place',
- 'type': 'house'})
-async def add_postcode(conn, word_id, postcode):
- t = conn.t.meta.tables['word']
- await conn.execute(t.insert(), {'word_id': word_id,
- 'word_token': ' ' + postcode,
- 'word': postcode,
- 'class': 'place',
- 'type': 'postcode'})
-async def add_special_term(conn, word_id, word_token, cls, typ, op):
- t = conn.t.meta.tables['word']
- await conn.execute(t.insert(), {'word_id': word_id,
- 'word_token': word_token,
- 'word': word_token,
- 'class': cls,
- 'type': typ,
- 'operator': op})
-def make_phrase(query):
- return [Phrase(PhraseType.NONE, s) for s in query.split(',')]
-async def conn(table_factory, temp_db_cursor):
- """ Create an asynchronous SQLAlchemy engine for the test DB.
- """
- table_factory('nominatim_properties',
- definition='property TEXT, value TEXT',
- content=(('tokenizer_maxwordfreq', '10000'), ))
- table_factory('word',
- definition="""word_id INT, word_token TEXT, word TEXT,
- class TEXT, type TEXT, country_code TEXT,
- search_name_count INT, operator TEXT
- """)
- temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION make_standard_name(name TEXT)
- RETURNS TEXT AS $$ SELECT lower(name); $$ LANGUAGE SQL;""")
- async with NominatimAPIAsync() as api:
- async with api.begin() as conn:
- yield conn
-async def test_empty_phrase(conn):
- ana = await tok.create_query_analyzer(conn)
- query = await ana.analyze_query([])
- assert len(query.source) == 0
- assert query.num_token_slots() == 0
-async def test_single_phrase_with_unknown_terms(conn):
- ana = await tok.create_query_analyzer(conn)
- await add_word(conn, 1, 'foo', 'FOO', 3)
- query = await ana.analyze_query(make_phrase('foo BAR'))
- assert len(query.source) == 1
- assert query.source[0].ptype == PhraseType.NONE
- assert query.source[0].text == 'foo bar'
- assert query.num_token_slots() == 2
- assert len(query.nodes[0].starting) == 1
- assert not query.nodes[1].starting
-async def test_multiple_phrases(conn):
- ana = await tok.create_query_analyzer(conn)
- await add_word(conn, 1, 'one', 'one', 13)
- await add_word(conn, 2, 'two', 'two', 45)
- await add_word(conn, 100, 'one two', 'one two', 3)
- await add_word(conn, 3, 'three', 'three', 4584)
- query = await ana.analyze_query(make_phrase('one two,three'))
- assert len(query.source) == 2
-async def test_housenumber_token(conn):
- ana = await tok.create_query_analyzer(conn)
- await add_housenumber(conn, 556, '45 a')
- query = await ana.analyze_query(make_phrase('45 A'))
- assert query.num_token_slots() == 2
- assert len(query.nodes[0].starting) == 2
- query.nodes[0].starting.sort(key=lambda tl: tl.end)
- hn1 = query.nodes[0].starting[0]
- assert hn1.ttype == TokenType.HOUSENUMBER
- assert hn1.end == 1
- assert hn1.tokens[0].token == 0
- hn2 = query.nodes[0].starting[1]
- assert hn2.ttype == TokenType.HOUSENUMBER
- assert hn2.end == 2
- assert hn2.tokens[0].token == 556
-async def test_postcode_token(conn):
- ana = await tok.create_query_analyzer(conn)
- await add_postcode(conn, 34, '45ax')
- query = await ana.analyze_query(make_phrase('45AX'))
- assert query.num_token_slots() == 1
- assert [tl.ttype for tl in query.nodes[0].starting] == [TokenType.POSTCODE]
-async def test_partial_tokens(conn):
- ana = await tok.create_query_analyzer(conn)
- await add_word(conn, 1, ' foo', 'foo', 99)
- await add_word(conn, 1, 'foo', 'FOO', 99)
- await add_word(conn, 1, 'bar', 'FOO', 990000)
- query = await ana.analyze_query(make_phrase('foo bar'))
- assert query.num_token_slots() == 2
- first = query.nodes[0].starting
- first.sort(key=lambda tl: tl.tokens[0].penalty)
- assert [tl.ttype for tl in first] == [TokenType.WORD, TokenType.PARTIAL]
- assert all(tl.tokens[0].lookup_word == 'foo' for tl in first)
- second = query.nodes[1].starting
- assert [tl.ttype for tl in second] == [TokenType.PARTIAL]
- assert not second[0].tokens[0].is_indexed
-@pytest.mark.parametrize('term,order', [('23456', ['POSTCODE', 'HOUSENUMBER', 'WORD', 'PARTIAL']),
- ])
-async def test_penalty_postcodes_and_housenumbers(conn, term, order):
- ana = await tok.create_query_analyzer(conn)
- await add_postcode(conn, 1, term)
- await add_housenumber(conn, 2, term)
- await add_word(conn, 3, term, term, 5)
- await add_word(conn, 4, ' ' + term, term, 1)
- query = await ana.analyze_query(make_phrase(term))
- assert query.num_token_slots() == 1
- torder = [(tl.tokens[0].penalty, tl.ttype.name) for tl in query.nodes[0].starting]
- torder.sort()
- assert [t[1] for t in torder] == order
-async def test_category_words_only_at_beginning(conn):
- ana = await tok.create_query_analyzer(conn)
- await add_special_term(conn, 1, 'foo', 'amenity', 'restaurant', 'in')
- await add_word(conn, 2, ' bar', 'BAR', 1)
- query = await ana.analyze_query(make_phrase('foo BAR foo'))
- assert query.num_token_slots() == 3
- assert len(query.nodes[0].starting) == 1
- assert query.nodes[0].starting[0].ttype == TokenType.NEAR_ITEM
- assert not query.nodes[2].starting
-async def test_qualifier_words(conn):
- ana = await tok.create_query_analyzer(conn)
- await add_special_term(conn, 1, 'foo', 'amenity', 'restaurant', '-')
- await add_word(conn, 2, ' bar', 'w', None)
- query = await ana.analyze_query(make_phrase('foo BAR foo BAR foo'))
- assert query.num_token_slots() == 5
- assert set(t.ttype for t in query.nodes[0].starting) == {TokenType.NEAR_ITEM, TokenType.QUALIFIER}
- assert set(t.ttype for t in query.nodes[2].starting) == {TokenType.QUALIFIER}
- assert set(t.ttype for t in query.nodes[4].starting) == {TokenType.NEAR_ITEM, TokenType.QUALIFIER}
-@pytest.mark.parametrize('logtype', ['text', 'html'])
-async def test_log_output(conn, logtype):
- ana = await tok.create_query_analyzer(conn)
- await add_word(conn, 1, 'foo', 'FOO', 99)
- set_log_output(logtype)
- await ana.analyze_query(make_phrase('foo'))
- assert get_and_disable()
+++ /dev/null
-# SPDX-License-Identifier: GPL-3.0-or-later
-# This file is part of Nominatim. (https://nominatim.org)
-# Copyright (C) 2024 by the Nominatim developer community.
-# For a full list of authors see the git log.
-Legacy word table for testing with functions to prefil and test contents
-of the table.
-from nominatim_db.db.connection import execute_scalar
-class MockLegacyWordTable:
- """ A word table for testing using legacy word table structure.
- """
- def __init__(self, conn):
- self.conn = conn
- with conn.cursor() as cur:
- cur.execute("""CREATE TABLE word (word_id INTEGER,
- word_token text,
- word text,
- class text,
- type text,
- country_code varchar(2),
- search_name_count INTEGER,
- operator TEXT)""")
- conn.commit()
- def add_full_word(self, word_id, word, word_token=None):
- with self.conn.cursor() as cur:
- cur.execute("""INSERT INTO word (word_id, word_token, word)
- VALUES (%s, %s, %s)
- """, (word_id, ' ' + (word_token or word), word))
- self.conn.commit()
- def add_special(self, word_token, word, cls, typ, oper):
- with self.conn.cursor() as cur:
- cur.execute("""INSERT INTO word (word_token, word, class, type, operator)
- VALUES (%s, %s, %s, %s, %s)
- """, (word_token, word, cls, typ, oper))
- self.conn.commit()
- def add_country(self, country_code, word_token):
- with self.conn.cursor() as cur:
- cur.execute("INSERT INTO word (word_token, country_code) VALUES(%s, %s)",
- (word_token, country_code))
- self.conn.commit()
- def add_postcode(self, word_token, postcode):
- with self.conn.cursor() as cur:
- cur.execute("""INSERT INTO word (word_token, word, class, type)
- VALUES (%s, %s, 'place', 'postcode')
- """, (word_token, postcode))
- self.conn.commit()
- def count(self):
- return execute_scalar(self.conn, "SELECT count(*) FROM word")
- def count_special(self):
- return execute_scalar(self.conn, "SELECT count(*) FROM word WHERE class != 'place'")
- def get_special(self):
- with self.conn.cursor() as cur:
- cur.execute("""SELECT word_token, word, class as cls, type, operator
- FROM word WHERE class != 'place'""")
- result = set((tuple(row) for row in cur))
- assert len(result) == cur.rowcount, "Word table has duplicates."
- return result
- def get_country(self):
- with self.conn.cursor() as cur:
- cur.execute("""SELECT country_code, word_token
- FROM word WHERE country_code is not null""")
- result = set((tuple(row) for row in cur))
- assert len(result) == cur.rowcount, "Word table has duplicates."
- return result
- def get_postcodes(self):
- with self.conn.cursor() as cur:
- cur.execute("""SELECT word FROM word
- WHERE class = 'place' and type = 'postcode'""")
- return set((row[0] for row in cur))
- def get_partial_words(self):
- with self.conn.cursor() as cur:
- cur.execute("""SELECT word_token, search_name_count FROM word
- WHERE class is null and country_code is null
- and not word_token like ' %'""")
- return set((tuple(row) for row in cur))
+++ /dev/null
-# SPDX-License-Identifier: GPL-3.0-or-later
-# This file is part of Nominatim. (https://nominatim.org)
-# Copyright (C) 2024 by the Nominatim developer community.
-# For a full list of authors see the git log.
-Test for legacy tokenizer.
-import shutil
-import re
-import pytest
-from nominatim_db.data.place_info import PlaceInfo
-from nominatim_db.tokenizer import legacy_tokenizer
-from nominatim_db.db import properties
-from nominatim_db.errors import UsageError
-from mock_legacy_word_table import MockLegacyWordTable
-# Force use of legacy word table
-def word_table(temp_db_conn):
- return MockLegacyWordTable(temp_db_conn)
-def test_config(project_env, tmp_path):
- module_dir = tmp_path / 'module_src'
- module_dir.mkdir()
- (module_dir / 'nominatim.so').write_text('TEST nominatim.so')
- project_env.lib_dir.module = module_dir
- sqldir = tmp_path / 'sql'
- sqldir.mkdir()
- (sqldir / 'tokenizer').mkdir()
- # Get the original SQL but replace make_standard_name to avoid module use.
- init_sql = (project_env.lib_dir.sql / 'tokenizer' / 'legacy_tokenizer.sql').read_text()
- for fn in ('transliteration', 'gettokenstring'):
- init_sql = re.sub(f'CREATE OR REPLACE FUNCTION {fn}[^;]*;',
- '', init_sql, re.DOTALL)
- init_sql += """
- CREATE OR REPLACE FUNCTION make_standard_name(name TEXT)
- """
- # Also load util functions. Some are needed by the tokenizer.
- init_sql += (project_env.lib_dir.sql / 'functions' / 'utils.sql').read_text()
- (sqldir / 'tokenizer' / 'legacy_tokenizer.sql').write_text(init_sql)
- (sqldir / 'words.sql').write_text("SELECT 'a'")
- shutil.copy(str(project_env.lib_dir.sql / 'tokenizer' / 'legacy_tokenizer_tables.sql'),
- str(sqldir / 'tokenizer' / 'legacy_tokenizer_tables.sql'))
- project_env.lib_dir.sql = sqldir
- project_env.lib_dir.data = sqldir
- return project_env
-def tokenizer_factory(dsn, tmp_path, property_table):
- (tmp_path / 'tokenizer').mkdir()
- def _maker():
- return legacy_tokenizer.create(dsn, tmp_path / 'tokenizer')
- return _maker
-def tokenizer_setup(tokenizer_factory, test_config, monkeypatch, sql_preprocessor):
- monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
- tok = tokenizer_factory()
- tok.init_new_db(test_config)
-def analyzer(tokenizer_factory, test_config, monkeypatch, sql_preprocessor,
- word_table, temp_db_with_extensions, tmp_path):
- monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
- monkeypatch.setenv('NOMINATIM_TERM_NORMALIZATION', ':: lower();')
- tok = tokenizer_factory()
- tok.init_new_db(test_config)
- monkeypatch.undo()
- with tok.name_analyzer() as analyzer:
- yield analyzer
-def make_standard_name(temp_db_cursor):
- temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION make_standard_name(name TEXT)
- RETURNS TEXT AS $$ SELECT '#' || lower(name) || '#'; $$ LANGUAGE SQL""")
-def create_postcode_id(temp_db_cursor):
- temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION create_postcode_id(postcode TEXT)
- INSERT INTO word (word_token, word, class, type)
- VALUES (' ' || postcode, postcode, 'place', 'postcode')
-def test_init_new(tokenizer_factory, test_config, monkeypatch,
- temp_db_conn, sql_preprocessor):
- monkeypatch.setenv('NOMINATIM_TERM_NORMALIZATION', 'xxvv')
- monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
- tok = tokenizer_factory()
- tok.init_new_db(test_config)
- assert properties.get_property(temp_db_conn, legacy_tokenizer.DBCFG_NORMALIZATION) == 'xxvv'
- outfile = test_config.project_dir / 'module' / 'nominatim.so'
- assert outfile.exists()
- assert outfile.read_text() == 'TEST nominatim.so'
- assert outfile.stat().st_mode == 33261
-def test_init_module_load_failed(tokenizer_factory, test_config):
- tok = tokenizer_factory()
- with pytest.raises(UsageError):
- tok.init_new_db(test_config)
-def test_init_module_custom(tokenizer_factory, test_config,
- monkeypatch, tmp_path, sql_preprocessor):
- module_dir = (tmp_path / 'custom').resolve()
- module_dir.mkdir()
- (module_dir/ 'nominatim.so').write_text('CUSTOM nomiantim.so')
- monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', str(module_dir))
- monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
- tok = tokenizer_factory()
- tok.init_new_db(test_config)
- assert not (test_config.project_dir / 'module').exists()
-def test_init_from_project(tokenizer_setup, tokenizer_factory, test_config):
- tok = tokenizer_factory()
- tok.init_from_project(test_config)
- assert tok.normalization is not None
-def test_update_sql_functions(sql_preprocessor, temp_db_conn,
- tokenizer_factory, test_config, table_factory,
- monkeypatch, temp_db_cursor):
- monkeypatch.setenv('NOMINATIM_MAX_WORD_FREQUENCY', '1133')
- monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
- tok = tokenizer_factory()
- tok.init_new_db(test_config)
- monkeypatch.undo()
- assert properties.get_property(temp_db_conn, legacy_tokenizer.DBCFG_MAXWORDFREQ) == '1133'
- table_factory('test', 'txt TEXT')
- func_file = test_config.lib_dir.sql / 'tokenizer' / 'legacy_tokenizer.sql'
- func_file.write_text("""INSERT INTO test VALUES ('{{max_word_freq}}'),
- ('{{modulepath}}')""")
- tok.update_sql_functions(test_config)
- test_content = temp_db_cursor.row_set('SELECT * FROM test')
- assert test_content == set((('1133', ), (str(test_config.project_dir / 'module'), )))
-def test_finalize_import(tokenizer_factory, temp_db_conn,
- temp_db_cursor, test_config, monkeypatch,
- sql_preprocessor_cfg):
- monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
- func_file = test_config.lib_dir.sql / 'tokenizer' / 'legacy_tokenizer_indices.sql'
- func_file.write_text("""CREATE FUNCTION test() RETURNS TEXT
- AS $$ SELECT 'b'::text $$ LANGUAGE SQL""")
- tok = tokenizer_factory()
- tok.init_new_db(test_config)
- tok.finalize_import(test_config)
- temp_db_cursor.scalar('SELECT test()') == 'b'
-def test_migrate_database(tokenizer_factory, test_config, temp_db_conn, monkeypatch):
- monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
- tok = tokenizer_factory()
- tok.migrate_database(test_config)
- assert properties.get_property(temp_db_conn, legacy_tokenizer.DBCFG_MAXWORDFREQ) is not None
- assert properties.get_property(temp_db_conn, legacy_tokenizer.DBCFG_NORMALIZATION) is not None
- outfile = test_config.project_dir / 'module' / 'nominatim.so'
- assert outfile.exists()
- assert outfile.read_text() == 'TEST nominatim.so'
- assert outfile.stat().st_mode == 33261
-def test_check_database(test_config, tokenizer_factory, monkeypatch,
- temp_db_cursor, sql_preprocessor_cfg):
- monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
- tok = tokenizer_factory()
- tok.init_new_db(test_config)
- assert tok.check_database(False) is None
-def test_check_database_no_tokenizer(test_config, tokenizer_factory):
- tok = tokenizer_factory()
- assert tok.check_database(False) is not None
-def test_check_database_bad_setup(test_config, tokenizer_factory, monkeypatch,
- temp_db_cursor, sql_preprocessor_cfg):
- monkeypatch.setattr(legacy_tokenizer, '_check_module', lambda m, c: None)
- tok = tokenizer_factory()
- tok.init_new_db(test_config)
- # Inject a bad transliteration.
- temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION make_standard_name(name TEXT)
- RETURNS TEXT AS $$ SELECT 'garbage'::text; $$ LANGUAGE SQL""")
- assert tok.check_database(False) is not None
-def test_update_statistics_reverse_only(word_table, tokenizer_factory, test_config):
- tok = tokenizer_factory()
- tok.update_statistics(test_config)
-def test_update_statistics(word_table, table_factory, temp_db_cursor, tokenizer_factory, test_config):
- word_table.add_full_word(1000, 'hello')
- table_factory('search_name',
- 'place_id BIGINT, name_vector INT[]',
- [(12, [1000])])
- tok = tokenizer_factory()
- tok.update_statistics(test_config)
- assert temp_db_cursor.scalar("""SELECT count(*) FROM word
- WHERE word_token like ' %' and
- search_name_count > 0""") > 0
-def test_update_word_tokens(tokenizer_factory):
- tok = tokenizer_factory()
- # This is a noop and should just pass.
- tok.update_word_tokens()
-def test_normalize(analyzer):
- assert analyzer.normalize('TEsT') == 'test'
-def test_update_postcodes_from_db_empty(analyzer, table_factory, word_table,
- create_postcode_id):
- table_factory('location_postcode', 'postcode TEXT',
- content=(('1234',), ('12 34',), ('AB23',), ('1234',)))
- analyzer.update_postcodes_from_db()
- assert word_table.get_postcodes() == {'1234', '12 34', 'AB23'}
-def test_update_postcodes_from_db_add_and_remove(analyzer, table_factory, word_table,
- create_postcode_id):
- table_factory('location_postcode', 'postcode TEXT',
- content=(('1234',), ('45BC', ), ('XX45', )))
- word_table.add_postcode(' 1234', '1234')
- word_table.add_postcode(' 5678', '5678')
- analyzer.update_postcodes_from_db()
- assert word_table.get_postcodes() == {'1234', '45BC', 'XX45'}
-def test_update_special_phrase_empty_table(analyzer, word_table, make_standard_name):
- analyzer.update_special_phrases([
- ("König bei", "amenity", "royal", "near"),
- ("Könige", "amenity", "royal", "-"),
- ("könige", "amenity", "royal", "-"),
- ("strasse", "highway", "primary", "in")
- ], True)
- assert word_table.get_special() \
- == set(((' #könig bei#', 'könig bei', 'amenity', 'royal', 'near'),
- (' #könige#', 'könige', 'amenity', 'royal', None),
- (' #strasse#', 'strasse', 'highway', 'primary', 'in')))
-def test_update_special_phrase_delete_all(analyzer, word_table, make_standard_name):
- word_table.add_special(' #foo#', 'foo', 'amenity', 'prison', 'in')
- word_table.add_special(' #bar#', 'bar', 'highway', 'road', None)
- assert word_table.count_special() == 2
- analyzer.update_special_phrases([], True)
- assert word_table.count_special() == 0
-def test_update_special_phrases_no_replace(analyzer, word_table, make_standard_name):
- word_table.add_special(' #foo#', 'foo', 'amenity', 'prison', 'in')
- word_table.add_special(' #bar#', 'bar', 'highway', 'road', None)
- assert word_table.count_special() == 2
- analyzer.update_special_phrases([], False)
- assert word_table.count_special() == 2
-def test_update_special_phrase_modify(analyzer, word_table, make_standard_name):
- word_table.add_special(' #foo#', 'foo', 'amenity', 'prison', 'in')
- word_table.add_special(' #bar#', 'bar', 'highway', 'road', None)
- assert word_table.count_special() == 2
- analyzer.update_special_phrases([
- ('prison', 'amenity', 'prison', 'in'),
- ('bar', 'highway', 'road', '-'),
- ('garden', 'leisure', 'garden', 'near')
- ], True)
- assert word_table.get_special() \
- == set(((' #prison#', 'prison', 'amenity', 'prison', 'in'),
- (' #bar#', 'bar', 'highway', 'road', None),
- (' #garden#', 'garden', 'leisure', 'garden', 'near')))
-def test_add_country_names(analyzer, word_table, make_standard_name):
- analyzer.add_country_names('de', {'name': 'Germany',
- 'name:de': 'Deutschland',
- 'short_name': 'germany'})
- assert word_table.get_country() \
- == {('de', ' #germany#'),
- ('de', ' #deutschland#')}
-def test_add_more_country_names(analyzer, word_table, make_standard_name):
- word_table.add_country('fr', ' #france#')
- word_table.add_country('it', ' #italy#')
- word_table.add_country('it', ' #itala#')
- analyzer.add_country_names('it', {'name': 'Italy', 'ref': 'IT'})
- assert word_table.get_country() \
- == {('fr', ' #france#'),
- ('it', ' #italy#'),
- ('it', ' #itala#'),
- ('it', ' #it#')}
-@pytest.mark.parametrize('pcode', ['12345', 'AB 123', '34-345'])
-def test_process_place_postcode(analyzer, create_postcode_id, word_table, pcode):
- analyzer.process_place(PlaceInfo({'address': {'postcode' : pcode}}))
- assert word_table.get_postcodes() == {pcode, }
-@pytest.mark.parametrize('pcode', ['12:23', 'ab;cd;f', '123;836'])
-def test_process_place_bad_postcode(analyzer, create_postcode_id, word_table, pcode):
- analyzer.process_place(PlaceInfo({'address': {'postcode' : pcode}}))
- assert not word_table.get_postcodes()
-class TestHousenumberName:
- @staticmethod
- @pytest.fixture(autouse=True)
- def setup_create_housenumbers(temp_db_cursor):
- temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION create_housenumbers(
- housenumbers TEXT[],
- OUT tokens TEXT, OUT normtext TEXT)
- AS $$
- SELECT housenumbers::TEXT, array_to_string(housenumbers, ';')
- @staticmethod
- @pytest.mark.parametrize('hnr', ['123a', '1', '101'])
- def test_process_place_housenumbers_simple(analyzer, hnr):
- info = analyzer.process_place(PlaceInfo({'address': {'housenumber' : hnr}}))
- assert info['hnr'] == hnr
- assert info['hnr_tokens'].startswith("{")
- @staticmethod
- def test_process_place_housenumbers_lists(analyzer):
- info = analyzer.process_place(PlaceInfo({'address': {'conscriptionnumber' : '1; 2;3'}}))
- assert set(info['hnr'].split(';')) == set(('1', '2', '3'))
- @staticmethod
- def test_process_place_housenumbers_duplicates(analyzer):
- info = analyzer.process_place(PlaceInfo({'address': {'housenumber' : '134',
- 'conscriptionnumber' : '134',
- 'streetnumber' : '99a'}}))
- assert set(info['hnr'].split(';')) == set(('134', '99a'))
-class TestPlaceNames:
- @pytest.fixture(autouse=True)
- def setup(self, analyzer):
- self.analyzer = analyzer
- def expect_name_terms(self, info, *expected_terms):
- tokens = self.analyzer.get_word_token_info(list(expected_terms))
- for token in tokens:
- assert token[2] is not None, "No token for {0}".format(token)
- assert eval(info['names']) == set((t[2] for t in tokens)),\
- f"Expected: {tokens}\nGot: {info['names']}"
- def process_named_place(self, names):
- return self.analyzer.process_place(PlaceInfo({'name': names}))
- def test_simple_names(self):
- info = self.process_named_place({'name': 'Soft bAr', 'ref': '34'})
- self.expect_name_terms(info, '#Soft bAr', '#34', 'Soft', 'bAr', '34')
- @pytest.mark.parametrize('sep', [',' , ';'])
- def test_names_with_separator(self, sep):
- info = self.process_named_place({'name': sep.join(('New York', 'Big Apple'))})
- self.expect_name_terms(info, '#New York', '#Big Apple',
- 'new', 'york', 'big', 'apple')
- def test_full_names_with_bracket(self):
- info = self.process_named_place({'name': 'Houseboat (left)'})
- self.expect_name_terms(info, '#Houseboat (left)', '#Houseboat',
- 'houseboat', '(left)')
- def test_country_name(self, word_table):
- place = PlaceInfo({'name' : {'name': 'Norge'},
- 'country_code': 'no',
- 'rank_address': 4,
- 'class': 'boundary',
- 'type': 'administrative'})
- info = self.analyzer.process_place(place)
- self.expect_name_terms(info, '#norge', 'norge')
- assert word_table.get_country() == {('no', ' norge')}
-class TestPlaceAddress:
- @pytest.fixture(autouse=True)
- def setup(self, analyzer):
- self.analyzer = analyzer
- @pytest.fixture
- def getorcreate_hnr_id(self, temp_db_cursor):
- temp_db_cursor.execute("""CREATE SEQUENCE seq_hnr start 1;
- CREATE OR REPLACE FUNCTION getorcreate_housenumber_id(lookup_word TEXT)
- SELECT -nextval('seq_hnr')::INTEGER; $$ LANGUAGE SQL""")
- def process_address(self, **kwargs):
- return self.analyzer.process_place(PlaceInfo({'address': kwargs}))
- def name_token_set(self, *expected_terms):
- tokens = self.analyzer.get_word_token_info(list(expected_terms))
- for token in tokens:
- assert token[2] is not None, "No token for {0}".format(token)
- return set((t[2] for t in tokens))
- @pytest.mark.parametrize('pcode', ['12345', 'AB 123', '34-345'])
- def test_process_place_postcode(self, word_table, pcode):
- self.process_address(postcode=pcode)
- assert word_table.get_postcodes() == {pcode, }
- @pytest.mark.parametrize('pcode', ['12:23', 'ab;cd;f', '123;836'])
- def test_process_place_bad_postcode(self, word_table, pcode):
- self.process_address(postcode=pcode)
- assert not word_table.get_postcodes()
- @pytest.mark.parametrize('hnr', ['123a', '0', '101'])
- def test_process_place_housenumbers_simple(self, hnr, getorcreate_hnr_id):
- info = self.process_address(housenumber=hnr)
- assert info['hnr'] == hnr.lower()
- assert info['hnr_tokens'] == "{-1}"
- def test_process_place_housenumbers_lists(self, getorcreate_hnr_id):
- info = self.process_address(conscriptionnumber='1; 2;3')
- assert set(info['hnr'].split(';')) == set(('1', '2', '3'))
- assert info['hnr_tokens'] == "{-1,-2,-3}"
- def test_process_place_housenumbers_duplicates(self, getorcreate_hnr_id):
- info = self.process_address(housenumber='134',
- conscriptionnumber='134',
- streetnumber='99A')
- assert set(info['hnr'].split(';')) == set(('134', '99a'))
- assert info['hnr_tokens'] == "{-1,-2}"
- def test_process_place_street(self):
- # legacy tokenizer only indexes known names
- self.analyzer.process_place(PlaceInfo({'name': {'name' : 'Grand Road'}}))
- info = self.process_address(street='Grand Road')
- assert eval(info['street']) == self.name_token_set('#Grand Road')
- def test_process_place_street_empty(self):
- info = self.process_address(street='🜵')
- assert info['street'] == '{}'
- def test_process_place_place(self):
- self.analyzer.process_place(PlaceInfo({'name': {'name' : 'Honu Lulu'}}))
- info = self.process_address(place='Honu Lulu')
- assert eval(info['place_search']) == self.name_token_set('#Honu Lulu',
- 'Honu', 'Lulu')
- assert eval(info['place_match']) == self.name_token_set('#Honu Lulu')
- def test_process_place_place_empty(self):
- info = self.process_address(place='🜵')
- assert 'place' not in info
- def test_process_place_address_terms(self):
- for name in ('Zwickau', 'Haupstraße', 'Sachsen'):
- self.analyzer.process_place(PlaceInfo({'name': {'name' : name}}))
- info = self.process_address(country='de', city='Zwickau', state='Sachsen',
- suburb='Zwickau', street='Hauptstr',
- full='right behind the church')
- city = self.name_token_set('ZWICKAU')
- state = self.name_token_set('SACHSEN')
- print(info)
- result = {k: eval(v[0]) for k,v in info['addr'].items()}
- assert result == {'city': city, 'suburb': city, 'state': state}
- def test_process_place_address_terms_empty(self):
- info = self.process_address(country='de', city=' ', street='Hauptstr',
- full='right behind the church')
- assert 'addr' not in info
from nominatim_db.db.connection import server_version_tuple
import nominatim_db.version
-from mock_legacy_word_table import MockLegacyWordTable
class DummyTokenizer:
def update_sql_functions(self, config):
monkeypatch.setattr(migration.tokenizer_factory, 'get_tokenizer_for_db',
lambda *args: DummyTokenizer())
-def legacy_word_table(temp_db_conn):
- return MockLegacyWordTable(temp_db_conn)
def test_no_migration_old_versions(temp_db_with_extensions, table_factory, def_config):
table_factory('country_name', 'name HSTORE, country_code TEXT')
assert temp_db_cursor.table_exists('nominatim_properties')
-def test_change_housenumber_transliteration(temp_db_conn, temp_db_cursor,
- legacy_word_table, placex_table):
- placex_table.add(housenumber='3A')
- temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION make_standard_name(name TEXT)
- RETURNS TEXT AS $$ SELECT lower(name) $$ LANGUAGE SQL """)
- temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION getorcreate_housenumber_id(lookup_word TEXT)
- migration.change_housenumber_transliteration(temp_db_conn)
- temp_db_conn.commit()
- assert temp_db_cursor.scalar('SELECT housenumber from placex') == '3a'
- migration.change_housenumber_transliteration(temp_db_conn)
- temp_db_conn.commit()
- assert temp_db_cursor.scalar('SELECT housenumber from placex') == '3a'
def test_switch_placenode_geometry_index(temp_db_conn, temp_db_cursor, placex_table):
temp_db_cursor.execute("""CREATE INDEX idx_placex_adminname
ON placex (place_id)""")