+ cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
+
+ cli.nominatim(module_dir='',
+ osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
+ cli_args=cmdline,
+ phpcgi_path='',
+ environ=self.test_env)
+
+
+ def copy_from_place(self, db):
+ """ Copy data from place to the placex and location_property_osmline
+ tables invoking the appropriate triggers.
+ """
+ self.run_nominatim('refresh', '--functions', '--no-diff-updates')
+
+ with db.cursor() as cur:
+ cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
+ name, admin_level, address,
+ extratags, geometry)
+ SELECT osm_type, osm_id, class, type,
+ name, admin_level, address,
+ extratags, geometry
+ FROM place
+ WHERE not (class='place' and type='houses' and osm_type='W')""")
+ cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
+ SELECT osm_id, address, geometry
+ FROM place
+ WHERE class='place' and type='houses'
+ and osm_type='W'
+ and ST_GeometryType(geometry) = 'ST_LineString'""")
+
+
+ def create_api_request_func_starlette(self):
+ import nominatim.server.starlette.server
+ from asgi_lifespan import LifespanManager
+ import httpx
+
+ async def _request(endpoint, params, project_dir, environ):
+ app = nominatim.server.starlette.server.get_application(project_dir, environ)
+
+ async with LifespanManager(app):
+ async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
+ response = await client.get(f"/{endpoint}", params=params)
+
+ return response.text, response.status_code
+
+ return _request
+
+
+ def create_api_request_func_sanic(self):
+ import nominatim.server.sanic.server
+
+ async def _request(endpoint, params, project_dir, environ):
+ app = nominatim.server.sanic.server.get_application(project_dir, environ)
+
+ _, response = await app.asgi_client.get(f"/{endpoint}", params=params)
+
+ return response.text, response.status_code
+
+ return _request
+
+
+ def create_api_request_func_falcon(self):
+ import nominatim.server.falcon.server
+ import falcon.testing
+
+ async def _request(endpoint, params, project_dir, environ):
+ app = nominatim.server.falcon.server.get_application(project_dir, environ)
+
+ async with falcon.testing.ASGIConductor(app) as conductor:
+ response = await conductor.get(f"/{endpoint}", params=params)
+
+ return response.text, response.status_code
+
+ return _request
+
+