Source code for bw2ui.bin.bw2_browser

#!/usr/bin/env python
# encoding: utf-8
"""Brightway2 database and activity browser.
Developed by Bernhard Steubing and Chris Mutel, 2013

Usage:
  bw2-browser
  bw2-browser <project>
  bw2-browser <project> <database>
  bw2-browser <project> <database> <activity-id>

Options:
  -h --help     Show this screen.
  --version     Show version.

"""

from __future__ import print_function, unicode_literals

import cmd
import codecs
import itertools
import math
import os
import pprint
import re
import shlex
import textwrap
import time
import traceback
import uuid
import warnings

import bw2analyzer as bwa
import bw2calc as bc
import bw2data as bd
from bw2data import __version__ as bd_version
from bw2data import (
    Database,
    Method,
    calculation_setups,
    config,
    databases,
    get_activity,
    methods,
    projects,
)
from bw2data.errors import UnknownObject
from packaging import version

if (
    bc.__version__
    and isinstance(bc.__version__, str)
    and version.parse(bc.__version__) >= version.parse("2.0.DEV10")
):
    from bw2data import get_multilca_data_objs

from bw2data.parameters import (
    ActivityParameter,
    DatabaseParameter,
    Group,
    ProjectParameter,
)
from docopt import docopt
from tabulate import tabulate


from bw2io import backup_project_directory, restore_project_directory

warnings.filterwarnings("ignore", ".*Read only project.*")

