1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Helper functions for executing external programs.
10 from typing import Any, Union, Optional, Mapping, IO
11 from pathlib import Path
14 import urllib.request as urlrequest
15 from urllib.parse import urlencode
17 from nominatim.typing import StrPath
18 from nominatim.version import version_str
19 from nominatim.db.connection import get_pg_env
21 LOG = logging.getLogger()
23 def run_legacy_script(script: StrPath, *args: Union[int, str],
25 throw_on_fail: bool = False) -> int:
26 """ Run a Nominatim PHP script with the given arguments.
28 Returns the exit code of the script. If `throw_on_fail` is True
29 then throw a `CalledProcessError` on a non-zero exit.
31 cmd = ['/usr/bin/env', 'php', '-Cq',
32 str(nominatim_env.phplib_dir / 'admin' / script)]
33 cmd.extend([str(a) for a in args])
35 env = nominatim_env.config.get_os_env()
36 env['NOMINATIM_DATADIR'] = str(nominatim_env.data_dir)
37 env['NOMINATIM_SQLDIR'] = str(nominatim_env.sqllib_dir)
38 env['NOMINATIM_CONFIGDIR'] = str(nominatim_env.config_dir)
39 env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str(nominatim_env.module_dir)
40 if not env['NOMINATIM_OSM2PGSQL_BINARY']:
41 env['NOMINATIM_OSM2PGSQL_BINARY'] = str(nominatim_env.osm2pgsql_path)
43 proc = subprocess.run(cmd, cwd=str(nominatim_env.project_dir), env=env,
46 return proc.returncode
48 def run_api_script(endpoint: str, project_dir: Path,
49 extra_env: Optional[Mapping[str, str]] = None,
50 phpcgi_bin: Optional[Path] = None,
51 params: Optional[Mapping[str, Any]] = None) -> int:
52 """ Execute a Nominatim API function.
54 The function needs a project directory that contains the website
55 directory with the scripts to be executed. The scripts will be run
56 using php_cgi. Query parameters can be added as named arguments.
58 Returns the exit code of the script.
60 log = logging.getLogger()
61 webdir = str(project_dir / 'website')
62 query_string = urlencode(params or {})
64 env = dict(QUERY_STRING=query_string,
65 SCRIPT_NAME=f'/{endpoint}.php',
66 REQUEST_URI=f'/{endpoint}.php?{query_string}',
67 CONTEXT_DOCUMENT_ROOT=webdir,
68 SCRIPT_FILENAME=f'{webdir}/{endpoint}.php',
69 HTTP_HOST='localhost',
70 HTTP_USER_AGENT='nominatim-tool',
71 REMOTE_ADDR='0.0.0.0',
74 SERVER_PROTOCOL='HTTP/1.1',
75 GATEWAY_INTERFACE='CGI/1.1',
76 REDIRECT_STATUS='CGI')
81 if phpcgi_bin is None:
82 cmd = ['/usr/bin/env', 'php-cgi']
84 cmd = [str(phpcgi_bin)]
86 proc = subprocess.run(cmd, cwd=str(project_dir), env=env,
87 stdout=subprocess.PIPE,
88 stderr=subprocess.PIPE,
91 if proc.returncode != 0 or proc.stderr:
93 log.error(proc.stderr.decode('utf-8').replace('\\n', '\n'))
95 log.error(proc.stdout.decode('utf-8').replace('\\n', '\n'))
96 return proc.returncode or 1
98 result = proc.stdout.decode('utf-8')
99 content_start = result.find('\r\n\r\n')
101 print(result[content_start + 4:].replace('\\n', '\n'))
106 def run_php_server(server_address: str, base_dir: StrPath) -> None:
107 """ Run the built-in server from the given directory.
109 subprocess.run(['/usr/bin/env', 'php', '-S', server_address],
110 cwd=str(base_dir), check=True)
113 def run_osm2pgsql(options: Mapping[str, Any]) -> None:
114 """ Run osm2pgsql with the given options.
116 env = get_pg_env(options['dsn'])
117 cmd = [str(options['osm2pgsql']),
118 '--hstore', '--latlon', '--slim',
119 '--with-forward-dependencies', 'false',
120 '--log-progress', 'true',
121 '--number-processes', str(options['threads']),
122 '--cache', str(options['osm2pgsql_cache']),
123 '--output', 'gazetteer',
124 '--style', str(options['osm2pgsql_style'])
126 if options['append']:
127 cmd.append('--append')
129 cmd.append('--create')
131 if options['flatnode_file']:
132 cmd.extend(('--flat-nodes', options['flatnode_file']))
134 for key, param in (('slim_data', '--tablespace-slim-data'),
135 ('slim_index', '--tablespace-slim-index'),
136 ('main_data', '--tablespace-main-data'),
137 ('main_index', '--tablespace-main-index')):
138 if options['tablespaces'][key]:
139 cmd.extend((param, options['tablespaces'][key]))
141 if options.get('disable_jit', False):
142 env['PGOPTIONS'] = '-c jit=off -c max_parallel_workers_per_gather=0'
144 if 'import_data' in options:
145 cmd.extend(('-r', 'xml', '-'))
146 elif isinstance(options['import_file'], list):
147 for fname in options['import_file']:
148 cmd.append(str(fname))
150 cmd.append(str(options['import_file']))
152 subprocess.run(cmd, cwd=options.get('cwd', '.'),
153 input=options.get('import_data'),
157 def get_url(url: str) -> str:
158 """ Get the contents from the given URL and return it as a UTF-8 string.
160 headers = {"User-Agent": f"Nominatim/{version_str()}"}
163 request = urlrequest.Request(url, headers=headers)
164 with urlrequest.urlopen(request) as response: # type: IO[bytes]
165 return response.read().decode('utf-8')
167 LOG.fatal('Failed to load URL: %s', url)