]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/index.c
use generator for thread choice
[nominatim.git] / nominatim / index.c
1 /*
2  * triggers indexing (reparenting etc.) through setting resetting indexed_status: update placex/osmline set indexed_status = 0 where indexed_status > 0
3  * triggers placex_update and osmline_update
4 */
5
6 #include <stdio.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <assert.h>
11 #include <pthread.h>
12 #include <time.h>
13 #include <stdint.h>
14
15 #include <libpq-fe.h>
16
17 #include "nominatim.h"
18 #include "index.h"
19 #include "export.h"
20 #include "postgresql.h"
21
22 extern int verbose;
23
24 void run_indexing(int rank, int interpolation, PGconn *conn, int num_threads, 
25 struct index_thread_data * thread_data, const char *structuredoutputfile)
26 {
27     int tuples, count, sleepcount;
28     pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
29     
30     time_t rankStartTime;
31     int rankTotalTuples;
32     int rankCountTuples;
33     float rankPerSecond;
34     
35     PGresult * resSectors;
36     PGresult * resPlaces;
37     PGresult * resNULL;
38     
39     int i;
40     int iSector;
41     int iResult;
42     
43     const char *paramValues[2];
44     int         paramLengths[2];
45     int         paramFormats[2];
46     uint32_t    paramRank;
47     uint32_t    paramSector;
48     uint32_t    sector;
49     
50     xmlTextWriterPtr writer;
51     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
52     
53     // Create the output file
54     writer = NULL;
55     if (structuredoutputfile)
56     {
57         writer = nominatim_exportXMLStart(structuredoutputfile);
58     }
59     
60     if (interpolation)
61     {
62         fprintf(stderr, "Starting interpolation lines (location_property_osmline)\n");
63     }
64     else
65     {
66         fprintf(stderr, "Starting rank %d\n", rank);
67     }
68     
69     rankCountTuples = 0;
70     rankPerSecond = 0;
71
72     paramRank = PGint32(rank);
73     paramValues[0] = (char *)&paramRank;
74     paramLengths[0] = sizeof(paramRank);
75     paramFormats[0] = 1;
76     
77     if (interpolation)
78     {
79         resSectors = PQexecPrepared(conn, "index_sectors_osmline", 0, NULL, 0, NULL, 1);
80     }
81     else
82     {
83         resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
84     }
85     if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
86     {
87         fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
88         PQclear(resSectors);
89         exit(EXIT_FAILURE);
90     }
91     if (PQftype(resSectors, 0) != PG_OID_INT4)
92     {
93         fprintf(stderr, "Sector value has unexpected type\n");
94         PQclear(resSectors);
95         exit(EXIT_FAILURE);
96     }
97     if (PQftype(resSectors, 1) != PG_OID_INT8)
98     {
99         fprintf(stderr, "Sector value has unexpected type\n");
100         PQclear(resSectors);
101         exit(EXIT_FAILURE);
102     }
103     
104     rankTotalTuples = 0;
105     for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
106     {
107         rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
108     }
109
110     rankStartTime = time(0);
111     for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
112     {
113         if (iSector > 0)
114         {
115             resPlaces = PQgetResult(conn);
116             if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
117             {
118                 fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
119                 PQclear(resPlaces);
120                 exit(EXIT_FAILURE);
121             }
122             if (PQftype(resPlaces, 0) != PG_OID_INT8)
123             {
124                 fprintf(stderr, "Place_id value has unexpected type\n");
125                 PQclear(resPlaces);
126                 exit(EXIT_FAILURE);
127             }
128             resNULL = PQgetResult(conn);
129             if (resNULL != NULL)
130             {
131                 fprintf(stderr, "Unexpected non-null response\n");
132                 exit(EXIT_FAILURE);
133             }
134         }
135
136         if (iSector < PQntuples(resSectors))
137         {
138             sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
139 //                fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
140
141             // Get all the place_id's for this sector
142             paramRank = PGint32(rank);
143             paramSector = PGint32(sector);
144             if (rankTotalTuples-rankCountTuples < num_threads*1000)
145             {
146                 // no sectors
147                 if (interpolation)
148                 {
149                     iResult = PQsendQueryPrepared(conn, "index_nosector_places_osmline", 0, NULL, 0, NULL, 1);
150                 }
151                 else
152                 {
153                     paramValues[0] = (char *)&paramRank;
154                     paramLengths[0] = sizeof(paramRank);
155                     paramFormats[0] = 1;
156                     iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
157                 }
158             }
159             else
160             {
161                 if (interpolation)
162                 {
163                     iResult = PQsendQueryPrepared(conn, "index_sector_places_osmline", 1, paramValues, paramLengths, paramFormats, 1);
164                     paramValues[0] = (char *)&paramSector;
165                     paramLengths[0] = sizeof(paramSector);
166                     paramFormats[0] = 1;
167                 }
168                 else
169                 {
170                     paramValues[0] = (char *)&paramRank;
171                     paramLengths[0] = sizeof(paramRank);
172                     paramFormats[0] = 1;
173                     paramValues[1] = (char *)&paramSector;
174                     paramLengths[1] = sizeof(paramSector);
175                     paramFormats[1] = 1;
176                     iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
177                 }
178             }
179             if (!iResult)
180             {
181                 fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
182                 PQclear(resPlaces);
183                 exit(EXIT_FAILURE);
184             }
185         }
186         if (iSector > 0)
187         {
188             count = 0;
189             rankPerSecond = 0;
190             tuples = PQntuples(resPlaces);
191
192             if (tuples > 0)
193             {
194                 // Spawn threads
195                 for (i = 0; i < num_threads; i++)
196                 {
197                     thread_data[i].res = resPlaces;
198                     thread_data[i].tuples = tuples;
199                     thread_data[i].count = &count;
200                     thread_data[i].count_mutex = &count_mutex;
201                     thread_data[i].writer = writer;
202                     thread_data[i].writer_mutex = &writer_mutex;
203                     if (interpolation)
204                     {
205                         thread_data[i].table = 0;  // use interpolations table
206                     }
207                     else
208                     {
209                         thread_data[i].table = 1;  // use placex table
210                     }
211                     pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
212                 }
213
214                 // Monitor threads to give user feedback
215                 sleepcount = 0;
216                 while (count < tuples)
217                 {
218                     usleep(1000);
219
220                     // Aim for one update per second
221                     if (sleepcount++ > 1000)
222                     {
223                         rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
224                         if(interpolation)
225                         {
226                             fprintf(stderr, "  Done %i in %i @ %f per second - Interpolation lines ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
227                         }
228                         else
229                         {
230                             fprintf(stderr, "  Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
231                         }
232                         
233                         sleepcount = 0;
234                     }
235                 }
236
237                 // Wait for everything to finish
238                 for (i = 0; i < num_threads; i++)
239                 {
240                     pthread_join(thread_data[i].thread, NULL);
241                 }
242
243                 rankCountTuples += tuples;
244             }
245
246             // Finished sector
247             rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
248             fprintf(stderr, "  Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
249
250             PQclear(resPlaces);
251         }
252         if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors))
253         {
254             iSector = PQntuples(resSectors) - 1;
255         }
256     }
257     // Finished rank
258     fprintf(stderr, "\r  Done %i in %i @ %f per second - FINISHED\n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
259
260     PQclear(resSectors);
261 }
262
263 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
264 {
265     struct index_thread_data *thread_data;
266
267     PGconn *conn;
268     PGresult *res;
269     int num_rows = 0, status_code = 0;
270     int db_has_locale = 0;
271     char *result_string = NULL;
272
273     int rank;
274
275     int i;
276
277     xmlTextWriterPtr writer;
278     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
279
280     Oid pg_prepare_params[2];
281
282     conn = PQconnectdb(conninfo);
283     if (PQstatus(conn) != CONNECTION_OK)
284     {
285         fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
286         exit(EXIT_FAILURE);
287     }
288
289     res = PQexec(conn, "SHOW lc_messages");
290     status_code = PQresultStatus(res);
291     if (status_code != PGRES_TUPLES_OK && status_code != PGRES_SINGLE_TUPLE) {
292         fprintf(stderr, "Failed determining database locale: %s\n", PQerrorMessage(conn));
293         exit(EXIT_FAILURE);
294     }
295     num_rows = PQntuples(res);
296     if (num_rows > 0)
297     {
298         result_string = PQgetvalue(res, 0, 0);
299         if (result_string && (strlen(result_string) > 0) && (strcasecmp(result_string, "C") != 0))
300         {
301             // non-default locale if the result exists, is non-empty, and is not "C"
302             db_has_locale = 1;
303         }
304     }
305
306     pg_prepare_params[0] = PG_OID_INT4;
307     res = PQprepare(conn, "index_sectors",
308                     "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
309                     1, pg_prepare_params);
310     if (PQresultStatus(res) != PGRES_COMMAND_OK)
311     {
312         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
313         exit(EXIT_FAILURE);
314     }
315     PQclear(res);
316     
317     res = PQprepare(conn, "index_sectors_osmline",
318                     "select geometry_sector,count(*) from location_property_osmline where indexed_status > 0 group by geometry_sector order by geometry_sector",
319                     0, NULL);
320     if (PQresultStatus(res) != PGRES_COMMAND_OK)
321     {
322         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
323         exit(EXIT_FAILURE);
324     }
325     PQclear(res);
326
327     pg_prepare_params[0] = PG_OID_INT4;
328     res = PQprepare(conn, "index_nosectors",
329                     "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
330                     1, pg_prepare_params);
331     if (PQresultStatus(res) != PGRES_COMMAND_OK)
332     {
333         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
334         exit(EXIT_FAILURE);
335     }
336     PQclear(res);
337
338     pg_prepare_params[0] = PG_OID_INT4;
339     pg_prepare_params[1] = PG_OID_INT4;
340     res = PQprepare(conn, "index_sector_places",
341                     "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
342                     2, pg_prepare_params);
343     if (PQresultStatus(res) != PGRES_COMMAND_OK)
344     {
345         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
346         exit(EXIT_FAILURE);
347     }
348     PQclear(res);
349
350     pg_prepare_params[0] = PG_OID_INT4;
351     res = PQprepare(conn, "index_nosector_places",
352                     "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
353                     1, pg_prepare_params);
354     if (PQresultStatus(res) != PGRES_COMMAND_OK)
355     {
356         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
357         exit(EXIT_FAILURE);
358     }
359     PQclear(res);
360     
361     pg_prepare_params[0] = PG_OID_INT4;
362     res = PQprepare(conn, "index_sector_places_osmline",
363                     "select place_id from location_property_osmline where geometry_sector = $1 and indexed_status > 0",
364                     1, pg_prepare_params);
365     if (PQresultStatus(res) != PGRES_COMMAND_OK)
366     {
367         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
368         exit(EXIT_FAILURE);
369     }
370     PQclear(res);
371     
372     res = PQprepare(conn, "index_nosector_places_osmline",
373                     "select place_id from location_property_osmline where indexed_status > 0 order by geometry_sector",
374                     0, NULL);
375     if (PQresultStatus(res) != PGRES_COMMAND_OK)
376     {
377         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
378         exit(EXIT_FAILURE);
379     }
380     PQclear(res);
381     
382     // Build the data for each thread
383     thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
384     for (i = 0; i < num_threads; i++)
385     {
386         thread_data[i].conn = PQconnectdb(conninfo);
387         if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
388         {
389             fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
390             exit(EXIT_FAILURE);
391         }
392
393         pg_prepare_params[0] = PG_OID_INT8;
394         res = PQprepare(thread_data[i].conn, "index_placex",
395                         "update placex set indexed_status = 0 where place_id = $1",
396                         1, pg_prepare_params);
397         if (PQresultStatus(res) != PGRES_COMMAND_OK)
398         {
399             fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(thread_data[i].conn));
400             exit(EXIT_FAILURE);
401         }
402         PQclear(res);
403         
404         pg_prepare_params[0] = PG_OID_INT8;
405         res = PQprepare(thread_data[i].conn, "index_osmline",
406                         "update location_property_osmline set indexed_status = 0 where place_id = $1",
407                         1, pg_prepare_params);
408         if (PQresultStatus(res) != PGRES_COMMAND_OK)
409         {
410             fprintf(stderr, "Failed preparing index_osmline: %s\n", PQerrorMessage(thread_data[i].conn));
411             exit(EXIT_FAILURE);
412         }
413         PQclear(res);
414
415         if (db_has_locale)
416         {
417             // Make sure the error message is not localized as we parse it later.
418             res = PQexec(thread_data[i].conn, "SET lc_messages TO 'C'");
419             if (PQresultStatus(res) != PGRES_COMMAND_OK)
420             {
421                 fprintf(stderr, "Failed to set langauge: %s\n", PQerrorMessage(thread_data[i].conn));
422                 exit(EXIT_FAILURE);
423             }
424             PQclear(res);
425         }
426         nominatim_exportCreatePreparedQueries(thread_data[i].conn);
427     }
428
429     fprintf(stderr, "Starting indexing rank (%i to %i) using %i threads\n", rank_min, rank_max, num_threads);
430
431     for (rank = rank_min; rank <= rank_max; rank++)
432     {
433         // OSMLINE: do reindexing (=> reparenting) for interpolation lines at rank 30, but before all other objects of rank 30
434         // reason: houses (rank 30) depend on the updated interpolation line, when reparenting (see placex_update in functions.sql)
435         if (rank == 30)
436         {
437             run_indexing(rank, 1, conn, num_threads, thread_data, structuredoutputfile);
438         }
439         run_indexing(rank, 0, conn, num_threads, thread_data, structuredoutputfile);
440     }
441         // Close all connections
442         for (i = 0; i < num_threads; i++)
443         {
444                 PQfinish(thread_data[i].conn);
445         }
446         PQfinish(conn);
447 }
448
449 void *nominatim_indexThread(void * thread_data_in)
450 {
451     struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
452     struct export_data  querySet;
453
454     PGresult   *res;
455
456     const char  *paramValues[1];
457     int         paramLengths[1];
458     int         paramFormats[1];
459     uint64_t    paramPlaceID;
460     uint64_t    place_id;
461     time_t      updateStartTime;
462     unsigned    table;
463     
464     table = thread_data->table;
465
466     while (1)
467     {
468         pthread_mutex_lock( thread_data->count_mutex );
469         if (*(thread_data->count) >= thread_data->tuples)
470         {
471             pthread_mutex_unlock( thread_data->count_mutex );
472             break;
473         }
474
475         place_id = PGint64(*((uint64_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
476         (*thread_data->count)++;
477
478         pthread_mutex_unlock( thread_data->count_mutex );
479
480         if (verbose) fprintf(stderr, "  Processing place_id %ld\n", place_id);
481
482         updateStartTime = time(0);
483         int done = 0;
484
485         if (thread_data->writer)
486         {
487              nominatim_exportPlaceQueries(place_id, thread_data->conn, &querySet);
488         }
489
490         while(!done)
491         {
492             paramPlaceID = PGint64(place_id);
493             paramValues[0] = (char *)&paramPlaceID;
494             paramLengths[0] = sizeof(paramPlaceID);
495             paramFormats[0] = 1;
496             if (table == 1) // table=1 for placex
497             {
498                 res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
499             }
500             else // table=0 for osmline
501             {
502                 res = PQexecPrepared(thread_data->conn, "index_osmline", 1, paramValues, paramLengths, paramFormats, 1);
503             }
504             if (PQresultStatus(res) == PGRES_COMMAND_OK)
505                 done = 1;
506             else
507             {
508                 if (!strncmp(PQerrorMessage(thread_data->conn), "ERROR:  deadlock detected", 25))
509                 {
510                     if (table == 1)
511                     {
512                         fprintf(stderr, "index_placex: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
513                     }
514                     else
515                     {
516                         fprintf(stderr, "index_osmline: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
517                     }
518                     PQclear(res);
519                     sleep(rand() % 10);
520                 }
521                 else
522                 {
523                     if (table == 1)
524                     {
525                         fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
526                     }
527                     else
528                     {
529                         fprintf(stderr, "index_osmline: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
530                     }
531                     PQclear(res);
532                     exit(EXIT_FAILURE);
533                 }
534             }
535         }
536         PQclear(res);
537         if (difftime(time(0), updateStartTime) > 1) fprintf(stderr, "  Slow place_id %ld\n", place_id);
538
539         if (thread_data->writer)
540         {
541             nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex, &querySet);
542             nominatim_exportFreeQueries(&querySet);
543         }
544     }
545
546     return NULL;
547 }