1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 A custom type that implements a simple key-value store of strings.
10 from typing import Any
12 import sqlalchemy as sa
13 from sqlalchemy.ext.compiler import compiles
14 from sqlalchemy.dialects.postgresql import HSTORE
15 from sqlalchemy.dialects.sqlite import JSON as sqlite_json
17 from ...typing import SaDialect, SaColumn
21 class KeyValueStore(sa.types.TypeDecorator[Any]):
22 """ Dialect-independent type of a simple key-value store of strings.
27 def load_dialect_impl(self, dialect: SaDialect) -> sa.types.TypeEngine[Any]:
28 if dialect.name == 'postgresql':
29 return HSTORE() # type: ignore[no-untyped-call]
31 return sqlite_json(none_as_null=True)
34 class comparator_factory(sa.types.UserDefinedType.Comparator): # type: ignore[type-arg]
36 def merge(self, other: SaColumn) -> 'sa.Operators':
37 """ Merge the values from the given KeyValueStore into this
38 one, overwriting values where necessary. When the argument
39 is null, nothing happens.
41 return KeyValueConcat(self.expr, other)
44 class KeyValueConcat(sa.sql.expression.FunctionElement[Any]):
45 """ Return the merged key-value store from the input parameters.
47 type = KeyValueStore()
51 @compiles(KeyValueConcat) # type: ignore[no-untyped-call, misc]
52 def default_json_concat(element: KeyValueConcat, compiler: 'sa.Compiled', **kw: Any) -> str:
53 arg1, arg2 = list(element.clauses)
54 return "(%s || coalesce(%s, ''::hstore))" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
56 @compiles(KeyValueConcat, 'sqlite') # type: ignore[no-untyped-call, misc]
57 def sqlite_json_concat(element: KeyValueConcat, compiler: 'sa.Compiled', **kw: Any) -> str:
58 arg1, arg2 = list(element.clauses)
59 return "json_patch(%s, coalesce(%s, '{}'))" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))