Files
pallectrum/electrum/sql_db.py

58 lines
1.8 KiB
Python
Raw Normal View History

2019-03-06 09:56:22 +01:00
import os
import concurrent
import queue
import threading
import asyncio
2019-06-27 09:03:34 +02:00
import sqlite3
2019-03-06 09:56:22 +01:00
from .logging import Logger
2019-03-06 09:56:22 +01:00
def sql(func):
"""wrapper for sql methods"""
def wrapper(self, *args, **kwargs):
assert threading.currentThread() != self.sql_thread
f = asyncio.Future()
2019-03-06 09:56:22 +01:00
self.db_requests.put((f, func, args, kwargs))
return f
2019-03-06 09:56:22 +01:00
return wrapper
class SqlDB(Logger):
2019-03-06 09:56:22 +01:00
2019-06-27 09:03:34 +02:00
def __init__(self, network, path, commit_interval=None):
Logger.__init__(self)
2019-03-06 09:56:22 +01:00
self.network = network
self.path = path
self.commit_interval = commit_interval
2019-03-06 09:56:22 +01:00
self.db_requests = queue.Queue()
self.sql_thread = threading.Thread(target=self.run_sql)
self.sql_thread.start()
def run_sql(self):
self.logger.info("SQL thread started")
2019-06-27 09:03:34 +02:00
self.conn = sqlite3.connect(self.path)
self.logger.info("Creating database")
self.create_database()
i = 0
2019-03-06 09:56:22 +01:00
while self.network.asyncio_loop.is_running():
try:
future, func, args, kwargs = self.db_requests.get(timeout=0.1)
except queue.Empty:
continue
try:
result = func(self, *args, **kwargs)
except BaseException as e:
future.set_exception(e)
continue
if not future.cancelled():
future.set_result(result)
# note: in sweepstore session.commit() is called inside
# the sql-decorated methods, so commiting to disk is awaited
if self.commit_interval:
i = (i + 1) % self.commit_interval
if i == 0:
2019-06-27 09:03:34 +02:00
self.conn.commit()
2019-03-06 09:56:22 +01:00
# write
2019-06-27 09:03:34 +02:00
self.conn.commit()
self.logger.info("SQL thread terminated")