]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/api/search/token_assignment.py
enable connection pools for sqlite
[nominatim.git] / nominatim / api / search / token_assignment.py
index 747fea6ca853e8e59c1f29bbdbcfb0f66e70fb14..d94d69039f0f602ab8c0aeda9e5c971a881353fd 100644 (file)
@@ -46,7 +46,7 @@ class TokenAssignment: # pylint: disable=too-many-instance-attributes
     housenumber: Optional[qmod.TokenRange] = None
     postcode: Optional[qmod.TokenRange] = None
     country: Optional[qmod.TokenRange] = None
     housenumber: Optional[qmod.TokenRange] = None
     postcode: Optional[qmod.TokenRange] = None
     country: Optional[qmod.TokenRange] = None
-    category: Optional[qmod.TokenRange] = None
+    near_item: Optional[qmod.TokenRange] = None
     qualifier: Optional[qmod.TokenRange] = None
 
 
     qualifier: Optional[qmod.TokenRange] = None
 
 
@@ -64,8 +64,8 @@ class TokenAssignment: # pylint: disable=too-many-instance-attributes
                 out.postcode = token.trange
             elif token.ttype == qmod.TokenType.COUNTRY:
                 out.country = token.trange
                 out.postcode = token.trange
             elif token.ttype == qmod.TokenType.COUNTRY:
                 out.country = token.trange
-            elif token.ttype == qmod.TokenType.CATEGORY:
-                out.category = token.trange
+            elif token.ttype == qmod.TokenType.NEAR_ITEM:
+                out.near_item = token.trange
             elif token.ttype == qmod.TokenType.QUALIFIER:
                 out.qualifier = token.trange
         return out
             elif token.ttype == qmod.TokenType.QUALIFIER:
                 out.qualifier = token.trange
         return out
@@ -109,7 +109,7 @@ class _TokenSequence:
         """
         # Country and category must be the final term for left-to-right
         return len(self.seq) > 1 and \
         """
         # Country and category must be the final term for left-to-right
         return len(self.seq) > 1 and \
-               self.seq[-1].ttype in (qmod.TokenType.COUNTRY, qmod.TokenType.CATEGORY)
+               self.seq[-1].ttype in (qmod.TokenType.COUNTRY, qmod.TokenType.NEAR_ITEM)
 
 
     def appendable(self, ttype: qmod.TokenType) -> Optional[int]:
 
 
     def appendable(self, ttype: qmod.TokenType) -> Optional[int]:
@@ -165,22 +165,22 @@ class _TokenSequence:
         if ttype == qmod.TokenType.COUNTRY:
             return None if self.direction == -1 else 1
 
         if ttype == qmod.TokenType.COUNTRY:
             return None if self.direction == -1 else 1
 
-        if ttype == qmod.TokenType.CATEGORY:
+        if ttype == qmod.TokenType.NEAR_ITEM:
             return self.direction
 
         if ttype == qmod.TokenType.QUALIFIER:
             if self.direction == 1:
                 if (len(self.seq) == 1
             return self.direction
 
         if ttype == qmod.TokenType.QUALIFIER:
             if self.direction == 1:
                 if (len(self.seq) == 1
-                    and self.seq[0].ttype in (qmod.TokenType.PARTIAL, qmod.TokenType.CATEGORY)) \
+                    and self.seq[0].ttype in (qmod.TokenType.PARTIAL, qmod.TokenType.NEAR_ITEM)) \
                    or (len(self.seq) == 2
                    or (len(self.seq) == 2
-                       and self.seq[0].ttype == qmod.TokenType.CATEGORY
+                       and self.seq[0].ttype == qmod.TokenType.NEAR_ITEM
                        and self.seq[1].ttype == qmod.TokenType.PARTIAL):
                     return 1
                 return None
             if self.direction == -1:
                 return -1
 
                        and self.seq[1].ttype == qmod.TokenType.PARTIAL):
                     return 1
                 return None
             if self.direction == -1:
                 return -1
 
