LOG.warning("Initialising replication updates")
with connect(args.config.get_libpq_dsn()) as conn:
- replication.init_replication(conn, base_url=args.config.REPLICATION_URL)
+ replication.init_replication(conn, base_url=args.config.REPLICATION_URL,
+ socket_timeout=args.socket_timeout)
if args.update_functions:
LOG.warning("Create functions")
refresh.create_functions(conn, args.config, True, False)
from ..tools import replication
with connect(args.config.get_libpq_dsn()) as conn:
- return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL)
+ return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL,
+ socket_timeout=args.socket_timeout)
def _report_update(self, batchdate: dt.datetime,
if not args.do_index:
LOG.fatal("Indexing cannot be disabled when running updates continuously.")
raise UsageError("Bad argument '--no-index'.")
- recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
+ recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, args.threads or 1)
+ dsn = args.config.get_libpq_dsn()
+
while True:
- with connect(args.config.get_libpq_dsn()) as conn:
- start = dt.datetime.now(dt.timezone.utc)
- state = replication.update(conn, params)
+ start = dt.datetime.now(dt.timezone.utc)
+ state = replication.update(dsn, params, socket_timeout=args.socket_timeout)
+
+ with connect(dsn) as conn:
if state is not replication.UpdateState.NO_CHANGES:
status.log_status(conn, start, 'import')
batchdate, _, _ = status.get_status(conn)
index_start = dt.datetime.now(dt.timezone.utc)
indexer.index_full(analyse=False)
- with connect(args.config.get_libpq_dsn()) as conn:
+ with connect(dsn) as conn:
status.set_indexed(conn, True)
status.log_status(conn, index_start, 'index')
conn.commit()