execution functions.
"""
- def execute(self, query, args=None): # pylint: disable=W0221
+ # pylint: disable=arguments-renamed,arguments-differ
+ def execute(self, query, args=None):
""" Query execution that logs the SQL query when debugging is enabled.
"""
LOG.debug(self.mogrify(query, args).decode('utf-8'))
cmd.extend(('-v', 'ON_ERROR_STOP=1'))
if not LOG.isEnabledFor(logging.INFO):
cmd.append('--quiet')
- proc = subprocess.Popen(cmd, env=get_pg_env(dsn), stdin=subprocess.PIPE)
-
- try:
- if not LOG.isEnabledFor(logging.INFO):
- proc.stdin.write('set client_min_messages to WARNING;'.encode('utf-8'))
-
- if pre_code:
- proc.stdin.write((pre_code + ';').encode('utf-8'))
-
- if fname.suffix == '.gz':
- with gzip.open(str(fname), 'rb') as fdesc:
- remain = _pipe_to_proc(proc, fdesc)
- else:
- with fname.open('rb') as fdesc:
- remain = _pipe_to_proc(proc, fdesc)
-
- if remain == 0 and post_code:
- proc.stdin.write((';' + post_code).encode('utf-8'))
- finally:
- proc.stdin.close()
- ret = proc.wait()
+
+ with subprocess.Popen(cmd, env=get_pg_env(dsn), stdin=subprocess.PIPE) as proc:
+ try:
+ if not LOG.isEnabledFor(logging.INFO):
+ proc.stdin.write('set client_min_messages to WARNING;'.encode('utf-8'))
+
+ if pre_code:
+ proc.stdin.write((pre_code + ';').encode('utf-8'))
+
+ if fname.suffix == '.gz':
+ with gzip.open(str(fname), 'rb') as fdesc:
+ remain = _pipe_to_proc(proc, fdesc)
+ else:
+ with fname.open('rb') as fdesc:
+ remain = _pipe_to_proc(proc, fdesc)
+
+ if remain == 0 and post_code:
+ proc.stdin.write((';' + post_code).encode('utf-8'))
+ finally:
+ proc.stdin.close()
+ ret = proc.wait()
if ret != 0 or remain > 0:
raise UsageError("Failed to execute SQL file.")
def __init__(self, country):
self.country = country
- self.collected = dict()
+ self.collected = {}
def add(self, postcode, x, y):
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)
- return open(fname, 'r')
+ return open(fname, 'r', encoding='utf-8')
fname = project_dir / f'{self.country}_postcodes.csv.gz'
"""
phrases = set()
- with open(self.csv_path) as file:
- reader = csv.DictReader(file, delimiter=',')
+ with open(self.csv_path, encoding='utf-8') as fd:
+ reader = csv.DictReader(fd, delimiter=',')
for row in reader:
phrases.add(
SpecialPhrase(row['phrase'], row['class'], row['type'], row['operator'])