- cwd = self.website_dir.name
- else:
- cwd = self.build_dir
-
- proc = subprocess.Popen(cmd, cwd=cwd, env=self.test_env,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- (outp, outerr) = proc.communicate()
- outerr = outerr.decode('utf-8').replace('\\n', '\n')
- LOG.debug("run_nominatim_script: %s\n%s\n%s", cmd, outp, outerr)
- assert (proc.returncode == 0), "Script '%s' failed:\n%s\n%s\n" % (script, outp, outerr)
+ cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
+
+ cli.nominatim(module_dir='',
+ osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
+ cli_args=cmdline,
+ environ=self.test_env)
+
+
+ def copy_from_place(self, db):
+ """ Copy data from place to the placex and location_property_osmline
+ tables invoking the appropriate triggers.
+ """
+ self.run_nominatim('refresh', '--functions', '--no-diff-updates')
+
+ with db.cursor() as cur:
+ cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
+ name, admin_level, address,
+ extratags, geometry)
+ SELECT osm_type, osm_id, class, type,
+ name, admin_level, address,
+ extratags, geometry
+ FROM place
+ WHERE not (class='place' and type='houses' and osm_type='W')""")
+ cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
+ SELECT osm_id, address, geometry
+ FROM place
+ WHERE class='place' and type='houses'
+ and osm_type='W'
+ and ST_GeometryType(geometry) = 'ST_LineString'""")
+
+
+ def create_api_request_func_starlette(self):
+ import nominatim.server.starlette.server
+ from asgi_lifespan import LifespanManager
+ import httpx
+
+ async def _request(endpoint, params, project_dir, environ, http_headers):
+ app = nominatim.server.starlette.server.get_application(project_dir, environ)
+
+ async with LifespanManager(app):
+ async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
+ response = await client.get(f"/{endpoint}", params=params,
+ headers=http_headers)
+
+ return response.text, response.status_code
+
+ return _request
+
+
+ def create_api_request_func_falcon(self):
+ import nominatim.server.falcon.server
+ import falcon.testing
+
+ async def _request(endpoint, params, project_dir, environ, http_headers):
+ app = nominatim.server.falcon.server.get_application(project_dir, environ)
+
+ async with falcon.testing.ASGIConductor(app) as conductor:
+ response = await conductor.get(f"/{endpoint}", params=params,
+ headers=http_headers)
+
+ return response.text, response.status_code
+
+ return _request
+
+
+