"""
def __init__(self, options):
+ self.current_query = None
+ self.current_params = None
+
+ self.conn = None
+ self.connect()
+
+ def connect(self):
+ if self.conn is not None:
+ self.cursor.close()
+ self.conn.close()
+
self.conn = make_connection(options, asynchronous=True)
self.wait()
self.cursor = self.conn.cursor()
- self.current_query = None
- self.current_params = None
-
def wait(self):
""" Block until any pending operation is done.
"""
sending a query.
"""
ready = self.threads
+ command_stat = 0
while True:
for thread in ready:
if thread.is_done():
+ command_stat += 1
yield thread
- ready, _, _ = select.select(self.threads, [], [])
+ # refresh the connections occasionaly to avoid potential
+ # memory leaks in Postgresql.
+ if command_stat > 100000:
+ for t in self.threads:
+ while not t.is_done():
+ wait_select(t.conn)
+ t.connect()
+ command_stat = 0
+ ready = self.threads
+ else:
+ ready, _, _ = select.select(self.threads, [], [])
assert(False, "Unreachable code")