]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tokenizer/icu_tokenizer.py
Merge pull request #2454 from lonvia/sort-out-token-assignment-in-sql
[nominatim.git] / nominatim / tokenizer / icu_tokenizer.py
1 """
2 Tokenizer implementing normalisation as used before Nominatim 4 but using
3 libICU instead of the PostgreSQL module.
4 """
5 from collections import Counter
6 import itertools
7 import json
8 import logging
9 import re
10 from textwrap import dedent
11
12 from nominatim.db.connection import connect
13 from nominatim.db.properties import set_property, get_property
14 from nominatim.db.utils import CopyBuffer
15 from nominatim.db.sql_preprocessor import SQLPreprocessor
16 from nominatim.tokenizer.icu_rule_loader import ICURuleLoader
17 from nominatim.tokenizer.icu_name_processor import ICUNameProcessor, ICUNameProcessorRules
18 from nominatim.tokenizer.base import AbstractAnalyzer, AbstractTokenizer
19
20 DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization"
21
22 LOG = logging.getLogger()
23
24 def create(dsn, data_dir):
25     """ Create a new instance of the tokenizer provided by this module.
26     """
27     return LegacyICUTokenizer(dsn, data_dir)
28
29
30 class LegacyICUTokenizer(AbstractTokenizer):
31     """ This tokenizer uses libICU to covert names and queries to ASCII.
32         Otherwise it uses the same algorithms and data structures as the
33         normalization routines in Nominatim 3.
34     """
35
36     def __init__(self, dsn, data_dir):
37         self.dsn = dsn
38         self.data_dir = data_dir
39         self.naming_rules = None
40         self.term_normalization = None
41
42
43     def init_new_db(self, config, init_db=True):
44         """ Set up a new tokenizer for the database.
45
46             This copies all necessary data in the project directory to make
47             sure the tokenizer remains stable even over updates.
48         """
49         loader = ICURuleLoader(config.load_sub_configuration('icu_tokenizer.yaml',
50                                                              config='TOKENIZER_CONFIG'))
51         self.naming_rules = ICUNameProcessorRules(loader=loader)
52         self.term_normalization = config.TERM_NORMALIZATION
53
54         self._install_php(config.lib_dir.php)
55         self._save_config()
56
57         if init_db:
58             self.update_sql_functions(config)
59             self._init_db_tables(config)
60
61
62     def init_from_project(self):
63         """ Initialise the tokenizer from the project directory.
64         """
65         with connect(self.dsn) as conn:
66             self.naming_rules = ICUNameProcessorRules(conn=conn)
67             self.term_normalization = get_property(conn, DBCFG_TERM_NORMALIZATION)
68
69
70     def finalize_import(self, _):
71         """ Do any required postprocessing to make the tokenizer data ready
72             for use.
73         """
74
75
76     def update_sql_functions(self, config):
77         """ Reimport the SQL functions for this tokenizer.
78         """
79         with connect(self.dsn) as conn:
80             sqlp = SQLPreprocessor(conn, config)
81             sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
82
83
84     def check_database(self):
85         """ Check that the tokenizer is set up correctly.
86         """
87         self.init_from_project()
88
89         if self.naming_rules is None:
90             return "Configuration for tokenizer 'icu' are missing."
91
92         return None
93
94
95     def name_analyzer(self):
96         """ Create a new analyzer for tokenizing names and queries
97             using this tokinzer. Analyzers are context managers and should
98             be used accordingly:
99
100             ```
101             with tokenizer.name_analyzer() as analyzer:
102                 analyser.tokenize()
103             ```
104
105             When used outside the with construct, the caller must ensure to
106             call the close() function before destructing the analyzer.
107
108             Analyzers are not thread-safe. You need to instantiate one per thread.
109         """
110         return LegacyICUNameAnalyzer(self.dsn, ICUNameProcessor(self.naming_rules))
111
112
113     def _install_php(self, phpdir):
114         """ Install the php script for the tokenizer.
115         """
116         php_file = self.data_dir / "tokenizer.php"
117         php_file.write_text(dedent(f"""\
118             <?php
119             @define('CONST_Max_Word_Frequency', 10000000);
120             @define('CONST_Term_Normalization_Rules', "{self.term_normalization}");
121             @define('CONST_Transliteration', "{self.naming_rules.search_rules}");
122             require_once('{phpdir}/tokenizer/icu_tokenizer.php');"""))
123
124
125     def _save_config(self):
126         """ Save the configuration that needs to remain stable for the given
127             database as database properties.
128         """
129         with connect(self.dsn) as conn:
130             self.naming_rules.save_rules(conn)
131
132             set_property(conn, DBCFG_TERM_NORMALIZATION, self.term_normalization)
133
134
135     def _init_db_tables(self, config):
136         """ Set up the word table and fill it with pre-computed word
137             frequencies.
138         """
139         with connect(self.dsn) as conn:
140             sqlp = SQLPreprocessor(conn, config)
141             sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer_tables.sql')
142             conn.commit()
143
144             LOG.warning("Precomputing word tokens")
145
146             # get partial words and their frequencies
147             words = self._count_partial_terms(conn)
148
149             # copy them back into the word table
150             with CopyBuffer() as copystr:
151                 for term, cnt in words.items():
152                     copystr.add('w', term, json.dumps({'count': cnt}))
153
154                 with conn.cursor() as cur:
155                     copystr.copy_out(cur, 'word',
156                                      columns=['type', 'word_token', 'info'])
157                     cur.execute("""UPDATE word SET word_id = nextval('seq_word')
158                                    WHERE word_id is null and type = 'w'""")
159
160             conn.commit()
161
162     def _count_partial_terms(self, conn):
163         """ Count the partial terms from the names in the place table.
164         """
165         words = Counter()
166         name_proc = ICUNameProcessor(self.naming_rules)
167
168         with conn.cursor(name="words") as cur:
169             cur.execute(""" SELECT v, count(*) FROM
170                               (SELECT svals(name) as v FROM place)x
171                             WHERE length(v) < 75 GROUP BY v""")
172
173             for name, cnt in cur:
174                 terms = set()
175                 for word in name_proc.get_variants_ascii(name_proc.get_normalized(name)):
176                     if ' ' in word:
177                         terms.update(word.split())
178                 for term in terms:
179                     words[term] += cnt
180
181         return words
182
183
184 class LegacyICUNameAnalyzer(AbstractAnalyzer):
185     """ The legacy analyzer uses the ICU library for splitting names.
186
187         Each instance opens a connection to the database to request the
188         normalization.
189     """
190
191     def __init__(self, dsn, name_proc):
192         self.conn = connect(dsn).connection
193         self.conn.autocommit = True
194         self.name_processor = name_proc
195
196         self._cache = _TokenCache()
197
198
199     def close(self):
200         """ Free all resources used by the analyzer.
201         """
202         if self.conn:
203             self.conn.close()
204             self.conn = None
205
206
207     def get_word_token_info(self, words):
208         """ Return token information for the given list of words.
209             If a word starts with # it is assumed to be a full name
210             otherwise is a partial name.
211
212             The function returns a list of tuples with
213             (original word, word token, word id).
214
215             The function is used for testing and debugging only
216             and not necessarily efficient.
217         """
218         full_tokens = {}
219         partial_tokens = {}
220         for word in words:
221             if word.startswith('#'):
222                 full_tokens[word] = self.name_processor.get_search_normalized(word[1:])
223             else:
224                 partial_tokens[word] = self.name_processor.get_search_normalized(word)
225
226         with self.conn.cursor() as cur:
227             cur.execute("""SELECT word_token, word_id
228                             FROM word WHERE word_token = ANY(%s) and type = 'W'
229                         """, (list(full_tokens.values()),))
230             full_ids = {r[0]: r[1] for r in cur}
231             cur.execute("""SELECT word_token, word_id
232                             FROM word WHERE word_token = ANY(%s) and type = 'w'""",
233                         (list(partial_tokens.values()),))
234             part_ids = {r[0]: r[1] for r in cur}
235
236         return [(k, v, full_ids.get(v, None)) for k, v in full_tokens.items()] \
237                + [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
238
239
240     @staticmethod
241     def normalize_postcode(postcode):
242         """ Convert the postcode to a standardized form.
243
244             This function must yield exactly the same result as the SQL function
245             'token_normalized_postcode()'.
246         """
247         return postcode.strip().upper()
248
249
250     def _make_standard_hnr(self, hnr):
251         """ Create a normalised version of a housenumber.
252
253             This function takes minor shortcuts on transliteration.
254         """
255         return self.name_processor.get_search_normalized(hnr)
256
257     def update_postcodes_from_db(self):
258         """ Update postcode tokens in the word table from the location_postcode
259             table.
260         """
261         to_delete = []
262         with self.conn.cursor() as cur:
263             # This finds us the rows in location_postcode and word that are
264             # missing in the other table.
265             cur.execute("""SELECT * FROM
266                             (SELECT pc, word FROM
267                               (SELECT distinct(postcode) as pc FROM location_postcode) p
268                               FULL JOIN
269                               (SELECT word FROM word WHERE type = 'P') w
270                               ON pc = word) x
271                            WHERE pc is null or word is null""")
272
273             with CopyBuffer() as copystr:
274                 for postcode, word in cur:
275                     if postcode is None:
276                         to_delete.append(word)
277                     else:
278                         copystr.add(self.name_processor.get_search_normalized(postcode),
279                                     'P', postcode)
280
281                 if to_delete:
282                     cur.execute("""DELETE FROM WORD
283                                    WHERE type ='P' and word = any(%s)
284                                 """, (to_delete, ))
285
286                 copystr.copy_out(cur, 'word',
287                                  columns=['word_token', 'type', 'word'])
288
289
290     def update_special_phrases(self, phrases, should_replace):
291         """ Replace the search index for special phrases with the new phrases.
292             If `should_replace` is True, then the previous set of will be
293             completely replaced. Otherwise the phrases are added to the
294             already existing ones.
295         """
296         norm_phrases = set(((self.name_processor.get_normalized(p[0]), p[1], p[2], p[3])
297                             for p in phrases))
298
299         with self.conn.cursor() as cur:
300             # Get the old phrases.
301             existing_phrases = set()
302             cur.execute("SELECT word, info FROM word WHERE type = 'S'")
303             for word, info in cur:
304                 existing_phrases.add((word, info['class'], info['type'],
305                                       info.get('op') or '-'))
306
307             added = self._add_special_phrases(cur, norm_phrases, existing_phrases)
308             if should_replace:
309                 deleted = self._remove_special_phrases(cur, norm_phrases,
310                                                        existing_phrases)
311             else:
312                 deleted = 0
313
314         LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
315                  len(norm_phrases), added, deleted)
316
317
318     def _add_special_phrases(self, cursor, new_phrases, existing_phrases):
319         """ Add all phrases to the database that are not yet there.
320         """
321         to_add = new_phrases - existing_phrases
322
323         added = 0
324         with CopyBuffer() as copystr:
325             for word, cls, typ, oper in to_add:
326                 term = self.name_processor.get_search_normalized(word)
327                 if term:
328                     copystr.add(term, 'S', word,
329                                 json.dumps({'class': cls, 'type': typ,
330                                             'op': oper if oper in ('in', 'near') else None}))
331                     added += 1
332
333             copystr.copy_out(cursor, 'word',
334                              columns=['word_token', 'type', 'word', 'info'])
335
336         return added
337
338
339     @staticmethod
340     def _remove_special_phrases(cursor, new_phrases, existing_phrases):
341         """ Remove all phrases from the databse that are no longer in the
342             new phrase list.
343         """
344         to_delete = existing_phrases - new_phrases
345
346         if to_delete:
347             cursor.execute_values(
348                 """ DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
349                     WHERE type = 'S' and word = name
350                           and info->>'class' = in_class and info->>'type' = in_type
351                           and ((op = '-' and info->>'op' is null) or op = info->>'op')
352                 """, to_delete)
353
354         return len(to_delete)
355
356
357     def add_country_names(self, country_code, names):
358         """ Add names for the given country to the search index.
359         """
360         word_tokens = set()
361         for name in self._compute_full_names(names):
362             norm_name = self.name_processor.get_search_normalized(name)
363             if norm_name:
364                 word_tokens.add(norm_name)
365
366         with self.conn.cursor() as cur:
367             # Get existing names
368             cur.execute("""SELECT word_token FROM word
369                             WHERE type = 'C' and word = %s""",
370                         (country_code, ))
371             word_tokens.difference_update((t[0] for t in cur))
372
373             # Only add those names that are not yet in the list.
374             if word_tokens:
375                 cur.execute("""INSERT INTO word (word_token, type, word)
376                                (SELECT token, 'C', %s
377                                 FROM unnest(%s) as token)
378                             """, (country_code, list(word_tokens)))
379
380             # No names are deleted at the moment.
381             # If deletion is made possible, then the static names from the
382             # initial 'country_name' table should be kept.
383
384
385     def process_place(self, place):
386         """ Determine tokenizer information about the given place.
387
388             Returns a JSON-serialisable structure that will be handed into
389             the database via the token_info field.
390         """
391         token_info = _TokenInfo(self._cache)
392
393         names = place.get('name')
394
395         if names:
396             fulls, partials = self._compute_name_tokens(names)
397
398             token_info.add_names(fulls, partials)
399
400             country_feature = place.get('country_feature')
401             if country_feature and re.fullmatch(r'[A-Za-z][A-Za-z]', country_feature):
402                 self.add_country_names(country_feature.lower(), names)
403
404         address = place.get('address')
405         if address:
406             self._process_place_address(token_info, address)
407
408         return token_info.data
409
410
411     def _process_place_address(self, token_info, address):
412         hnrs = []
413         addr_terms = []
414         for key, value in address.items():
415             if key == 'postcode':
416                 self._add_postcode(value)
417             elif key in ('housenumber', 'streetnumber', 'conscriptionnumber'):
418                 hnrs.append(value)
419             elif key == 'street':
420                 token_info.add_street(self._compute_partial_tokens(value))
421             elif key == 'place':
422                 token_info.add_place(self._compute_partial_tokens(value))
423             elif not key.startswith('_') and \
424                  key not in ('country', 'full'):
425                 addr_terms.append((key, self._compute_partial_tokens(value)))
426
427         if hnrs:
428             hnrs = self._split_housenumbers(hnrs)
429             token_info.add_housenumbers(self.conn, [self._make_standard_hnr(n) for n in hnrs])
430
431         if addr_terms:
432             token_info.add_address_terms(addr_terms)
433
434     def _compute_partial_tokens(self, name):
435         """ Normalize the given term, split it into partial words and return
436             then token list for them.
437         """
438         norm_name = self.name_processor.get_search_normalized(name)
439
440         tokens = []
441         need_lookup = []
442         for partial in norm_name.split():
443             token = self._cache.partials.get(partial)
444             if token:
445                 tokens.append(token)
446             else:
447                 need_lookup.append(partial)
448
449         if need_lookup:
450             with self.conn.cursor() as cur:
451                 cur.execute("""SELECT word, getorcreate_partial_word(word)
452                                FROM unnest(%s) word""",
453                             (need_lookup, ))
454
455                 for partial, token in cur:
456                     tokens.append(token)
457                     self._cache.partials[partial] = token
458
459         return tokens
460
461     def _compute_name_tokens(self, names):
462         """ Computes the full name and partial name tokens for the given
463             dictionary of names.
464         """
465         full_names = self._compute_full_names(names)
466         full_tokens = set()
467         partial_tokens = set()
468
469         for name in full_names:
470             norm_name = self.name_processor.get_normalized(name)
471             full, part = self._cache.names.get(norm_name, (None, None))
472             if full is None:
473                 variants = self.name_processor.get_variants_ascii(norm_name)
474                 if not variants:
475                     continue
476
477                 with self.conn.cursor() as cur:
478                     cur.execute("SELECT (getorcreate_full_word(%s, %s)).*",
479                                 (norm_name, variants))
480                     full, part = cur.fetchone()
481
482                 self._cache.names[norm_name] = (full, part)
483
484             full_tokens.add(full)
485             partial_tokens.update(part)
486
487         return full_tokens, partial_tokens
488
489
490     @staticmethod
491     def _compute_full_names(names):
492         """ Return the set of all full name word ids to be used with the
493             given dictionary of names.
494         """
495         full_names = set()
496         for name in (n.strip() for ns in names.values() for n in re.split('[;,]', ns)):
497             if name:
498                 full_names.add(name)
499
500                 brace_idx = name.find('(')
501                 if brace_idx >= 0:
502                     full_names.add(name[:brace_idx].strip())
503
504         return full_names
505
506
507     def _add_postcode(self, postcode):
508         """ Make sure the normalized postcode is present in the word table.
509         """
510         if re.search(r'[:,;]', postcode) is None:
511             postcode = self.normalize_postcode(postcode)
512
513             if postcode not in self._cache.postcodes:
514                 term = self.name_processor.get_search_normalized(postcode)
515                 if not term:
516                     return
517
518                 with self.conn.cursor() as cur:
519                     # no word_id needed for postcodes
520                     cur.execute("""INSERT INTO word (word_token, type, word)
521                                    (SELECT %s, 'P', pc FROM (VALUES (%s)) as v(pc)
522                                     WHERE NOT EXISTS
523                                      (SELECT * FROM word
524                                       WHERE type = 'P' and word = pc))
525                                 """, (term, postcode))
526                 self._cache.postcodes.add(postcode)
527
528
529     @staticmethod
530     def _split_housenumbers(hnrs):
531         if len(hnrs) > 1 or ',' in hnrs[0] or ';' in hnrs[0]:
532             # split numbers if necessary
533             simple_list = []
534             for hnr in hnrs:
535                 simple_list.extend((x.strip() for x in re.split(r'[;,]', hnr)))
536
537             if len(simple_list) > 1:
538                 hnrs = list(set(simple_list))
539             else:
540                 hnrs = simple_list
541
542         return hnrs
543
544
545
546
547 class _TokenInfo:
548     """ Collect token information to be sent back to the database.
549     """
550     def __init__(self, cache):
551         self._cache = cache
552         self.data = {}
553
554     @staticmethod
555     def _mk_array(tokens):
556         return '{%s}' % ','.join((str(s) for s in tokens))
557
558
559     def add_names(self, fulls, partials):
560         """ Adds token information for the normalised names.
561         """
562         self.data['names'] = self._mk_array(itertools.chain(fulls, partials))
563
564
565     def add_housenumbers(self, conn, hnrs):
566         """ Extract housenumber information from a list of normalised
567             housenumbers.
568         """
569         self.data['hnr_tokens'] = self._mk_array(self._cache.get_hnr_tokens(conn, hnrs))
570         self.data['hnr'] = ';'.join(hnrs)
571
572
573     def add_street(self, tokens):
574         """ Add addr:street match terms.
575         """
576         if tokens:
577             self.data['street'] = self._mk_array(tokens)
578
579
580     def add_place(self, tokens):
581         """ Add addr:place search and match terms.
582         """
583         if tokens:
584             self.data['place'] = self._mk_array(tokens)
585
586
587     def add_address_terms(self, terms):
588         """ Add additional address terms.
589         """
590         tokens = {key: self._mk_array(partials)
591                   for key, partials in terms if partials}
592
593         if tokens:
594             self.data['addr'] = tokens
595
596
597 class _TokenCache:
598     """ Cache for token information to avoid repeated database queries.
599
600         This cache is not thread-safe and needs to be instantiated per
601         analyzer.
602     """
603     def __init__(self):
604         self.names = {}
605         self.partials = {}
606         self.postcodes = set()
607         self.housenumbers = {}
608
609
610     def get_hnr_tokens(self, conn, terms):
611         """ Get token ids for a list of housenumbers, looking them up in the
612             database if necessary. `terms` is an iterable of normalized
613             housenumbers.
614         """
615         tokens = []
616         askdb = []
617
618         for term in terms:
619             token = self.housenumbers.get(term)
620             if token is None:
621                 askdb.append(term)
622             else:
623                 tokens.append(token)
624
625         if askdb:
626             with conn.cursor() as cur:
627                 cur.execute("SELECT nr, getorcreate_hnr_id(nr) FROM unnest(%s) as nr",
628                             (askdb, ))
629                 for term, tid in cur:
630                     self.housenumbers[term] = tid
631                     tokens.append(tid)
632
633         return tokens