-            tempseq = self.seq[1:] if self.seq[0].ttype == qmod.TokenType.CATEGORY else self.seq
+            tempseq = self.seq[1:] if self.seq[0].ttype == qmod.TokenType.NEAR_ITEM else self.seq
             if len(tempseq) == 0:
                 return 1
             if len(tempseq) == 1 and self.seq[0].ttype == qmod.TokenType.HOUSENUMBER:
             if len(tempseq) == 0:
                 return 1
             if len(tempseq) == 1 and self.seq[0].ttype == qmod.TokenType.HOUSENUMBER:
@@ -253,10 +253,103 @@ class _TokenSequence:
                 priors = sum(1 for t in self.seq[hnrpos+1:] if t.ttype == qmod.TokenType.PARTIAL)
                 if not self._adapt_penalty_from_priors(priors, 1):
                     return False
                 priors = sum(1 for t in self.seq[hnrpos+1:] if t.ttype == qmod.TokenType.PARTIAL)
                 if not self._adapt_penalty_from_priors(priors, 1):
                     return False
+            if any(t.ttype == qmod.TokenType.NEAR_ITEM for t in self.seq):
+                self.penalty += 1.0
 
         return True
 
 
 
         return True
 
 
+    def _get_assignments_postcode(self, base: TokenAssignment,
+                                  query_len: int)  -> Iterator[TokenAssignment]:
+        """ Yield possible assignments of Postcode searches with an
+            address component.
+        """
+        assert base.postcode is not None
+
+        if (base.postcode.start == 0 and self.direction != -1)\
+           or (base.postcode.end == query_len and self.direction != 1):
+            log().comment('postcode search')
+            # <address>,<postcode> should give preference to address search
+            if base.postcode.start == 0:
+                penalty = self.penalty
+                self.direction = -1 # name searches are only possbile backwards
+            else:
+                penalty = self.penalty + 0.1
+                self.direction = 1 # name searches are only possbile forwards
+            yield dataclasses.replace(base, penalty=penalty)
+
+
+    def _get_assignments_address_forward(self, base: TokenAssignment,
+                                         query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
+        """ Yield possible assignments of address searches with
+            left-to-right reading.
+        """
+        first = base.address[0]
+
+        log().comment('first word = name')
+        yield dataclasses.replace(base, penalty=self.penalty,
+                                  name=first, address=base.address[1:])
+
+        # To paraphrase:
+        #  * if another name term comes after the first one and before the
+        #    housenumber
+        #  * a qualifier comes after the name
+        #  * the containing phrase is strictly typed
+        if (base.housenumber and first.end < base.housenumber.start)\
+           or (base.qualifier and base.qualifier > first)\
+           or (query.nodes[first.start].ptype != qmod.PhraseType.NONE):
+            return
+
+        penalty = self.penalty
+
+        # Penalty for:
+        #  * <name>, <street>, <housenumber> , ...
+        #  * queries that are comma-separated
+        if (base.housenumber and base.housenumber > first) or len(query.source) > 1:
+            penalty += 0.25
+
+        for i in range(first.start + 1, first.end):
+            name, addr = first.split(i)
+            log().comment(f'split first word = name ({i - first.start})')
+            yield dataclasses.replace(base, name=name, address=[addr] + base.address[1:],
+                                      penalty=penalty + PENALTY_TOKENCHANGE[query.nodes[i].btype])
+
+
+    def _get_assignments_address_backward(self, base: TokenAssignment,
+                                          query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
+        """ Yield possible assignments of address searches with
+            right-to-left reading.
+        """
+        last = base.address[-1]
+
+        if self.direction == -1 or len(base.address) > 1:
+            log().comment('last word = name')
+            yield dataclasses.replace(base, penalty=self.penalty,
+                                      name=last, address=base.address[:-1])
+
+        # To paraphrase:
+        #  * if another name term comes before the last one and after the
+        #    housenumber
+        #  * a qualifier comes before the name
+        #  * the containing phrase is strictly typed
+        if (base.housenumber and last.start > base.housenumber.end)\
+           or (base.qualifier and base.qualifier < last)\
+           or (query.nodes[last.start].ptype != qmod.PhraseType.NONE):
+            return
+
+        penalty = self.penalty
+        if base.housenumber and base.housenumber < last:
+            penalty += 0.4
+        if len(query.source) > 1:
+            penalty += 0.25
+
+        for i in range(last.start + 1, last.end):
+            addr, name = last.split(i)
+            log().comment(f'split last word = name ({i - last.start})')
+            yield dataclasses.replace(base, name=name, address=base.address[:-1] + [addr],
+                                      penalty=penalty + PENALTY_TOKENCHANGE[query.nodes[i].btype])
+
+
     def get_assignments(self, query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
         """ Yield possible assignments for the current sequence.
 
     def get_assignments(self, query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
         """ Yield possible assignments for the current sequence.
 
@@ -265,62 +358,36 @@ class _TokenSequence:
         """
         base = TokenAssignment.from_ranges(self.seq)
 
         """
         base = TokenAssignment.from_ranges(self.seq)
 
+        num_addr_tokens = sum(t.end - t.start for t in base.address)
+        if num_addr_tokens > 50:
+            return
+
         # Postcode search (postcode-only search is covered in next case)
         if base.postcode is not None and base.address:
         # Postcode search (postcode-only search is covered in next case)
         if base.postcode is not None and base.address:
-            if (base.postcode.start == 0 and self.direction != -1)\
-               or (base.postcode.end == query.num_token_slots() and self.direction != 1):
-                log().comment('postcode search')
-                yield dataclasses.replace(base, penalty=self.penalty)
+            yield from self._get_assignments_postcode(base, query.num_token_slots())
 
         # Postcode or country-only search
         if not base.address:
 
         # Postcode or country-only search
         if not base.address:
-            if not base.housenumber and (base.postcode or base.country or base.category):
+            if not base.housenumber and (base.postcode or base.country or base.near_item):
                 log().comment('postcode/country search')
                 yield dataclasses.replace(base, penalty=self.penalty)
         else:
                 log().comment('postcode/country search')
                 yield dataclasses.replace(base, penalty=self.penalty)
         else:
-            # Use entire first word as name
+            # <postcode>,<address> should give preference to postcode search
+            if base.postcode and base.postcode.start == 0:
+                self.penalty += 0.1
+
+            # Right-to-left reading of the address
             if self.direction != -1:
             if self.direction != -1:
-                log().comment('first word = name')
-                yield dataclasses.replace(base, name=base.address[0],
-                                          penalty=self.penalty,
-                                          address=base.address[1:])
-
-            # Use entire last word as name
-            if self.direction == -1 or (self.direction == 0 and len(base.address) > 1):
-                log().comment('last word = name')
-                yield dataclasses.replace(base, name=base.address[-1],
-                                          penalty=self.penalty,
-                                          address=base.address[:-1])
+                yield from self._get_assignments_address_forward(base, query)
+
+            # Left-to-right reading of the address
+            if self.direction != 1:
+                yield from self._get_assignments_address_backward(base, query)
 
             # variant for special housenumber searches
             if base.housenumber:
                 yield dataclasses.replace(base, penalty=self.penalty)
 
 
             # variant for special housenumber searches
             if base.housenumber:
                 yield dataclasses.replace(base, penalty=self.penalty)
 
-            # Use beginning of first word as name
-            if self.direction != -1:
-                first = base.address[0]
-                if (not base.housenumber or first.end >= base.housenumber.start)\
-                   and (not base.qualifier or first.start >= base.qualifier.end):
-                    for i in range(first.start + 1, first.end):
-                        name, addr = first.split(i)
-                        penalty = self.penalty + PENALTY_TOKENCHANGE[query.nodes[i].btype]
-                        log().comment(f'split first word = name ({i - first.start})')
-                        yield dataclasses.replace(base, name=name, penalty=penalty,
-                                                  address=[addr] + base.address[1:])
-
-            # Use end of last word as name
-            if self.direction != 1:
-                last = base.address[-1]
-                if (not base.housenumber or last.start <= base.housenumber.end)\
-                   and (not base.qualifier or last.end <= base.qualifier.start):
-                    for i in range(last.start + 1, last.end):
-                        addr, name = last.split(i)
-                        penalty = self.penalty + PENALTY_TOKENCHANGE[query.nodes[i].btype]
-                        log().comment(f'split last word = name ({i - last.start})')
-                        yield dataclasses.replace(base, name=name, penalty=penalty,
-                                                  address=base.address[:-1] + [addr])
-
-
 
 def yield_token_assignments(query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
     """ Return possible word type assignments to word positions.
 
 def yield_token_assignments(query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
     """ Return possible word type assignments to word positions.