Source code for materials_commons.cli.functions

import datetime
import dateutil
import hashlib
import json
import os
import requests
import sys
import time

import materials_commons.api as mcapi
import materials_commons.api.models as models
from tabulate import tabulate

from materials_commons.cli.exceptions import MCCLIException, MissingRemoteException, \
    MultipleRemoteException, NoDefaultRemoteException
from materials_commons.cli.print_formatter import PrintFormatter, trunc
from materials_commons.cli.sqltable import SqlTable
from materials_commons.cli.user_config import Config, RemoteConfig

# TODO: mcapi.Config, mcapi.Remote, mcapi.RemoteConfig

[docs]def mkdir_if(path): """Convenience function for making a directory, if it does not exist. """ if not os.path.exists(path): os.mkdir(path)
[docs]def remove_if(path): """Convenience function for removing a file, if it exists. """ if os.path.exists(path): os.remove(path)
[docs]def rmdir_if(path): """Convenience function for removing a directory, if it exists. """ if os.path.exists(path): os.rmdir(path)
[docs]def make_file(path, text): """Convenience function for writing "text" to a file at "path". """ with open(path, 'w') as f: f.write(text)
[docs]def remove_hidden_project_files(project_path): """Removes a local project's configuration files and directory""" remove_if(os.path.join(project_path, ".mc", "config.json")) remove_if(os.path.join(project_path, ".mc", "project.db")) rmdir_if(os.path.join(project_path, ".mc"))
[docs]def getit(obj, name, default=None): """Returns the "name" attribute (or default value) whether "obj" is a dict or an object.""" if isinstance(obj, dict): return obj.get(name, default) else: return getattr(obj, name, default)
[docs]def as_is(value): """Returns value without any changes. A placeholder for when a function is needed.""" return value
[docs]def epoch_time(time_value): """Attempts to convert various time representations into s since the epoch Args: time_value: A representation of time. Returns: An integer s since the epoch. Uses the following: +-------------------+------------------------------------------------------------+ | If this type | Then do this conversion | +-------------------+------------------------------------------------------------+ | str | time.mktime(dateutil.parser.parse(time_value).timetuple()) | +-------------------+------------------------------------------------------------+ | float, int | time_value | +-------------------+------------------------------------------------------------+ | datetime.datetime | time.mktime(time_value.timetuple()) | +-------------------+------------------------------------------------------------+ | dict | time_value['epoch_time'] | +-------------------+------------------------------------------------------------+ | None | None | +-------------------+------------------------------------------------------------+ | Else | str(type(time_value)) | +-------------------+------------------------------------------------------------+ """ if isinstance(time_value, str): # expect ISO 8601 str return time.mktime(dateutil.parser.parse(time_value).timetuple()) elif isinstance(time_value, (float, int)): return time_value elif isinstance(time_value, datetime.datetime): return time.mktime(time_value.timetuple()) elif isinstance(time_value, dict) and ('epoch_time' in time_value): return time_value['epoch_time'] elif time_value is None: return None else: return str(type(time_value))
[docs]def format_time(time_value, fmt="%Y %b %d %H:%M:%S"): """Attempts to put various time representations into specified format for printing Args: time_value: A representation of time. fmt (str): Format to use for the return value. Returns: A string representation using the specified format. Uses the following: +-------------------+-------------------------------------------------+ | If this type | Then do this conversion | +-------------------+-------------------------------------------------+ | str | dateutil.parser.parse(time_value).strftime(fmt) | +-------------------+-------------------------------------------------+ | float, int | time.strftime(fmt, time.localtime(time_value)) | +-------------------+-------------------------------------------------+ | datetime.datetime | time.strftime(fmt, time.localtime(time_value)) | +-------------------+-------------------------------------------------+ | dict | format_time(time_value['epoch_time']) | +-------------------+-------------------------------------------------+ | None | "-" | +-------------------+-------------------------------------------------+ | Else | str(type(time_value)) | +-------------------+-------------------------------------------------+ """ if isinstance(time_value, str): # expect ISO 8601 str return dateutil.parser.parse(time_value).strftime(fmt) elif isinstance(time_value, (float, int)): return time.strftime(fmt, time.localtime(time_value)) elif isinstance(time_value, datetime.datetime): return time_value.strftime(fmt) elif isinstance(time_value, dict) and ('epoch_time' in time_value): return format_time(time_value['epoch_time']) elif time_value is None: return '-' else: return str(type(time_value))
[docs]def checksum(path): """Generate MD5 checksum for the file at "path" """ with open(path, 'rb') as f: return hashlib.md5(f.read()).hexdigest()
[docs]def random_name(n=3, max_letters=6, sep='-'): """Generates a random name for "n" words of max "max_letters" length, joined by "sep" """ import random word_file = "/usr/share/dict/words" if os.path.exists(word_file): WORDS = open(word_file).read().splitlines() else: import requests word_site = "http://svnweb.freebsd.org/csrg/share/dict/words?view=co&content-type=text/plain" response = requests.get(word_site) WORDS = response.content.decode("utf-8").splitlines() results=[] count=0 while count<n: word = WORDS[random.randint(0, len(WORDS)-1)] if word[0].isupper(): continue if len(word) > max_letters: continue results.append(word) count += 1 return sep.join(results)
[docs]def add_remote_option(parser, help): """Add the "--remote <email> <url>" cli option to an ArgumentParser - This standardizes the use of the "--remote <email> <url>" cli option which is needed in some contexts, but not others Args: parser (:class:`argparse.ArgumentParser`): The initial ArgumentParser Returns: :class:`argparse.ArgumentParser`: The ArgumentParser, with the added option "--remote <email> <url>". """ parser.add_argument('--remote', nargs=2, metavar=('EMAIL', 'URL'), help=help)
[docs]def optional_remote_config(args): """Return remote configuration parameters specified by cli option "--remote", or the user's default remote Args: args (argparse.Namespace): The result of argparse's "parse_args()" method. Checks for "args.remote". Returns: :class:`user_config.RemoteConfig`: The remote configuration parameters for the appropriate client: either the value specified using the "--remote <email> <url>" option, or else the user's configured default. """ config = Config() if args.remote: email = args.remote[0] url = args.remote[1] remote_config = RemoteConfig(mcurl=url, email=email) if remote_config not in config.remotes: raise MissingRemoteException("Could not find remote: {0} {1}".format(remote_config.email, remote_config.mcurl)) return config.remotes[config.remotes.index(remote_config)] else: if not config.default_remote.mcurl or not config.default_remote.mcapikey: raise NoDefaultRemoteException("Default remote not set") return config.default_remote
[docs]def optional_remote(args, default_client=None): """Return remote specified by cli option "--remote", or the user's default remote Args: args (argparse args): The result of argparse's "parse_args()" method. Checks for "args.remote". default_client (:class:`materials_commons.api.Client`): The default client to use. If None, uses the default remote specified in the user's configuration file: :: `materials_commons.cli.user_config.Config().default_remote.make_client()`. Returns: :class:`materials_commons.api.Client`: The appropriate client: the value specified using the "--remote <email> <url>" option, or else the user's configured default. """ config = Config() if args.remote: email = args.remote[0] url = args.remote[1] remote_config = RemoteConfig(mcurl=url, email=email) if remote_config not in config.remotes: raise MissingRemoteException("Could not find remote: {0} {1}".format(remote_config.email, remote_config.mcurl)) return config.remotes[config.remotes.index(remote_config)].make_client() else: if default_client is None: if not config.default_remote.mcurl or not config.default_remote.mcapikey: raise NoDefaultRemoteException("Default remote not set") return config.default_remote.make_client() else: return default_client
[docs]def make_local_project_client(path): """Construct a client to access project data from the local project configuration Args: path (str): Path to anywhere inside a local project directory Returns: :class:`materials_commons.api.Client`: A client for the instance of Materials Commons that is storing the project """ project_config = read_project_config(path) if not project_config: return None config = Config() remote_config = project_config.remote if remote_config == config.default_remote: return config.default_remote.make_client() elif remote_config not in config.remotes: raise MissingRemoteException("Could not make project Client, failed to find remote config: {0} {1}".format(remote_config.email, remote_config.mcurl)) remote_config_with_apikey = config.remotes[config.remotes.index(remote_config)] return remote_config_with_apikey.make_client()
[docs]class ProjectTable(SqlTable): """The ProjectTable creates a sqlite "project" table to cache some basic project data"""
[docs] @staticmethod def default_print_fmt(): """Returns project table print formatting parameters""" return [ ("name", "name", "<", 24, as_is), ("id", "id", "<", 36, as_is), ("checktime", "checktime", "<", 24, format_time) ]
[docs] @staticmethod def tablecolumns(): """Returns sqlite project table creating parameters Returns: A dict, with column name as key, and list of column creation args for value. """ return { "id": ["integer", "UNIQUE"], "uuid": ["text"], "name": ["text"], "data": ["text"], "checktime": ["real"] # last time the remote data was queried (s since epoch) }
[docs] @staticmethod def tablename(): """Returns sqlite project table name "project" """ return "project"
def __init__(self, proj_local_path): """ Args: proj_local_path (str): Path to local project directory """ super(ProjectTable, self).__init__(proj_local_path)
[docs] def select_all(self): """Select record by id Returns: sqlite3.Row or None """ self.curs.execute("SELECT * FROM " + self.tablename()) return self.curs.fetchall()
[docs]def make_local_project(path, data=None): """Read local project config file and use to construct materials_commons.api.models.Project - Checks if "path" is a path located inside a local project directory (by looking for the ".mc" directory). - Use local project configuration to: - Construct a "remote" instance (:class:`materials_commons.api.Client`) - Call Materials Commons and construct a project instance (:class:`materials_commons.api.Project`) - Add attributes to the project: - "local_path" (str) providing the absolute path to the local project directory - "remote" (:class:`materials_commons.api.Client`) project specific client instance Args: path (str): Path inside a local project directory data (dict): Optional, project data. If stored in cache this avoids an extra API call. Notes: Caching behavior is currently disabled while updating `materials_commons.cli` for MC2.0. It allows setting a "fetch lock" so that data that is not cached or older than the time the lock was set will be queried from the remote, otherwise the local cache data is used. """ proj_path = project_path(path) if not proj_path: raise MCCLIException("No Materials Commons project found at " + str(path)) project_config = read_project_config(path) # check for project data cached in sqlite ".mc/project.db" project_table = ProjectTable(proj_path) project_table.connect() results = project_table.select_all() project_table.close() if len(results) > 1: raise MCCLIException("Project db error: Found >1 project") client = make_local_project_client(path) if not results or not project_config.remote_updatetime or results[0]['checktime'] < project_config.remote_updatetime: checktime = time.time() try: if data is None: proj = client.get_project(project_config.project_id) else: proj = models.Project(data=data) except requests.exceptions.ConnectionError as e: raise MCCLIException("Could not connect to " + remote.config.mcurl) except requests.exceptions.HTTPError as e: raise MCCLIException("HTTPError: " + str(e)) record = { 'id': project_config.project_id, 'uuid': project_config.project_uuid, 'name': proj.name, 'data': json.dumps(proj._data), 'checktime': checktime } project_table.connect() project_table.insert_or_replace(record) project_table.close() else: record = results[0] proj = models.Project(data=json.loads(record['data'])) proj.local_path = proj_path proj.remote = client return proj
[docs]def make_local_expt(proj): """Read local project configuration to construct "current experiment" object - The "current experiment" is a default experiment for putting newly created samples or processes Args: proj (:class:`materials_commons.api.models.Project`): A Project instance, including the `materials_commons.cli` added attribute "local_path". Returns: Returns the local project's "current experiment" (:class:`materials_commons.api.models.Experiment`), or None if not set. """ project_config = read_project_config(proj.local_path) if project_config: expt = proj.remote.get_experiment(project_config.experiment_id) expt.project = proj return expt return None
[docs]def humanize(file_size_bytes): """Get a nice string representation of file size Args: file_size_bytes (int): File size in bytes Returns: str: File size as human readable string (ex: "10B", "8K", "5M", "2G", etc.) """ abbrev = [("B", 0), ("K", 10), ("M", 20), ("G", 30), ("T", 40)] for key, val in abbrev: _size = (file_size_bytes >> val) if _size < 1000 or key == "T": return str(_size) + key
[docs]def request_confirmation(msg, force=False): """Request user confirmation Args: msg (str): For example, the value "Are you sure you want to permanently delete these?", will prompt user with: :: "Are you sure you want to permanently delete these? ('Yes'/'No'): " force (bool): Proceed without user confirmation Returns: bool: True if confirmed or forced, False if not confirmed. """ if not force: msg = msg + " ('Yes'/'No'): " while True: input_str = input(msg) if input_str == 'No': return False elif input_str == 'Yes': return True print("Invalid input") else: return True
def _proj_path(path): """Returns the path to a local project directory if it contains "path", else None""" # if not os.path.isdir(path): # raise Exception("Error, no directory named: " + path) curr = path cont = True while cont is True: test_path = os.path.join(curr, '.mc') if os.path.isdir(test_path): return curr elif curr == os.path.dirname(curr): return None else: curr = os.path.dirname(curr) return None
[docs]def project_path(path): """Returns the path to a local project directory if it contains "path", else None""" return _proj_path(path)
[docs]def project_exists(path): """Returns True if a local project directory exists containing "path" """ if _proj_path(path): return True return False
def _mcdir(path): """Find project .mc directory path if it already exists, else return None""" dirpath = _proj_path(path) if dirpath is None: return None return os.path.join(dirpath, '.mc') def _proj_config(path): """Find project config path if .mc directory already exists, else return None""" dirpath = _proj_path(path) if dirpath is None: return None return os.path.join(dirpath, '.mc', 'config.json')
[docs]class ProjectConfig(object): """Facilitates reading and writing a JSON file storing local project configuration values Project ``.mc/config.json`` file format: :: { "remote": { "mcurl": <url>, "email": <email> }, "project_id": <id>, "project_uuid": <uuid>, "experiment_id": <id>, "experiment_uuid": <uuid>, "remote_updatetime": <number>, "globus_upload_id": <id>, "globus_download_id": <id> } Attributes: project_path (str): Absolute path to local project directory. config_dir (str): Absolute path to local project configuration directory (".mc"). config_path (str): Absolute path to local project configuration file (".mc/config.json"). project_id (int or None): Project ID project_uuid (str or None): Project UUID experiment_id (int or None): Current experiment ID experiment_uuid (str or None): Current experiment UUID remote (user_config.RemoteConfig): Holds configuration variables (email, url, apikey) for the remote instance of Materials Commons where the project is stored. remote_updatetime (number or None): For use with optional caching, holds the last time local cache data was updated from the remote. globus_upload_id (int or None): ID specifying which Globus upload directory should be used for Globus uploads. globus_download_id (int or None): ID specifying which Globus download directory should be used for Globus downloads. """ def __init__(self, project_path): """Construct by reading local project configuration file if it exists Args: project_path (str): Absolute path to local project directory. Will read `<project_path>/.mc/config.json`. All attributes will be None if the configuration file does not exist. """ self.project_path = project_path self.config_dir = os.path.join(self.project_path, ".mc") self.config_path = os.path.join(self.config_dir, "config.json") data = {} if os.path.exists(self.config_path): with open(self.config_path) as f: data = json.load(f) # handle deprecated 'remote_url' if 'remote_url' in data and 'remote' not in data: config = Config() matching = [remote_config for remote_config in config.remotes if remote_config.mcurl == data['remote_url']] if len(matching) == 0: raise MissingRemoteException("Could not get project data. Failed to find remote with url: {0}".format(data['remote_url'])) elif len(matching) > 1: raise MultipleRemoteException("Could not get project data. Found multiple remote accounts for url: {0}".format(data['remote_url'])) else: data['remote'] = dict() data['remote']['mcurl'] = matching[0].mcurl data['remote']['email'] = matching[0].email self.remote = RemoteConfig(**data.get('remote', {})) self.project_id = data.get('project_id', None) self.project_uuid = data.get('project_uuid', None) self.experiment_id = data.get('experiment_id', None) self.experiment_uuid = data.get('experiment_uuid', None) self.remote_updatetime = data.get('remote_updatetime', None) self.globus_upload_id = data.get('globus_upload_id', None) self.globus_download_id = data.get('globus_download_id', None)
[docs] def to_dict(self): """Returns the project configuration as a dict""" return { 'remote': { 'mcurl': self.remote.mcurl, 'email': self.remote.email }, 'project_id': self.project_id, 'project_uuid': self.project_uuid, 'experiment_id': self.experiment_id, 'experiment_uuid': self.experiment_uuid, 'remote_updatetime': self.remote_updatetime, 'globus_upload_id': self.globus_upload_id, 'globus_download_id': self.globus_download_id, }
[docs] def save(self): """Save project configuration as a JSON file""" if not os.path.exists(self.config_dir): os.mkdir(self.config_dir) with open(self.config_path, 'w') as f: json.dump(self.to_dict(), f) return
[docs]def read_project_config(path): """Read local project configuration Returns: If the project configuration file ("<project>/.mc/config.json") exists, returns a :class:`ProjectConfig` instance. Else, returns None. """ # get config proj_config_path = _proj_config(path) if proj_config_path and os.path.exists(proj_config_path): return ProjectConfig(project_path(path)) else: return None
[docs]def clone_project(remote_config, project_id, parent_dest, name=None): """Clone a remote project to a local directory - Will create the local project directory "<parent_dest>/<project_name>" - Will save local project configuration "<parent_dest>/.mc/config.json" with project info Args: remote_config (:class:`user_config.RemoteConfig`): Holds configuration variables (email, url, apikey) for the remote instance of Materials Commons where the project to be cloned is stored. project_id (int): ID of the project to clone parent_dest (str): Absolute path to a local directory which will become the parent of the cloned project directory. name (str): Name of created project directory. Default is remote project name. Returns: :class:`materials_commons.api.models.Project`: Object representing the cloned project """ # get Project client = remote_config.make_client() proj = client.get_project(project_id) if name is None: name = proj.name # check if project already exists dest = os.path.join(parent_dest, name) if project_path(dest): raise MCCLIException("Project already exists at:", project_path(dest)) # create project directory if not os.path.exists(dest): os.mkdir(dest) project_config = ProjectConfig(dest) project_config.remote = remote_config project_config.project_id = proj.id project_config.project_uuid = proj.uuid project_config.save() return make_local_project(dest, proj._data)