]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tokenizer/legacy_tokenizer.py
move WorkerPool into db module
[nominatim.git] / nominatim / tokenizer / legacy_tokenizer.py
1 """
2 Tokenizer implementing normalisation as used before Nominatim 4.
3 """
4 from collections import OrderedDict
5 import logging
6 import re
7 import shutil
8 from textwrap import dedent
9
10 from icu import Transliterator
11 import psycopg2
12 import psycopg2.extras
13
14 from nominatim.db.connection import connect
15 from nominatim.db import properties
16 from nominatim.db import utils as db_utils
17 from nominatim.db.sql_preprocessor import SQLPreprocessor
18 from nominatim.errors import UsageError
19
20 DBCFG_NORMALIZATION = "tokenizer_normalization"
21 DBCFG_MAXWORDFREQ = "tokenizer_maxwordfreq"
22
23 LOG = logging.getLogger()
24
25 def create(dsn, data_dir):
26     """ Create a new instance of the tokenizer provided by this module.
27     """
28     return LegacyTokenizer(dsn, data_dir)
29
30
31 def _install_module(config_module_path, src_dir, module_dir):
32     """ Copies the PostgreSQL normalisation module into the project
33         directory if necessary. For historical reasons the module is
34         saved in the '/module' subdirectory and not with the other tokenizer
35         data.
36
37         The function detects when the installation is run from the
38         build directory. It doesn't touch the module in that case.
39     """
40     # Custom module locations are simply used as is.
41     if config_module_path:
42         LOG.info("Using custom path for database module at '%s'", config_module_path)
43         return config_module_path
44
45     # Compatibility mode for builddir installations.
46     if module_dir.exists() and src_dir.samefile(module_dir):
47         LOG.info('Running from build directory. Leaving database module as is.')
48         return module_dir
49
50     # In any other case install the module in the project directory.
51     if not module_dir.exists():
52         module_dir.mkdir()
53
54     destfile = module_dir / 'nominatim.so'
55     shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
56     destfile.chmod(0o755)
57
58     LOG.info('Database module installed at %s', str(destfile))
59
60     return module_dir
61
62
63 def _check_module(module_dir, conn):
64     """ Try to use the PostgreSQL module to confirm that it is correctly
65         installed and accessible from PostgreSQL.
66     """
67     with conn.cursor() as cur:
68         try:
69             cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
70                            RETURNS text AS '{}/nominatim.so', 'transliteration'
71                            LANGUAGE c IMMUTABLE STRICT;
72                            DROP FUNCTION nominatim_test_import_func(text)
73                         """.format(module_dir))
74         except psycopg2.DatabaseError as err:
75             LOG.fatal("Error accessing database module: %s", err)
76             raise UsageError("Database module cannot be accessed.") from err
77
78
79 class LegacyTokenizer:
80     """ The legacy tokenizer uses a special PostgreSQL module to normalize
81         names and queries. The tokenizer thus implements normalization through
82         calls to the database.
83     """
84
85     def __init__(self, dsn, data_dir):
86         self.dsn = dsn
87         self.data_dir = data_dir
88         self.normalization = None
89
90
91     def init_new_db(self, config, init_db=True):
92         """ Set up a new tokenizer for the database.
93
94             This copies all necessary data in the project directory to make
95             sure the tokenizer remains stable even over updates.
96         """
97         module_dir = _install_module(config.DATABASE_MODULE_PATH,
98                                      config.lib_dir.module,
99                                      config.project_dir / 'module')
100
101         self.normalization = config.TERM_NORMALIZATION
102
103         self._install_php(config)
104
105         with connect(self.dsn) as conn:
106             _check_module(module_dir, conn)
107             self._save_config(conn, config)
108             conn.commit()
109
110         if init_db:
111             self.update_sql_functions(config)
112             self._init_db_tables(config)
113
114
115     def init_from_project(self):
116         """ Initialise the tokenizer from the project directory.
117         """
118         with connect(self.dsn) as conn:
119             self.normalization = properties.get_property(conn, DBCFG_NORMALIZATION)
120
121
122     def finalize_import(self, config):
123         """ Do any required postprocessing to make the tokenizer data ready
124             for use.
125         """
126         with connect(self.dsn) as conn:
127             sqlp = SQLPreprocessor(conn, config)
128             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_indices.sql')
129
130
131     def update_sql_functions(self, config):
132         """ Reimport the SQL functions for this tokenizer.
133         """
134         with connect(self.dsn) as conn:
135             max_word_freq = properties.get_property(conn, DBCFG_MAXWORDFREQ)
136             modulepath = config.DATABASE_MODULE_PATH or \
137                          str((config.project_dir / 'module').resolve())
138             sqlp = SQLPreprocessor(conn, config)
139             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer.sql',
140                               max_word_freq=max_word_freq,
141                               modulepath=modulepath)
142
143
144     def check_database(self):
145         """ Check that the tokenizer is set up correctly.
146         """
147         hint = """\
148              The Postgresql extension nominatim.so was not correctly loaded.
149
150              Error: {error}
151
152              Hints:
153              * Check the output of the CMmake/make installation step
154              * Does nominatim.so exist?
155              * Does nominatim.so exist on the database server?
156              * Can nominatim.so be accessed by the database user?
157              """
158         with connect(self.dsn) as conn:
159             with conn.cursor() as cur:
160                 try:
161                     out = cur.scalar("SELECT make_standard_name('a')")
162                 except psycopg2.Error as err:
163                     return hint.format(error=str(err))
164
165         if out != 'a':
166             return hint.format(error='Unexpected result for make_standard_name()')
167
168         return None
169
170
171     def migrate_database(self, config):
172         """ Initialise the project directory of an existing database for
173             use with this tokenizer.
174
175             This is a special migration function for updating existing databases
176             to new software versions.
177         """
178         self.normalization = config.TERM_NORMALIZATION
179         module_dir = _install_module(config.DATABASE_MODULE_PATH,
180                                      config.lib_dir.module,
181                                      config.project_dir / 'module')
182
183         with connect(self.dsn) as conn:
184             _check_module(module_dir, conn)
185             self._save_config(conn, config)
186
187
188     def name_analyzer(self):
189         """ Create a new analyzer for tokenizing names and queries
190             using this tokinzer. Analyzers are context managers and should
191             be used accordingly:
192
193             ```
194             with tokenizer.name_analyzer() as analyzer:
195                 analyser.tokenize()
196             ```
197
198             When used outside the with construct, the caller must ensure to
199             call the close() function before destructing the analyzer.
200
201             Analyzers are not thread-safe. You need to instantiate one per thread.
202         """
203         normalizer = Transliterator.createFromRules("phrase normalizer",
204                                                     self.normalization)
205         return LegacyNameAnalyzer(self.dsn, normalizer)
206
207
208     def _install_php(self, config):
209         """ Install the php script for the tokenizer.
210         """
211         php_file = self.data_dir / "tokenizer.php"
212         php_file.write_text(dedent("""\
213             <?php
214             @define('CONST_Max_Word_Frequency', {0.MAX_WORD_FREQUENCY});
215             @define('CONST_Term_Normalization_Rules', "{0.TERM_NORMALIZATION}");
216             require_once('{0.lib_dir.php}/tokenizer/legacy_tokenizer.php');
217             """.format(config)))
218
219
220     def _init_db_tables(self, config):
221         """ Set up the word table and fill it with pre-computed word
222             frequencies.
223         """
224         with connect(self.dsn) as conn:
225             sqlp = SQLPreprocessor(conn, config)
226             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_tables.sql')
227             conn.commit()
228
229         LOG.warning("Precomputing word tokens")
230         db_utils.execute_file(self.dsn, config.lib_dir.data / 'words.sql')
231
232
233     def _save_config(self, conn, config):
234         """ Save the configuration that needs to remain stable for the given
235             database as database properties.
236         """
237         properties.set_property(conn, DBCFG_NORMALIZATION, self.normalization)
238         properties.set_property(conn, DBCFG_MAXWORDFREQ, config.MAX_WORD_FREQUENCY)
239
240
241 class LegacyNameAnalyzer:
242     """ The legacy analyzer uses the special Postgresql module for
243         splitting names.
244
245         Each instance opens a connection to the database to request the
246         normalization.
247     """
248
249     def __init__(self, dsn, normalizer):
250         self.conn = connect(dsn).connection
251         self.conn.autocommit = True
252         self.normalizer = normalizer
253         psycopg2.extras.register_hstore(self.conn)
254
255         self._cache = _TokenCache(self.conn)
256
257
258     def __enter__(self):
259         return self
260
261
262     def __exit__(self, exc_type, exc_value, traceback):
263         self.close()
264
265
266     def close(self):
267         """ Free all resources used by the analyzer.
268         """
269         if self.conn:
270             self.conn.close()
271             self.conn = None
272
273
274     @staticmethod
275     def get_word_token_info(conn, words):
276         """ Return token information for the given list of words.
277             If a word starts with # it is assumed to be a full name
278             otherwise is a partial name.
279
280             The function returns a list of tuples with
281             (original word, word token, word id).
282
283             The function is used for testing and debugging only
284             and not necessarily efficient.
285         """
286         with conn.cursor() as cur:
287             cur.execute("""SELECT t.term, word_token, word_id
288                            FROM word, (SELECT unnest(%s::TEXT[]) as term) t
289                            WHERE word_token = (CASE
290                                    WHEN left(t.term, 1) = '#' THEN
291                                      ' ' || make_standard_name(substring(t.term from 2))
292                                    ELSE
293                                      make_standard_name(t.term)
294                                    END)
295                                  and class is null and country_code is null""",
296                         (words, ))
297
298             return [(r[0], r[1], r[2]) for r in cur]
299
300
301     def normalize(self, phrase):
302         """ Normalize the given phrase, i.e. remove all properties that
303             are irrelevant for search.
304         """
305         return self.normalizer.transliterate(phrase)
306
307
308     @staticmethod
309     def normalize_postcode(postcode):
310         """ Convert the postcode to a standardized form.
311
312             This function must yield exactly the same result as the SQL function
313             'token_normalized_postcode()'.
314         """
315         return postcode.strip().upper()
316
317
318     def update_postcodes_from_db(self):
319         """ Update postcode tokens in the word table from the location_postcode
320             table.
321         """
322         with self.conn.cursor() as cur:
323             # This finds us the rows in location_postcode and word that are
324             # missing in the other table.
325             cur.execute("""SELECT * FROM
326                             (SELECT pc, word FROM
327                               (SELECT distinct(postcode) as pc FROM location_postcode) p
328                               FULL JOIN
329                               (SELECT word FROM word
330                                 WHERE class ='place' and type = 'postcode') w
331                               ON pc = word) x
332                            WHERE pc is null or word is null""")
333
334             to_delete = []
335             to_add = []
336
337             for postcode, word in cur:
338                 if postcode is None:
339                     to_delete.append(word)
340                 else:
341                     to_add.append(postcode)
342
343             if to_delete:
344                 cur.execute("""DELETE FROM WORD
345                                WHERE class ='place' and type = 'postcode'
346                                      and word = any(%s)
347                             """, (to_delete, ))
348             if to_add:
349                 cur.execute("""SELECT count(create_postcode_id(pc))
350                                FROM unnest(%s) as pc
351                             """, (to_add, ))
352
353
354
355     def update_special_phrases(self, phrases):
356         """ Replace the search index for special phrases with the new phrases.
357         """
358         norm_phrases = set(((self.normalize(p[0]), p[1], p[2], p[3])
359                             for p in phrases))
360
361         with self.conn.cursor() as cur:
362             # Get the old phrases.
363             existing_phrases = set()
364             cur.execute("""SELECT word, class, type, operator FROM word
365                            WHERE class != 'place'
366                                  OR (type != 'house' AND type != 'postcode')""")
367             for label, cls, typ, oper in cur:
368                 existing_phrases.add((label, cls, typ, oper or '-'))
369
370             to_add = norm_phrases - existing_phrases
371             to_delete = existing_phrases - norm_phrases
372
373             if to_add:
374                 psycopg2.extras.execute_values(
375                     cur,
376                     """ INSERT INTO word (word_id, word_token, word, class, type,
377                                           search_name_count, operator)
378                         (SELECT nextval('seq_word'), make_standard_name(name), name,
379                                 class, type, 0,
380                                 CASE WHEN op in ('in', 'near') THEN op ELSE null END
381                            FROM (VALUES %s) as v(name, class, type, op))""",
382                     to_add)
383
384             if to_delete:
385                 psycopg2.extras.execute_values(
386                     cur,
387                     """ DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
388                         WHERE word = name and class = in_class and type = in_type
389                               and ((op = '-' and operator is null) or op = operator)""",
390                     to_delete)
391
392         LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
393                  len(norm_phrases), len(to_add), len(to_delete))
394
395
396     def add_country_names(self, country_code, names):
397         """ Add names for the given country to the search index.
398         """
399         with self.conn.cursor() as cur:
400             cur.execute(
401                 """INSERT INTO word (word_id, word_token, country_code)
402                    (SELECT nextval('seq_word'), lookup_token, %s
403                       FROM (SELECT ' ' || make_standard_name(n) as lookup_token
404                             FROM unnest(%s)n) y
405                       WHERE NOT EXISTS(SELECT * FROM word
406                                        WHERE word_token = lookup_token and country_code = %s))
407                 """, (country_code, names, country_code))
408
409
410     def process_place(self, place):
411         """ Determine tokenizer information about the given place.
412
413             Returns a JSON-serialisable structure that will be handed into
414             the database via the token_info field.
415         """
416         token_info = _TokenInfo(self._cache)
417
418         names = place.get('name')
419
420         if names:
421             token_info.add_names(self.conn, names)
422
423             country_feature = place.get('country_feature')
424             if country_feature and re.fullmatch(r'[A-Za-z][A-Za-z]', country_feature):
425                 self.add_country_names(country_feature.lower(), list(names.values()))
426
427         address = place.get('address')
428
429         if address:
430             hnrs = []
431             addr_terms = []
432             for key, value in address.items():
433                 if key == 'postcode':
434                     self._add_postcode(value)
435                 elif key in ('housenumber', 'streetnumber', 'conscriptionnumber'):
436                     hnrs.append(value)
437                 elif key == 'street':
438                     token_info.add_street(self.conn, value)
439                 elif key == 'place':
440                     token_info.add_place(self.conn, value)
441                 elif not key.startswith('_') and \
442                      key not in ('country', 'full'):
443                     addr_terms.append((key, value))
444
445             if hnrs:
446                 token_info.add_housenumbers(self.conn, hnrs)
447
448             if addr_terms:
449                 token_info.add_address_terms(self.conn, addr_terms)
450
451         return token_info.data
452
453
454     def _add_postcode(self, postcode):
455         """ Make sure the normalized postcode is present in the word table.
456         """
457         if re.search(r'[:,;]', postcode) is None:
458             self._cache.add_postcode(self.conn, self.normalize_postcode(postcode))
459
460
461 class _TokenInfo:
462     """ Collect token information to be sent back to the database.
463     """
464     def __init__(self, cache):
465         self.cache = cache
466         self.data = {}
467
468
469     def add_names(self, conn, names):
470         """ Add token information for the names of the place.
471         """
472         with conn.cursor() as cur:
473             # Create the token IDs for all names.
474             self.data['names'] = cur.scalar("SELECT make_keywords(%s)::text",
475                                             (names, ))
476
477
478     def add_housenumbers(self, conn, hnrs):
479         """ Extract housenumber information from the address.
480         """
481         if len(hnrs) == 1:
482             token = self.cache.get_housenumber(hnrs[0])
483             if token is not None:
484                 self.data['hnr_tokens'] = token
485                 self.data['hnr'] = hnrs[0]
486                 return
487
488         # split numbers if necessary
489         simple_list = []
490         for hnr in hnrs:
491             simple_list.extend((x.strip() for x in re.split(r'[;,]', hnr)))
492
493         if len(simple_list) > 1:
494             simple_list = list(set(simple_list))
495
496         with conn.cursor() as cur:
497             cur.execute("SELECT (create_housenumbers(%s)).* ", (simple_list, ))
498             self.data['hnr_tokens'], self.data['hnr'] = cur.fetchone()
499
500
501     def add_street(self, conn, street):
502         """ Add addr:street match terms.
503         """
504         def _get_street(name):
505             with conn.cursor() as cur:
506                 return cur.scalar("SELECT word_ids_from_name(%s)::text", (name, ))
507
508         self.data['street'] = self.cache.streets.get(street, _get_street)
509
510
511     def add_place(self, conn, place):
512         """ Add addr:place search and match terms.
513         """
514         def _get_place(name):
515             with conn.cursor() as cur:
516                 cur.execute("""SELECT (addr_ids_from_name(%s)
517                                        || getorcreate_name_id(make_standard_name(%s), ''))::text,
518                                       word_ids_from_name(%s)::text""",
519                             (name, name, name))
520                 return cur.fetchone()
521
522         self.data['place_search'], self.data['place_match'] = \
523             self.cache.places.get(place, _get_place)
524
525
526     def add_address_terms(self, conn, terms):
527         """ Add additional address terms.
528         """
529         def _get_address_term(name):
530             with conn.cursor() as cur:
531                 cur.execute("""SELECT addr_ids_from_name(%s)::text,
532                                       word_ids_from_name(%s)::text""",
533                             (name, name))
534                 return cur.fetchone()
535
536         tokens = {}
537         for key, value in terms:
538             tokens[key] = self.cache.address_terms.get(value, _get_address_term)
539
540         self.data['addr'] = tokens
541
542
543 class _LRU:
544     """ Least recently used cache that accepts a generator function to
545         produce the item when there is a cache miss.
546     """
547
548     def __init__(self, maxsize=128, init_data=None):
549         self.data = init_data or OrderedDict()
550         self.maxsize = maxsize
551         if init_data is not None and len(init_data) > maxsize:
552             self.maxsize = len(init_data)
553
554     def get(self, key, generator):
555         """ Get the item with the given key from the cache. If nothing
556             is found in the cache, generate the value through the
557             generator function and store it in the cache.
558         """
559         value = self.data.get(key)
560         if value is not None:
561             self.data.move_to_end(key)
562         else:
563             value = generator(key)
564             if len(self.data) >= self.maxsize:
565                 self.data.popitem(last=False)
566             self.data[key] = value
567
568         return value
569
570
571 class _TokenCache:
572     """ Cache for token information to avoid repeated database queries.
573
574         This cache is not thread-safe and needs to be instantiated per
575         analyzer.
576     """
577     def __init__(self, conn):
578         # various LRU caches
579         self.streets = _LRU(maxsize=256)
580         self.places = _LRU(maxsize=128)
581         self.address_terms = _LRU(maxsize=1024)
582
583         # Lookup houseunumbers up to 100 and cache them
584         with conn.cursor() as cur:
585             cur.execute("""SELECT i, ARRAY[getorcreate_housenumber_id(i::text)]::text
586                            FROM generate_series(1, 100) as i""")
587             self._cached_housenumbers = {str(r[0]) : r[1] for r in cur}
588
589         # For postcodes remember the ones that have already been added
590         self.postcodes = set()
591
592     def get_housenumber(self, number):
593         """ Get a housenumber token from the cache.
594         """
595         return self._cached_housenumbers.get(number)
596
597
598     def add_postcode(self, conn, postcode):
599         """ Make sure the given postcode is in the database.
600         """
601         if postcode not in self.postcodes:
602             with conn.cursor() as cur:
603                 cur.execute('SELECT create_postcode_id(%s)', (postcode, ))
604             self.postcodes.add(postcode)