#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from copy import deepcopy
import numpy as np
import pandas as pd
from pyam import filter_by_meta, META_IDX
from pyam.utils import isstr, islistable
[docs]class Statistics(object):
"""This class provides a wrapper for descriptive statistics
of IAMC-style timeseries data.
Parameters
----------
df: pyam.IamDataFrame
an IamDataFrame from which to retrieve metadata for grouping, filtering
groupby: str or dict
a column of `df.meta` to be used for the groupby feature,
or a dictionary of `{column: list}`, where `list` is used for ordering
filters: list of tuples
arguments for filtering and describing, either `((index, dict)` or
`((index[0], index[1]), dict)`, when also using `groupby`, index must
haev length 2.
percentiles: list-like of numbers, optional
The percentiles to include in the output of `pandas.describe()`.
All should fall between 0 and 1. The default is `[.25, .5, .75]`,
which returns the 25th, 50th, and 75th percentiles.
"""
def __init__(self, df, groupby=None, filters=None, rows=False,
percentiles=[0.25, 0.5, 0.75]):
self.df = df
self.idx_depth = None
# assing `groupby` settings and check that specifications are valid
self.col = None
self.groupby = None
if isstr(groupby):
self.col = groupby
self.groupby = {groupby: None}
elif isinstance(groupby, dict) and len(groupby) == 1:
self.col = list(groupby.keys())[0]
self.groupby = groupby
self.idx_depth = 2
elif groupby is not None:
raise ValueError('arg `{}` not valid `groupby`'.format(groupby))
if self.col is not None and self.col not in df.meta.columns:
raise ValueError('column `{}` not in `df.meta`'.format(self.col))
# if neither groupby nor filters is given, use filters to describe all
# and assume that rows are used
if groupby is None and filters is None:
self.filters = [('', {})]
rows = True
else:
self.filters = filters if filters is not None else []
# set lists to sort index and subindex
self._idx = [] if self.col is None else [self.col]
self._sub_idx = self.groupby[self.col] or self.df[self.col].unique() \
if self.col is not None else []
self._headers, self._subheaders = ([], [])
# assing `filters` settings and check that specifications are valid
for (idx, _filter) in self.filters:
# check that index in tuple is valid
if isstr(idx):
self._add_to_index(idx)
else:
if not (isinstance(idx, tuple) and len(idx) == 2 and
isstr(idx[0]) or not isstr(idx[1])):
raise ValueError('`{}` is not a valid index'.format(idx))
self._add_to_index(idx[0], idx[1])
# check that filters in tuple are valid
if not isinstance(_filter, dict):
raise ValueError('`{}` is not a valid filter'.format(_filter))
elif not (set(_filter) - set(META_IDX)).issubset(df.meta):
raise ValueError('column `{}` not in `df.meta`'.format(
set(_filter) - set(META_IDX) - set(df.meta)))
self.stats = None
self.rows = [] if rows else None
# percentiles for passing to `pandas.describe()`
self.percentiles = list(percentiles)
self._describe_cols = (['count', 'mean', 'std', 'min'] +
['{:.0%}'.format(i) for i in self.percentiles] +
['max'])
def _add_to_index(self, idx, sub_idx=None):
# assign index depth if not set
if self.idx_depth is None:
self.idx_depth = 1 if sub_idx is None else 2
# check that index matches depth
if self.groupby is not None and sub_idx is None:
msg = 'if `groupby` is used, index `{}` must have format `{}`'
raise ValueError(msg.format(idx, '(idx0, idx1)'))
if self.idx_depth == 1 and sub_idx is not None:
raise ValueError('index depth set to 1, found `({}, {})`'
.format(idx, sub_idx))
if self.idx_depth == 2 and sub_idx is None:
raise ValueError('index depth set to 2, found `({})`'.format(idx))
# append to lists for sorting index
if idx not in self._idx:
self._idx.append(idx)
if self.idx_depth == 2 and sub_idx not in self._sub_idx:
self._sub_idx.append(sub_idx)
def _add_to_header(self, header, subheader):
if header not in self._headers:
self._headers.append(header)
if islistable(subheader):
for s in subheader:
if s not in self._subheaders:
self._subheaders.append(s)
elif subheader not in self._subheaders:
self._subheaders.append(subheader)
[docs] def add(self, data, header, row=None, subheader=None):
"""Filter `data` by arguments of this SummaryStats instance,
then apply `pd.describe()` and format the statistics
Parameters
----------
data : pd.DataFrame or pd.Series
data for which summary statistics should be computed
header : str
column name for descriptive statistics
row : str
row name for descriptive statistics
(required if `pyam.Statistics(rows=True)`)
subheader : str, optional
column name (level=1) if data is a unnamed `pd.Series`
"""
# verify validity of specifications
if self.rows is not None and row is None:
raise ValueError('row specification required')
if self.rows is None and row is not None:
raise ValueError('row arg illegal for this `Statistics` instance')
if isinstance(data, pd.Series):
if subheader is not None:
data.name = subheader
elif data.name is None:
msg = '`data` must be named `pd.Series` or provide `subheader`'
raise ValueError(msg)
data = pd.DataFrame(data)
if self.rows is not None and row not in self.rows:
self.rows.append(row)
_stats = None
# describe with groupby feature
if self.groupby is not None:
filter_args = dict(data=data, df=self.df, join_meta=True)
filter_args.update(self.groupby)
_stats = (
filter_by_meta(**filter_args).groupby(self.col)
.describe(percentiles=self.percentiles)
)
_stats = pd.concat([_stats], keys=[self.col], names=[''], axis=0)
if self.rows:
_stats['row'] = row
_stats.set_index('row', append=True, inplace=True)
_stats.index.names = [''] * 3 if self.rows else [''] * 2
# describe with filter feature
for (idx, _filter) in self.filters:
filter_args = dict(data=data, df=self.df)
filter_args.update(_filter)
_stats_f = (
filter_by_meta(**filter_args)
.describe(percentiles=self.percentiles)
)
_stats_f = pd.DataFrame(_stats_f.unstack()).T
if self.idx_depth == 1:
levels = [[idx]]
else:
levels = [[idx[0]], [idx[1]]]
lvls, lbls = (levels, [[0]] * self.idx_depth) if not self.rows \
else (levels + [[row]], [[0]] * (self.idx_depth + 1))
_stats_f.index = pd.MultiIndex(levels=lvls, labels=lbls)
_stats = _stats_f if _stats is None else _stats.append(_stats_f)
# add header
_stats = pd.concat([_stats], keys=[header], names=[''], axis=1)
subheader = _stats.columns.get_level_values(1).unique()
self._add_to_header(header, subheader)
# set statistics
if self.stats is None:
self.stats = _stats
else:
self.stats = _stats.combine_first(self.stats)
[docs] def reindex(self, copy=True):
"""Reindex the summary statistics dataframe"""
ret = deepcopy(self) if copy else self
ret.stats = ret.stats.reindex(index=ret._idx, level=0)
if ret.idx_depth == 2:
ret.stats = ret.stats.reindex(index=ret._sub_idx, level=1)
if ret.rows is not None:
ret.stats = ret.stats.reindex(index=ret.rows, level=ret.idx_depth)
ret.stats = ret.stats.reindex(columns=ret._headers, level=0)
ret.stats = ret.stats.reindex(columns=ret._subheaders, level=1)
ret.stats = ret.stats.reindex(columns=ret._describe_cols, level=2)
if copy:
return ret
[docs] def summarize(self, center='mean', fullrange=None, interquartile=None,
custom_format='{:.2f}'):
"""Format the compiled statistics to a concise string output
Parameter
---------
center : str, default `mean`
what to return as 'center' of the summary: `mean`, `50%`, `median`
fullrange : bool, default None
return full range of data if True or `fullrange`, `interquartile`
and `format_spec` are None
interquartile : bool, default None
return interquartile range if True
custom_format : formatting specifications
"""
# call `reindex()` to reorder index and columns
self.reindex(copy=False)
center = 'median' if center == '50%' else center
if fullrange is None and interquartile is None:
fullrange = True
return self.stats.apply(format_rows, center=center,
fullrange=fullrange,
interquartile=interquartile,
custom_format=custom_format,
axis=1, raw=False)
# %% auxiliary functions
def format_rows(row, center, fullrange=None, interquartile=None,
custom_format='{:.2f}'):
"""Format a row with `describe()` columns to a concise string"""
if (fullrange or 0) + (interquartile or 0) == 1:
legend = '{} ({})'.format(center, 'max, min' if fullrange is True
else 'interquartile range')
index = row.index.droplevel(2).drop_duplicates()
count_arg = dict(tuples=[('count', '')], names=[None, legend])
else:
msg = 'displaying multiple range formats simultaneously not supported'
raise NotImplementedError(msg)
ret = pd.Series(index=pd.MultiIndex.from_tuples(**count_arg).append(index))
row = row.sort_index()
center = '50%' if center == 'median' else center
# get maximum of `count` and write to first entry of return series
count = max([i for i in row.loc[(slice(None), slice(None), 'count')]
if not np.isnan(i)])
ret.loc[('count', '')] = ('{:.0f}'.format(count)) if count > 1 else ''
# set upper and lower for the range
upper, lower = ('max', 'min') if fullrange is True else ('75%', '25%')
# format `describe()` columns to string output
for i in index:
x = row.loc[i]
_count = x['count']
if np.isnan(_count) or _count == 0:
s = ''
elif _count > 1:
s = '{f} ({f}, {f})'.format(f=custom_format)\
.format(x[center], x[upper], x[lower])
elif _count == 1:
s = '{f}'.format(f=custom_format).format(x['50%'])
# add count of this section as `[]` if different from count_max
if 0 < _count < count:
s += ' [{:.0f}]'.format(_count)
ret.loc[i] = s
return ret