+ conn.set_query_timeout(self.query_timeout)
+ geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
+ self.config.get_int('REQUEST_TIMEOUT') \
+ if self.config.REQUEST_TIMEOUT else None)
+ phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
+ return await geocoder.lookup(phrases)
+
+
+ # pylint: disable=too-many-arguments,too-many-branches
+ async def search_address(self, amenity: Optional[str] = None,
+ street: Optional[str] = None,
+ city: Optional[str] = None,
+ county: Optional[str] = None,
+ state: Optional[str] = None,
+ country: Optional[str] = None,
+ postalcode: Optional[str] = None,
+ **params: Any) -> SearchResults:
+ """ Find an address using structured search.
+ """
+ async with self.begin() as conn:
+ conn.set_query_timeout(self.query_timeout)
+ details = ntyp.SearchDetails.from_kwargs(params)
+
+ phrases: List[Phrase] = []
+
+ if amenity:
+ phrases.append(Phrase(PhraseType.AMENITY, amenity))
+ if street:
+ phrases.append(Phrase(PhraseType.STREET, street))
+ if city:
+ phrases.append(Phrase(PhraseType.CITY, city))
+ if county:
+ phrases.append(Phrase(PhraseType.COUNTY, county))
+ if state:
+ phrases.append(Phrase(PhraseType.STATE, state))
+ if postalcode:
+ phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
+ if country:
+ phrases.append(Phrase(PhraseType.COUNTRY, country))
+
+ if not phrases:
+ raise UsageError('Nothing to search for.')
+
+ if amenity or street:
+ details.restrict_min_max_rank(26, 30)
+ elif city:
+ details.restrict_min_max_rank(13, 25)
+ elif county:
+ details.restrict_min_max_rank(10, 12)
+ elif state:
+ details.restrict_min_max_rank(5, 9)
+ elif postalcode:
+ details.restrict_min_max_rank(5, 11)
+ else:
+ details.restrict_min_max_rank(4, 4)
+
+ if 'layers' not in params:
+ details.layers = ntyp.DataLayer.ADDRESS
+ if amenity:
+ details.layers |= ntyp.DataLayer.POI
+
+ geocoder = ForwardGeocoder(conn, details,
+ self.config.get_int('REQUEST_TIMEOUT') \
+ if self.config.REQUEST_TIMEOUT else None)
+ return await geocoder.lookup(phrases)
+
+
+ async def search_category(self, categories: List[Tuple[str, str]],
+ near_query: Optional[str] = None,
+ **params: Any) -> SearchResults:
+ """ Find an object of a certain category near another place.
+ The near place may either be given as an unstructured search
+ query in itself or as coordinates.
+ """
+ if not categories:
+ return SearchResults()
+
+ details = ntyp.SearchDetails.from_kwargs(params)
+ async with self.begin() as conn:
+ conn.set_query_timeout(self.query_timeout)
+ if near_query:
+ phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
+ else:
+ phrases = []
+ if details.keywords:
+ await make_query_analyzer(conn)
+
+ geocoder = ForwardGeocoder(conn, details,
+ self.config.get_int('REQUEST_TIMEOUT') \
+ if self.config.REQUEST_TIMEOUT else None)
+ return await geocoder.lookup_pois(categories, phrases)
+