]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/index.c
pull in the special phrases from the wiki
[nominatim.git] / nominatim / index.c
1 /*
2 */
3
4 #include <stdio.h>
5 #include <unistd.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <assert.h>
9 #include <pthread.h>
10 #include <time.h>
11 #include <stdint.h>
12
13 #include <libpq-fe.h>
14
15 #include "nominatim.h"
16 #include "index.h"
17 #include "export.h"
18 #include "postgresql.h"
19
20 extern int verbose;
21
22 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
23 {
24     struct index_thread_data * thread_data;
25     pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
26     int tuples, count, sleepcount;
27
28     time_t rankStartTime;
29     int rankTotalTuples;
30     int rankCountTuples;
31     float rankPerSecond;
32
33     PGconn *conn;
34     PGresult * res;
35     PGresult * resSectors;
36     PGresult * resPlaces;
37     PGresult * resNULL;
38
39     int rank;
40     int i;
41     int iSector;
42     int iResult;
43
44     const char *paramValues[2];
45     int         paramLengths[2];
46     int         paramFormats[2];
47     uint32_t    paramRank;
48     uint32_t    paramSector;
49     uint32_t    sector;
50
51     xmlTextWriterPtr writer;
52     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
53
54     Oid pg_prepare_params[2];
55
56     conn = PQconnectdb(conninfo);
57     if (PQstatus(conn) != CONNECTION_OK)
58     {
59         fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
60         exit(EXIT_FAILURE);
61     }
62
63     pg_prepare_params[0] = PG_OID_INT4;
64     res = PQprepare(conn, "index_sectors",
65                     "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
66                     1, pg_prepare_params);
67     if (PQresultStatus(res) != PGRES_COMMAND_OK)
68     {
69         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
70         exit(EXIT_FAILURE);
71     }
72     PQclear(res);
73
74     pg_prepare_params[0] = PG_OID_INT4;
75     res = PQprepare(conn, "index_nosectors",
76                     "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
77                     1, pg_prepare_params);
78     if (PQresultStatus(res) != PGRES_COMMAND_OK)
79     {
80         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
81         exit(EXIT_FAILURE);
82     }
83     PQclear(res);
84
85     pg_prepare_params[0] = PG_OID_INT4;
86     pg_prepare_params[1] = PG_OID_INT4;
87     res = PQprepare(conn, "index_sector_places",
88                     "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
89                     2, pg_prepare_params);
90     if (PQresultStatus(res) != PGRES_COMMAND_OK)
91     {
92         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
93         exit(EXIT_FAILURE);
94     }
95     PQclear(res);
96
97     pg_prepare_params[0] = PG_OID_INT4;
98     res = PQprepare(conn, "index_nosector_places",
99                     "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
100                     1, pg_prepare_params);
101     if (PQresultStatus(res) != PGRES_COMMAND_OK)
102     {
103         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
104         exit(EXIT_FAILURE);
105     }
106     PQclear(res);
107
108     // Build the data for each thread
109     thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
110     for (i = 0; i < num_threads; i++)
111     {
112         thread_data[i].conn = PQconnectdb(conninfo);
113         if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
114         {
115             fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
116             exit(EXIT_FAILURE);
117         }
118
119         pg_prepare_params[0] = PG_OID_INT4;
120         res = PQprepare(thread_data[i].conn, "index_placex",
121                         "update placex set indexed_status = 0 where place_id = $1",
122                         1, pg_prepare_params);
123         if (PQresultStatus(res) != PGRES_COMMAND_OK)
124         {
125             fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn));
126             exit(EXIT_FAILURE);
127         }
128         PQclear(res);
129
130         res = PQexec(thread_data[i].conn, "set enable_seqscan = false");
131         if (PQresultStatus(res) != PGRES_COMMAND_OK)
132         {
133             fprintf(stderr, "Failed disabling sequential scan: %s\n", PQerrorMessage(conn));
134             exit(EXIT_FAILURE);
135         }
136         PQclear(res);
137
138         nominatim_exportCreatePreparedQueries(thread_data[i].conn);
139     }
140
141     // Create the output file
142     writer = NULL;
143     if (structuredoutputfile)
144     {
145         writer = nominatim_exportXMLStart(structuredoutputfile);
146     }
147
148     fprintf(stderr, "Starting indexing rank (%i to %i) using %i treads\n", rank_min, rank_max, num_threads);
149
150     for (rank = rank_min; rank <= rank_max; rank++)
151     {
152         printf("Starting rank %d\n", rank);
153         rankCountTuples = 0;
154         rankPerSecond = 0;
155
156         paramRank = PGint32(rank);
157         paramValues[0] = (char *)&paramRank;
158         paramLengths[0] = sizeof(paramRank);
159         paramFormats[0] = 1;
160         if (rank < 16)
161             resSectors = PQexecPrepared(conn, "index_nosectors", 1, paramValues, paramLengths, paramFormats, 1);
162         else
163             resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
164         if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
165         {
166             fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
167             PQclear(resSectors);
168             exit(EXIT_FAILURE);
169         }
170         if (PQftype(resSectors, 0) != PG_OID_INT4)
171         {
172             fprintf(stderr, "Sector value has unexpected type\n");
173             PQclear(resSectors);
174             exit(EXIT_FAILURE);
175         }
176         if (PQftype(resSectors, 1) != PG_OID_INT8)
177         {
178             fprintf(stderr, "Sector value has unexpected type\n");
179             PQclear(resSectors);
180             exit(EXIT_FAILURE);
181         }
182
183         rankTotalTuples = 0;
184         for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
185         {
186             rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
187         }
188
189         rankStartTime = time(0);
190         for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
191         {
192             if (iSector > 0)
193             {
194                 resPlaces = PQgetResult(conn);
195                 if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
196                 {
197                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
198                     PQclear(resPlaces);
199                     exit(EXIT_FAILURE);
200                 }
201                 if (PQftype(resPlaces, 0) != PG_OID_INT4)
202                 {
203                     fprintf(stderr, "Place_id value has unexpected type\n");
204                     PQclear(resPlaces);
205                     exit(EXIT_FAILURE);
206                 }
207                 resNULL = PQgetResult(conn);
208                 if (resNULL != NULL)
209                 {
210                     fprintf(stderr, "Unexpected non-null response\n");
211                     exit(EXIT_FAILURE);
212                 }
213             }
214
215             if (iSector < PQntuples(resSectors))
216             {
217                 sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
218                 //printf("\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
219
220                 // Get all the place_id's for this sector
221                 paramRank = PGint32(rank);
222                 paramValues[0] = (char *)&paramRank;
223                 paramLengths[0] = sizeof(paramRank);
224                 paramFormats[0] = 1;
225                 paramSector = PGint32(sector);
226                 paramValues[1] = (char *)&paramSector;
227                 paramLengths[1] = sizeof(paramSector);
228                 paramFormats[1] = 1;
229                 if (rank < 16)
230                     iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
231                 else
232                     iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
233                 if (!iResult)
234                 {
235                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
236                     PQclear(resPlaces);
237                     exit(EXIT_FAILURE);
238                 }
239             }
240
241             if (iSector > 0)
242             {
243                 count = 0;
244                 rankPerSecond = 0;
245                 tuples = PQntuples(resPlaces);
246
247                 if (tuples > 0)
248                 {
249                     // Spawn threads
250                     for (i = 0; i < num_threads; i++)
251                     {
252                         thread_data[i].res = resPlaces;
253                         thread_data[i].tuples = tuples;
254                         thread_data[i].count = &count;
255                         thread_data[i].count_mutex = &count_mutex;
256                         thread_data[i].writer = writer;
257                         thread_data[i].writer_mutex = &writer_mutex;
258                         pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
259                     }
260
261                     // Monitor threads to give user feedback
262                     sleepcount = 0;
263                     while (count < tuples)
264                     {
265                         usleep(1000);
266
267                         // Aim for one update per second
268                         if (sleepcount++ > 500)
269                         {
270                             rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
271                             printf("  Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
272                             sleepcount = 0;
273                         }
274                     }
275
276                     // Wait for everything to finish
277                     for (i = 0; i < num_threads; i++)
278                     {
279                         pthread_join(thread_data[i].thread, NULL);
280                     }
281
282                     rankCountTuples += tuples;
283                 }
284
285                 // Finished sector
286                 rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
287                 printf("  Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
288
289                 PQclear(resPlaces);
290             }
291         }
292         // Finished rank
293         printf("\r  Done %i in %i @ %f per second - FINISHED                      \n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
294
295         PQclear(resSectors);
296     }
297
298     if (writer)
299     {
300         nominatim_exportXMLEnd(writer);
301     }
302 }
303
304 void *nominatim_indexThread(void * thread_data_in)
305 {
306     struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
307
308     PGresult   *res;
309
310     const char *paramValues[1];
311     int         paramLengths[1];
312     int         paramFormats[1];
313     uint32_t    paramPlaceID;
314     uint32_t    place_id;
315     time_t              updateStartTime;
316
317     while (1)
318     {
319         pthread_mutex_lock( thread_data->count_mutex );
320         if (*(thread_data->count) >= thread_data->tuples)
321         {
322             pthread_mutex_unlock( thread_data->count_mutex );
323             break;
324         }
325
326         place_id = PGint32(*((uint32_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
327         (*thread_data->count)++;
328
329         pthread_mutex_unlock( thread_data->count_mutex );
330
331         if (verbose) printf("  Processing place_id %d\n", place_id);
332
333         updateStartTime = time(0);
334         paramPlaceID = PGint32(place_id);
335         paramValues[0] = (char *)&paramPlaceID;
336         paramLengths[0] = sizeof(paramPlaceID);
337         paramFormats[0] = 1;
338         res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
339         if (PQresultStatus(res) != PGRES_COMMAND_OK)
340         {
341             fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
342             PQclear(res);
343             exit(EXIT_FAILURE);
344         }
345         PQclear(res);
346         if (difftime(time(0), updateStartTime) > 1) printf("  Slow place_id %d\n", place_id);
347
348         if (thread_data->writer)
349         {
350             nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex);
351         }
352     }
353
354     return NULL;
355 }