#!/usr/bin/env python
# encoding: utf-8
"""Brightway2 database and activity browser.
Developed by Bernhard Steubing and Chris Mutel, 2013
Usage:
bw2-browser
bw2-browser <project>
bw2-browser <project> <database>
bw2-browser <project> <database> <activity-id>
Options:
-h --help Show this screen.
--version Show version.
"""
from __future__ import print_function, unicode_literals
import cmd
import codecs
import itertools
import math
import os
import pprint
import re
import shlex
import textwrap
import time
import traceback
import uuid
import warnings
import bw2analyzer as bwa
import bw2calc as bc
import bw2data as bd
from bw2data import __version__ as bd_version
from bw2data import (
Database,
Method,
calculation_setups,
config,
databases,
get_activity,
methods,
projects,
)
from bw2data.errors import UnknownObject
from packaging import version
if (
bc.__version__
and isinstance(bc.__version__, str)
and version.parse(bc.__version__) >= version.parse("2.0.DEV10")
):
from bw2data import get_multilca_data_objs
from bw2data.parameters import (
ActivityParameter,
DatabaseParameter,
Group,
ProjectParameter,
)
from docopt import docopt
from tabulate import tabulate
from bw2io import backup_project_directory, restore_project_directory
warnings.filterwarnings("ignore", ".*Read only project.*")
[docs]
FTS5_ENABLED_BD_VERSION = "4.0.dev47"
[docs]
GRUMPY = itertools.cycle(
(
"This makes no damn sense: ",
"My mule has more sense than this: ",
"If 50 million people say a foolish thing, it is still a foolish thing: ",
"I have had enough of this kind of thing: ",
"What are you talking about? ",
"Are you kidding me? What is this: ",
)
)
[docs]
QUIET = itertools.cycle(
(
"You say it best when you say nothing at all...",
"Let us be silent, that we may hear the whispers of the gods.",
"Actions speak louder than words. But you didn't use either!",
"We have ways of making you talk, Mr. Bond!",
"Brevity is the soul of wit. But you can take it too far!",
"Do not underestimate the determination of a quiet man.",
)
)
[docs]
HELP_TEXT = """
This is a simple way to browse databases and activities in Brightway2.
The following commands are available:
Basic commands:
?: Print this help screen.
quit, q: Exit the activity browser.
number: Go to option number when a list of options is present.
l: List current options.
n: Go to next page in paged options.
p: Go to previous page in paged options.
p number: Go to page number in paged options.
h: List history of databases and activities viewed.
wh: Write history to a text file.
autosave: Toggle autosave behaviour on and off.
Working with projects:
lpj: List available projects.
backup [directory]: Backup the current project. Optionally specify a target \
directory. If no directory is specified, the backup will be created in the \
projects directory with a timestamp.
restore <archive> [options]: Restore a project from an archive. Supported \
options: --project NAME, --overwrite.
Working with databases:
ldb: List available databases.
db name: Go to database name. No quotes needed.
s [string]: Search activity names in current database with string. Without string \
the search provides no results.
s -loc {LOCATION} [string]: Search activity names in current database with \
string and location LOCATION.
s -cat {CAT::SUBCAT::SUBSUBCAT} [string]: Search activity names in current \
database with string and category CAT, SUBCAT, SUBCAT [useful for biosphere].
s -rp {REFERENCE PRODUCT} [string]: Search activities in current database that \
have reference product and optionnaly match string in search.
Working with activities:
a id: Go to activity id in current database. Complex ids in quotes.
aa: List all activities in current database. aa name sorts the activities \
by name.
lprods: List product activities only (product and process_with_reference_product).
i: Info on current activity.
ii: Extended Info on current activity.
r: Choose a random activity from current database.
u: List upstream activities (inputs for the current activity).
up: List upstream activities with pedigree info if avail (inputs for the current \
activity).
uu: List upstream activities with formula info if avail.
un: display uncertainty information of upstream activitities if avail.
d: List downstream activities (activities which consume current activity).
b: List biosphere flows for the current activity.
pe: List production exchanges for current activity.
pei: show the information of the production exchange of the current activity.
cfs: Show characterization factors for current activity and current method.
G: if a method and activity are selected, do an lcia of the activity.
ta: if an lcia of the activity has been done, list top activities.
te: if an lcia of the activity has been done, list top emissions.
ca: do a contribution analysis of an activity with a method.
sc: print recursive supply chain of an activity.
Working with methods:
lm: List methods.
mi: Show method metadata. (must select method/category/subcategory first)
Working with parameters:
lpam: List all parameters (Project, Database and Activity) showing only \
basic columns (data).
lpam [-f]: List all parameters showing all columns (data) of each parameter.
lpam [-f] -g {YY}: List parameters for a specific group. Use db or specific \
data. add as first option -f to show all columns.
lpamg: show parameter groups
ap [-f]: If an activity is selected, show activity parameters
dp [-f]: if a database is selected show database parameters
pp [-f]: If a project is selected show project parameters
fp : Find parameters (Project, Database or Activity) by name
sp : search a parameter (accepts wildcards)
Misc:
tsv: [filename] export latest table to tsv file (e.g.: results or cfs)
GC: Start group contribution mode for comparing multiple activities.
add: Add current activity to the list for group contribution/compare analysis.
list: List all activities added for group contribution/compare analysis.
clear: Clear (empty) the list of added activities for group contribution/compare.
GCH: Show the results table from the last group contribution/compare analysis.
"""
[docs]
def get_autosave_text(autosave):
return "on" if autosave else "off"
[docs]
class ActivityBrowser(cmd.Cmd):
"""A command line based Activity Browser for brightway2."""
[docs]
def _init(self, project=None, database=None, activity=None, method=None):
"""Provide initial data.
Can't override __init__, because this is an old style class
i.e. there is no support for ``super``."""
# Have to print into here; otherwise only print during ``cmdloop``
if config.p.get("ab_activity", None):
# Must be tuple, not a list
config.p["ab_activity"] = tuple(config.p["ab_activity"])
print(HELP_TEXT + "\n" + self.format_defaults())
self.page_size = 20
self.search_limit = config.p.get("search_limit", 100)
self.set_current_options(None)
self.autosave = config.p.get("ab_autosave", False)
self.history = self.reformat_history(config.p.get("ab_history", []))
self.load_project(project)
self.load_database(database)
self.load_activity(activity)
self.load_method(method)
self.temp_activities = []
self.gc_results = None # Store GC command results for GCH command
self.update_prompt()
######################
# Options management #
######################
[docs]
def choose_option(self, opt):
"""Go to option ``opt``"""
try:
index = int(opt)
if index >= len(self.current_options.get("formatted", [])):
print("There aren't this many options")
elif self.current_options["type"] == "method_namespaces":
self.choose_method_namespace(self.current_options["options"][index])
elif self.current_options["type"] == "methods":
self.choose_method(self.current_options["options"][index])
elif self.current_options["type"] == "categories":
self.choose_category(self.current_options["options"][index])
elif self.current_options["type"] == "subcategories":
self.choose_subcategory(self.current_options["options"][index])
elif self.current_options["type"] == "projects":
self.choose_project(self.current_options["options"][index])
elif self.current_options["type"] == "databases":
self.choose_database(self.current_options["options"][index])
elif self.current_options["type"] == "activities":
self.choose_activity(self.current_options["options"][index])
elif self.current_options["type"] == "groups":
self.choose_group(self.current_options["options"][index])
elif self.current_options["type"] == "history":
option = self.current_options["options"][index]
if option[0] == "database":
self.choose_database(option[1])
elif option[0] == "activity":
self.choose_activity(option[1])
elif option[0] == "method":
self.choose_method(option[1])
elif option[0] == "category":
self.choose_category(option[1])
elif option[0] == "subcategory":
self.choose_subcategory(option[1])
else:
# No current options.
print("No current options to choose from")
except Exception:
print(traceback.format_exc())
print("Can't convert %(o)s to number.\nCurrent options are:" % {"o": opt})
self.print_current_options()
[docs]
def print_current_options(self, label=None):
print("")
if label:
print(label + "\n")
if not self.current_options.get("formatted", []):
print("Empty list")
elif self.max_page:
# Paging needed
begin = self.page * self.page_size
end = (self.page + 1) * self.page_size
for index, obj in enumerate(self.current_options["formatted"][begin:end]):
print(
"[%(index)i]: %(option)s" % {"option": obj, "index": index + begin}
)
print(
"\nPage %(page)i of %(maxp)s. Use n (next page) and p \
(previous page) to navigate."
% {"page": self.page, "maxp": self.max_page}
)
else:
for index, obj in enumerate(self.current_options["formatted"]):
print("[%(index)i]: %(option)s" % {"option": obj, "index": index})
print("")
[docs]
def set_current_options(self, options):
self.page = 0
if options is None:
options = {"type": None}
self.max_page = 0
else:
self.max_page = int(math.ceil(len(options["formatted"]) / self.page_size))
self.current_options = options
####################
# Shell management #
####################
[docs]
def update_prompt(self):
"""update prompt and upstream/downstream activity lists"""
self.invite = ">> "
self.prompt = ""
if self.activity:
allowed_length = 76 - 8 - len(self.database)
activity_ = get_activity(self.activity)
name = activity_.get("name", "Unknown")
categories = activity_.get("categories", []) or []
if allowed_length < len(name):
name = name[:allowed_length]
self.prompt = "%(pj)s@(%(db)s) %(n)s %(categories)s" % {
"pj": self.project,
"db": self.database,
"n": name,
"categories": categories,
}
elif self.database:
self.prompt = "%(pj)s@(%(name)s) " % {
"pj": self.project,
"name": self.database,
}
elif self.project:
self.prompt = "%(pj)s " % {"pj": self.project}
if self.method:
if self.category:
if self.subcategory:
self.prompt += "[%(method)s/%(category)s/%(subcategory)s] " % {
"method": self.method,
"category": self.category,
"subcategory": self.subcategory,
}
else:
self.prompt += "[%(method)s/%(category)s] " % {
"method": self.method,
"category": self.category,
}
else:
self.prompt += "[%(method)s/] " % {"method": self.method}
self.prompt += self.invite
##############
# Formatting #
##############
[docs]
def format_history(self, command):
kind, obj = command
if kind == "database":
return "Db: %(name)s" % {"name": obj}
elif kind == "activity":
return "Act: %(act)s" % {"act": self.format_activity(obj)}
else:
return f"{kind}: {obj}"
[docs]
def reformat_history(self, json_data):
"""Convert lists to tuples (from JSON serialization)"""
return [
(x[0], tuple(x[1])) if x[0] == "activity" else tuple(x) for x in json_data
]
[docs]
def print_cfs(self, current_methods, activity=None):
"""Print cfs for a list of methods, and optionally only for an activity"""
table_lines = []
for m in current_methods:
method_ = Method(m)
cfs = method_.load()
if activity and "biosphere" in self.database:
cfs = [cf for cf in cfs if get_activity(cf[0]).key == activity]
for cf in cfs:
# in bw2, the first elment of the cf data is a key -> tuple('db', 'id')
# in bw25, the first element is single int id of the activity
# this looks hackish, but it allows to keep 1 code-base for both
# versions of bw (bw2 & bw25)
if isinstance(cf[0], int):
flow = get_activity(cf[0])
else:
flow_key = tuple((cf[0][0], cf[0][1]))
flow = get_activity(flow_key)
flow_cat_tup = flow.get("categories", ("",)) or ("",)
flow_cat = flow_cat_tup[0]
flow_subcat = None
if len(flow_cat_tup) == 2:
flow_subcat = flow_cat_tup[1]
if has_namespaced_methods():
line = [
m[0],
m[1],
m[2],
m[3],
cf[1],
flow["name"],
flow_cat,
flow_subcat,
method_.metadata["unit"],
]
else:
line = [
m[0],
m[1],
m[2],
cf[1],
flow["name"],
flow_cat,
flow_subcat,
method_.metadata["unit"],
]
table_lines.append(line)
if table_lines:
if has_namespaced_methods():
headers = [
"namespace",
"method",
"category",
"indicator",
"cf",
"flow",
"flow_category",
"flow_subcategory",
"unit",
]
else:
headers = [
"method",
"category",
"indicator",
"cf",
"flow",
"flow_category",
"flow_subcategory",
"unit",
]
print("CFS")
self.tabulate_data = tabulate(table_lines, headers=headers, tablefmt="tsv")
print(tabulate(table_lines, headers=headers))
else:
print("Not characterized by method")
#######################
# Project management #
#######################
[docs]
def choose_project(self, project):
if self.project == project:
return
self.project = project
projects.set_current(self.project, writable=False)
self.history.append(("project", project))
if self.autosave:
config.p["ab_project"] = self.project
config.p["ab_history"] = self.history[-10:]
config.save_preferences()
self.set_current_options(None)
self.activity = None
self.database = None
self.list_databases()
self.update_prompt()
[docs]
def load_project(self, project):
if project:
if project not in projects:
print("Project %(name)s not found" % {"name": project})
self.load_project(None)
else:
self.project = project
projects.set_current(self.project, writable=False)
elif config.p.get("ab_project", False):
self.project = config.p["ab_project"]
else:
self.project = None
self.list_projects()
[docs]
def list_projects(self):
pjs = [p.name for p in projects]
self.set_current_options(
{
"type": "projects",
"options": pjs,
"formatted": ["%(name)s" % {"name": name} for name in pjs],
}
)
self.print_current_options("Projects")
#######################
# Database management #
#######################
[docs]
def choose_database(self, database):
if self.activity and self.activity[0] == database:
pass
elif config.p.get("ab_activity", [0, 0])[0] == database:
self.choose_activity(config.p["ab_activity"])
else:
self.unknown_activity()
self.database = database
self.history.append(("database", database))
if self.autosave:
config.p["ab_database"] = self.database
config.p["ab_history"] = self.history[-10:]
config.save_preferences()
self.set_current_options(None)
self.update_prompt()
[docs]
def load_database(self, database):
"""Load database, trying first"""
if database:
if database not in databases:
print("Database %(name)s not found" % {"name": database})
self.load_database(None)
else:
self.database = database
elif config.p.get("ab_database", False):
self.database = config.p["ab_database"]
else:
self.database = None
[docs]
def list_databases(self):
dbs = sorted(databases.list)
self.set_current_options(
{
"type": "databases",
"options": dbs,
"formatted": [
"%(name)s (%(number)s activities/flows)"
% {"name": name, "number": databases[name].get("number", "unknown")}
for name in dbs
],
}
)
self.print_current_options("Databases")
#######################
# Activity management #
#######################
[docs]
def load_activity(self, activity):
"""Load given or default activity on start"""
if isinstance(activity, str):
# Input parameter
self.choose_activity((self.database, activity))
elif config.p.get("ab_activity", None):
self.choose_activity(config.p["ab_activity"], restored=True)
else:
self.unknown_activity()
[docs]
def choose_activity(self, key, restored=False):
self.database = key[0]
self.activity = key
self.history.append(("activity", key))
if self.autosave and not restored:
config.p["ab_activity"] = key
config.p["ab_history"] = self.history[-10:]
config.save_preferences()
self.set_current_options(None)
self.update_prompt()
[docs]
def get_downstream_exchanges(self, activity):
"""Get the exchanges that consume this activity's product"""
activity = get_activity(activity)
excs = []
exchanges = activity.upstream()
for exc in exchanges:
if activity == exc["input"] and not activity == exc["output"]:
excs.append(
{
"type": exc.get("type", "Unknown"),
"input": exc["output"],
"amount": exc["amount"],
"key": exc["output"][1],
"name": exc.get("name", "Unknown"),
}
)
excs.sort(key=lambda x: x["name"])
return excs
[docs]
def unknown_activity(self):
self.activity = None
########################
# Method management #
########################
[docs]
def load_method(self, method):
if method:
if method not in methods:
print("Method %(name)s not found" % {"name": method})
self.load_method(None)
else:
self.method = method[0]
self.category = method[1]
elif config.p.get("ab_method", False):
self.method = config.p["ab_method"]
else:
self.method = None
self.category = None
self.subcategory = None
[docs]
def list_methods(self):
if self.project:
m_names = set([])
methods_ = sorted(methods)
for m in methods_:
m_names.add(m[0])
m_names = sorted(m_names)
if len(methods_) > 0 and has_namespaced_methods():
self.set_current_options(
{
"type": "method_namespaces",
"options": list(m_names),
"formatted": ["%(name)s" % {"name": name} for name in m_names],
}
)
methods_label = "Method namespaces"
else:
self.set_current_options(
{
"type": "methods",
"options": list(m_names),
"formatted": ["%(name)s" % {"name": name} for name in m_names],
}
)
methods_label = "Methods"
self.print_current_options(methods_label)
else:
self.set_current_options(None)
self.update_prompt()
[docs]
def choose_method_namespace(self, method_namespace):
self.method_namespace = method_namespace
self.method = self.category = self.subcategory = None
self.history.append(("method_namespace", method_namespace))
if self.autosave:
config.p["ab_method_namespace"] = self.method
config.p["ab_history"] = self.history[-10:]
config.save_preferences()
c_names = set([])
methods_ = sorted(methods)
for m in [m for m in methods_ if m[0] == method_namespace]:
c_names.add(m[1])
c_names = sorted(c_names)
self.set_current_options(
{
"type": "methods",
"options": list(c_names),
"formatted": ["%(name)s" % {"name": name} for name in c_names],
}
)
self.print_current_options("Methods")
self.update_prompt()
[docs]
def choose_method(self, method):
self.method = method
self.category = self.subcategory = None
self.history.append(("method", method))
if self.autosave:
config.p["ab_method"] = self.method
config.p["ab_history"] = self.history[-10:]
config.save_preferences()
c_names = set([])
methods_ = sorted(methods)
if has_namespaced_methods():
for m in [
m for m in methods_ if m[0] == self.method_namespace and m[1] == method
]:
c_names.add(m[2])
else:
for m in [m for m in methods_ if m[0] == method]:
c_names.add(m[1])
c_names = sorted(c_names)
self.set_current_options(
{
"type": "categories",
"options": list(c_names),
"formatted": ["%(name)s" % {"name": name} for name in c_names],
}
)
self.print_current_options("Categories")
self.update_prompt()
[docs]
def choose_category(self, category):
self.category = category
self.history.append(("category", category))
if self.autosave:
config.p["ab_category"] = self.category
config.p["ab_history"] = self.history[-10:]
config.save_preferences()
c_names = set([])
methods_ = sorted(methods)
if has_namespaced_methods():
for m in [
m
for m in methods_
if m[0] == self.method_namespace
and m[1] == self.method
and m[2] == category
]:
c_names.add(m[3])
else:
for m in [m for m in methods_ if m[0] == self.method and m[1] == category]:
c_names.add(m[2])
self.set_current_options(
{
"type": "subcategories",
"options": list(c_names),
"formatted": ["%(name)s" % {"name": name} for name in c_names],
}
)
self.print_current_options("Subcategories")
self.update_prompt()
[docs]
def choose_subcategory(self, subcategory):
self.subcategory = subcategory
self.history.append(("subcategory", subcategory))
# using ecoinvent_interface creates biosphere dbs that are not only called
# "biosphere3" so we test now only against a substring, not the exact name
if (
self.activity and "biosphere" in self.database
): # TODO: recover generic name instead of hard coded one
if self.method_namespace:
mkey = (
self.method_namespace,
self.method,
self.category,
self.subcategory,
)
else:
mkey = (self.method, self.category, self.subcategory)
self.print_cfs([mkey], self.activity)
self.update_prompt()
#################################
# GROUP / Parameters Management #
#################################
[docs]
def dehydrate_params(self, params, fields):
"""Remove keys of each param dictionnary, and only keep fields."""
return [{k: v for k, v in p.dict.items() if k in fields} for p in params]
[docs]
def acquire_params(self, full_cols, the_group):
if full_cols:
pparams = [p.dict for p in ProjectParameter.select()]
dparams = [p.dict for p in DatabaseParameter.select()]
aparams = [p.dict for p in ActivityParameter.select()]
else:
pparams = self.dehydrate_params(
ProjectParameter.select(), ["name", "formula", "amount"]
)
dparams = self.dehydrate_params(
DatabaseParameter.select(), ["database", "name", "formula", "amount"]
)
aparams = self.dehydrate_params(
ActivityParameter.select(),
["database", "code", "group", "name", "formula", "amount"],
)
if the_group:
if the_group.lower() == "project":
dparams = []
aparams = []
else:
pparams = []
dparams = [p for p in dparams if p["database"] == the_group]
aparams = [p for p in aparams if p["group"] == the_group]
return pparams, dparams, aparams
[docs]
def choose_group(self, group_id):
g = Group.get_by_id(group_id)
pparams, dparams, aparams = self.acquire_params(False, g.name)
if len(pparams) > 0:
print("Project Parameters")
print(tabulate(pparams, headers="keys"))
if len(dparams) > 0:
print("Database Parameters")
print(tabulate(dparams, headers="keys"))
if len(aparams) > 0:
print("Activity Parameters")
print(tabulate(aparams, headers="keys"))
self.set_current_options(None)
########################
# Default user actions #
########################
[docs]
def default(self, line):
"""No ``do_foo`` command - try to select from options."""
if self.current_options["type"]:
try:
self.choose_option(int(line))
except Exception:
print(next(GRUMPY) + line)
else:
print(next(GRUMPY) + line)
[docs]
def emptyline(self):
"""No command entered!"""
print(next(QUIET) + "\n(? for help)")
#######################
# Custom user actions #
#######################
[docs]
def do_a(self, arg):
"""Go to activity id ``arg``"""
key = (self.database, arg)
if not self.database:
print("Please choose a database first")
# Support the use of int ids (used in bw25)
activity_ref = None
try:
activity_ref = int(arg)
except ValueError:
activity_ref = key
try:
activity = get_activity(activity_ref)
self.choose_activity(activity.key)
except UnknownObject:
print(f"Invalid activity id {key[1]}")
[docs]
def do_autosave(self, arg):
"""Toggle autosave behaviour.
If autosave is on, the current database or activity is written to
config.p each time it changes.
"""
self.autosave = not self.autosave
config.p["ab_autosave"] = self.autosave
config.save_preferences()
print("Autosave is now %s" % get_autosave_text(self.autosave))
[docs]
def do_b(self, arg):
"""List biosphere flows"""
if not self.activity:
print("Need to choose an activity first")
else:
es = get_activity(self.activity).exchanges()
self.format_exchanges_as_options(es, "biosphere")
self.print_current_options("Biosphere flows")
[docs]
def do_cfs(self, arg):
"""Print cfs of biosphere flows or method."""
# Support multiple biosphere databases in one project
if (
(
self.activity
and "biosphere" in self.database # show the cfs for the given flow
)
or self.method
or self.method_namespace
): # show the cfs of a given method
if has_namespaced_methods():
namespace_shift = 1
else:
namespace_shift = 0
if has_namespaced_methods() and self.method_namespace:
current_methods = [m for m in methods if m[0] == self.method_namespace]
if self.method:
current_methods = [
m for m in methods if m[0 + namespace_shift] == self.method
]
# print(f"Current namespace {self.method_namespace}")
# print(f"Current methods {current_methods}")
# print(f"Current method {self.method}")
if self.category: # show cfs for current cat, current act
current_methods = [
m
for m in current_methods
if m[1 + namespace_shift] == self.category
]
if self.subcategory:
current_methods = [
m
for m in current_methods
if m[2 + namespace_shift] == self.subcategory
]
else:
print("No method currently selected") # Alternative: cfs for all methods?
return False
self.print_cfs(current_methods, self.activity)
self.update_prompt()
[docs]
def do_tsv(self, arg):
"""write the latest table created as tsv file."""
output_filename = "output.tsv"
if arg:
output_filename = arg
if self.tabulate_data:
with open(output_filename, "w") as f:
f.write(self.tabulate_data)
[docs]
def do_cp(self, arg):
"""Clear preferences. Only for development."""
self.autosave = False
if config.p["ab_autosave"]:
del config.p["ab_autosave"]
del config.p["ab_project"]
del config.p["ab_method"]
del config.p["ab_database"]
del config.p["ab_activity"]
del config.p["ab_history"]
config.save_preferences()
self.project = self.database = self.activity = None
self.method = self.category = self.subcategory = None
self.update_prompt()
[docs]
def do_d(self, arg):
"""Load downstream activities"""
if not self.activity:
print("Need to choose an activity first")
else:
ds = get_activity(self.activity)
unit = ds.get("unit", "")
excs = self.get_downstream_exchanges(self.activity)
self.format_exchanges_as_options(excs, "technosphere", unit)
self.print_current_options("Downstream consumers")
[docs]
def do_db(self, arg):
"""Switch to a different database"""
print(arg)
if arg not in databases:
print("'%(db)s' not a valid database" % {"db": arg})
else:
self.choose_database(arg)
[docs]
def do_h(self, arg):
"""Pretty print history of databases & activities"""
self.set_current_options(
{
"type": "history",
"options": self.history[::-1],
"formatted": [self.format_history(o) for o in self.history[::-1]],
}
)
self.print_current_options("Browser history")
[docs]
def do_help(self, args):
print(HELP_TEXT)
[docs]
def do_i(self, arg):
"""Info on current activity.
TODO: Colors could be improved."""
if not self.activity:
print("No current activity")
else:
ds = get_activity(self.activity)
prod = [x for x in ds.exchanges() if x["input"] == self.activity]
if "production amount" in ds and ds["production amount"]:
amount = ds["production amount"]
elif len(prod) == 1:
amount = prod[0]["amount"]
else:
amount = 1.0
print(
"""\n%(name)s
Database: %(database)s
ID: %(id)s
numerical_id: %(n_id)s
Product: %(product)s
Production amount: %(amount).2g %(unit)s
Location: %(location)s
Classifications:
%(classifications)s
Technosphere inputs: %(tech)s
Biosphere flows: %(bio)s
Reference flow used by: %(consumers)s\n"""
% {
"name": ds.get("name", "Unknown"),
"product": ds.get("reference product") or ds.get("name", "Unknown"),
"database": self.activity[0],
"id": self.activity[1],
# numerical ids are a feature of bw25
"n_id": ds.get("id") or "NA",
"amount": amount,
"unit": ds.get("unit", ""),
"classifications": "\n\t\t\t".join(
[
"{}: {}".format(c[0], c[1])
for c in ds.get("classifications", [])
]
),
"location": ds.get("location", config.global_location),
"tech": len(
[x for x in ds.exchanges() if x["type"] == "technosphere"]
),
"bio": len([x for x in ds.exchanges() if x["type"] == "biosphere"]),
"consumers": len(self.get_downstream_exchanges(self.activity)),
}
)
[docs]
def do_ii(self, arg):
"""Extended Info on current activity.
TODO: Colors could be improved."""
if not self.activity:
print("No current activity")
else:
ds = get_activity(self.activity)
prod = [x for x in ds.exchanges() if x["input"] == self.activity]
if "production amount" in ds and ds["production amount"]:
amount = ds["production amount"]
elif len(prod) == 1:
amount = prod[0]["amount"]
else:
amount = 1.0
print(
"""Extended info \n%(name)s
Database: %(database)s
ID: %(id)s
numerical_id: %(n_id)s
Product: %(product)s
Production amount: %(amount).2g %(unit)s
Location: %(location)s
Classifications:
%(classifications)s
Technosphere inputs: %(tech)s
Biosphere flows: %(bio)s
Reference flow used by: %(consumers)s\n"""
% {
"name": ds.get("name", "Unknown"),
"product": ds.get("reference product") or ds.get("name", "Unknown"),
"database": self.activity[0],
"id": self.activity[1],
# numerical ids are a feature of bw25
"n_id": ds.get("id") or "NA",
"amount": amount,
"unit": ds.get("unit", ""),
"classifications": "\n\t\t\t".join(
[
"{}: {}".format(c[0], c[1])
for c in ds.get("classifications", [])
]
),
"location": ds.get("location", config.global_location),
"tech": len(
[x for x in ds.exchanges() if x["type"] == "technosphere"]
),
"bio": len([x for x in ds.exchanges() if x["type"] == "biosphere"]),
"consumers": len(self.get_downstream_exchanges(self.activity)),
}
)
indentation_char = " " * 4
line_length = 50 # TODO: use dynamic line lenght or take from prefs
t_wrapper = textwrap.TextWrapper()
t_wrapper.width = line_length
for field in [
k
for k in ds.keys()
if k
not in [
"name",
"product",
"database",
"location",
"unit",
"classifications",
"production amount",
"code",
]
]:
if field.casefold() == "comment".casefold() and ds.get(field):
t_wrapper.replace_whitespace = False
contents = "\n".join(
[
"\n".join(t_wrapper.wrap(line))
for line in ds[field].splitlines()
if line.strip() != ""
]
)
print(
"%(tab)s%(field)s:" % {"tab": indentation_char, "field": field}
)
for line in contents.splitlines():
print(
"%(tab)s%(line)s"
% {"tab": indentation_char * 2, "line": line}
)
else:
if isinstance(ds[field], str):
t_wrapper.replace_whitespace = False
field_contents = t_wrapper.wrap(ds[field])
else:
t_wrapper.break_long_words = False
field_contents = t_wrapper.wrap(repr(ds[field]))
print(
"%(tab)s%(field)s:" % {"tab": indentation_char, "field": field}
)
for line in field_contents:
print(
"%(tab)s%(line)s"
% {"tab": indentation_char * 2, "line": line}
)
[docs]
def do_l(self, arg):
"""List current options"""
if self.current_options["type"]:
self.print_current_options()
else:
print("No current options")
[docs]
def do_lm(self, arg):
"""List methods"""
self.list_methods()
[docs]
def do_lpj(self, arg):
"""List available projects"""
self.list_projects()
[docs]
def do_backup(self, target_dir=None):
"""Backup the current project. Optionally specify a target directory."""
if not self.project:
print("Please select a project first")
return
dir_backup = None
if target_dir and target_dir.strip():
dir_backup = os.path.expanduser(target_dir.strip())
# Check if directory exists before calling backup function
if not os.path.exists(dir_backup):
print("Backup directory does not exist: %s" % dir_backup)
return
if not os.path.isdir(dir_backup):
print("Backup path is not a directory: %s" % dir_backup)
return
try:
backup_path = backup_project_directory(self.project, dir_backup=dir_backup)
print(
"Project '%s' backed up successfully to: %s"
% (self.project, backup_path)
)
except ValueError as e:
print("Error: Project does not exist or invalid project name: %s" % e)
except FileNotFoundError as e:
print("Error: Backup directory not found: %s" % e)
except PermissionError as e:
print("Error: Permission denied. Cannot write to backup directory: %s" % e)
except Exception as e:
print("Error during backup: %s" % e)
traceback.print_exc()
[docs]
def do_restore(self, arg):
"""Restore a project from a backup archive."""
try:
tokens = shlex.split(arg)
except ValueError as exc:
print("Error parsing arguments: %s" % exc)
return
if not tokens:
print("Usage: restore <archive> [--project NAME] [--overwrite]")
return
archive_path = None
project_name = None
overwrite = False
extra_args = []
i = 0
while i < len(tokens):
token = tokens[i]
if token in ("--project", "-p"):
i += 1
if i >= len(tokens):
print("Missing value for %s" % token)
return
project_name = tokens[i]
elif token in ("--overwrite", "-o"):
overwrite = True
else:
if archive_path is None:
archive_path = token
elif project_name is None:
project_name = token
else:
extra_args.append(token)
i += 1
if extra_args:
print("Unrecognized arguments: %s" % ", ".join(extra_args))
print("Usage: restore <archive> [--project NAME] [--overwrite]")
return
if not archive_path:
print("Usage: restore <archive> [--project NAME] [--overwrite]")
return
archive_path = os.path.expanduser(archive_path)
if not os.path.exists(archive_path):
print("Archive not found: %s" % archive_path)
return
if not os.path.isfile(archive_path):
print("Archive path is not a file: %s" % archive_path)
return
if project_name and project_name in projects and not overwrite:
print(
"Project '%s' already exists. Use --overwrite to replace it."
% project_name
)
return
if overwrite:
confirm_name = project_name or "the project stored in the archive"
answer = (
input("You are about to overwrite %s. Continue? [y/N]: " % confirm_name)
.strip()
.lower()
)
if answer not in ("y", "yes"):
print("Restore cancelled.")
return
try:
restored_project = restore_project_directory(
archive_path,
project_name=project_name,
overwrite_existing=overwrite,
switch=True,
)
print(
"Project '%s' restored successfully from %s"
% (restored_project, archive_path)
)
self.choose_project(restored_project)
except FileNotFoundError as exc:
print("Project archive not found, Error: %s" % exc)
except ValueError as exc:
print("Project archive is invalid, Error: %s" % exc)
except Exception as exc:
print("Error during restore: %s" % exc)
traceback.print_exc()
[docs]
def do_ldb(self, arg):
"""List available databases"""
self.list_databases()
[docs]
def do_mi(self, arg):
"""Show method information"""
if self.method and self.category and self.subcategory:
if has_namespaced_methods() and self.method_namespace:
m_key = (
self.method_namespace,
self.method,
self.category,
self.subcategory,
)
else:
m_key = (self.method, self.category, self.subcategory)
try:
m = Method(m_key)
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(m.metadata)
except UnknownObject:
print(f"Method {m_key} not found")
else:
print("No current method selected")
[docs]
def do_n(self, arg):
"""Go to next page in paged options"""
if not self.current_options["type"]:
print("Not in page mode")
elif self.page == self.max_page:
print("No next page")
else:
self.page += 1
self.print_current_options()
[docs]
def do_p(self, arg):
"""Go to previous page in paged options"""
if not self.current_options["type"]:
print("Not in page mode")
elif arg:
try:
page = int(arg)
if page < 0 or page > self.max_page:
print("Invalid page number")
else:
self.page = page
self.print_current_options()
except Exception:
print("Can't convert page number %(page)s" % {"page": arg})
elif self.page == 0:
print("Already page 0")
else:
self.page -= 1
self.print_current_options()
[docs]
def do_q(self, args):
"""Exit the activity browser."""
return True
[docs]
def do_quit(self, args):
"""Exit the activity browser."""
return True
[docs]
def do_r(self, arg):
"""Choose an activity at random"""
if not self.database:
print("Please choose a database first")
else:
key = Database(self.database).random()
self.choose_activity(key)
[docs]
def do_s(self, arg):
"""Search activity names."""
if not self.database:
print("Please choose a database first")
else:
re1a = r"."
search_criterion = None
criterion_value = None
if "-loc" in arg:
re1a = r"(-loc\s)" # Any Single Whitespace Character 1
search_criterion = "location"
elif "-cat" in arg:
re1a = r"(-cat\s)" # Any Single Whitespace Character 1
search_criterion = "category"
elif "-cas" in arg:
re1a = r"(-cas\s)" # Any Single Whitespace Character 1
search_criterion = "CAS number"
elif "-rp" in arg:
re1a = r"(-rp\s)" # Any Single Whitespace Character 1
search_criterion = "reference product"
re1b = r"(\{.*\})" # Curly Braces 1
re2 = (
r"(?:\s(.+))?" # at least a space, and then 1 to n chars, but optional
)
rg = re.compile(re1a + re1b + re2, re.IGNORECASE | re.DOTALL)
m = rg.search(arg)
needle = arg # Find the needle in the haystack
if m is None and "-loc" in arg:
print("Missing location in curly braces in command: -loc {MX} ...")
return
elif m is None and "-cat" in arg:
print("Missing category in curly braces in command: -cat {water} ...")
return
elif m is None and "-cas" in arg:
print(
"Missing CAS Number in curly braces in command: -cas {000095-50-1} \
..."
)
return
elif "-cas" in arg and "biosphere" not in self.database:
print("CAS Number search only for biosphere dbs.")
return
elif m is None and "-rp" in arg:
print(
"Missing reference product in curly braces in command: -rp "
"{electricity high voltage} ..."
)
return
elif m:
c2 = m.group(2)
criterion_value = c2.strip("{}")
needle = m.group(3)
print(
"Filtering for {} {} after search".format(
search_criterion, criterion_value
)
)
results = search_bw2(
search_criterion,
criterion_value,
self.database,
needle,
self.search_limit,
)
results_keys = [r.key for r in results]
self.set_current_options(
{
"type": "activities",
"options": results_keys,
"formatted": [self.format_activity(key) for key in results],
}
)
self.print_current_options(
"Search results for %(query)s" % {"query": needle}
)
[docs]
def do_u(self, arg):
"""List upstream processes"""
if not self.activity:
print("Need to choose an activity first")
else:
es = get_activity(self.activity).technosphere()
self.format_exchanges_as_options(es, "technosphere")
self.print_current_options("Upstream inputs")
[docs]
def do_up(self, arg):
"""List upstream processes"""
if not self.activity:
print("Need to choose an activity first")
else:
es = get_activity(self.activity).technosphere()
self.format_exchanges_as_options(es, "technosphere", show_pedigree=True)
self.print_current_options("Upstream inputs")
[docs]
def do_uu(self, arg):
"""List upstream processes extra info (formulas)"""
if not self.activity:
print("Need to choose an activity first")
else:
es = get_activity(self.activity).technosphere()
self.format_exchanges_as_options(es, "technosphere", show_formulas=True)
self.print_current_options("Upstream inputs")
[docs]
def do_un(self, arg):
"""Display uncertainty infor of upstream activities if available"""
if not self.activity:
print("Need to choose an activity first")
else:
es = get_activity(self.activity).technosphere()
self.format_exchanges_as_options(es, "technosphere", show_uncertainty=True)
self.print_current_options("Upstream inputs")
[docs]
def do_wh(self, arg):
output_dir = projects.request_directory("export")
fp = os.path.join(output_dir, "browser history.%s.txt" % time.ctime())
with codecs.open(fp, "w", encoding="utf-8") as f:
for line in self.history:
f.write(line + "\n")
print("History exported to %(fp)s" % {"fp": fp})
[docs]
def build_method_key_list(self):
method_key_list = []
if has_namespaced_methods():
if (
self.method_namespace
and self.method
and self.category
and self.subcategory
):
method_id = (
self.method_namespace,
self.method,
self.category,
self.subcategory,
)
method_key_list.append(method_id)
elif self.method_namespace and self.method and self.category is None:
for m in methods:
if m[0] == self.method_namespace and m[1] == self.method:
method_key_list.append(m)
elif self.method_namespace and self.method is None:
for m in methods:
if m[0] == self.method_namespace:
method_key_list.append(m)
else:
if self.method and self.category and self.subcategory:
method_id = (self.method, self.category, self.subcategory)
method_key_list.append(method_id)
elif self.method and self.category and self.subcategory is None:
for m in methods:
if m[0] == self.method and m[1] == self.category:
method_key_list.append(m)
elif self.method and self.category is None:
for m in methods:
if m[0] == self.method:
method_key_list.append(m)
return sorted(method_key_list)
[docs]
def do_G(self, arg):
"""Do an LCIA of the selected activity + method[s]"""
if self.activity and self.method:
selected_activity = get_activity(self.activity)
def label_values(*attrs):
values = set()
for attr in attrs:
value = getattr(bd.labels, attr, None)
if value is None:
continue
if isinstance(value, (set, tuple, list)):
values.update(value)
else:
values.add(value)
return values
product_like_types = label_values(
"product_node_type",
"product_node_types",
"chimaera_node_default",
"chimaera_node_defaults",
)
plain_process_types = label_values(
"process_node_type",
"process_node_types",
"process_node_default",
"process_node_defaults",
)
# Compatibility fallback if labels are unavailable in current bw2data version
if not plain_process_types:
plain_process_types = {"process"}
selected_type = selected_activity.get("type")
if (
selected_type in plain_process_types
and selected_type not in product_like_types
):
production_exchanges = [
exc
for exc in selected_activity.exchanges()
if exc.get("type") == "production"
]
print(
"Selected node is a plain process (%s). LCIA can only be performed on products."
% selected_activity.id
)
if production_exchanges:
self.format_exchanges_as_options(production_exchanges, "production")
self.print_current_options(
"Products produced by current process (choose one, then run `G` again)"
)
else:
print(
"No production exchanges found for this process. Use `lprods` to list product nodes."
)
return
method_key_list = self.build_method_key_list()
if has_namespaced_methods():
namespace_shift = 1
else:
namespace_shift = 0
if (
bc.__version__
and isinstance(bc.__version__, str)
and version.parse(bc.__version__) >= version.parse("2.0.DEV10")
):
# the configuration
config = {"impact_categories": method_key_list}
activities = [selected_activity]
func_units = {a["name"]: {a.id: 1.0} for a in activities}
data_objs = get_multilca_data_objs(
functional_units=func_units, method_config=config
)
mlca = bc.MultiLCA(
demands=func_units, method_config=config, data_objs=data_objs
)
try:
mlca.lci()
mlca.lcia()
except ValueError as exc:
print(
"Can't run LCIA for the selected node. Please select a product node (try `lprods`). Error: %s"
% exc
)
return
formatted_res = []
for (method, _), score in mlca.scores.items():
method_name = method[0 + namespace_shift]
category_name = method[1 + namespace_shift]
if len(method) == 5:
subcategory_name = method[2 + namespace_shift]
indicator_name = method[3 + namespace_shift]
else:
indicator_name = method[2 + namespace_shift]
formatted_res_item = [
method_name,
category_name,
indicator_name,
Method(method).metadata["unit"],
score,
]
if len(method) == 5:
formatted_res_item.insert(2, subcategory_name)
if has_namespaced_methods():
formatted_res_item.insert(0, method[0])
formatted_res.append(formatted_res_item)
else:
bw2browser_cs = {
"inv": [{selected_activity: 1}],
"ia": method_key_list,
}
tmp_cs_id = uuid.uuid1()
calculation_setups[str(tmp_cs_id)] = bw2browser_cs
mlca = bc.MultiLCA(str(tmp_cs_id))
formatted_res = [
[
mlca.methods[i][0],
mlca.methods[i][1],
mlca.methods[i][2],
Method(mlca.methods[i]).metadata["unit"],
score.pop(),
]
for i, score in enumerate(mlca.results.T.tolist())
]
headers = ["method", "category", "subcategory", "unit", "score"]
if has_namespaced_methods():
headers.insert(0, "namespace")
if len(formatted_res[0]) == 5:
headers.insert(2, "subcategory")
self.tabulate_data = tabulate(
formatted_res,
headers=headers,
tablefmt="tsv",
)
print(tabulate(formatted_res, headers=headers))
else:
print("Select at least a method first")
[docs]
def do_GC(self, arg):
"""Do an LCIA of all activities in the temporary list with a fully specified method."""
# Check if we have a full method specification
method_namespace = getattr(self, "method_namespace", None)
if has_namespaced_methods():
if not all(
[method_namespace, self.method, self.category, self.subcategory]
):
print(
"Please select a full method specification: namespace, method, category, and subcategory"
)
return
method_id = (
method_namespace,
self.method,
self.category,
self.subcategory,
)
method_key_list = [method_id]
else:
if not all([self.method, self.category, self.subcategory]):
print(
"Please select a full method specification: method, category, and subcategory"
)
return
method_id = (self.method, self.category, self.subcategory)
method_key_list = [method_id]
# Check if we have activities in the temporary list
if not self.temp_activities:
print(
"Temporary activities list is empty. Use 'add' to add activities first."
)
return
# Build functional units from all activities in the temporary list
activities = [get_activity(key) for key in self.temp_activities]
func_units = {str(a.id): {a.id: 1.0} for a in activities}
if has_namespaced_methods():
namespace_shift = 1
else:
namespace_shift = 0
if (
bc.__version__
and isinstance(bc.__version__, str)
and version.parse(bc.__version__) >= version.parse("2.0.DEV10")
):
# the configuration
config = {"impact_categories": method_key_list}
data_objs = get_multilca_data_objs(
functional_units=func_units, method_config=config
)
mlca = bc.MultiLCA(
demands=func_units, method_config=config, data_objs=data_objs
)
mlca.lci()
mlca.lcia()
# Organize scores by activity
# mlca.scores is a dict with keys (method, functional_unit_name)
# Get all unique methods
methods_seen = set()
for (method, func_unit_name), score in mlca.scores.items():
if method not in methods_seen:
methods_seen.add(method)
# Build results per activity
headers = ["method", "category", "subcategory", "unit", "score"]
if has_namespaced_methods():
headers.insert(0, "namespace")
# Collect all results for export
all_results_for_export = []
print(
"LCA results for %d activities in temporary list:"
% len(self.temp_activities)
)
for activity in activities:
print("\nActivity: %s" % activity)
activity_results = []
for method in methods_seen:
score = mlca.scores.get((method, str(activity.id)), 0)
method_name = method[0 + namespace_shift]
category_name = method[1 + namespace_shift]
indicator_name = method[2 + namespace_shift]
result_row = [
method_name,
category_name,
indicator_name,
Method(method).metadata["unit"],
score,
]
if has_namespaced_methods():
result_row.insert(0, method[0])
activity_results.append(result_row)
# Add to export list with activity identifier
export_row = result_row + [activity]
all_results_for_export.append(export_row)
print(tabulate(activity_results, headers=headers))
# Calculate and show aggregated results (sum across all activities)
print("\nAggregated results (sum of all activities):")
aggregated_results = []
for method in methods_seen:
total_score = sum(
mlca.scores.get((method, str(activity.id)), 0)
for activity in activities
)
method_name = method[0 + namespace_shift]
category_name = method[1 + namespace_shift]
indicator_name = method[2 + namespace_shift]
result_row = [
method_name,
category_name,
indicator_name,
Method(method).metadata["unit"],
total_score,
]
if has_namespaced_methods():
result_row.insert(0, method[0])
aggregated_results.append(result_row)
# Add to export list with aggregated identifier
export_row = result_row + ["AGGREGATED"]
all_results_for_export.append(export_row)
# Create combined table with activity column for export
export_headers = headers + ["activity"]
self.tabulate_data = tabulate(
all_results_for_export,
headers=export_headers,
tablefmt="tsv",
)
print(tabulate(aggregated_results, headers=headers))
# Store results for GCH command
self.gc_results = {
"activities": activities,
"activity_results": all_results_for_export,
"aggregated_results": aggregated_results,
"methods_seen": methods_seen,
"headers": headers,
"namespace_shift": namespace_shift,
}
else:
# Legacy API
# Build a list of functional units, one for each activity
bw2browser_cs = {
"inv": [{get_activity(key): 1} for key in self.temp_activities],
"ia": method_key_list,
}
tmp_cs_id = uuid.uuid1()
calculation_setups[str(tmp_cs_id)] = bw2browser_cs
mlca = bc.MultiLCA(str(tmp_cs_id))
headers = ["method", "category", "subcategory", "unit", "score"]
if has_namespaced_methods():
headers.insert(0, "namespace")
# Results are organized as: mlca.results has shape (num_methods, num_activities)
# Collect all results for export
all_results_for_export = []
print(
"LCA results for %d activities in temporary list:"
% len(self.temp_activities)
)
for idx, activity_key in enumerate(self.temp_activities):
activity = get_activity(activity_key)
activity_name = activity.get("name", "Unknown")
print("\nActivity %d: %s" % (idx + 1, activity_name))
activity_results = []
for i in range(len(mlca.methods)):
method = mlca.methods[i]
score = mlca.results[i, idx] if mlca.results.shape[1] > idx else 0
result_row = [
method[0],
method[1],
method[2],
Method(method).metadata["unit"],
score,
]
if has_namespaced_methods():
result_row.insert(0, method[0])
activity_results.append(result_row)
# Add to export list with activity identifier
export_row = result_row + [activity_name]
all_results_for_export.append(export_row)
print(tabulate(activity_results, headers=headers))
# Show aggregated results
print("\nAggregated results (sum of all activities):")
aggregated_results = []
for i in range(len(mlca.methods)):
method = mlca.methods[i]
total_score = (
mlca.results[i, :].sum() if mlca.results.shape[1] > 0 else 0
)
result_row = [
method[0],
method[1],
method[2],
Method(method).metadata["unit"],
total_score,
]
if has_namespaced_methods():
result_row.insert(0, method[0])
aggregated_results.append(result_row)
# Add to export list with aggregated identifier
export_row = result_row + ["AGGREGATED"]
all_results_for_export.append(export_row)
# Create combined table with activity column for export
export_headers = headers + ["activity"]
self.tabulate_data = tabulate(
all_results_for_export,
headers=export_headers,
tablefmt="tsv",
)
print(tabulate(aggregated_results, headers=headers))
# Store results for GCH command
activities_list = [get_activity(key) for key in self.temp_activities]
self.gc_results = {
"activities": activities_list,
"activity_results": all_results_for_export,
"aggregated_results": aggregated_results,
"mlca": mlca,
"headers": headers,
}
[docs]
def do_GCH(self, arg):
"""Display ASCII bar charts for GC command results."""
if not self.gc_results:
print("No GC results available. Please run GC command first.")
return
results = self.gc_results
activity_results = results["activity_results"]
headers = results.get("headers", [])
# Find score column index (second to last column)
score_col_idx = -2
# Extract activity names and scores
# Group results by activity
activity_scores = {}
activity_methods = {}
for row in activity_results:
# Last column is the activity identifier
activity_id = row[-1]
# Score is second to last column
try:
score = float(row[score_col_idx])
except (ValueError, TypeError):
score = 0.0
if activity_id not in activity_scores:
activity_scores[activity_id] = []
activity_methods[activity_id] = []
activity_scores[activity_id].append(score)
# Extract method name for label
method_col_idx = 1 if "namespace" in headers else 0
if len(row) > method_col_idx:
method_name = str(row[method_col_idx])
if len(row) > method_col_idx + 1:
method_name = f"{row[method_col_idx]}/{row[method_col_idx+1]}"
activity_methods[activity_id].append(method_name)
# Collect all scores and labels from all activities (excluding aggregated)
all_scores = []
all_labels = []
# Create a mapping from activity_id to activity object/key for formatting
activities_map = {}
if "activities" in results:
# Map activity objects to their string representation
for activity in results["activities"]:
activities_map[activity] = activity
# Also map by activity key if it's stored as a key
if hasattr(activity, "key"):
activities_map[activity.key] = activity
# Also check if we can map from temp_activities (these are the keys)
activity_keys_map = {}
for key in self.temp_activities:
activity_obj = get_activity(key)
# Map the activity object itself
activities_map[activity_obj] = key
# Map by key tuple
activities_map[key] = key
# Store key mapping
activity_keys_map[activity_obj] = key
activity_keys_map[key] = key
for activity_id, scores in activity_scores.items():
if activity_id == "AGGREGATED":
continue
# Get the activity key for formatting
activity_key = None
# Check if activity_id is already a key (tuple)
if isinstance(activity_id, tuple) and len(activity_id) == 2:
activity_key = activity_id
# Check if it's an activity object
elif hasattr(activity_id, "key"):
activity_key = activity_id.key
# Check if we have it in our maps
elif activity_id in activity_keys_map:
activity_key = activity_keys_map[activity_id]
elif activity_id in activities_map:
mapped = activities_map[activity_id]
if isinstance(mapped, tuple) and len(mapped) == 2:
activity_key = mapped
elif hasattr(mapped, "key"):
activity_key = mapped.key
# Check if it's a string (legacy API stored activity name as string)
elif isinstance(activity_id, str) and activity_id != "AGGREGATED":
# Try to find the activity by name in temp_activities
for key in self.temp_activities:
activity_obj = get_activity(key)
if activity_obj.get("name") == activity_id:
activity_key = key
break
# Get string representation of activity
if activity_key:
# Use format_activity to get full string representation
activity_str = self.format_activity(activity_key, max_length=200)
elif hasattr(activity_id, "key"):
# It's an activity object, use its key
activity_str = self.format_activity(activity_id.key, max_length=200)
else:
# Fallback: try to get activity object and format it
try:
# If it's already an activity object, try to get its string representation
if hasattr(activity_id, "get") and hasattr(activity_id, "key"):
activity_str = self.format_activity(
activity_id.key, max_length=200
)
else:
# Last resort: use string representation
activity_str = str(activity_id)
except:
activity_str = str(activity_id)
# Get method names for this activity
method_names = activity_methods.get(
activity_id, [f"Method {i+1}" for i in range(len(scores))]
)
# Create labels combining activity string representation and method
for i, (score, method_name) in enumerate(zip(scores, method_names)):
all_scores.append(score)
all_labels.append(f"{activity_str} - {method_name}")
if not all_scores:
print("No activity results to display.")
return
# Display ASCII bar chart (always use simple ASCII, no plotext)
self._simple_ascii_chart(all_scores, all_labels)
[docs]
def _simple_ascii_chart(self, all_scores=None, all_labels=None):
"""Fallback simple ASCII bar chart without external dependencies."""
if not self.gc_results:
return
# If scores and labels are provided, use them directly
if all_scores is not None and all_labels is not None:
print("\n" + "=" * 80)
print("ASCII Bar Chart for GC Results (all activities)")
print("=" * 80)
print()
# Calculate max label length for alignment
max_label_length = (
max(len(label) for label in all_labels) if all_labels else 0
)
max_label_length = min(
max_label_length, 70
) # Cap at 70 characters for better display
# Find max score for scaling
max_score = max(all_scores) if all_scores else 1
bar_width = 50 # Width of bar in characters
# Create labeled bar chart with activity/method names
header_label = "Activity/Method"
header_padding = " " * max(0, max_label_length - len(header_label))
print(
f"{header_label}{header_padding} │ Bar Chart"
+ " " * (bar_width - 9)
+ "Score"
)
print("-" * (max_label_length + bar_width + 20))
# Display all scores in a single chart
for label, score in zip(all_labels, all_scores):
# Truncate label if needed
display_label = label
if len(label) > max_label_length:
display_label = label[: max_label_length - 3] + "..."
# Calculate bar length
bar_length = (
int((score / max_score) * bar_width) if max_score > 0 else 0
)
bar = "â–ˆ" * bar_length
# Print label, bar, and score
print(
f"{display_label:<{max_label_length}} │ {bar:<{bar_width}} {score:.4f}"
)
print("-" * (max_label_length + bar_width + 20))
print(f"Max score: {max_score:.4f}")
return
# Fallback: extract from gc_results if not provided
results = self.gc_results
activity_results = results["activity_results"]
# Create activity mapping similar to main function
activities_map = {}
activity_keys_map = {}
if "activities" in results:
for activity in results["activities"]:
activities_map[activity] = activity
if hasattr(activity, "key"):
activities_map[activity.key] = activity
activity_keys_map[activity] = activity.key
activity_keys_map[activity.key] = activity.key
# Map from temp_activities
for key in self.temp_activities:
activity_obj = get_activity(key)
activities_map[activity_obj] = key
activities_map[key] = key
activity_keys_map[activity_obj] = key
activity_keys_map[key] = key
# Group by activity
activity_scores = {}
activity_methods = {}
headers = results.get("headers", [])
method_col_idx = 1 if "namespace" in headers else 0
for row in activity_results:
activity_id = row[-1]
if activity_id == "AGGREGATED":
continue
try:
score = float(row[-2])
except (ValueError, TypeError):
score = 0.0
if activity_id not in activity_scores:
activity_scores[activity_id] = []
activity_methods[activity_id] = []
activity_scores[activity_id].append(score)
# Extract method name
if len(row) > method_col_idx:
method_name = str(row[method_col_idx])
if len(row) > method_col_idx + 1:
method_name = f"{row[method_col_idx]}/{row[method_col_idx+1]}"
activity_methods[activity_id].append(method_name)
# Combine all scores and labels
combined_scores = []
combined_labels = []
for activity_id, scores in activity_scores.items():
# Get the activity key for formatting
activity_key = None
# Check if activity_id is already a key (tuple)
if isinstance(activity_id, tuple) and len(activity_id) == 2:
activity_key = activity_id
# Check if it's an activity object
elif hasattr(activity_id, "key"):
activity_key = activity_id.key
# Check if we have it in our maps
elif activity_id in activity_keys_map:
activity_key = activity_keys_map[activity_id]
elif activity_id in activities_map:
mapped = activities_map[activity_id]
if isinstance(mapped, tuple) and len(mapped) == 2:
activity_key = mapped
elif hasattr(mapped, "key"):
activity_key = mapped.key
# Check if it's a string (legacy API stored activity name as string)
elif isinstance(activity_id, str) and activity_id != "AGGREGATED":
# Try to find the activity by name in temp_activities
for key in self.temp_activities:
activity_obj = get_activity(key)
if activity_obj.get("name") == activity_id:
activity_key = key
break
# Get string representation of activity
if activity_key:
# Use format_activity to get full string representation
activity_str = self.format_activity(activity_key, max_length=200)
elif hasattr(activity_id, "key"):
# It's an activity object, use its key
activity_str = self.format_activity(activity_id.key, max_length=200)
else:
# Fallback: try to get activity object and format it
try:
if hasattr(activity_id, "get") and hasattr(activity_id, "key"):
activity_str = self.format_activity(
activity_id.key, max_length=200
)
else:
activity_str = str(activity_id)
except:
activity_str = str(activity_id)
method_names = activity_methods.get(
activity_id, [f"Method {i+1}" for i in range(len(scores))]
)
for score, method_name in zip(scores, method_names):
combined_scores.append(score)
combined_labels.append(f"{activity_str} - {method_name}")
if not combined_scores:
return
print("\n" + "=" * 80)
print("ASCII Bar Chart for GC Results (all activities)")
print("=" * 80)
print()
# Calculate max label length for alignment
max_label_length = (
max(len(label) for label in combined_labels) if combined_labels else 0
)
max_label_length = min(
max_label_length, 70
) # Cap at 70 characters for better display
# Find max score for scaling
max_score = max(combined_scores) if combined_scores else 1
bar_width = 50 # Width of bar in characters
# Create labeled bar chart with activity/method names
header_label = "Activity/Method"
header_padding = " " * max(0, max_label_length - len(header_label))
print(
f"{header_label}{header_padding} │ Bar Chart"
+ " " * (bar_width - 9)
+ "Score"
)
print("-" * (max_label_length + bar_width + 20))
# Display all scores in a single chart
for label, score in zip(combined_labels, combined_scores):
# Truncate label if needed
display_label = label
if len(label) > max_label_length:
display_label = label[: max_label_length - 3] + "..."
# Calculate bar length
bar_length = int((score / max_score) * bar_width) if max_score > 0 else 0
bar = "â–ˆ" * bar_length
# Print label, bar, and score
print(
f"{display_label:<{max_label_length}} │ {bar:<{bar_width}} {score:.4f}"
)
print("-" * (max_label_length + bar_width + 20))
print(f"Max score: {max_score:.4f}")
[docs]
def do_ta(self, arg):
"""Display top activities if an activity + method are selected."""
if self.activity:
if self.method and self.category and self.subcategory:
a = get_activity(self.activity)
lca = a.lca((self.method, self.category, self.subcategory))
top_a = bwa.ContributionAnalysis().annotated_top_processes(lca)
print(tabulate(top_a, headers=["score", "supply", "Activity"]))
else:
print("Select at least a method first")
else:
print("Select an activity ")
[docs]
def do_te(self, arg):
"""Display top emissions if an activity + method are selected."""
if self.activity:
if self.method and self.category and self.subcategory:
a = get_activity(self.activity)
lca = a.lca((self.method, self.category, self.subcategory))
if is_legacy_bwa():
top_e = bw2_compat_annotated_top_emissions(lca)
else:
top_e = bwa.ContributionAnalysis().annotated_top_emissions(lca)
print(tabulate(top_e, headers=["score", "supply", "Activity"]))
else:
print("Select at least a method first")
else:
print("Select an activity ")
[docs]
def do_aa(self, arg):
"""List all activities in the current database."""
if not self.database:
print("Please choose a database first")
else:
db = Database(self.database)
activities = [activity for activity in db]
# Sort activities by name
if arg and isinstance(arg, str) and arg.lower() == "name":
activities.sort(key=lambda a: a.get("name"))
activity_keys = [
(self.database, activity["code"]) for activity in activities
]
formatted_activities = [self.format_activity(key) for key in activity_keys]
self.set_current_options(
{
"type": "activities",
"options": activity_keys,
"formatted": formatted_activities,
}
)
self.print_current_options("Activities in database")
[docs]
def do_lprods(self, arg):
"""List product or chimaera activities in the current database."""
if not self.database:
print("Please choose a database first")
return
product_types = set()
for attr in (
"product_node_type",
"product_node_types",
"chimaera_node_default",
"chimaera_node_defaults",
):
value = getattr(bd.labels, attr, None)
if value is None:
continue
if isinstance(value, (set, tuple, list)):
product_types.update(value)
else:
product_types.add(value)
# Compatibility fallback for older/newer Brightway label APIs.
if not product_types:
product_types = {"product", "process_with_reference_product"}
db = Database(self.database)
activities = [
activity for activity in db if activity.get("type") in product_types
]
if arg and isinstance(arg, str) and arg.lower() == "name":
activities.sort(key=lambda a: a.get("name"))
activity_keys = [(self.database, activity["code"]) for activity in activities]
formatted_activities = [self.format_activity(key) for key in activity_keys]
self.set_current_options(
{
"type": "activities",
"options": activity_keys,
"formatted": formatted_activities,
}
)
self.print_current_options("Product or chimaera activities in database")
[docs]
def do_lpam(self, arg):
"""List all (Project, Database, Activity) parameters."""
re1 = r"(-f\s)?" # -f and a single whitespace Char
re2a = r"(-g\s)" # Any Single Whitespace Character 1
re2b = r"(\{.*\})" # Curly Braces 1
rg = re.compile(re1 + re2a + re2b, re.IGNORECASE | re.DOTALL)
m = rg.search(arg)
full_cols = False
the_group = None
if m is None and "-g" in arg:
print("Missing group in curly braces in command: -g {DANCE} ...")
return
elif m:
c2 = m.group(3)
the_group = c2.strip("{}")
print("Filtering for group {} after search".format(the_group))
full_cols = m.group(1) is not None
if not self.project:
print("Please choose a project first")
else:
pparams, dparams, aparams = self.acquire_params(full_cols, the_group)
if len(pparams) > 0:
print("Project Parameters")
print(tabulate(pparams, headers="keys"))
if len(dparams) > 0:
print("Database Parameters")
print(tabulate(dparams, headers="keys"))
if len(aparams) > 0:
print("Activity Parameters")
print(tabulate(aparams, headers="keys"))
[docs]
def do_lpamg(self, arg):
"""List parameter groups."""
groups = [g for g in Group.select()]
self.set_current_options(
{
"type": "groups",
"options": [g.id for g in groups],
"formatted": [g.name for g in groups],
}
)
self.print_current_options("Parameter groups: ")
[docs]
def do_ap(self, arg):
"""If an activity is selected, show its parameters."""
if self.activity:
param_objects = ActivityParameter.select().where(
(ActivityParameter.code == self.activity[1])
& (ActivityParameter.database == self.database)
)
aparams = []
if "-f" not in arg:
aparams = self.dehydrate_params(
param_objects,
["database", "code", "group", "name", "formula", "amount"],
)
else:
aparams = [p.dict for p in param_objects]
if len(aparams) > 0:
print(tabulate(aparams, headers="keys"))
else:
print(
"No Activity parameters for {}".format(get_activity(self.activity))
)
else:
print("Please select an activity first")
[docs]
def do_dp(self, arg):
"""List all database parameters."""
if self.database:
param_objects = DatabaseParameter.select().where(
DatabaseParameter.database == self.database
)
dparams = []
if "-f" not in arg:
dparams = self.dehydrate_params(
param_objects, ["name", "formula", "amount"]
)
else:
dparams = [p.dict for p in param_objects]
if len(dparams) > 0:
print(tabulate(dparams, headers="keys"))
else:
print("No database parameters in database {}".format(self.database))
else:
print("Please select a database first")
[docs]
def do_pp(self, arg):
"""List all project parameters."""
if self.project:
param_objects = ProjectParameter.select()
pparams = []
if "-f" not in arg:
pparams = self.dehydrate_params(
param_objects, ["database", "name", "formula", "amount"]
)
else:
pparams = [p.dict for p in param_objects]
if len(pparams) > 0:
print(tabulate(pparams, headers="keys"))
else:
print("No project parameters in {}".format(self.project))
else:
print("Please select a project first")
[docs]
def do_fp(self, arg):
"""Find a specific parameter by name."""
if self.project:
pparams, dparams, aparams = self.acquire_params(False, None)
for p in pparams:
p.update({"parameter type": "project"})
for p in dparams:
p.update({"parameter type": "database"})
for p in aparams:
p.update({"parameter type": "activity"})
p = [p for p in pparams + dparams + aparams if p["name"] == arg]
if len(p) > 0:
print(tabulate(p, headers="keys"))
else:
print("Please select a project first")
[docs]
def do_sp(self, arg):
"""Search for a parameter by name, accepting wildcards in arg."""
if self.project:
pparams_objects = ProjectParameter.select().where(
ProjectParameter.name % arg
)
dparams_objects = DatabaseParameter.select().where(
DatabaseParameter.name % arg
)
aparams_objects = ActivityParameter.select().where(
ActivityParameter.name % arg
)
pparams = self.dehydrate_params(
pparams_objects, ["database", "name", "formula", "amount"]
)
dparams = self.dehydrate_params(
dparams_objects, ["name", "formula", "amount"]
)
aparams = self.dehydrate_params(
aparams_objects,
["database", "code", "group", "name", "formula", "amount"],
)
for p in pparams:
p.update({"parameter type": "project"})
for p in dparams:
p.update({"parameter type": "database"})
for p in aparams:
p.update({"parameter type": "activity"})
p = [p for p in pparams + dparams + aparams]
if len(p) > 0:
print(tabulate(p, headers="keys"))
else:
print("Please select a project first")
[docs]
def do_add(self, arg):
"""Add the currently selected activity to the temporary activities list."""
if not self.activity:
print("No activity currently selected")
else:
if self.activity not in self.temp_activities:
self.temp_activities.append(self.activity)
print(
"Added activity to temporary list: %s"
% self.format_activity(self.activity)
)
print(
"Temporary list now contains %d activities"
% len(self.temp_activities)
)
else:
print(
"Activity already in temporary list: %s"
% self.format_activity(self.activity)
)
[docs]
def do_clear(self, arg):
"""Clear the temporary activities list."""
count = len(self.temp_activities)
self.temp_activities = []
print("Cleared temporary activities list (%d activities removed)" % count)
[docs]
def do_lt(self, arg):
"""List all activities in the temporary activities list."""
if not self.temp_activities:
print("Temporary activities list is empty")
else:
print(
"Temporary activities list (%d activities):" % len(self.temp_activities)
)
for index, activity_key in enumerate(self.temp_activities):
print("[%d]: %s" % (index, self.format_activity(activity_key)))
[docs]
def do_ca(self, arg):
"""Print the recursive calculation of an LCA, accepting cutoff as arg."""
if all([self.method, self.category, self.subcategory]) and self.activity:
if arg is None or arg == "":
bwa.print_recursive_calculation(
self.activity, (self.method, self.category, self.subcategory)
)
else:
bwa.print_recursive_calculation(
self.activity,
(self.method, self.category, self.subcategory),
cutoff=float(arg),
)
else:
print("Please select a method and an activity first.")
[docs]
def do_sc(self, arg):
"""Print the supply chain of an activity, accepting cutoff as arg."""
if self.activity:
if arg is None or arg == "":
bwa.print_recursive_supply_chain(self.activity)
else:
bwa.print_recursive_supply_chain(
self.activity,
cutoff=float(arg),
)
else:
print("Please select an activity first.")
[docs]
def do_pe(self, arg):
"""show production exchanges if they exist"""
if not self.activity:
print("Need to choose an activity first")
else:
es = get_activity(self.activity).exchanges()
self.format_exchanges_as_options(es, "production")
self.print_current_options("production exchanges")
[docs]
def do_pei(self, arg):
"""show production exchanges if they exist"""
if not self.activity:
print("Need to choose an activity first")
else:
prod_ex = [
e
for e in get_activity(self.activity).exchanges()
if e["type"] == "production"
]
print("\n Production Exchange information\n")
for e in prod_ex:
for prop, value in e.as_dict().items():
print(f"\t {prop}: {value}")
print("")
[docs]
def bw2_compat_annotated_top_emissions(lca, names=True, **kwargs):
"""Get list of most damaging biosphere flows in an LCA, sorted by ``abs(direct impact)``. # noqa: E501
Returns a list of tuples: ``(lca score, inventory amount, activity)``. If ``names`` is False, they returns the process key as the last element. # noqa: E501
"""
# This is a temporary fix, until
# https://github.com/brightway-lca/brightway2-analyzer/issues/27
# gets correctly handled for bw2 branch
# The only difference in the actual code is the casting of indices to ints.
print("Using compat mode annotated_top_emissions")
ra, rp, rb = lca.reverse_dict()
results = [
(score, lca.inventory[int(index), :].sum(), rb[int(index)])
for score, index in bwa.ContributionAnalysis().top_emissions(
lca.characterized_inventory, **kwargs
)
]
if names:
results = [(x[0], x[1], get_activity(x[2])) for x in results]
return results
[docs]
def is_legacy_bwa():
return bwa.__version__[0] == 0 and bwa.__version__[1] == 10
[docs]
def is_legacy_bc():
return isinstance(bc.__version__, tuple)
[docs]
def is_legacy_bd():
if isinstance(bd_version, tuple):
return True
elif isinstance(bd_version, str) and version.parse(bd_version) < version.parse(
FTS5_ENABLED_BD_VERSION
):
return True
return False
[docs]
def has_namespaced_methods():
return len(list(methods)[0]) >= 4
[docs]
def search_bw2(search_criterion, criterion_value, database, needle, search_limit):
"""Search and then filter by criteria."""
if needle is None:
needle = ""
if search_criterion and criterion_value:
if (
search_criterion == "location"
or search_criterion == "reference product"
or search_criterion == "CAS number"
):
if needle:
results = Database(database).search(needle, limit=search_limit)
else:
results = Database(database)
results = [
r
for r in results
if r.get(search_criterion)
and r.get(search_criterion, "").casefold() == criterion_value.casefold()
]
elif search_criterion == "category":
criterion = tuple(map(lambda x: x.casefold(), criterion_value.split("::")))
if needle:
results = Database(database).search(needle, limit=search_limit)
else:
results = Database(database)
results = [
r
for r in results
if r.get("categories")
and tuple(map(lambda x: x.casefold(), r.get("categories"))) == criterion
]
else:
results = Database(database).search(needle, limit=search_limit)
return results
[docs]
def main():
arguments = docopt(__doc__, version="Brightway2 Activity Browser 2.0")
activitybrowser = ActivityBrowser()
activitybrowser._init(
project=arguments["<project>"],
database=arguments["<database>"],
activity=arguments["<activity-id>"],
)
activitybrowser.cmdloop()
if __name__ == "__main__":
main()