]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/index.c
Merge pull request #629 from melvyn-sopacua/release-2.6.0a1
[nominatim.git] / nominatim / index.c
1 /*
2  * triggers indexing (reparenting etc.) through setting resetting indexed_status: update placex/osmline set indexed_status = 0 where indexed_status > 0
3  * triggers placex_update and osmline_update
4 */
5
6 #include <stdio.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <assert.h>
11 #include <pthread.h>
12 #include <time.h>
13 #include <stdint.h>
14
15 #include <libpq-fe.h>
16
17 #include "nominatim.h"
18 #include "index.h"
19 #include "export.h"
20 #include "postgresql.h"
21
22 extern int verbose;
23
24 void run_indexing(int rank, int interpolation, PGconn *conn, int num_threads, 
25 struct index_thread_data * thread_data, const char *structuredoutputfile)
26 {
27     int tuples, count, sleepcount;
28     pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
29     
30     time_t rankStartTime;
31     int rankTotalTuples;
32     int rankCountTuples;
33     float rankPerSecond;
34     
35     PGresult * resSectors;
36     PGresult * resPlaces;
37     PGresult * resNULL;
38     
39     int i;
40     int iSector;
41     int iResult;
42     
43     const char *paramValues[2];
44     int         paramLengths[2];
45     int         paramFormats[2];
46     uint32_t    paramRank;
47     uint32_t    paramSector;
48     uint32_t    sector;
49     
50     xmlTextWriterPtr writer;
51     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
52     
53     // Create the output file
54     writer = NULL;
55     if (structuredoutputfile)
56     {
57         writer = nominatim_exportXMLStart(structuredoutputfile);
58     }
59     
60     if (interpolation)
61     {
62         fprintf(stderr, "Starting interpolation lines (location_property_osmline)\n");
63     }
64     else
65     {
66         fprintf(stderr, "Starting rank %d\n", rank);
67     }
68     
69     rankCountTuples = 0;
70     rankPerSecond = 0;
71
72     paramRank = PGint32(rank);
73     paramValues[0] = (char *)&paramRank;
74     paramLengths[0] = sizeof(paramRank);
75     paramFormats[0] = 1;
76     
77     if (interpolation)
78     {
79         resSectors = PQexecPrepared(conn, "index_sectors_osmline", 0, NULL, 0, NULL, 1);
80     }
81     else
82     {
83         resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
84     }
85     if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
86     {
87         fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
88         PQclear(resSectors);
89         exit(EXIT_FAILURE);
90     }
91     if (PQftype(resSectors, 0) != PG_OID_INT4)
92     {
93         fprintf(stderr, "Sector value has unexpected type\n");
94         PQclear(resSectors);
95         exit(EXIT_FAILURE);
96     }
97     if (PQftype(resSectors, 1) != PG_OID_INT8)
98     {
99         fprintf(stderr, "Sector value has unexpected type\n");
100         PQclear(resSectors);
101         exit(EXIT_FAILURE);
102     }
103     
104     rankTotalTuples = 0;
105     for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
106     {
107         rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
108     }
109
110     rankStartTime = time(0);
111     for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
112     {
113         if (iSector > 0)
114         {
115             resPlaces = PQgetResult(conn);
116             if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
117             {
118                 fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
119                 PQclear(resPlaces);
120                 exit(EXIT_FAILURE);
121             }
122             if (PQftype(resPlaces, 0) != PG_OID_INT8)
123             {
124                 fprintf(stderr, "Place_id value has unexpected type\n");
125                 PQclear(resPlaces);
126                 exit(EXIT_FAILURE);
127             }
128             resNULL = PQgetResult(conn);
129             if (resNULL != NULL)
130             {
131                 fprintf(stderr, "Unexpected non-null response\n");
132                 exit(EXIT_FAILURE);
133             }
134         }
135
136         if (iSector < PQntuples(resSectors))
137         {
138             sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
139 //                fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
140
141             // Get all the place_id's for this sector
142             paramRank = PGint32(rank);
143             paramSector = PGint32(sector);
144             if (rankTotalTuples-rankCountTuples < num_threads*1000)
145             {
146                 // no sectors
147                 if (interpolation)
148                 {
149                     iResult = PQsendQueryPrepared(conn, "index_nosector_places_osmline", 0, NULL, 0, NULL, 1);
150                 }
151                 else
152                 {
153                     paramValues[0] = (char *)&paramRank;
154                     paramLengths[0] = sizeof(paramRank);
155                     paramFormats[0] = 1;
156                     iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
157                 }
158             }
159             else
160             {
161                 if (interpolation)
162                 {
163                     iResult = PQsendQueryPrepared(conn, "index_sector_places_osmline", 1, paramValues, paramLengths, paramFormats, 1);
164                     paramValues[0] = (char *)&paramSector;
165                     paramLengths[0] = sizeof(paramSector);
166                     paramFormats[0] = 1;
167                 }
168                 else
169                 {
170                     paramValues[0] = (char *)&paramRank;
171                     paramLengths[0] = sizeof(paramRank);
172                     paramFormats[0] = 1;
173                     paramValues[1] = (char *)&paramSector;
174                     paramLengths[1] = sizeof(paramSector);
175                     paramFormats[1] = 1;
176                     iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
177                 }
178             }
179             if (!iResult)
180             {
181                 fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
182                 PQclear(resPlaces);
183                 exit(EXIT_FAILURE);
184             }
185         }
186         if (iSector > 0)
187         {
188             count = 0;
189             rankPerSecond = 0;
190             tuples = PQntuples(resPlaces);
191
192             if (tuples > 0)
193             {
194                 // Spawn threads
195                 for (i = 0; i < num_threads; i++)
196                 {
197                     thread_data[i].res = resPlaces;
198                     thread_data[i].tuples = tuples;
199                     thread_data[i].count = &count;
200                     thread_data[i].count_mutex = &count_mutex;
201                     thread_data[i].writer = writer;
202                     thread_data[i].writer_mutex = &writer_mutex;
203                     if (interpolation)
204                     {
205                         thread_data[i].table = 0;  // use interpolations table
206                     }
207                     else
208                     {
209                         thread_data[i].table = 1;  // use placex table
210                     }
211                     pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
212                 }
213
214                 // Monitor threads to give user feedback
215                 sleepcount = 0;
216                 while (count < tuples)
217                 {
218                     usleep(1000);
219
220                     // Aim for one update per second
221                     if (sleepcount++ > 500)
222                     {
223                         rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
224                         if(interpolation)
225                         {
226                             fprintf(stderr, "  Done %i in %i @ %f per second - Interpolation lines ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
227                         }
228                         else
229                         {
230                             fprintf(stderr, "  Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
231                         }
232                         
233                         sleepcount = 0;
234                     }
235                 }
236
237                 // Wait for everything to finish
238                 for (i = 0; i < num_threads; i++)
239                 {
240                     pthread_join(thread_data[i].thread, NULL);
241                 }
242
243                 rankCountTuples += tuples;
244             }
245
246             // Finished sector
247             rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
248             fprintf(stderr, "  Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
249
250             PQclear(resPlaces);
251         }
252         if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors))
253         {
254             iSector = PQntuples(resSectors) - 1;
255         }
256     }
257     // Finished rank
258     fprintf(stderr, "\r  Done %i in %i @ %f per second - FINISHED\n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
259
260     PQclear(resSectors);
261 }
262
263 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
264 {
265     struct index_thread_data * thread_data;
266
267     PGconn *conn;
268     PGresult * res;
269
270     int rank;
271     
272     int i;
273
274     xmlTextWriterPtr writer;
275     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
276
277     Oid pg_prepare_params[2];
278
279     conn = PQconnectdb(conninfo);
280     if (PQstatus(conn) != CONNECTION_OK)
281     {
282         fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
283         exit(EXIT_FAILURE);
284     }
285
286     pg_prepare_params[0] = PG_OID_INT4;
287     res = PQprepare(conn, "index_sectors",
288                     "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
289                     1, pg_prepare_params);
290     if (PQresultStatus(res) != PGRES_COMMAND_OK)
291     {
292         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
293         exit(EXIT_FAILURE);
294     }
295     PQclear(res);
296     
297     res = PQprepare(conn, "index_sectors_osmline",
298                     "select geometry_sector,count(*) from location_property_osmline where indexed_status > 0 group by geometry_sector order by geometry_sector",
299                     0, NULL);
300     if (PQresultStatus(res) != PGRES_COMMAND_OK)
301     {
302         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
303         exit(EXIT_FAILURE);
304     }
305     PQclear(res);
306
307     pg_prepare_params[0] = PG_OID_INT4;
308     res = PQprepare(conn, "index_nosectors",
309                     "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
310                     1, pg_prepare_params);
311     if (PQresultStatus(res) != PGRES_COMMAND_OK)
312     {
313         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
314         exit(EXIT_FAILURE);
315     }
316     PQclear(res);
317
318     pg_prepare_params[0] = PG_OID_INT4;
319     pg_prepare_params[1] = PG_OID_INT4;
320     res = PQprepare(conn, "index_sector_places",
321                     "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
322                     2, pg_prepare_params);
323     if (PQresultStatus(res) != PGRES_COMMAND_OK)
324     {
325         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
326         exit(EXIT_FAILURE);
327     }
328     PQclear(res);
329
330     pg_prepare_params[0] = PG_OID_INT4;
331     res = PQprepare(conn, "index_nosector_places",
332                     "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
333                     1, pg_prepare_params);
334     if (PQresultStatus(res) != PGRES_COMMAND_OK)
335     {
336         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
337         exit(EXIT_FAILURE);
338     }
339     PQclear(res);
340     
341     pg_prepare_params[0] = PG_OID_INT4;
342     res = PQprepare(conn, "index_sector_places_osmline",
343                     "select place_id from location_property_osmline where geometry_sector = $1 and indexed_status > 0",
344                     1, pg_prepare_params);
345     if (PQresultStatus(res) != PGRES_COMMAND_OK)
346     {
347         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
348         exit(EXIT_FAILURE);
349     }
350     PQclear(res);
351     
352     res = PQprepare(conn, "index_nosector_places_osmline",
353                     "select place_id from location_property_osmline where indexed_status > 0 order by geometry_sector",
354                     0, NULL);
355     if (PQresultStatus(res) != PGRES_COMMAND_OK)
356     {
357         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
358         exit(EXIT_FAILURE);
359     }
360     PQclear(res);
361     
362     // Build the data for each thread
363     thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
364     for (i = 0; i < num_threads; i++)
365     {
366         thread_data[i].conn = PQconnectdb(conninfo);
367         if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
368         {
369             fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
370             exit(EXIT_FAILURE);
371         }
372
373         pg_prepare_params[0] = PG_OID_INT8;
374         res = PQprepare(thread_data[i].conn, "index_placex",
375                         "update placex set indexed_status = 0 where place_id = $1",
376                         1, pg_prepare_params);
377         if (PQresultStatus(res) != PGRES_COMMAND_OK)
378         {
379             fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(thread_data[i].conn));
380             exit(EXIT_FAILURE);
381         }
382         PQclear(res);
383         
384         pg_prepare_params[0] = PG_OID_INT8;
385         res = PQprepare(thread_data[i].conn, "index_osmline",
386                         "update location_property_osmline set indexed_status = 0 where place_id = $1",
387                         1, pg_prepare_params);
388         if (PQresultStatus(res) != PGRES_COMMAND_OK)
389         {
390             fprintf(stderr, "Failed preparing index_osmline: %s\n", PQerrorMessage(thread_data[i].conn));
391             exit(EXIT_FAILURE);
392         }
393         PQclear(res);
394
395         // Make sure the error message is not localized as we parse it later.
396         res = PQexec(thread_data[i].conn, "SET lc_messages TO 'C'");
397         if (PQresultStatus(res) != PGRES_COMMAND_OK)
398         {
399             fprintf(stderr, "Failed to set langauge: %s\n", PQerrorMessage(thread_data[i].conn));
400             exit(EXIT_FAILURE);
401         }
402         PQclear(res);
403
404         nominatim_exportCreatePreparedQueries(thread_data[i].conn);
405     }
406
407
408     fprintf(stderr, "Starting indexing rank (%i to %i) using %i threads\n", rank_min, rank_max, num_threads);
409
410     for (rank = rank_min; rank <= rank_max; rank++)
411     {
412         // OSMLINE: do reindexing (=> reparenting) for interpolation lines at rank 30, but before all other objects of rank 30
413         // reason: houses (rank 30) depend on the updated interpolation line, when reparenting (see placex_update in functions.sql)
414         if (rank == 30)
415         {
416             run_indexing(rank, 1, conn, num_threads, thread_data, structuredoutputfile);
417         }
418         run_indexing(rank, 0, conn, num_threads, thread_data, structuredoutputfile);
419     }
420         // Close all connections
421         for (i = 0; i < num_threads; i++)
422         {
423                 PQfinish(thread_data[i].conn);
424         }
425         PQfinish(conn);
426 }
427
428 void *nominatim_indexThread(void * thread_data_in)
429 {
430     struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
431     struct export_data  querySet;
432
433     PGresult   *res;
434
435     const char  *paramValues[1];
436     int         paramLengths[1];
437     int         paramFormats[1];
438     uint64_t    paramPlaceID;
439     uint64_t    place_id;
440     time_t      updateStartTime;
441     uint        table;
442     
443     table = (uint)(thread_data->table);
444
445     while (1)
446     {
447         pthread_mutex_lock( thread_data->count_mutex );
448         if (*(thread_data->count) >= thread_data->tuples)
449         {
450             pthread_mutex_unlock( thread_data->count_mutex );
451             break;
452         }
453
454         place_id = PGint64(*((uint64_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
455         (*thread_data->count)++;
456
457         pthread_mutex_unlock( thread_data->count_mutex );
458
459         if (verbose) fprintf(stderr, "  Processing place_id %ld\n", place_id);
460
461         updateStartTime = time(0);
462         int done = 0;
463
464         if (thread_data->writer)
465         {
466              nominatim_exportPlaceQueries(place_id, thread_data->conn, &querySet);
467         }
468
469         while(!done)
470         {
471             paramPlaceID = PGint64(place_id);
472             paramValues[0] = (char *)&paramPlaceID;
473             paramLengths[0] = sizeof(paramPlaceID);
474             paramFormats[0] = 1;
475             if (table == 1) // table=1 for placex
476             {
477                 res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
478             }
479             else // table=0 for osmline
480             {
481                 res = PQexecPrepared(thread_data->conn, "index_osmline", 1, paramValues, paramLengths, paramFormats, 1);
482             }
483             if (PQresultStatus(res) == PGRES_COMMAND_OK)
484                 done = 1;
485             else
486             {
487                 if (!strncmp(PQerrorMessage(thread_data->conn), "ERROR:  deadlock detected", 25))
488                 {
489                     if (table == 1)
490                     {
491                         fprintf(stderr, "index_placex: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
492                     }
493                     else
494                     {
495                         fprintf(stderr, "index_osmline: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
496                     }
497                     PQclear(res);
498                     sleep(rand() % 10);
499                 }
500                 else
501                 {
502                     if (table == 1)
503                     {
504                         fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
505                     }
506                     else
507                     {
508                         fprintf(stderr, "index_osmline: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
509                     }
510                     PQclear(res);
511                     exit(EXIT_FAILURE);
512                 }
513             }
514         }
515         PQclear(res);
516         if (difftime(time(0), updateStartTime) > 1) fprintf(stderr, "  Slow place_id %ld\n", place_id);
517
518         if (thread_data->writer)
519         {
520             nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex, &querySet);
521             nominatim_exportFreeQueries(&querySet);
522         }
523     }
524
525     return NULL;
526 }