[docs] FTS5_ENABLED_BD_VERSION = "4.0.dev47"
[docs] GRUMPY = itertools.cycle( ( "This makes no damn sense: ", "My mule has more sense than this: ", "If 50 million people say a foolish thing, it is still a foolish thing: ", "I have had enough of this kind of thing: ", "What are you talking about? ", "Are you kidding me? What is this: ", ) )
[docs] QUIET = itertools.cycle( ( "You say it best when you say nothing at all...", "Let us be silent, that we may hear the whispers of the gods.", "Actions speak louder than words. But you didn't use either!", "We have ways of making you talk, Mr. Bond!", "Brevity is the soul of wit. But you can take it too far!", "Do not underestimate the determination of a quiet man.", ) )
[docs] HELP_TEXT = """ This is a simple way to browse databases and activities in Brightway2. The following commands are available: Basic commands: ?: Print this help screen. quit, q: Exit the activity browser. number: Go to option number when a list of options is present. l: List current options. n: Go to next page in paged options. p: Go to previous page in paged options. p number: Go to page number in paged options. h: List history of databases and activities viewed. wh: Write history to a text file. autosave: Toggle autosave behaviour on and off. Working with projects: lpj: List available projects. backup [directory]: Backup the current project. Optionally specify a target \ directory. If no directory is specified, the backup will be created in the \ projects directory with a timestamp. restore <archive> [options]: Restore a project from an archive. Supported \ options: --project NAME, --overwrite. Working with databases: ldb: List available databases. db name: Go to database name. No quotes needed. s [string]: Search activity names in current database with string. Without string \ the search provides no results. s -loc {LOCATION} [string]: Search activity names in current database with \ string and location LOCATION. s -cat {CAT::SUBCAT::SUBSUBCAT} [string]: Search activity names in current \ database with string and category CAT, SUBCAT, SUBCAT [useful for biosphere]. s -rp {REFERENCE PRODUCT} [string]: Search activities in current database that \ have reference product and optionnaly match string in search. Working with activities: a id: Go to activity id in current database. Complex ids in quotes. aa: List all activities in current database. aa name sorts the activities \ by name. lprods: List product activities only (product and process_with_reference_product). i: Info on current activity. ii: Extended Info on current activity. r: Choose a random activity from current database. u: List upstream activities (inputs for the current activity). up: List upstream activities with pedigree info if avail (inputs for the current \ activity). uu: List upstream activities with formula info if avail. un: display uncertainty information of upstream activitities if avail. d: List downstream activities (activities which consume current activity). b: List biosphere flows for the current activity. pe: List production exchanges for current activity. pei: show the information of the production exchange of the current activity. cfs: Show characterization factors for current activity and current method. G: if a method and activity are selected, do an lcia of the activity. ta: if an lcia of the activity has been done, list top activities. te: if an lcia of the activity has been done, list top emissions. ca: do a contribution analysis of an activity with a method. sc: print recursive supply chain of an activity. Working with methods: lm: List methods. mi: Show method metadata. (must select method/category/subcategory first) Working with parameters: lpam: List all parameters (Project, Database and Activity) showing only \ basic columns (data). lpam [-f]: List all parameters showing all columns (data) of each parameter. lpam [-f] -g {YY}: List parameters for a specific group. Use db or specific \ data. add as first option -f to show all columns. lpamg: show parameter groups ap [-f]: If an activity is selected, show activity parameters dp [-f]: if a database is selected show database parameters pp [-f]: If a project is selected show project parameters fp : Find parameters (Project, Database or Activity) by name sp : search a parameter (accepts wildcards) Misc: tsv: [filename] export latest table to tsv file (e.g.: results or cfs) GC: Start group contribution mode for comparing multiple activities. add: Add current activity to the list for group contribution/compare analysis. list: List all activities added for group contribution/compare analysis. clear: Clear (empty) the list of added activities for group contribution/compare. GCH: Show the results table from the last group contribution/compare analysis. """
[docs] def get_autosave_text(autosave): return "on" if autosave else "off"
[docs] class ActivityBrowser(cmd.Cmd): """A command line based Activity Browser for brightway2."""
[docs] def _init(self, project=None, database=None, activity=None, method=None): """Provide initial data. Can't override __init__, because this is an old style class i.e. there is no support for ``super``.""" # Have to print into here; otherwise only print during ``cmdloop`` if config.p.get("ab_activity", None): # Must be tuple, not a list config.p["ab_activity"] = tuple(config.p["ab_activity"]) print(HELP_TEXT + "\n" + self.format_defaults()) self.page_size = 20 self.search_limit = config.p.get("search_limit", 100) self.set_current_options(None) self.autosave = config.p.get("ab_autosave", False) self.history = self.reformat_history(config.p.get("ab_history", [])) self.load_project(project) self.load_database(database) self.load_activity(activity) self.load_method(method) self.temp_activities = [] self.gc_results = None # Store GC command results for GCH command self.update_prompt()
###################### # Options management # ######################
[docs] def choose_option(self, opt): """Go to option ``opt``""" try: index = int(opt) if index >= len(self.current_options.get("formatted", [])): print("There aren't this many options") elif self.current_options["type"] == "method_namespaces": self.choose_method_namespace(self.current_options["options"][index]) elif self.current_options["type"] == "methods": self.choose_method(self.current_options["options"][index]) elif self.current_options["type"] == "categories": self.choose_category(self.current_options["options"][index]) elif self.current_options["type"] == "subcategories": self.choose_subcategory(self.current_options["options"][index]) elif self.current_options["type"] == "projects": self.choose_project(self.current_options["options"][index]) elif self.current_options["type"] == "databases": self.choose_database(self.current_options["options"][index]) elif self.current_options["type"] == "activities": self.choose_activity(self.current_options["options"][index]) elif self.current_options["type"] == "groups": self.choose_group(self.current_options["options"][index]) elif self.current_options["type"] == "history": option = self.current_options["options"][index] if option[0] == "database": self.choose_database(option[1]) elif option[0] == "activity": self.choose_activity(option[1]) elif option[0] == "method": self.choose_method(option[1]) elif option[0] == "category": self.choose_category(option[1]) elif option[0] == "subcategory": self.choose_subcategory(option[1]) else: # No current options. print("No current options to choose from") except Exception: print(traceback.format_exc()) print("Can't convert %(o)s to number.\nCurrent options are:" % {"o": opt}) self.print_current_options()
[docs] def print_current_options(self, label=None): print("") if label: print(label + "\n") if not self.current_options.get("formatted", []): print("Empty list") elif self.max_page: # Paging needed begin = self.page * self.page_size end = (self.page + 1) * self.page_size for index, obj in enumerate(self.current_options["formatted"][begin:end]): print( "[%(index)i]: %(option)s" % {"option": obj, "index": index + begin} ) print( "\nPage %(page)i of %(maxp)s. Use n (next page) and p \ (previous page) to navigate." % {"page": self.page, "maxp": self.max_page} ) else: for index, obj in enumerate(self.current_options["formatted"]): print("[%(index)i]: %(option)s" % {"option": obj, "index": index}) print("")
[docs] def set_current_options(self, options): self.page = 0 if options is None: options = {"type": None} self.max_page = 0 else: self.max_page = int(math.ceil(len(options["formatted"]) / self.page_size)) self.current_options = options
#################### # Shell management # ####################
[docs] def update_prompt(self): """update prompt and upstream/downstream activity lists""" self.invite = ">> " self.prompt = "" if self.activity: allowed_length = 76 - 8 - len(self.database) activity_ = get_activity(self.activity) name = activity_.get("name", "Unknown") categories = activity_.get("categories", []) or [] if allowed_length < len(name): name = name[:allowed_length] self.prompt = "%(pj)s@(%(db)s) %(n)s %(categories)s" % { "pj": self.project, "db": self.database, "n": name, "categories": categories, } elif self.database: self.prompt = "%(pj)s@(%(name)s) " % { "pj": self.project, "name": self.database, } elif self.project: self.prompt = "%(pj)s " % {"pj": self.project} if self.method: if self.category: if self.subcategory: self.prompt += "[%(method)s/%(category)s/%(subcategory)s] " % { "method": self.method, "category": self.category, "subcategory": self.subcategory, } else: self.prompt += "[%(method)s/%(category)s] " % { "method": self.method, "category": self.category, } else: self.prompt += "[%(method)s/] " % {"method": self.method} self.prompt += self.invite
############## # Formatting # ##############
[docs] def format_activity(self, key, max_length=10000): ds = get_activity(key) kurtz = {"location": ds.get("location", ""), "name": ds.get("name", "Unknown")} if max_length < len(kurtz["name"]): max_length -= len(kurtz["location"]) + 6 kurtz["name"] = kurtz["name"][:max_length] + "..." # TODO: Can adjust string lengths with product name, but just ignore for now product = ds.get("reference product", "") categories = ds.get("categories", "") or "" if product: product += ", " % {} kurtz["product"] = product kurtz["categories"] = categories return "%(name)s (%(product)s%(location)s%(categories)s)" % kurtz
[docs] def format_defaults(self): text = """The current data directory is %(dd)s. Autosave is turned %(autosave)s.""" % { "dd": projects.dir, "autosave": get_autosave_text(config.p.get("ab_autosave", False)), } if config.p.get("ab_database", None): text += "\nDefault database: %(db)s." % {"db": config.p["ab_database"]} if config.p.get("ab_activity", None): text += "\nDefault activity: %s" % self.format_activity( config.p["ab_activity"] ) return text
[docs] def format_history(self, command): kind, obj = command if kind == "database": return "Db: %(name)s" % {"name": obj} elif kind == "activity": return "Act: %(act)s" % {"act": self.format_activity(obj)} else: return f"{kind}: {obj}"
[docs] def reformat_history(self, json_data): """Convert lists to tuples (from JSON serialization)""" return [ (x[0], tuple(x[1])) if x[0] == "activity" else tuple(x) for x in json_data ]
[docs] def print_cfs(self, current_methods, activity=None): """Print cfs for a list of methods, and optionally only for an activity""" table_lines = [] for m in current_methods: method_ = Method(m) cfs = method_.load() if activity and "biosphere" in self.database: cfs = [cf for cf in cfs if get_activity(cf[0]).key == activity] for cf in cfs: # in bw2, the first elment of the cf data is a key -> tuple('db', 'id') # in bw25, the first element is single int id of the activity # this looks hackish, but it allows to keep 1 code-base for both # versions of bw (bw2 & bw25) if isinstance(cf[0], int): flow = get_activity(cf[0]) else: flow_key = tuple((cf[0][0], cf[0][1])) flow = get_activity(flow_key) flow_cat_tup = flow.get("categories", ("",)) or ("",) flow_cat = flow_cat_tup[0] flow_subcat = None if len(flow_cat_tup) == 2: flow_subcat = flow_cat_tup[1] if has_namespaced_methods(): line = [ m[0], m[1], m[2], m[3], cf[1], flow["name"], flow_cat, flow_subcat, method_.metadata["unit"], ] else: line = [ m[0], m[1], m[2], cf[1], flow["name"], flow_cat, flow_subcat, method_.metadata["unit"], ] table_lines.append(line) if table_lines: if has_namespaced_methods(): headers = [ "namespace", "method", "category", "indicator", "cf", "flow", "flow_category", "flow_subcategory", "unit", ] else: headers = [ "method", "category", "indicator", "cf", "flow", "flow_category", "flow_subcategory", "unit", ] print("CFS") self.tabulate_data = tabulate(table_lines, headers=headers, tablefmt="tsv") print(tabulate(table_lines, headers=headers)) else: print("Not characterized by method")
####################### # Project management # #######################
[docs] def choose_project(self, project): if self.project == project: return self.project = project projects.set_current(self.project, writable=False) self.history.append(("project", project)) if self.autosave: config.p["ab_project"] = self.project config.p["ab_history"] = self.history[-10:] config.save_preferences() self.set_current_options(None) self.activity = None self.database = None self.list_databases() self.update_prompt()
[docs] def load_project(self, project): if project: if project not in projects: print("Project %(name)s not found" % {"name": project}) self.load_project(None) else: self.project = project projects.set_current(self.project, writable=False) elif config.p.get("ab_project", False): self.project = config.p["ab_project"] else: self.project = None self.list_projects()
[docs] def list_projects(self): pjs = [p.name for p in projects] self.set_current_options( { "type": "projects", "options": pjs, "formatted": ["%(name)s" % {"name": name} for name in pjs], } ) self.print_current_options("Projects")
####################### # Database management # #######################
[docs] def choose_database(self, database): if self.activity and self.activity[0] == database: pass elif config.p.get("ab_activity", [0, 0])[0] == database: self.choose_activity(config.p["ab_activity"]) else: self.unknown_activity() self.database = database self.history.append(("database", database)) if self.autosave: config.p["ab_database"] = self.database config.p["ab_history"] = self.history[-10:] config.save_preferences() self.set_current_options(None) self.update_prompt()
[docs] def load_database(self, database): """Load database, trying first""" if database: if database not in databases: print("Database %(name)s not found" % {"name": database}) self.load_database(None) else: self.database = database elif config.p.get("ab_database", False): self.database = config.p["ab_database"] else: self.database = None
[docs] def list_databases(self): dbs = sorted(databases.list) self.set_current_options( { "type": "databases", "options": dbs, "formatted": [ "%(name)s (%(number)s activities/flows)" % {"name": name, "number": databases[name].get("number", "unknown")} for name in dbs ], } ) self.print_current_options("Databases")
####################### # Activity management # #######################
[docs] def load_activity(self, activity): """Load given or default activity on start""" if isinstance(activity, str): # Input parameter self.choose_activity((self.database, activity)) elif config.p.get("ab_activity", None): self.choose_activity(config.p["ab_activity"], restored=True) else: self.unknown_activity()
[docs] def choose_activity(self, key, restored=False): self.database = key[0] self.activity = key self.history.append(("activity", key)) if self.autosave and not restored: config.p["ab_activity"] = key config.p["ab_history"] = self.history[-10:] config.save_preferences() self.set_current_options(None) self.update_prompt()
[docs] def format_exchanges_as_options( self, es, kind, unit_override=None, show_formulas=False, show_pedigree=False, show_uncertainty=False, ): objs = [] for exc in es: if exc["type"] != kind: continue ds = get_activity(exc["input"]) objs.append( { "name": ds.get("name", "Unknown"), "location": ds.get("location", config.global_location), "unit": unit_override or ds.get("unit", "unit"), "amount": exc["amount"], "formula": exc.get("formula", None), "pedigree": exc.get("pedigree", None), "loc": exc.get("loc", None), "scale": exc.get("scale", None), "uncertainty_type": exc.get("uncertainty_type", None), "key": exc["input"], } ) objs.sort(key=lambda x: x["name"]) if show_formulas: format_string = ( "%(amount).3g [=%(formula)s] %(unit)s %(name)s (%(location)s)" # NOQA: E501 ) elif show_pedigree: format_string = "%(amount).3g %(unit)s %(name)s (%(location)s)\n\t[pedigree: %(pedigree)s] " # NOQA: E501 elif show_uncertainty: format_string = "%(amount).3g %(unit)s %(name)s (%(location)s)\n\t[uncertainty type: %(uncertainty_type)s, scale: %(scale)s, loc: %(loc)s] " # NOQA: E501 else: format_string = "%(amount).3g %(unit)s %(name)s (%(location)s)" self.set_current_options( { "type": "activities", "options": [obj["key"] for obj in objs], "formatted": [format_string % obj for obj in objs], } )
[docs] def get_downstream_exchanges(self, activity): """Get the exchanges that consume this activity's product""" activity = get_activity(activity) excs = [] exchanges = activity.upstream() for exc in exchanges: if activity == exc["input"] and not activity == exc["output"]: excs.append( { "type": exc.get("type", "Unknown"), "input": exc["output"], "amount": exc["amount"], "key": exc["output"][1], "name": exc.get("name", "Unknown"), } ) excs.sort(key=lambda x: x["name"]) return excs
[docs] def unknown_activity(self): self.activity = None
######################## # Method management # ########################
[docs] def load_method(self, method): if method: if method not in methods: print("Method %(name)s not found" % {"name": method}) self.load_method(None) else: self.method = method[0] self.category = method[1] elif config.p.get("ab_method", False): self.method = config.p["ab_method"] else: self.method = None self.category = None self.subcategory = None
[docs] def list_methods(self): if self.project: m_names = set([]) methods_ = sorted(methods) for m in methods_: m_names.add(m[0]) m_names = sorted(m_names) if len(methods_) > 0 and has_namespaced_methods(): self.set_current_options( { "type": "method_namespaces", "options": list(m_names), "formatted": ["%(name)s" % {"name": name} for name in m_names], } ) methods_label = "Method namespaces" else: self.set_current_options( { "type": "methods", "options": list(m_names), "formatted": ["%(name)s" % {"name": name} for name in m_names], } ) methods_label = "Methods" self.print_current_options(methods_label) else: self.set_current_options(None) self.update_prompt()
[docs] def choose_method_namespace(self, method_namespace): self.method_namespace = method_namespace self.method = self.category = self.subcategory = None self.history.append(("method_namespace", method_namespace)) if self.autosave: config.p["ab_method_namespace"] = self.method config.p["ab_history"] = self.history[-10:] config.save_preferences() c_names = set([]) methods_ = sorted(methods) for m in [m for m in methods_ if m[0] == method_namespace]: c_names.add(m[1]) c_names = sorted(c_names) self.set_current_options( { "type": "methods", "options": list(c_names), "formatted": ["%(name)s" % {"name": name} for name in c_names], } ) self.print_current_options("Methods") self.update_prompt()
[docs] def choose_method(self, method): self.method = method self.category = self.subcategory = None self.history.append(("method", method)) if self.autosave: config.p["ab_method"] = self.method config.p["ab_history"] = self.history[-10:] config.save_preferences() c_names = set([]) methods_ = sorted(methods) if has_namespaced_methods(): for m in [ m for m in methods_ if m[0] == self.method_namespace and m[1] == method ]: c_names.add(m[2]) else: for m in [m for m in methods_ if m[0] == method]: c_names.add(m[1]) c_names = sorted(c_names) self.set_current_options( { "type": "categories", "options": list(c_names), "formatted": ["%(name)s" % {"name": name} for name in c_names], } ) self.print_current_options("Categories") self.update_prompt()
[docs] def choose_category(self, category): self.category = category self.history.append(("category", category)) if self.autosave: config.p["ab_category"] = self.category config.p["ab_history"] = self.history[-10:] config.save_preferences() c_names = set([]) methods_ = sorted(methods) if has_namespaced_methods(): for m in [ m for m in methods_ if m[0] == self.method_namespace and m[1] == self.method and m[2] == category ]: c_names.add(m[3]) else: for m in [m for m in methods_ if m[0] == self.method and m[1] == category]: c_names.add(m[2]) self.set_current_options( { "type": "subcategories", "options": list(c_names), "formatted": ["%(name)s" % {"name": name} for name in c_names], } ) self.print_current_options("Subcategories") self.update_prompt()
[docs] def choose_subcategory(self, subcategory): self.subcategory = subcategory self.history.append(("subcategory", subcategory)) # using ecoinvent_interface creates biosphere dbs that are not only called # "biosphere3" so we test now only against a substring, not the exact name if ( self.activity and "biosphere" in self.database ): # TODO: recover generic name instead of hard coded one if self.method_namespace: mkey = ( self.method_namespace, self.method, self.category, self.subcategory, ) else: mkey = (self.method, self.category, self.subcategory) self.print_cfs([mkey], self.activity) self.update_prompt()
################################# # GROUP / Parameters Management # #################################
[docs] def dehydrate_params(self, params, fields): """Remove keys of each param dictionnary, and only keep fields.""" return [{k: v for k, v in p.dict.items() if k in fields} for p in params]
[docs] def acquire_params(self, full_cols, the_group): if full_cols: pparams = [p.dict for p in ProjectParameter.select()] dparams = [p.dict for p in DatabaseParameter.select()] aparams = [p.dict for p in ActivityParameter.select()] else: pparams = self.dehydrate_params( ProjectParameter.select(), ["name", "formula", "amount"] ) dparams = self.dehydrate_params( DatabaseParameter.select(), ["database", "name", "formula", "amount"] ) aparams = self.dehydrate_params( ActivityParameter.select(), ["database", "code", "group", "name", "formula", "amount"], ) if the_group: if the_group.lower() == "project": dparams = [] aparams = [] else: pparams = [] dparams = [p for p in dparams if p["database"] == the_group] aparams = [p for p in aparams if p["group"] == the_group] return pparams, dparams, aparams
[docs] def choose_group(self, group_id): g = Group.get_by_id(group_id) pparams, dparams, aparams = self.acquire_params(False, g.name) if len(pparams) > 0: print("Project Parameters") print(tabulate(pparams, headers="keys")) if len(dparams) > 0: print("Database Parameters") print(tabulate(dparams, headers="keys")) if len(aparams) > 0: print("Activity Parameters") print(tabulate(aparams, headers="keys")) self.set_current_options(None)
######################## # Default user actions # ########################
[docs] def default(self, line): """No ``do_foo`` command - try to select from options.""" if self.current_options["type"]: try: self.choose_option(int(line)) except Exception: print(next(GRUMPY) + line) else: print(next(GRUMPY) + line)
[docs] def emptyline(self): """No command entered!""" print(next(QUIET) + "\n(? for help)")
####################### # Custom user actions # #######################
[docs] def do_a(self, arg): """Go to activity id ``arg``""" key = (self.database, arg) if not self.database: print("Please choose a database first") # Support the use of int ids (used in bw25) activity_ref = None try: activity_ref = int(arg) except ValueError: activity_ref = key try: activity = get_activity(activity_ref) self.choose_activity(activity.key) except UnknownObject: print(f"Invalid activity id {key[1]}")
[docs] def do_autosave(self, arg): """Toggle autosave behaviour. If autosave is on, the current database or activity is written to config.p each time it changes. """ self.autosave = not self.autosave config.p["ab_autosave"] = self.autosave config.save_preferences() print("Autosave is now %s" % get_autosave_text(self.autosave))
[docs] def do_b(self, arg): """List biosphere flows""" if not self.activity: print("Need to choose an activity first") else: es = get_activity(self.activity).exchanges() self.format_exchanges_as_options(es, "biosphere") self.print_current_options("Biosphere flows")
[docs] def do_cfs(self, arg): """Print cfs of biosphere flows or method.""" # Support multiple biosphere databases in one project if ( ( self.activity and "biosphere" in self.database # show the cfs for the given flow ) or self.method or self.method_namespace ): # show the cfs of a given method if has_namespaced_methods(): namespace_shift = 1 else: namespace_shift = 0 if has_namespaced_methods() and self.method_namespace: current_methods = [m for m in methods if m[0] == self.method_namespace] if self.method: current_methods = [ m for m in methods if m[0 + namespace_shift] == self.method ] # print(f"Current namespace {self.method_namespace}") # print(f"Current methods {current_methods}") # print(f"Current method {self.method}") if self.category: # show cfs for current cat, current act current_methods = [ m for m in current_methods if m[1 + namespace_shift] == self.category ] if self.subcategory: current_methods = [ m for m in current_methods if m[2 + namespace_shift] == self.subcategory ] else: print("No method currently selected") # Alternative: cfs for all methods? return False self.print_cfs(current_methods, self.activity) self.update_prompt()
[docs] def do_tsv(self, arg): """write the latest table created as tsv file.""" output_filename = "output.tsv" if arg: output_filename = arg if self.tabulate_data: with open(output_filename, "w") as f: f.write(self.tabulate_data)
[docs] def do_cp(self, arg): """Clear preferences. Only for development.""" self.autosave = False if config.p["ab_autosave"]: del config.p["ab_autosave"] del config.p["ab_project"] del config.p["ab_method"] del config.p["ab_database"] del config.p["ab_activity"] del config.p["ab_history"] config.save_preferences() self.project = self.database = self.activity = None self.method = self.category = self.subcategory = None self.update_prompt()
[docs] def do_d(self, arg): """Load downstream activities""" if not self.activity: print("Need to choose an activity first") else: ds = get_activity(self.activity) unit = ds.get("unit", "") excs = self.get_downstream_exchanges(self.activity) self.format_exchanges_as_options(excs, "technosphere", unit) self.print_current_options("Downstream consumers")
[docs] def do_db(self, arg): """Switch to a different database""" print(arg) if arg not in databases: print("'%(db)s' not a valid database" % {"db": arg}) else: self.choose_database(arg)
[docs] def do_h(self, arg): """Pretty print history of databases & activities""" self.set_current_options( { "type": "history", "options": self.history[::-1], "formatted": [self.format_history(o) for o in self.history[::-1]], } ) self.print_current_options("Browser history")
[docs] def do_help(self, args): print(HELP_TEXT)
[docs] def do_i(self, arg): """Info on current activity. TODO: Colors could be improved.""" if not self.activity: print("No current activity") else: ds = get_activity(self.activity) prod = [x for x in ds.exchanges() if x["input"] == self.activity] if "production amount" in ds and ds["production amount"]: amount = ds["production amount"] elif len(prod) == 1: amount = prod[0]["amount"] else: amount = 1.0 print( """\n%(name)s Database: %(database)s ID: %(id)s numerical_id: %(n_id)s Product: %(product)s Production amount: %(amount).2g %(unit)s Location: %(location)s Classifications: %(classifications)s Technosphere inputs: %(tech)s Biosphere flows: %(bio)s Reference flow used by: %(consumers)s\n""" % { "name": ds.get("name", "Unknown"), "product": ds.get("reference product") or ds.get("name", "Unknown"), "database": self.activity[0], "id": self.activity[1], # numerical ids are a feature of bw25 "n_id": ds.get("id") or "NA", "amount": amount, "unit": ds.get("unit", ""), "classifications": "\n\t\t\t".join( [ "{}: {}".format(c[0], c[1]) for c in ds.get("classifications", []) ] ), "location": ds.get("location", config.global_location), "tech": len( [x for x in ds.exchanges() if x["type"] == "technosphere"] ), "bio": len([x for x in ds.exchanges() if x["type"] == "biosphere"]), "consumers": len(self.get_downstream_exchanges(self.activity)), } )
[docs] def do_ii(self, arg): """Extended Info on current activity. TODO: Colors could be improved.""" if not self.activity: print("No current activity") else: ds = get_activity(self.activity) prod = [x for x in ds.exchanges() if x["input"] == self.activity] if "production amount" in ds and ds["production amount"]: amount = ds["production amount"] elif len(prod) == 1: amount = prod[0]["amount"] else: amount = 1.0 print( """Extended info \n%(name)s Database: %(database)s ID: %(id)s numerical_id: %(n_id)s Product: %(product)s Production amount: %(amount).2g %(unit)s Location: %(location)s Classifications: %(classifications)s Technosphere inputs: %(tech)s Biosphere flows: %(bio)s Reference flow used by: %(consumers)s\n""" % { "name": ds.get("name", "Unknown"), "product": ds.get("reference product") or ds.get("name", "Unknown"), "database": self.activity[0], "id": self.activity[1], # numerical ids are a feature of bw25 "n_id": ds.get("id") or "NA", "amount": amount, "unit": ds.get("unit", ""), "classifications": "\n\t\t\t".join( [ "{}: {}".format(c[0], c[1]) for c in ds.get("classifications", []) ] ), "location": ds.get("location", config.global_location), "tech": len( [x for x in ds.exchanges() if x["type"] == "technosphere"] ), "bio": len([x for x in ds.exchanges() if x["type"] == "biosphere"]), "consumers": len(self.get_downstream_exchanges(self.activity)), } ) indentation_char = " " * 4 line_length = 50 # TODO: use dynamic line lenght or take from prefs t_wrapper = textwrap.TextWrapper() t_wrapper.width = line_length for field in [ k for k in ds.keys() if k not in [ "name", "product", "database", "location", "unit", "classifications", "production amount", "code", ] ]: if field.casefold() == "comment".casefold() and ds.get(field): t_wrapper.replace_whitespace = False contents = "\n".join( [ "\n".join(t_wrapper.wrap(line)) for line in ds[field].splitlines() if line.strip() != "" ] ) print( "%(tab)s%(field)s:" % {"tab": indentation_char, "field": field} ) for line in contents.splitlines(): print( "%(tab)s%(line)s" % {"tab": indentation_char * 2, "line": line} ) else: if isinstance(ds[field], str): t_wrapper.replace_whitespace = False field_contents = t_wrapper.wrap(ds[field]) else: t_wrapper.break_long_words = False field_contents = t_wrapper.wrap(repr(ds[field])) print( "%(tab)s%(field)s:" % {"tab": indentation_char, "field": field} ) for line in field_contents: print( "%(tab)s%(line)s" % {"tab": indentation_char * 2, "line": line} )
[docs] def do_l(self, arg): """List current options""" if self.current_options["type"]: self.print_current_options() else: print("No current options")
[docs] def do_lm(self, arg): """List methods""" self.list_methods()
[docs] def do_lpj(self, arg): """List available projects""" self.list_projects()
[docs] def do_backup(self, target_dir=None): """Backup the current project. Optionally specify a target directory.""" if not self.project: print("Please select a project first") return dir_backup = None if target_dir and target_dir.strip(): dir_backup = os.path.expanduser(target_dir.strip()) # Check if directory exists before calling backup function if not os.path.exists(dir_backup): print("Backup directory does not exist: %s" % dir_backup) return if not os.path.isdir(dir_backup): print("Backup path is not a directory: %s" % dir_backup) return try: backup_path = backup_project_directory(self.project, dir_backup=dir_backup) print( "Project '%s' backed up successfully to: %s" % (self.project, backup_path) ) except ValueError as e: print("Error: Project does not exist or invalid project name: %s" % e) except FileNotFoundError as e: print("Error: Backup directory not found: %s" % e) except PermissionError as e: print("Error: Permission denied. Cannot write to backup directory: %s" % e) except Exception as e: print("Error during backup: %s" % e) traceback.print_exc()
[docs] def do_restore(self, arg): """Restore a project from a backup archive.""" try: tokens = shlex.split(arg) except ValueError as exc: print("Error parsing arguments: %s" % exc) return if not tokens: print("Usage: restore <archive> [--project NAME] [--overwrite]") return archive_path = None project_name = None overwrite = False extra_args = [] i = 0 while i < len(tokens): token = tokens[i] if token in ("--project", "-p"): i += 1 if i >= len(tokens): print("Missing value for %s" % token) return project_name = tokens[i] elif token in ("--overwrite", "-o"): overwrite = True else: if archive_path is None: archive_path = token elif project_name is None: project_name = token else: extra_args.append(token) i += 1 if extra_args: print("Unrecognized arguments: %s" % ", ".join(extra_args)) print("Usage: restore <archive> [--project NAME] [--overwrite]") return if not archive_path: print("Usage: restore <archive> [--project NAME] [--overwrite]") return archive_path = os.path.expanduser(archive_path) if not os.path.exists(archive_path): print("Archive not found: %s" % archive_path) return if not os.path.isfile(archive_path): print("Archive path is not a file: %s" % archive_path) return if project_name and project_name in projects and not overwrite: print( "Project '%s' already exists. Use --overwrite to replace it." % project_name ) return if overwrite: confirm_name = project_name or "the project stored in the archive" answer = ( input("You are about to overwrite %s. Continue? [y/N]: " % confirm_name) .strip() .lower() ) if answer not in ("y", "yes"): print("Restore cancelled.") return try: restored_project = restore_project_directory( archive_path, project_name=project_name, overwrite_existing=overwrite, switch=True, ) print( "Project '%s' restored successfully from %s" % (restored_project, archive_path) ) self.choose_project(restored_project) except FileNotFoundError as exc: print("Project archive not found, Error: %s" % exc) except ValueError as exc: print("Project archive is invalid, Error: %s" % exc) except Exception as exc: print("Error during restore: %s" % exc) traceback.print_exc()
[docs] def do_ldb(self, arg): """List available databases""" self.list_databases()
[docs] def do_mi(self, arg): """Show method information""" if self.method and self.category and self.subcategory: if has_namespaced_methods() and self.method_namespace: m_key = ( self.method_namespace, self.method, self.category, self.subcategory, ) else: m_key = (self.method, self.category, self.subcategory) try: m = Method(m_key) pp = pprint.PrettyPrinter(indent=4) pp.pprint(m.metadata) except UnknownObject: print(f"Method {m_key} not found") else: print("No current method selected")
[docs] def do_n(self, arg): """Go to next page in paged options""" if not self.current_options["type"]: print("Not in page mode") elif self.page == self.max_page: print("No next page") else: self.page += 1 self.print_current_options()
[docs] def do_p(self, arg): """Go to previous page in paged options""" if not self.current_options["type"]: print("Not in page mode") elif arg: try: page = int(arg) if page < 0 or page > self.max_page: print("Invalid page number") else: self.page = page self.print_current_options() except Exception: print("Can't convert page number %(page)s" % {"page": arg}) elif self.page == 0: print("Already page 0") else: self.page -= 1 self.print_current_options()
[docs] def do_q(self, args): """Exit the activity browser.""" return True
[docs] def do_quit(self, args): """Exit the activity browser.""" return True
[docs] def do_r(self, arg): """Choose an activity at random""" if not self.database: print("Please choose a database first") else: key = Database(self.database).random() self.choose_activity(key)
[docs] def do_s(self, arg): """Search activity names.""" if not self.database: print("Please choose a database first") else: re1a = r"." search_criterion = None criterion_value = None if "-loc" in arg: re1a = r"(-loc\s)" # Any Single Whitespace Character 1 search_criterion = "location" elif "-cat" in arg: re1a = r"(-cat\s)" # Any Single Whitespace Character 1 search_criterion = "category" elif "-cas" in arg: re1a = r"(-cas\s)" # Any Single Whitespace Character 1 search_criterion = "CAS number" elif "-rp" in arg: re1a = r"(-rp\s)" # Any Single Whitespace Character 1 search_criterion = "reference product" re1b = r"(\{.*\})" # Curly Braces 1 re2 = ( r"(?:\s(.+))?" # at least a space, and then 1 to n chars, but optional ) rg = re.compile(re1a + re1b + re2, re.IGNORECASE | re.DOTALL) m = rg.search(arg) needle = arg # Find the needle in the haystack if m is None and "-loc" in arg: print("Missing location in curly braces in command: -loc {MX} ...") return elif m is None and "-cat" in arg: print("Missing category in curly braces in command: -cat {water} ...") return elif m is None and "-cas" in arg: print( "Missing CAS Number in curly braces in command: -cas {000095-50-1} \ ..." ) return elif "-cas" in arg and "biosphere" not in self.database: print("CAS Number search only for biosphere dbs.") return elif m is None and "-rp" in arg: print( "Missing reference product in curly braces in command: -rp " "{electricity high voltage} ..." ) return elif m: c2 = m.group(2) criterion_value = c2.strip("{}") needle = m.group(3) print( "Filtering for {} {} after search".format( search_criterion, criterion_value ) ) results = search_bw2( search_criterion, criterion_value, self.database, needle, self.search_limit, ) results_keys = [r.key for r in results] self.set_current_options( { "type": "activities", "options": results_keys, "formatted": [self.format_activity(key) for key in results], } ) self.print_current_options( "Search results for %(query)s" % {"query": needle} )
[docs] def do_u(self, arg): """List upstream processes""" if not self.activity: print("Need to choose an activity first") else: es = get_activity(self.activity).technosphere() self.format_exchanges_as_options(es, "technosphere") self.print_current_options("Upstream inputs")
[docs] def do_up(self, arg): """List upstream processes""" if not self.activity: print("Need to choose an activity first") else: es = get_activity(self.activity).technosphere() self.format_exchanges_as_options(es, "technosphere", show_pedigree=True) self.print_current_options("Upstream inputs")
[docs] def do_uu(self, arg): """List upstream processes extra info (formulas)""" if not self.activity: print("Need to choose an activity first") else: es = get_activity(self.activity).technosphere() self.format_exchanges_as_options(es, "technosphere", show_formulas=True) self.print_current_options("Upstream inputs")
[docs] def do_un(self, arg): """Display uncertainty infor of upstream activities if available""" if not self.activity: print("Need to choose an activity first") else: es = get_activity(self.activity).technosphere() self.format_exchanges_as_options(es, "technosphere", show_uncertainty=True) self.print_current_options("Upstream inputs")
[docs] def do_wh(self, arg): output_dir = projects.request_directory("export") fp = os.path.join(output_dir, "browser history.%s.txt" % time.ctime()) with codecs.open(fp, "w", encoding="utf-8") as f: for line in self.history: f.write(line + "\n") print("History exported to %(fp)s" % {"fp": fp})
[docs] def build_method_key_list(self): method_key_list = [] if has_namespaced_methods(): if ( self.method_namespace and self.method and self.category and self.subcategory ): method_id = ( self.method_namespace, self.method, self.category, self.subcategory, ) method_key_list.append(method_id) elif self.method_namespace and self.method and self.category is None: for m in methods: if m[0] == self.method_namespace and m[1] == self.method: method_key_list.append(m) elif self.method_namespace and self.method is None: for m in methods: if m[0] == self.method_namespace: method_key_list.append(m) else: if self.method and self.category and self.subcategory: method_id = (self.method, self.category, self.subcategory) method_key_list.append(method_id) elif self.method and self.category and self.subcategory is None: for m in methods: if m[0] == self.method and m[1] == self.category: method_key_list.append(m) elif self.method and self.category is None: for m in methods: if m[0] == self.method: method_key_list.append(m) return sorted(method_key_list)
[docs] def do_G(self, arg): """Do an LCIA of the selected activity + method[s]""" if self.activity and self.method: selected_activity = get_activity(self.activity) def label_values(*attrs): values = set() for attr in attrs: value = getattr(bd.labels, attr, None) if value is None: continue if isinstance(value, (set, tuple, list)): values.update(value) else: values.add(value) return values product_like_types = label_values( "product_node_type", "product_node_types", "chimaera_node_default", "chimaera_node_defaults", ) plain_process_types = label_values( "process_node_type", "process_node_types", "process_node_default", "process_node_defaults", ) # Compatibility fallback if labels are unavailable in current bw2data version if not plain_process_types: plain_process_types = {"process"} selected_type = selected_activity.get("type") if ( selected_type in plain_process_types and selected_type not in product_like_types ): production_exchanges = [ exc for exc in selected_activity.exchanges() if exc.get("type") == "production" ] print( "Selected node is a plain process (%s). LCIA can only be performed on products." % selected_activity.id ) if production_exchanges: self.format_exchanges_as_options(production_exchanges, "production") self.print_current_options( "Products produced by current process (choose one, then run `G` again)" ) else: print( "No production exchanges found for this process. Use `lprods` to list product nodes." ) return method_key_list = self.build_method_key_list() if has_namespaced_methods(): namespace_shift = 1 else: namespace_shift = 0 if ( bc.__version__ and isinstance(bc.__version__, str) and version.parse(bc.__version__) >= version.parse("2.0.DEV10") ): # the configuration config = {"impact_categories": method_key_list} activities = [selected_activity] func_units = {a["name"]: {a.id: 1.0} for a in activities} data_objs = get_multilca_data_objs( functional_units=func_units, method_config=config ) mlca = bc.MultiLCA( demands=func_units, method_config=config, data_objs=data_objs ) try: mlca.lci() mlca.lcia() except ValueError as exc: print( "Can't run LCIA for the selected node. Please select a product node (try `lprods`). Error: %s" % exc ) return formatted_res = [] for (method, _), score in mlca.scores.items(): method_name = method[0 + namespace_shift] category_name = method[1 + namespace_shift] if len(method) == 5: subcategory_name = method[2 + namespace_shift] indicator_name = method[3 + namespace_shift] else: indicator_name = method[2 + namespace_shift] formatted_res_item = [ method_name, category_name, indicator_name, Method(method).metadata["unit"], score, ] if len(method) == 5: formatted_res_item.insert(2, subcategory_name) if has_namespaced_methods(): formatted_res_item.insert(0, method[0]) formatted_res.append(formatted_res_item) else: bw2browser_cs = { "inv": [{selected_activity: 1}], "ia": method_key_list, } tmp_cs_id = uuid.uuid1() calculation_setups[str(tmp_cs_id)] = bw2browser_cs mlca = bc.MultiLCA(str(tmp_cs_id)) formatted_res = [ [ mlca.methods[i][0], mlca.methods[i][1], mlca.methods[i][2], Method(mlca.methods[i]).metadata["unit"], score.pop(), ] for i, score in enumerate(mlca.results.T.tolist()) ] headers = ["method", "category", "subcategory", "unit", "score"] if has_namespaced_methods(): headers.insert(0, "namespace") if len(formatted_res[0]) == 5: headers.insert(2, "subcategory") self.tabulate_data = tabulate( formatted_res, headers=headers, tablefmt="tsv", ) print(tabulate(formatted_res, headers=headers)) else: print("Select at least a method first")
[docs] def do_GC(self, arg): """Do an LCIA of all activities in the temporary list with a fully specified method.""" # Check if we have a full method specification method_namespace = getattr(self, "method_namespace", None) if has_namespaced_methods(): if not all( [method_namespace, self.method, self.category, self.subcategory] ): print( "Please select a full method specification: namespace, method, category, and subcategory" ) return method_id = ( method_namespace, self.method, self.category, self.subcategory, ) method_key_list = [method_id] else: if not all([self.method, self.category, self.subcategory]): print( "Please select a full method specification: method, category, and subcategory" ) return method_id = (self.method, self.category, self.subcategory) method_key_list = [method_id] # Check if we have activities in the temporary list if not self.temp_activities: print( "Temporary activities list is empty. Use 'add' to add activities first." ) return # Build functional units from all activities in the temporary list activities = [get_activity(key) for key in self.temp_activities] func_units = {str(a.id): {a.id: 1.0} for a in activities} if has_namespaced_methods(): namespace_shift = 1 else: namespace_shift = 0 if ( bc.__version__ and isinstance(bc.__version__, str) and version.parse(bc.__version__) >= version.parse("2.0.DEV10") ): # the configuration config = {"impact_categories": method_key_list} data_objs = get_multilca_data_objs( functional_units=func_units, method_config=config ) mlca = bc.MultiLCA( demands=func_units, method_config=config, data_objs=data_objs ) mlca.lci() mlca.lcia() # Organize scores by activity # mlca.scores is a dict with keys (method, functional_unit_name) # Get all unique methods methods_seen = set() for (method, func_unit_name), score in mlca.scores.items(): if method not in methods_seen: methods_seen.add(method) # Build results per activity headers = ["method", "category", "subcategory", "unit", "score"] if has_namespaced_methods(): headers.insert(0, "namespace") # Collect all results for export all_results_for_export = [] print( "LCA results for %d activities in temporary list:" % len(self.temp_activities) ) for activity in activities: print("\nActivity: %s" % activity) activity_results = [] for method in methods_seen: score = mlca.scores.get((method, str(activity.id)), 0) method_name = method[0 + namespace_shift] category_name = method[1 + namespace_shift] indicator_name = method[2 + namespace_shift] result_row = [ method_name, category_name, indicator_name, Method(method).metadata["unit"], score, ] if has_namespaced_methods(): result_row.insert(0, method[0]) activity_results.append(result_row) # Add to export list with activity identifier export_row = result_row + [activity] all_results_for_export.append(export_row) print(tabulate(activity_results, headers=headers)) # Calculate and show aggregated results (sum across all activities) print("\nAggregated results (sum of all activities):") aggregated_results = [] for method in methods_seen: total_score = sum( mlca.scores.get((method, str(activity.id)), 0) for activity in activities ) method_name = method[0 + namespace_shift] category_name = method[1 + namespace_shift] indicator_name = method[2 + namespace_shift] result_row = [ method_name, category_name, indicator_name, Method(method).metadata["unit"], total_score, ] if has_namespaced_methods(): result_row.insert(0, method[0]) aggregated_results.append(result_row) # Add to export list with aggregated identifier export_row = result_row + ["AGGREGATED"] all_results_for_export.append(export_row) # Create combined table with activity column for export export_headers = headers + ["activity"] self.tabulate_data = tabulate( all_results_for_export, headers=export_headers, tablefmt="tsv", ) print(tabulate(aggregated_results, headers=headers)) # Store results for GCH command self.gc_results = { "activities": activities, "activity_results": all_results_for_export, "aggregated_results": aggregated_results, "methods_seen": methods_seen, "headers": headers, "namespace_shift": namespace_shift, } else: # Legacy API # Build a list of functional units, one for each activity bw2browser_cs = { "inv": [{get_activity(key): 1} for key in self.temp_activities], "ia": method_key_list, } tmp_cs_id = uuid.uuid1() calculation_setups[str(tmp_cs_id)] = bw2browser_cs mlca = bc.MultiLCA(str(tmp_cs_id)) headers = ["method", "category", "subcategory", "unit", "score"] if has_namespaced_methods(): headers.insert(0, "namespace") # Results are organized as: mlca.results has shape (num_methods, num_activities) # Collect all results for export all_results_for_export = [] print( "LCA results for %d activities in temporary list:" % len(self.temp_activities) ) for idx, activity_key in enumerate(self.temp_activities): activity = get_activity(activity_key) activity_name = activity.get("name", "Unknown") print("\nActivity %d: %s" % (idx + 1, activity_name)) activity_results = [] for i in range(len(mlca.methods)): method = mlca.methods[i] score = mlca.results[i, idx] if mlca.results.shape[1] > idx else 0 result_row = [ method[0], method[1], method[2], Method(method).metadata["unit"], score, ] if has_namespaced_methods(): result_row.insert(0, method[0]) activity_results.append(result_row) # Add to export list with activity identifier export_row = result_row + [activity_name] all_results_for_export.append(export_row) print(tabulate(activity_results, headers=headers)) # Show aggregated results print("\nAggregated results (sum of all activities):") aggregated_results = [] for i in range(len(mlca.methods)): method = mlca.methods[i] total_score = ( mlca.results[i, :].sum() if mlca.results.shape[1] > 0 else 0 ) result_row = [ method[0], method[1], method[2], Method(method).metadata["unit"], total_score, ] if has_namespaced_methods(): result_row.insert(0, method[0]) aggregated_results.append(result_row) # Add to export list with aggregated identifier export_row = result_row + ["AGGREGATED"] all_results_for_export.append(export_row) # Create combined table with activity column for export export_headers = headers + ["activity"] self.tabulate_data = tabulate( all_results_for_export, headers=export_headers, tablefmt="tsv", ) print(tabulate(aggregated_results, headers=headers)) # Store results for GCH command activities_list = [get_activity(key) for key in self.temp_activities] self.gc_results = { "activities": activities_list, "activity_results": all_results_for_export, "aggregated_results": aggregated_results, "mlca": mlca, "headers": headers, }
[docs] def do_GCH(self, arg): """Display ASCII bar charts for GC command results.""" if not self.gc_results: print("No GC results available. Please run GC command first.") return results = self.gc_results activity_results = results["activity_results"] headers = results.get("headers", []) # Find score column index (second to last column) score_col_idx = -2 # Extract activity names and scores # Group results by activity activity_scores = {} activity_methods = {} for row in activity_results: # Last column is the activity identifier activity_id = row[-1] # Score is second to last column try: score = float(row[score_col_idx]) except (ValueError, TypeError): score = 0.0 if activity_id not in activity_scores: activity_scores[activity_id] = [] activity_methods[activity_id] = [] activity_scores[activity_id].append(score) # Extract method name for label method_col_idx = 1 if "namespace" in headers else 0 if len(row) > method_col_idx: method_name = str(row[method_col_idx]) if len(row) > method_col_idx + 1: method_name = f"{row[method_col_idx]}/{row[method_col_idx+1]}" activity_methods[activity_id].append(method_name) # Collect all scores and labels from all activities (excluding aggregated) all_scores = [] all_labels = [] # Create a mapping from activity_id to activity object/key for formatting activities_map = {} if "activities" in results: # Map activity objects to their string representation for activity in results["activities"]: activities_map[activity] = activity # Also map by activity key if it's stored as a key if hasattr(activity, "key"): activities_map[activity.key] = activity # Also check if we can map from temp_activities (these are the keys) activity_keys_map = {} for key in self.temp_activities: activity_obj = get_activity(key) # Map the activity object itself activities_map[activity_obj] = key # Map by key tuple activities_map[key] = key # Store key mapping activity_keys_map[activity_obj] = key activity_keys_map[key] = key for activity_id, scores in activity_scores.items(): if activity_id == "AGGREGATED": continue # Get the activity key for formatting activity_key = None # Check if activity_id is already a key (tuple) if isinstance(activity_id, tuple) and len(activity_id) == 2: activity_key = activity_id # Check if it's an activity object elif hasattr(activity_id, "key"): activity_key = activity_id.key # Check if we have it in our maps elif activity_id in activity_keys_map: activity_key = activity_keys_map[activity_id] elif activity_id in activities_map: mapped = activities_map[activity_id] if isinstance(mapped, tuple) and len(mapped) == 2: activity_key = mapped elif hasattr(mapped, "key"): activity_key = mapped.key # Check if it's a string (legacy API stored activity name as string) elif isinstance(activity_id, str) and activity_id != "AGGREGATED": # Try to find the activity by name in temp_activities for key in self.temp_activities: activity_obj = get_activity(key) if activity_obj.get("name") == activity_id: activity_key = key break # Get string representation of activity if activity_key: # Use format_activity to get full string representation activity_str = self.format_activity(activity_key, max_length=200) elif hasattr(activity_id, "key"): # It's an activity object, use its key activity_str = self.format_activity(activity_id.key, max_length=200) else: # Fallback: try to get activity object and format it try: # If it's already an activity object, try to get its string representation if hasattr(activity_id, "get") and hasattr(activity_id, "key"): activity_str = self.format_activity( activity_id.key, max_length=200 ) else: # Last resort: use string representation activity_str = str(activity_id) except: activity_str = str(activity_id) # Get method names for this activity method_names = activity_methods.get( activity_id, [f"Method {i+1}" for i in range(len(scores))] ) # Create labels combining activity string representation and method for i, (score, method_name) in enumerate(zip(scores, method_names)): all_scores.append(score) all_labels.append(f"{activity_str} - {method_name}") if not all_scores: print("No activity results to display.") return # Display ASCII bar chart (always use simple ASCII, no plotext) self._simple_ascii_chart(all_scores, all_labels)
[docs] def _simple_ascii_chart(self, all_scores=None, all_labels=None): """Fallback simple ASCII bar chart without external dependencies.""" if not self.gc_results: return # If scores and labels are provided, use them directly if all_scores is not None and all_labels is not None: print("\n" + "=" * 80) print("ASCII Bar Chart for GC Results (all activities)") print("=" * 80) print() # Calculate max label length for alignment max_label_length = ( max(len(label) for label in all_labels) if all_labels else 0 ) max_label_length = min( max_label_length, 70 ) # Cap at 70 characters for better display # Find max score for scaling max_score = max(all_scores) if all_scores else 1 bar_width = 50 # Width of bar in characters # Create labeled bar chart with activity/method names header_label = "Activity/Method" header_padding = " " * max(0, max_label_length - len(header_label)) print( f"{header_label}{header_padding} │ Bar Chart" + " " * (bar_width - 9) + "Score" ) print("-" * (max_label_length + bar_width + 20)) # Display all scores in a single chart for label, score in zip(all_labels, all_scores): # Truncate label if needed display_label = label if len(label) > max_label_length: display_label = label[: max_label_length - 3] + "..." # Calculate bar length bar_length = ( int((score / max_score) * bar_width) if max_score > 0 else 0 ) bar = "█" * bar_length # Print label, bar, and score print( f"{display_label:<{max_label_length}} │ {bar:<{bar_width}} {score:.4f}" ) print("-" * (max_label_length + bar_width + 20)) print(f"Max score: {max_score:.4f}") return # Fallback: extract from gc_results if not provided results = self.gc_results activity_results = results["activity_results"] # Create activity mapping similar to main function activities_map = {} activity_keys_map = {} if "activities" in results: for activity in results["activities"]: activities_map[activity] = activity if hasattr(activity, "key"): activities_map[activity.key] = activity activity_keys_map[activity] = activity.key activity_keys_map[activity.key] = activity.key # Map from temp_activities for key in self.temp_activities: activity_obj = get_activity(key) activities_map[activity_obj] = key activities_map[key] = key activity_keys_map[activity_obj] = key activity_keys_map[key] = key # Group by activity activity_scores = {} activity_methods = {} headers = results.get("headers", []) method_col_idx = 1 if "namespace" in headers else 0 for row in activity_results: activity_id = row[-1] if activity_id == "AGGREGATED": continue try: score = float(row[-2]) except (ValueError, TypeError): score = 0.0 if activity_id not in activity_scores: activity_scores[activity_id] = [] activity_methods[activity_id] = [] activity_scores[activity_id].append(score) # Extract method name if len(row) > method_col_idx: method_name = str(row[method_col_idx]) if len(row) > method_col_idx + 1: method_name = f"{row[method_col_idx]}/{row[method_col_idx+1]}" activity_methods[activity_id].append(method_name) # Combine all scores and labels combined_scores = [] combined_labels = [] for activity_id, scores in activity_scores.items(): # Get the activity key for formatting activity_key = None # Check if activity_id is already a key (tuple) if isinstance(activity_id, tuple) and len(activity_id) == 2: activity_key = activity_id # Check if it's an activity object elif hasattr(activity_id, "key"): activity_key = activity_id.key # Check if we have it in our maps elif activity_id in activity_keys_map: activity_key = activity_keys_map[activity_id] elif activity_id in activities_map: mapped = activities_map[activity_id] if isinstance(mapped, tuple) and len(mapped) == 2: activity_key = mapped elif hasattr(mapped, "key"): activity_key = mapped.key # Check if it's a string (legacy API stored activity name as string) elif isinstance(activity_id, str) and activity_id != "AGGREGATED": # Try to find the activity by name in temp_activities for key in self.temp_activities: activity_obj = get_activity(key) if activity_obj.get("name") == activity_id: activity_key = key break # Get string representation of activity if activity_key: # Use format_activity to get full string representation activity_str = self.format_activity(activity_key, max_length=200) elif hasattr(activity_id, "key"): # It's an activity object, use its key activity_str = self.format_activity(activity_id.key, max_length=200) else: # Fallback: try to get activity object and format it try: if hasattr(activity_id, "get") and hasattr(activity_id, "key"): activity_str = self.format_activity( activity_id.key, max_length=200 ) else: activity_str = str(activity_id) except: activity_str = str(activity_id) method_names = activity_methods.get( activity_id, [f"Method {i+1}" for i in range(len(scores))] ) for score, method_name in zip(scores, method_names): combined_scores.append(score) combined_labels.append(f"{activity_str} - {method_name}") if not combined_scores: return print("\n" + "=" * 80) print("ASCII Bar Chart for GC Results (all activities)") print("=" * 80) print() # Calculate max label length for alignment max_label_length = ( max(len(label) for label in combined_labels) if combined_labels else 0 ) max_label_length = min( max_label_length, 70 ) # Cap at 70 characters for better display # Find max score for scaling max_score = max(combined_scores) if combined_scores else 1 bar_width = 50 # Width of bar in characters # Create labeled bar chart with activity/method names header_label = "Activity/Method" header_padding = " " * max(0, max_label_length - len(header_label)) print( f"{header_label}{header_padding} │ Bar Chart" + " " * (bar_width - 9) + "Score" ) print("-" * (max_label_length + bar_width + 20)) # Display all scores in a single chart for label, score in zip(combined_labels, combined_scores): # Truncate label if needed display_label = label if len(label) > max_label_length: display_label = label[: max_label_length - 3] + "..." # Calculate bar length bar_length = int((score / max_score) * bar_width) if max_score > 0 else 0 bar = "█" * bar_length # Print label, bar, and score print( f"{display_label:<{max_label_length}} │ {bar:<{bar_width}} {score:.4f}" ) print("-" * (max_label_length + bar_width + 20)) print(f"Max score: {max_score:.4f}")
[docs] def do_ta(self, arg): """Display top activities if an activity + method are selected.""" if self.activity: if self.method and self.category and self.subcategory: a = get_activity(self.activity) lca = a.lca((self.method, self.category, self.subcategory)) top_a = bwa.ContributionAnalysis().annotated_top_processes(lca) print(tabulate(top_a, headers=["score", "supply", "Activity"])) else: print("Select at least a method first") else: print("Select an activity ")
[docs] def do_te(self, arg): """Display top emissions if an activity + method are selected.""" if self.activity: if self.method and self.category and self.subcategory: a = get_activity(self.activity) lca = a.lca((self.method, self.category, self.subcategory)) if is_legacy_bwa(): top_e = bw2_compat_annotated_top_emissions(lca) else: top_e = bwa.ContributionAnalysis().annotated_top_emissions(lca) print(tabulate(top_e, headers=["score", "supply", "Activity"])) else: print("Select at least a method first") else: print("Select an activity ")
[docs] def do_aa(self, arg): """List all activities in the current database.""" if not self.database: print("Please choose a database first") else: db = Database(self.database) activities = [activity for activity in db] # Sort activities by name if arg and isinstance(arg, str) and arg.lower() == "name": activities.sort(key=lambda a: a.get("name")) activity_keys = [ (self.database, activity["code"]) for activity in activities ] formatted_activities = [self.format_activity(key) for key in activity_keys] self.set_current_options( { "type": "activities", "options": activity_keys, "formatted": formatted_activities, } ) self.print_current_options("Activities in database")
[docs] def do_lprods(self, arg): """List product or chimaera activities in the current database.""" if not self.database: print("Please choose a database first") return product_types = set() for attr in ( "product_node_type", "product_node_types", "chimaera_node_default", "chimaera_node_defaults", ): value = getattr(bd.labels, attr, None) if value is None: continue if isinstance(value, (set, tuple, list)): product_types.update(value) else: product_types.add(value) # Compatibility fallback for older/newer Brightway label APIs. if not product_types: product_types = {"product", "process_with_reference_product"} db = Database(self.database) activities = [ activity for activity in db if activity.get("type") in product_types ] if arg and isinstance(arg, str) and arg.lower() == "name": activities.sort(key=lambda a: a.get("name")) activity_keys = [(self.database, activity["code"]) for activity in activities] formatted_activities = [self.format_activity(key) for key in activity_keys] self.set_current_options( { "type": "activities", "options": activity_keys, "formatted": formatted_activities, } ) self.print_current_options("Product or chimaera activities in database")
[docs] def do_lpam(self, arg): """List all (Project, Database, Activity) parameters.""" re1 = r"(-f\s)?" # -f and a single whitespace Char re2a = r"(-g\s)" # Any Single Whitespace Character 1 re2b = r"(\{.*\})" # Curly Braces 1 rg = re.compile(re1 + re2a + re2b, re.IGNORECASE | re.DOTALL) m = rg.search(arg) full_cols = False the_group = None if m is None and "-g" in arg: print("Missing group in curly braces in command: -g {DANCE} ...") return elif m: c2 = m.group(3) the_group = c2.strip("{}") print("Filtering for group {} after search".format(the_group)) full_cols = m.group(1) is not None if not self.project: print("Please choose a project first") else: pparams, dparams, aparams = self.acquire_params(full_cols, the_group) if len(pparams) > 0: print("Project Parameters") print(tabulate(pparams, headers="keys")) if len(dparams) > 0: print("Database Parameters") print(tabulate(dparams, headers="keys")) if len(aparams) > 0: print("Activity Parameters") print(tabulate(aparams, headers="keys"))
[docs] def do_lpamg(self, arg): """List parameter groups.""" groups = [g for g in Group.select()] self.set_current_options( { "type": "groups", "options": [g.id for g in groups], "formatted": [g.name for g in groups], } ) self.print_current_options("Parameter groups: ")
[docs] def do_ap(self, arg): """If an activity is selected, show its parameters.""" if self.activity: param_objects = ActivityParameter.select().where( (ActivityParameter.code == self.activity[1]) & (ActivityParameter.database == self.database) ) aparams = [] if "-f" not in arg: aparams = self.dehydrate_params( param_objects, ["database", "code", "group", "name", "formula", "amount"], ) else: aparams = [p.dict for p in param_objects] if len(aparams) > 0: print(tabulate(aparams, headers="keys")) else: print( "No Activity parameters for {}".format(get_activity(self.activity)) ) else: print("Please select an activity first")
[docs] def do_dp(self, arg): """List all database parameters.""" if self.database: param_objects = DatabaseParameter.select().where( DatabaseParameter.database == self.database ) dparams = [] if "-f" not in arg: dparams = self.dehydrate_params( param_objects, ["name", "formula", "amount"] ) else: dparams = [p.dict for p in param_objects] if len(dparams) > 0: print(tabulate(dparams, headers="keys")) else: print("No database parameters in database {}".format(self.database)) else: print("Please select a database first")
[docs] def do_pp(self, arg): """List all project parameters.""" if self.project: param_objects = ProjectParameter.select() pparams = [] if "-f" not in arg: pparams = self.dehydrate_params( param_objects, ["database", "name", "formula", "amount"] ) else: pparams = [p.dict for p in param_objects] if len(pparams) > 0: print(tabulate(pparams, headers="keys")) else: print("No project parameters in {}".format(self.project)) else: print("Please select a project first")
[docs] def do_fp(self, arg): """Find a specific parameter by name.""" if self.project: pparams, dparams, aparams = self.acquire_params(False, None) for p in pparams: p.update({"parameter type": "project"}) for p in dparams: p.update({"parameter type": "database"}) for p in aparams: p.update({"parameter type": "activity"}) p = [p for p in pparams + dparams + aparams if p["name"] == arg] if len(p) > 0: print(tabulate(p, headers="keys")) else: print("Please select a project first")
[docs] def do_sp(self, arg): """Search for a parameter by name, accepting wildcards in arg.""" if self.project: pparams_objects = ProjectParameter.select().where( ProjectParameter.name % arg ) dparams_objects = DatabaseParameter.select().where( DatabaseParameter.name % arg ) aparams_objects = ActivityParameter.select().where( ActivityParameter.name % arg ) pparams = self.dehydrate_params( pparams_objects, ["database", "name", "formula", "amount"] ) dparams = self.dehydrate_params( dparams_objects, ["name", "formula", "amount"] ) aparams = self.dehydrate_params( aparams_objects, ["database", "code", "group", "name", "formula", "amount"], ) for p in pparams: p.update({"parameter type": "project"}) for p in dparams: p.update({"parameter type": "database"}) for p in aparams: p.update({"parameter type": "activity"}) p = [p for p in pparams + dparams + aparams] if len(p) > 0: print(tabulate(p, headers="keys")) else: print("Please select a project first")
[docs] def do_add(self, arg): """Add the currently selected activity to the temporary activities list.""" if not self.activity: print("No activity currently selected") else: if self.activity not in self.temp_activities: self.temp_activities.append(self.activity) print( "Added activity to temporary list: %s" % self.format_activity(self.activity) ) print( "Temporary list now contains %d activities" % len(self.temp_activities) ) else: print( "Activity already in temporary list: %s" % self.format_activity(self.activity) )
[docs] def do_clear(self, arg): """Clear the temporary activities list.""" count = len(self.temp_activities) self.temp_activities = [] print("Cleared temporary activities list (%d activities removed)" % count)
[docs] def do_lt(self, arg): """List all activities in the temporary activities list.""" if not self.temp_activities: print("Temporary activities list is empty") else: print( "Temporary activities list (%d activities):" % len(self.temp_activities) ) for index, activity_key in enumerate(self.temp_activities): print("[%d]: %s" % (index, self.format_activity(activity_key)))
[docs] def do_ca(self, arg): """Print the recursive calculation of an LCA, accepting cutoff as arg.""" if all([self.method, self.category, self.subcategory]) and self.activity: if arg is None or arg == "": bwa.print_recursive_calculation( self.activity, (self.method, self.category, self.subcategory) ) else: bwa.print_recursive_calculation( self.activity, (self.method, self.category, self.subcategory), cutoff=float(arg), ) else: print("Please select a method and an activity first.")
[docs] def do_sc(self, arg): """Print the supply chain of an activity, accepting cutoff as arg.""" if self.activity: if arg is None or arg == "": bwa.print_recursive_supply_chain(self.activity) else: bwa.print_recursive_supply_chain( self.activity, cutoff=float(arg), ) else: print("Please select an activity first.")
[docs] def do_pe(self, arg): """show production exchanges if they exist""" if not self.activity: print("Need to choose an activity first") else: es = get_activity(self.activity).exchanges() self.format_exchanges_as_options(es, "production") self.print_current_options("production exchanges")
[docs] def do_pei(self, arg): """show production exchanges if they exist""" if not self.activity: print("Need to choose an activity first") else: prod_ex = [ e for e in get_activity(self.activity).exchanges() if e["type"] == "production" ] print("\n Production Exchange information\n") for e in prod_ex: for prop, value in e.as_dict().items(): print(f"\t {prop}: {value}") print("")
[docs] def bw2_compat_annotated_top_emissions(lca, names=True, **kwargs): """Get list of most damaging biosphere flows in an LCA, sorted by ``abs(direct impact)``. # noqa: E501 Returns a list of tuples: ``(lca score, inventory amount, activity)``. If ``names`` is False, they returns the process key as the last element. # noqa: E501 """ # This is a temporary fix, until # https://github.com/brightway-lca/brightway2-analyzer/issues/27 # gets correctly handled for bw2 branch # The only difference in the actual code is the casting of indices to ints. print("Using compat mode annotated_top_emissions") ra, rp, rb = lca.reverse_dict() results = [ (score, lca.inventory[int(index), :].sum(), rb[int(index)]) for score, index in bwa.ContributionAnalysis().top_emissions( lca.characterized_inventory, **kwargs ) ] if names: results = [(x[0], x[1], get_activity(x[2])) for x in results] return results
[docs] def is_legacy_bwa(): return bwa.__version__[0] == 0 and bwa.__version__[1] == 10
[docs] def is_legacy_bc(): return isinstance(bc.__version__, tuple)
[docs] def is_legacy_bd(): if isinstance(bd_version, tuple): return True elif isinstance(bd_version, str) and version.parse(bd_version) < version.parse( FTS5_ENABLED_BD_VERSION ): return True return False
[docs] def has_namespaced_methods(): return len(list(methods)[0]) >= 4
[docs] def search_bw2(search_criterion, criterion_value, database, needle, search_limit): """Search and then filter by criteria.""" if needle is None: needle = "" if search_criterion and criterion_value: if ( search_criterion == "location" or search_criterion == "reference product" or search_criterion == "CAS number" ): if needle: results = Database(database).search(needle, limit=search_limit) else: results = Database(database) results = [ r for r in results if r.get(search_criterion) and r.get(search_criterion, "").casefold() == criterion_value.casefold() ] elif search_criterion == "category": criterion = tuple(map(lambda x: x.casefold(), criterion_value.split("::"))) if needle: results = Database(database).search(needle, limit=search_limit) else: results = Database(database) results = [ r for r in results if r.get("categories") and tuple(map(lambda x: x.casefold(), r.get("categories"))) == criterion ] else: results = Database(database).search(needle, limit=search_limit) return results
[docs] def main(): arguments = docopt(__doc__, version="Brightway2 Activity Browser 2.0") activitybrowser = ActivityBrowser() activitybrowser._init( project=arguments["<project>"], database=arguments["<database>"], activity=arguments["<activity-id>"], ) activitybrowser.cmdloop()
if __name__ == "__main__": main()