]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/index.c
253b4f20667c8ff22852f4c271f779c381548196
[nominatim.git] / nominatim / index.c
1 /*
2  * triggers indexing (reparenting etc.) through setting resetting indexed_status: update placex/osmline set indexed_status = 0 where indexed_status > 0
3  * triggers placex_update and osmline_update
4 */
5
6 #include <stdio.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <assert.h>
11 #include <pthread.h>
12 #include <time.h>
13 #include <stdint.h>
14
15 #include <libpq-fe.h>
16
17 #include "nominatim.h"
18 #include "index.h"
19 #include "export.h"
20 #include "postgresql.h"
21
22 extern int verbose;
23
24 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
25 {
26     struct index_thread_data * thread_data;
27     pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
28     int tuples, count, sleepcount;
29
30     time_t rankStartTime;
31     int rankTotalTuples;
32     int rankCountTuples;
33     float rankPerSecond;
34
35     PGconn *conn;
36     PGresult * res;
37     PGresult * resSectors;
38     PGresult * resPlaces;
39     PGresult * resNULL;
40
41     int rank;
42     int i;
43     int iSector;
44     int iResult;
45
46     const char *paramValues[2];
47     int         paramLengths[2];
48     int         paramFormats[2];
49     uint32_t    paramRank;
50     uint32_t    paramSector;
51     uint32_t    sector;
52
53     xmlTextWriterPtr writer;
54     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
55
56     Oid pg_prepare_params[2];
57
58     conn = PQconnectdb(conninfo);
59     if (PQstatus(conn) != CONNECTION_OK)
60     {
61         fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
62         exit(EXIT_FAILURE);
63     }
64
65     pg_prepare_params[0] = PG_OID_INT4;
66     res = PQprepare(conn, "index_sectors",
67                     "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
68                     1, pg_prepare_params);
69     if (PQresultStatus(res) != PGRES_COMMAND_OK)
70     {
71         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
72         exit(EXIT_FAILURE);
73     }
74     PQclear(res);
75     
76     res = PQprepare(conn, "index_sectors_osmline",
77                     "select geometry_sector,count(*) from location_property_osmline where indexed_status > 0 group by geometry_sector order by geometry_sector",
78                     0, NULL);
79     if (PQresultStatus(res) != PGRES_COMMAND_OK)
80     {
81         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
82         exit(EXIT_FAILURE);
83     }
84     PQclear(res);
85
86     pg_prepare_params[0] = PG_OID_INT4;
87     res = PQprepare(conn, "index_nosectors",
88                     "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
89                     1, pg_prepare_params);
90     if (PQresultStatus(res) != PGRES_COMMAND_OK)
91     {
92         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
93         exit(EXIT_FAILURE);
94     }
95     PQclear(res);
96
97     pg_prepare_params[0] = PG_OID_INT4;
98     pg_prepare_params[1] = PG_OID_INT4;
99     res = PQprepare(conn, "index_sector_places",
100                     "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
101                     2, pg_prepare_params);
102     if (PQresultStatus(res) != PGRES_COMMAND_OK)
103     {
104         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
105         exit(EXIT_FAILURE);
106     }
107     PQclear(res);
108
109     pg_prepare_params[0] = PG_OID_INT4;
110     res = PQprepare(conn, "index_nosector_places",
111                     "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
112                     1, pg_prepare_params);
113     if (PQresultStatus(res) != PGRES_COMMAND_OK)
114     {
115         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
116         exit(EXIT_FAILURE);
117     }
118     PQclear(res);
119     
120     pg_prepare_params[0] = PG_OID_INT4;
121     res = PQprepare(conn, "index_sector_places_osmline",
122                     "select place_id from location_property_osmline where geometry_sector = $1 and indexed_status > 0",
123                     1, pg_prepare_params);
124     if (PQresultStatus(res) != PGRES_COMMAND_OK)
125     {
126         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
127         exit(EXIT_FAILURE);
128     }
129     PQclear(res);
130     
131     res = PQprepare(conn, "index_nosector_places_osmline",
132                     "select place_id from location_property_osmline where indexed_status > 0 order by geometry_sector",
133                     0, NULL);
134     if (PQresultStatus(res) != PGRES_COMMAND_OK)
135     {
136         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
137         exit(EXIT_FAILURE);
138     }
139     PQclear(res);
140     
141     // Build the data for each thread
142     thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
143     for (i = 0; i < num_threads; i++)
144     {
145         thread_data[i].conn = PQconnectdb(conninfo);
146         if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
147         {
148             fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
149             exit(EXIT_FAILURE);
150         }
151
152         pg_prepare_params[0] = PG_OID_INT8;
153         res = PQprepare(thread_data[i].conn, "index_placex",
154                         "update placex set indexed_status = 0 where place_id = $1",
155                         1, pg_prepare_params);
156         if (PQresultStatus(res) != PGRES_COMMAND_OK)
157         {
158             fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn));
159             exit(EXIT_FAILURE);
160         }
161         PQclear(res);
162         
163         pg_prepare_params[0] = PG_OID_INT8;
164         res = PQprepare(thread_data[i].conn, "index_osmline",
165                         "update location_property_osmline set indexed_status = 0 where place_id = $1",
166                         1, pg_prepare_params);
167         if (PQresultStatus(res) != PGRES_COMMAND_OK)
168         {
169             fprintf(stderr, "Failed preparing index_osmline: %s\n", PQerrorMessage(conn));
170             exit(EXIT_FAILURE);
171         }
172         PQclear(res);
173
174         /*res = PQexec(thread_data[i].conn, "set enable_seqscan = false");
175         if (PQresultStatus(res) != PGRES_COMMAND_OK)
176         {
177             fprintf(stderr, "Failed disabling sequential scan: %s\n", PQerrorMessage(conn));
178             exit(EXIT_FAILURE);
179         }
180         PQclear(res);*/
181
182         nominatim_exportCreatePreparedQueries(thread_data[i].conn);
183     }
184
185     // Create the output file
186     writer = NULL;
187     if (structuredoutputfile)
188     {
189         writer = nominatim_exportXMLStart(structuredoutputfile);
190     }
191
192     fprintf(stderr, "Starting indexing rank (%i to %i) using %i threads\n", rank_min, rank_max, num_threads);
193
194     // first for the placex table
195     for (rank = rank_min; rank <= rank_max; rank++)
196     {
197         // OSMLINE: do reindexing (=> reparenting) for interpolation lines at rank 30, but before all other objects of rank 30
198         // reason: houses (rank 30) depend on the updated interpolation line, when reparenting (see placex_update in functions.sql)
199         if (rank == 30)
200         {
201             fprintf(stderr, "Starting indexing interpolation lines (location_property_osmline)\n");
202             rankCountTuples = 0;
203             rankTotalTuples = 0;
204             resSectors = PQexecPrepared(conn, "index_sectors_osmline", 0, NULL, 0, NULL, 1);
205             if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
206             {
207                 fprintf(stderr, "index_sectors_osmline: SELECT failed: %s", PQerrorMessage(conn));
208                 PQclear(resSectors);
209                 exit(EXIT_FAILURE);
210             }
211             if (PQftype(resSectors, 0) != PG_OID_INT4)
212             {
213                 fprintf(stderr, "Sector value has unexpected type\n");
214                 PQclear(resSectors);
215                 exit(EXIT_FAILURE);
216             }
217             if (PQftype(resSectors, 1) != PG_OID_INT8)
218             {
219                 fprintf(stderr, "Sector value has unexpected type\n");
220                 PQclear(resSectors);
221                 exit(EXIT_FAILURE);
222             }
223             rankStartTime = time(0);
224             for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
225             {
226                 rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
227             }
228             // do it only if tuples with indexed_status > 0 were found in osmline
229             int nTuples = PQntuples(resSectors);
230             if (nTuples > 0)
231             {
232                 for (iSector = 0; iSector <= nTuples; iSector++)
233                 {
234                     if (iSector > 0)
235                     {
236                         resPlaces = PQgetResult(conn);
237                         if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
238                         {
239                             fprintf(stderr, "index_sector_places: SELECT failed: %s\n", PQerrorMessage(conn));
240                             PQclear(resPlaces);
241                             exit(EXIT_FAILURE);
242                         }
243                         if (PQftype(resPlaces, 0) != PG_OID_INT8)
244                         {
245                             fprintf(stderr, "Place_id value has unexpected type\n");
246                             PQclear(resPlaces);
247                             exit(EXIT_FAILURE);
248                         }
249                         resNULL = PQgetResult(conn);
250                         if (resNULL != NULL)
251                         {
252                             fprintf(stderr, "Unexpected non-null response\n");
253                             exit(EXIT_FAILURE);
254                         }
255                     }
256
257                     if (iSector < nTuples)
258                     {
259                         sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
260             //                fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
261
262                         // Get all the place_id's for this sector
263                         paramSector = PGint32(sector);
264                         paramValues[0] = (char *)&paramSector;
265                         paramLengths[0] = sizeof(paramSector);
266                         paramFormats[0] = 1;
267                         if (rankTotalTuples-rankCountTuples < num_threads*1000)
268                         {
269                             // no sectors
270                             iResult = PQsendQueryPrepared(conn, "index_nosector_places_osmline", 0, NULL, 0, NULL, 1);
271                         }
272                         else
273                         {
274                             iResult = PQsendQueryPrepared(conn, "index_sector_places_osmline", 1, paramValues, paramLengths, paramFormats, 1);
275                         }
276                         if (!iResult)
277                         {
278                             fprintf(stderr, "index_sector_places_osmline: SELECT failed: %s", PQerrorMessage(conn));
279                             PQclear(resPlaces);
280                             exit(EXIT_FAILURE);
281                         }
282                     }
283                     if (iSector > 0)
284                     {
285                         count = 0;
286                         rankPerSecond = 0;
287                         tuples = PQntuples(resPlaces);
288
289                         if (tuples > 0)
290                         {
291                             // Spawn threads
292                             for (i = 0; i < num_threads; i++)
293                             {
294                                 thread_data[i].res = resPlaces;
295                                 thread_data[i].tuples = tuples;
296                                 thread_data[i].count = &count;
297                                 thread_data[i].count_mutex = &count_mutex;
298                                 thread_data[i].writer = writer;
299                                 thread_data[i].writer_mutex = &writer_mutex;
300                                 thread_data[i].table = 0; // use osmline table
301                                 pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
302                             }
303                             // Monitor threads to give user feedback
304                             sleepcount = 0;
305                             while (count < tuples)
306                             {
307                                 usleep(1000);
308
309                                 // Aim for one update per second
310                                 if (sleepcount++ > 500)
311                                 {
312                                     rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
313                                     fprintf(stderr, "  Done %i in %i @ %f per second - Interpolation Lines ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - (rankCountTuples + count)))/(float)rankPerSecond);
314                                     sleepcount = 0;
315                                 }
316                             }
317
318                             // Wait for everything to finish
319                             for (i = 0; i < num_threads; i++)
320                             {
321                                 pthread_join(thread_data[i].thread, NULL);
322                             }
323                             rankCountTuples += tuples;
324                         }
325                         // Finished sector
326                         rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
327                         fprintf(stderr, "  Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
328                         PQclear(resPlaces);
329                     }
330                     if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < nTuples)
331                     {
332                         iSector = nTuples - 1;
333                     }
334                 }
335                 PQclear(resSectors);
336             }
337             // Finished rank
338             fprintf(stderr, "\r  Done %i tuples in %i seconds- FINISHED\n", rankCountTuples,(int)(difftime(time(0), rankStartTime)));
339             if (writer)
340             {
341                 nominatim_exportXMLEnd(writer);
342             }
343         }
344         fprintf(stderr, "Starting rank %d\n", rank);
345         rankCountTuples = 0;
346         rankPerSecond = 0;
347
348         paramRank = PGint32(rank);
349         paramValues[0] = (char *)&paramRank;
350         paramLengths[0] = sizeof(paramRank);
351         paramFormats[0] = 1;
352 //        if (rank < 16)
353 //            resSectors = PQexecPrepared(conn, "index_nosectors", 1, paramValues, paramLengths, paramFormats, 1);
354 //        else
355         resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
356
357         if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
358         {
359             fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
360             PQclear(resSectors);
361             exit(EXIT_FAILURE);
362         }
363         if (PQftype(resSectors, 0) != PG_OID_INT4)
364         {
365             fprintf(stderr, "Sector value has unexpected type\n");
366             PQclear(resSectors);
367             exit(EXIT_FAILURE);
368         }
369         if (PQftype(resSectors, 1) != PG_OID_INT8)
370         {
371             fprintf(stderr, "Sector value has unexpected type\n");
372             PQclear(resSectors);
373             exit(EXIT_FAILURE);
374         }
375         
376         rankTotalTuples = 0;
377         for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
378         {
379             rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
380         }
381
382         rankStartTime = time(0);
383
384         for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
385         {
386             if (iSector > 0)
387             {
388                 resPlaces = PQgetResult(conn);
389                 if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
390                 {
391                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
392                     PQclear(resPlaces);
393                     exit(EXIT_FAILURE);
394                 }
395                 if (PQftype(resPlaces, 0) != PG_OID_INT8)
396                 {
397                     fprintf(stderr, "Place_id value has unexpected type\n");
398                     PQclear(resPlaces);
399                     exit(EXIT_FAILURE);
400                 }
401                 resNULL = PQgetResult(conn);
402                 if (resNULL != NULL)
403                 {
404                     fprintf(stderr, "Unexpected non-null response\n");
405                     exit(EXIT_FAILURE);
406                 }
407             }
408
409             if (iSector < PQntuples(resSectors))
410             {
411                 sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
412 //                fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
413
414                 // Get all the place_id's for this sector
415                 paramRank = PGint32(rank);
416                 paramValues[0] = (char *)&paramRank;
417                 paramLengths[0] = sizeof(paramRank);
418                 paramFormats[0] = 1;
419                 paramSector = PGint32(sector);
420                 paramValues[1] = (char *)&paramSector;
421                 paramLengths[1] = sizeof(paramSector);
422                 paramFormats[1] = 1;
423                 if (rankTotalTuples-rankCountTuples < num_threads*1000)
424                 {
425                     iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
426                 }
427                 else
428                 {
429                     iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
430                 }
431                 if (!iResult)
432                 {
433                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
434                     PQclear(resPlaces);
435                     exit(EXIT_FAILURE);
436                 }
437             }
438
439             if (iSector > 0)
440             {
441                 count = 0;
442                 rankPerSecond = 0;
443                 tuples = PQntuples(resPlaces);
444
445                 if (tuples > 0)
446                 {
447                     // Spawn threads
448                     for (i = 0; i < num_threads; i++)
449                     {
450                         thread_data[i].res = resPlaces;
451                         thread_data[i].tuples = tuples;
452                         thread_data[i].count = &count;
453                         thread_data[i].count_mutex = &count_mutex;
454                         thread_data[i].writer = writer;
455                         thread_data[i].writer_mutex = &writer_mutex;
456                         thread_data[i].table = 1;  // use placex table
457                         pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
458                     }
459
460                     // Monitor threads to give user feedback
461                     sleepcount = 0;
462                     while (count < tuples)
463                     {
464                         usleep(1000);
465
466                         // Aim for one update per second
467                         if (sleepcount++ > 500)
468                         {
469                             rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
470                             fprintf(stderr, "  Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
471                             sleepcount = 0;
472                         }
473                     }
474
475                     // Wait for everything to finish
476                     for (i = 0; i < num_threads; i++)
477                     {
478                         pthread_join(thread_data[i].thread, NULL);
479                     }
480
481                     rankCountTuples += tuples;
482                 }
483
484                 // Finished sector
485                 rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
486                 fprintf(stderr, "  Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
487
488                 PQclear(resPlaces);
489             }
490             if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors))
491             {
492                 iSector = PQntuples(resSectors) - 1;
493             }
494         }
495         // Finished rank
496         fprintf(stderr, "\r  Done %i in %i @ %f per second - FINISHED                      \n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
497
498         PQclear(resSectors);
499     }
500     
501
502     if (rank_max == 30)
503     {
504         // Close all connections
505         for (i = 0; i < num_threads; i++)
506         {
507             PQfinish(thread_data[i].conn);
508         }
509         PQfinish(conn);
510     }
511 }
512
513 void *nominatim_indexThread(void * thread_data_in)
514 {
515     struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
516     struct export_data  querySet;
517
518     PGresult   *res;
519
520     const char  *paramValues[1];
521     int         paramLengths[1];
522     int         paramFormats[1];
523     uint64_t    paramPlaceID;
524     uint64_t    place_id;
525     time_t      updateStartTime;
526     uint        table;
527     
528     table = (uint)(thread_data->table);
529
530     while (1)
531     {
532         pthread_mutex_lock( thread_data->count_mutex );
533         if (*(thread_data->count) >= thread_data->tuples)
534         {
535             pthread_mutex_unlock( thread_data->count_mutex );
536             break;
537         }
538
539         place_id = PGint64(*((uint64_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
540         (*thread_data->count)++;
541
542         pthread_mutex_unlock( thread_data->count_mutex );
543
544         if (verbose) fprintf(stderr, "  Processing place_id %ld\n", place_id);
545
546         updateStartTime = time(0);
547         int done = 0;
548
549         if (thread_data->writer)
550         {
551              nominatim_exportPlaceQueries(place_id, thread_data->conn, &querySet);
552         }
553
554         while(!done)
555         {
556             paramPlaceID = PGint64(place_id);
557             paramValues[0] = (char *)&paramPlaceID;
558             paramLengths[0] = sizeof(paramPlaceID);
559             paramFormats[0] = 1;
560             if (table == 1) // table=1 for placex
561             {
562                 res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
563             }
564             else // table=0 for osmline
565             {
566                 res = PQexecPrepared(thread_data->conn, "index_osmline", 1, paramValues, paramLengths, paramFormats, 1);
567             }
568             if (PQresultStatus(res) == PGRES_COMMAND_OK)
569                 done = 1;
570             else
571             {
572                 if (!strncmp(PQerrorMessage(thread_data->conn), "ERROR:  deadlock detected", 25))
573                 {
574                     if (table == 1)
575                     {
576                         fprintf(stderr, "index_placex: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
577                     }
578                     else
579                     {
580                         fprintf(stderr, "index_osmline: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
581                     }
582                     PQclear(res);
583                     sleep(rand() % 10);
584                 }
585                 else
586                 {
587                     if (table == 1)
588                     {
589                         fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
590                     }
591                     else
592                     {
593                         fprintf(stderr, "index_osmline: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
594                     }
595                     PQclear(res);
596                     exit(EXIT_FAILURE);
597                 }
598             }
599         }
600         PQclear(res);
601         if (difftime(time(0), updateStartTime) > 1) fprintf(stderr, "  Slow place_id %ld\n", place_id);
602
603         if (thread_data->writer)
604         {
605             nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex, &querySet);
606             nominatim_exportFreeQueries(&querySet);
607         }
608     }
609
610     return NULL;
611 }