"""
Implementation of classes for API access via libraries.
"""
-from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
+from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple, cast
import asyncio
import sys
import contextlib
raise UsageError(f"SQlite database '{params.get('dbname')}' does not exist.")
else:
dsn = self.config.get_database_params()
- query = {k: v for k, v in dsn.items()
+ query = {k: str(v) for k, v in dsn.items()
if k not in ('user', 'password', 'dbname', 'host', 'port')}
dburl = sa.engine.URL.create(
f'postgresql+{PGCORE_LIB}',
- database=dsn.get('dbname'),
- username=dsn.get('user'),
- password=dsn.get('password'),
- host=dsn.get('host'),
- port=int(dsn['port']) if 'port' in dsn else None,
+ database=cast(str, dsn.get('dbname')),
+ username=cast(str, dsn.get('user')),
+ password=cast(str, dsn.get('password')),
+ host=cast(str, dsn.get('host')),
+ port=int(cast(str, dsn['port'])) if 'port' in dsn else None,
query=query)
engine = sa_asyncio.create_async_engine(dburl, **extra_args)