]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/index.c
fix line end
[nominatim.git] / nominatim / index.c
1 /*
2 */
3
4 #include <stdio.h>
5 #include <unistd.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <assert.h>
9 #include <pthread.h>
10 #include <time.h>
11 #include <stdint.h>
12
13 #include <libpq-fe.h>
14
15 #include "nominatim.h"
16 #include "index.h"
17 #include "export.h"
18 #include "postgresql.h"
19
20 extern int verbose;
21
22 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
23 {
24     struct index_thread_data * thread_data;
25     pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
26     int tuples, count, sleepcount;
27
28     time_t rankStartTime;
29     int rankTotalTuples;
30     int rankCountTuples;
31     float rankPerSecond;
32
33     PGconn *conn;
34     PGresult * res;
35     PGresult * resSectors;
36     PGresult * resPlaces;
37     PGresult * resNULL;
38
39     int rank;
40     int i;
41     int iSector;
42     int iResult;
43
44     const char *paramValues[2];
45     int         paramLengths[2];
46     int         paramFormats[2];
47     uint32_t    paramRank;
48     uint32_t    paramSector;
49     uint32_t    sector;
50
51     xmlTextWriterPtr writer;
52     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
53
54     Oid pg_prepare_params[2];
55
56     conn = PQconnectdb(conninfo);
57     if (PQstatus(conn) != CONNECTION_OK)
58     {
59         fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
60         exit(EXIT_FAILURE);
61     }
62
63     pg_prepare_params[0] = PG_OID_INT4;
64     res = PQprepare(conn, "index_sectors",
65                     "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
66                     1, pg_prepare_params);
67     if (PQresultStatus(res) != PGRES_COMMAND_OK)
68     {
69         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
70         exit(EXIT_FAILURE);
71     }
72     PQclear(res);
73
74     pg_prepare_params[0] = PG_OID_INT4;
75     res = PQprepare(conn, "index_nosectors",
76                     "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
77                     1, pg_prepare_params);
78     if (PQresultStatus(res) != PGRES_COMMAND_OK)
79     {
80         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
81         exit(EXIT_FAILURE);
82     }
83     PQclear(res);
84
85     pg_prepare_params[0] = PG_OID_INT4;
86     pg_prepare_params[1] = PG_OID_INT4;
87     res = PQprepare(conn, "index_sector_places",
88                     "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
89                     2, pg_prepare_params);
90     if (PQresultStatus(res) != PGRES_COMMAND_OK)
91     {
92         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
93         exit(EXIT_FAILURE);
94     }
95     PQclear(res);
96
97     pg_prepare_params[0] = PG_OID_INT4;
98     res = PQprepare(conn, "index_nosector_places",
99                     "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
100                     1, pg_prepare_params);
101     if (PQresultStatus(res) != PGRES_COMMAND_OK)
102     {
103         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
104         exit(EXIT_FAILURE);
105     }
106     PQclear(res);
107
108     // Build the data for each thread
109     thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
110     for (i = 0; i < num_threads; i++)
111     {
112         thread_data[i].conn = PQconnectdb(conninfo);
113         if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
114         {
115             fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
116             exit(EXIT_FAILURE);
117         }
118
119         pg_prepare_params[0] = PG_OID_INT4;
120         res = PQprepare(thread_data[i].conn, "index_placex",
121                         "update placex set indexed_status = 0 where place_id = $1",
122                         1, pg_prepare_params);
123         if (PQresultStatus(res) != PGRES_COMMAND_OK)
124         {
125             fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn));
126             exit(EXIT_FAILURE);
127         }
128         PQclear(res);
129
130         res = PQexec(thread_data[i].conn, "set enable_seqscan = false");
131         if (PQresultStatus(res) != PGRES_COMMAND_OK)
132         {
133             fprintf(stderr, "Failed disabling sequential scan: %s\n", PQerrorMessage(conn));
134             exit(EXIT_FAILURE);
135         }
136         PQclear(res);
137
138         nominatim_exportCreatePreparedQueries(thread_data[i].conn);
139     }
140
141     // Create the output file
142     writer = NULL;
143     if (structuredoutputfile)
144     {
145         writer = nominatim_exportXMLStart(structuredoutputfile);
146     }
147
148     fprintf(stderr, "Starting indexing rank (%i to %i) using %i treads\n", rank_min, rank_max, num_threads);
149
150     for (rank = rank_min; rank <= rank_max; rank++)
151     {
152         printf("Starting rank %d\n", rank);
153         rankCountTuples = 0;
154         rankPerSecond = 0;
155
156         paramRank = PGint32(rank);
157         paramValues[0] = (char *)&paramRank;
158         paramLengths[0] = sizeof(paramRank);
159         paramFormats[0] = 1;
160 //        if (rank < 16)
161 //            resSectors = PQexecPrepared(conn, "index_nosectors", 1, paramValues, paramLengths, paramFormats, 1);
162 //        else
163             resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
164         if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
165         {
166             fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
167             PQclear(resSectors);
168             exit(EXIT_FAILURE);
169         }
170         if (PQftype(resSectors, 0) != PG_OID_INT4)
171         {
172             fprintf(stderr, "Sector value has unexpected type\n");
173             PQclear(resSectors);
174             exit(EXIT_FAILURE);
175         }
176         if (PQftype(resSectors, 1) != PG_OID_INT8)
177         {
178             fprintf(stderr, "Sector value has unexpected type\n");
179             PQclear(resSectors);
180             exit(EXIT_FAILURE);
181         }
182
183         rankTotalTuples = 0;
184         for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
185         {
186             rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
187         }
188
189         rankStartTime = time(0);
190         for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
191         {
192             if (iSector > 0)
193             {
194                 resPlaces = PQgetResult(conn);
195                 if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
196                 {
197                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
198                     PQclear(resPlaces);
199                     exit(EXIT_FAILURE);
200                 }
201                 if (PQftype(resPlaces, 0) != PG_OID_INT4)
202                 {
203                     fprintf(stderr, "Place_id value has unexpected type\n");
204                     PQclear(resPlaces);
205                     exit(EXIT_FAILURE);
206                 }
207                 resNULL = PQgetResult(conn);
208                 if (resNULL != NULL)
209                 {
210                     fprintf(stderr, "Unexpected non-null response\n");
211                     exit(EXIT_FAILURE);
212                 }
213             }
214
215             if (iSector < PQntuples(resSectors))
216             {
217                 sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
218                 //printf("\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
219
220                 // Get all the place_id's for this sector
221                 paramRank = PGint32(rank);
222                 paramValues[0] = (char *)&paramRank;
223                 paramLengths[0] = sizeof(paramRank);
224                 paramFormats[0] = 1;
225                 paramSector = PGint32(sector);
226                 paramValues[1] = (char *)&paramSector;
227                 paramLengths[1] = sizeof(paramSector);
228                 paramFormats[1] = 1;
229                 if (rankTotalTuples-rankCountTuples < num_threads*20)
230                 {
231                         iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
232                 }
233                 else
234                 {
235                     iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
236                 }
237                 if (!iResult)
238                 {
239                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
240                     PQclear(resPlaces);
241                     exit(EXIT_FAILURE);
242                 }
243             }
244
245             if (iSector > 0)
246             {
247                 count = 0;
248                 rankPerSecond = 0;
249                 tuples = PQntuples(resPlaces);
250
251                 if (tuples > 0)
252                 {
253                     // Spawn threads
254                     for (i = 0; i < num_threads; i++)
255                     {
256                         thread_data[i].res = resPlaces;
257                         thread_data[i].tuples = tuples;
258                         thread_data[i].count = &count;
259                         thread_data[i].count_mutex = &count_mutex;
260                         thread_data[i].writer = writer;
261                         thread_data[i].writer_mutex = &writer_mutex;
262                         pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
263                     }
264
265                     // Monitor threads to give user feedback
266                     sleepcount = 0;
267                     while (count < tuples)
268                     {
269                         usleep(1000);
270
271                         // Aim for one update per second
272                         if (sleepcount++ > 500)
273                         {
274                             rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
275                             printf("  Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
276                             sleepcount = 0;
277                         }
278                     }
279
280                     // Wait for everything to finish
281                     for (i = 0; i < num_threads; i++)
282                     {
283                         pthread_join(thread_data[i].thread, NULL);
284                     }
285
286                     rankCountTuples += tuples;
287                 }
288
289                 // Finished sector
290                 rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
291                 printf("  Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
292
293                 PQclear(resPlaces);
294             }
295             if (rankTotalTuples-rankCountTuples < num_threads*20) iSector = PQntuples(resSectors);
296         }
297         // Finished rank
298         printf("\r  Done %i in %i @ %f per second - FINISHED                      \n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
299
300         PQclear(resSectors);
301     }
302
303     if (writer)
304     {
305         nominatim_exportXMLEnd(writer);
306     }
307 }
308
309 void *nominatim_indexThread(void * thread_data_in)
310 {
311     struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
312
313     PGresult   *res;
314
315     const char *paramValues[1];
316     int         paramLengths[1];
317     int         paramFormats[1];
318     uint32_t    paramPlaceID;
319     uint32_t    place_id;
320     time_t              updateStartTime;
321
322     while (1)
323     {
324         pthread_mutex_lock( thread_data->count_mutex );
325         if (*(thread_data->count) >= thread_data->tuples)
326         {
327             pthread_mutex_unlock( thread_data->count_mutex );
328             break;
329         }
330
331         place_id = PGint32(*((uint32_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
332         (*thread_data->count)++;
333
334         pthread_mutex_unlock( thread_data->count_mutex );
335
336         if (verbose) printf("  Processing place_id %d\n", place_id);
337
338         updateStartTime = time(0);
339         int done = 0;
340         while(!done)
341         {
342                 paramPlaceID = PGint32(place_id);
343                 paramValues[0] = (char *)&paramPlaceID;
344                 paramLengths[0] = sizeof(paramPlaceID);
345                 paramFormats[0] = 1;
346                 res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
347                 if (PQresultStatus(res) == PGRES_COMMAND_OK)
348                         done = 1;
349                 else
350                 {
351                         if (strncmp(PQerrorMessage(thread_data->conn), "ERROR:  deadlock detected", 25))
352                         {
353                             fprintf(stderr, "index_placex: UPDATE failed - deadlock, retrying\n");
354                         }
355                         else
356                         {
357                             fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
358                             PQclear(res);
359                                 sleep(5);
360 //                          exit(EXIT_FAILURE);
361                         }
362                 }
363         }
364         PQclear(res);
365         if (difftime(time(0), updateStartTime) > 1) printf("  Slow place_id %d\n", place_id);
366
367         if (thread_data->writer)
368         {
369             nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex);
370         }
371     }
372
373     return NULL;
374 }