use manifest.json instead of loading init file for plugin registration
This commit is contained in:
@@ -23,6 +23,7 @@
|
||||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
import json
|
||||
import os
|
||||
import pkgutil
|
||||
import importlib.util
|
||||
@@ -31,6 +32,7 @@ import threading
|
||||
import traceback
|
||||
import sys
|
||||
import aiohttp
|
||||
import zipfile as zipfile_lib
|
||||
|
||||
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
|
||||
Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
|
||||
@@ -58,6 +60,7 @@ _logger = get_logger(__name__)
|
||||
plugin_loaders = {}
|
||||
hook_names = set()
|
||||
hooks = {}
|
||||
_root_permission_cache = {}
|
||||
|
||||
|
||||
class Plugins(DaemonThread):
|
||||
@@ -71,11 +74,11 @@ class Plugins(DaemonThread):
|
||||
self.cmd_only = cmd_only # type: bool
|
||||
self.internal_plugin_metadata = {}
|
||||
self.external_plugin_metadata = {}
|
||||
self.loaded_command_modules = set() # type: set[str]
|
||||
if cmd_only:
|
||||
# only import the command modules of plugins
|
||||
Logger.__init__(self)
|
||||
self.find_plugins()
|
||||
self.load_plugins()
|
||||
return
|
||||
DaemonThread.__init__(self)
|
||||
self.device_manager = DeviceMgr(config)
|
||||
@@ -101,46 +104,32 @@ class Plugins(DaemonThread):
|
||||
# we exclude the ones packaged as *code*, here:
|
||||
if loader.__class__.__qualname__ == "PyiFrozenImporter":
|
||||
continue
|
||||
if self.cmd_only and self.config.get('enable_plugin_' + name) is not True:
|
||||
module_path = os.path.join(pkg_path, name)
|
||||
if external and not self._has_recursive_root_permissions(module_path):
|
||||
self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
|
||||
continue
|
||||
base_name = 'electrum.plugins' if not external else 'electrum_external_plugins'
|
||||
full_name = f'{base_name}.{name}'
|
||||
if external:
|
||||
module_path = os.path.join(pkg_path, name)
|
||||
if not self._has_recursive_root_permissions(module_path):
|
||||
self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
|
||||
continue
|
||||
module_path = os.path.join(module_path, '__init__.py')
|
||||
if not os.path.exists(module_path):
|
||||
continue
|
||||
spec = importlib.util.spec_from_file_location(full_name, module_path)
|
||||
else:
|
||||
spec = importlib.util.find_spec(full_name)
|
||||
if spec is None:
|
||||
if self.cmd_only:
|
||||
continue # no commands module in this plugin
|
||||
raise Exception(f"Error pre-loading {full_name}: no spec")
|
||||
module = self.exec_module_from_spec(spec, full_name)
|
||||
if self.cmd_only:
|
||||
assert name not in self.loaded_command_modules, f"tried to load commands of {name} twice"
|
||||
self.loaded_command_modules.add(name)
|
||||
if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
|
||||
continue
|
||||
try:
|
||||
with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
|
||||
d = json.load(f)
|
||||
except FileNotFoundError:
|
||||
self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
|
||||
continue
|
||||
d = module.__dict__
|
||||
if 'fullname' not in d:
|
||||
continue
|
||||
d['display_name'] = d['fullname']
|
||||
gui_good = self.gui_name in d.get('available_for', [])
|
||||
if not gui_good:
|
||||
continue
|
||||
details = d.get('registers_wallet_type')
|
||||
if details:
|
||||
self.register_wallet_type(name, gui_good, details)
|
||||
details = d.get('registers_keystore')
|
||||
if details:
|
||||
self.register_keystore(name, gui_good, details)
|
||||
if d.get('requires_wallet_type'):
|
||||
# trustedcoin will not be added to list
|
||||
continue
|
||||
d['path'] = module_path
|
||||
if not self.cmd_only:
|
||||
gui_good = self.gui_name in d.get('available_for', [])
|
||||
if not gui_good:
|
||||
continue
|
||||
details = d.get('registers_wallet_type')
|
||||
if details:
|
||||
self.register_wallet_type(name, gui_good, details)
|
||||
details = d.get('registers_keystore')
|
||||
if details:
|
||||
self.register_keystore(name, gui_good, details)
|
||||
if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
|
||||
_logger.info(f"Found the following plugin modules: {iter_modules=}")
|
||||
raise Exception(f"duplicate plugins? for {name=}")
|
||||
@@ -174,7 +163,10 @@ class Plugins(DaemonThread):
|
||||
for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
|
||||
if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
|
||||
try:
|
||||
self.load_plugin_by_name(name)
|
||||
if self.cmd_only: # only load init method to register commands
|
||||
self.maybe_load_plugin_init_method(name)
|
||||
else:
|
||||
self.load_plugin_by_name(name)
|
||||
except BaseException as e:
|
||||
self.logger.exception(f"cannot initialize plugin {name}: {e}")
|
||||
|
||||
@@ -184,12 +176,17 @@ class Plugins(DaemonThread):
|
||||
@profiler(min_threshold=0.5)
|
||||
def _has_recursive_root_permissions(self, path):
|
||||
"""Check if a directory and all its subdirectories have root permissions"""
|
||||
global _root_permission_cache
|
||||
if _root_permission_cache.get(path) is not None:
|
||||
return _root_permission_cache[path]
|
||||
_root_permission_cache[path] = False
|
||||
for root, dirs, files in os.walk(path):
|
||||
if not self._has_root_permissions(root):
|
||||
return False
|
||||
for f in files:
|
||||
if not self._has_root_permissions(os.path.join(root, f)):
|
||||
return False
|
||||
_root_permission_cache[path] = True
|
||||
return True
|
||||
|
||||
def get_external_plugin_dir(self):
|
||||
@@ -237,23 +234,25 @@ class Plugins(DaemonThread):
|
||||
raise Exception(f"duplicate plugins for name={name}")
|
||||
if self.cmd_only and not self.config.get('enable_plugin_' + name):
|
||||
continue
|
||||
module_path = f'electrum_external_plugins.{name}' if external else f'electrum.plugins.{name}'
|
||||
spec = zipfile.find_spec(name)
|
||||
module = self.exec_module_from_spec(spec, module_path)
|
||||
if self.cmd_only:
|
||||
assert name not in self.loaded_command_modules, f"tried to load commands of {name} twice"
|
||||
self.loaded_command_modules.add(name)
|
||||
continue
|
||||
d = module.__dict__
|
||||
gui_good = self.gui_name in d.get('available_for', [])
|
||||
if not gui_good:
|
||||
try:
|
||||
with zipfile_lib.ZipFile(path) as file:
|
||||
manifest_path = os.path.join(name, 'manifest.json')
|
||||
with file.open(manifest_path, 'r') as f:
|
||||
d = json.load(f)
|
||||
except Exception:
|
||||
self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
|
||||
continue
|
||||
d['filename'] = filename
|
||||
if 'fullname' not in d:
|
||||
continue
|
||||
d['display_name'] = d['fullname']
|
||||
d['zip_hash_sha256'] = get_file_hash256(path)
|
||||
d['is_zip'] = True
|
||||
d['path'] = path
|
||||
if not self.cmd_only:
|
||||
gui_good = self.gui_name in d.get('available_for', [])
|
||||
if not gui_good:
|
||||
continue
|
||||
if 'fullname' not in d:
|
||||
continue
|
||||
d['display_name'] = d['fullname']
|
||||
d['zip_hash_sha256'] = get_file_hash256(path)
|
||||
if external:
|
||||
self.external_plugin_metadata[name] = d
|
||||
else:
|
||||
@@ -274,11 +273,35 @@ class Plugins(DaemonThread):
|
||||
else:
|
||||
raise Exception(f"could not find plugin {name!r}")
|
||||
|
||||
def load_plugin_by_name(self, name) -> 'BasePlugin':
|
||||
def maybe_load_plugin_init_method(self, name: str) -> None:
|
||||
"""Loads the __init__.py module of the plugin if it is not already loaded."""
|
||||
is_external = name in self.external_plugin_metadata
|
||||
base_name = (f'electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
|
||||
if base_name not in sys.modules:
|
||||
metadata = self.get_metadata(name)
|
||||
is_zip = metadata.get('is_zip', False)
|
||||
# if the plugin was not enabled on startup the init module hasn't been loaded yet
|
||||
if not is_zip:
|
||||
if is_external:
|
||||
path = os.path.join(metadata['path'], '__init__.py')
|
||||
init_spec = importlib.util.spec_from_file_location(base_name, path)
|
||||
else:
|
||||
init_spec = importlib.util.find_spec(base_name)
|
||||
else:
|
||||
zipfile = zipimport.zipimporter(metadata['path'])
|
||||
init_spec = zipfile.find_spec(name)
|
||||
self.exec_module_from_spec(init_spec, base_name)
|
||||
if name == "trustedcoin":
|
||||
# removes trustedcoin after loading to not show it in the list of plugins
|
||||
del self.internal_plugin_metadata[name]
|
||||
|
||||
def load_plugin_by_name(self, name: str) -> 'BasePlugin':
|
||||
if name in self.plugins:
|
||||
return self.plugins[name]
|
||||
|
||||
is_zip = self.is_plugin_zip(name)
|
||||
# if the plugin was not enabled on startup the init module hasn't been loaded yet
|
||||
self.maybe_load_plugin_init_method(name)
|
||||
|
||||
is_external = name in self.external_plugin_metadata
|
||||
if not is_external:
|
||||
full_name = f'electrum.plugins.{name}.{self.gui_name}'
|
||||
@@ -289,11 +312,7 @@ class Plugins(DaemonThread):
|
||||
if spec is None:
|
||||
raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
|
||||
try:
|
||||
if is_zip:
|
||||
module = self.exec_module_from_spec(spec, full_name)
|
||||
else:
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
module = self.exec_module_from_spec(spec, full_name)
|
||||
plugin = module.Plugin(self, self.config, name)
|
||||
except Exception as e:
|
||||
raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
|
||||
@@ -493,10 +512,9 @@ class BasePlugin(Logger):
|
||||
raise NotImplementedError()
|
||||
|
||||
def read_file(self, filename: str) -> bytes:
|
||||
import zipfile
|
||||
if self.parent.is_plugin_zip(self.name):
|
||||
plugin_filename = self.parent.zip_plugin_path(self.name)
|
||||
with zipfile.ZipFile(plugin_filename) as myzip:
|
||||
with zipfile_lib.ZipFile(plugin_filename) as myzip:
|
||||
with myzip.open(os.path.join(self.name, filename)) as myfile:
|
||||
return myfile.read()
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user