Source code for bekk.bekk_estimation

#!/usr/bin/env python
# -*- coding: utf-8 -*-
r"""
BEKK estimation
===============

Estimation is performed using Quasi Maximum Likelihood (QML) method.
Specifically, the individual contribution to the Gaussian log-likelihood is

.. math::
    l_{t}\left(\theta\right)=
    -\ln\left|H_{t}\right|-u_{t}^{\prime}H_{t}^{-1}u_{t}.

"""
from __future__ import print_function, division

import time
import itertools

import numpy as np
import pandas as pd
import scipy.linalg as scl
import scipy.stats as scs

from scipy.optimize import minimize, basinhopping
from functools import partial

from bekk import ParamStandard, ParamSpatial, BEKKResults
from .utils import (estimate_uvar, likelihood_python, filter_var_python,
                    take_time)
try:
    from .recursion import filter_var
    from .likelihood import likelihood_gauss
except:
    print('Failed to import cython modules. '
          + 'Temporary hack to compile documentation.')

__all__ = ['BEKK']


[docs]class BEKK(object): """BEKK estimation class. Attributes ---------- innov Return innovations hvar Condiational variance Methods ------- estimate Estimate parameters of the model collect_losses Collect forecast losses using rolling window """ def __init__(self, innov): """Initialize the class. Parameters ---------- innov : (nobs, nstocks) array Return innovations """ self.innov = innov self.hvar = None def likelihood(self, theta, model='standard', restriction='full', target=None, cfree=False, groups=None, cython=True, use_penalty=False): """Compute the conditional log-likelihood function. Parameters ---------- theta : 1dim array Dimension depends on the model restriction model : str Specific model to estimate. Must be - 'standard' - 'spatial' restriction : str Restriction on parameters. Must be - 'full' - 'diagonal' - 'scalar' target : (nstocks, nstocks) array Estimate of unconditional variance matrix cfree : bool Whether to leave C matrix free (True) or not (False) groups : list of lists of tuples Encoded groups of items cython : bool Whether to use Cython optimizations (True) or not (False) use_penalty : bool Whether to include penalty term in the likelihood Returns ------- float The value of the minus log-likelihood function. If some regularity conditions are violated, then it returns some obscene number. """ try: if model == 'standard': param = ParamStandard.from_theta(theta=theta, target=target, nstocks=self.innov.shape[1], restriction=restriction) elif model == 'spatial': param = ParamSpatial.from_theta(theta=theta, target=target, cfree=cfree, restriction=restriction, groups=groups) else: raise NotImplementedError('The model is not implemented!') # TODO: Temporary hack to exclude errors in optimization if isinstance(param, np.ndarray): return 1e10 if param.constraint() >= 1: return 1e10 # if param.uvar_bad(): # return 1e10 args = [self.hvar, self.innov, param.amat, param.bmat, param.cmat] penalty = param.penalty() if use_penalty else 0 if cython: filter_var(*args) return likelihood_gauss(self.hvar, self.innov) + penalty else: filter_var_python(*args) return likelihood_python(self.hvar, self.innov) + penalty except: return 1e10
[docs] def estimate(self, param_start=None, restriction='scalar', cfree=False, use_target=False, model='standard', groups=None, method='SLSQP', cython=True, use_penalty=False): """Estimate parameters of the BEKK model. Parameters ---------- param_start : ParamStandard or ParamSpatial instance Starting parameters. See Notes for more details. model : str Specific model to estimate. Must be - 'standard' - 'spatial' restriction : str Restriction on parameters. Must be - 'full' - 'diagonal' - 'group' (only applicable with 'spatial' model) - 'scalar' use_target : bool Whether to use variance targeting (True) or not (False) cfree : bool Whether to leave C matrix free (True) or not (False) groups : list of lists of tuples Encoded groups of items method : str Optimization method. See scipy.optimize.minimize cython : bool Whether to use Cython optimizations (True) or not (False) use_penalty : bool Whether to include penalty term in the likelihood Returns ------- BEKKResults instance Estimation results object Notes ----- If no param_start is given, the program will estimate parameters in the order 'from simple to more complicated' (from scalar to diagonal to full) while always using variance targeting. """ # Start timer for the whole optimization time_start = time.time() # Check for incompatible inputs if use_target and cfree: raise ValueError('use_target and cfree are incompatible!') # if (groups is not None) and (model != 'spatial'): # raise ValueError('The model is incompatible with weights!') # Update default settings nobs, nstocks = self.innov.shape var_target = estimate_uvar(self.innov) self.hvar = np.zeros((nobs, nstocks, nstocks), dtype=float) self.hvar[0] = var_target.copy() # Check for existence of initial guess among arguments. # Otherwise, initialize. if param_start is None: common = {'restriction': restriction, 'method': method, 'use_penalty': use_penalty, 'use_target': use_target} if model == 'standard': param_start = self.init_param_standard(**common) elif model == 'spatial': param_start = self.init_param_spatial(groups=groups, cfree=cfree, **common) else: raise NotImplementedError('The model is not implemented!') # Get vector of parameters to start optimization theta_start = param_start.get_theta(restriction=restriction, use_target=use_target, cfree=cfree) if use_target: target = var_target else: target = None # Optimization options options = {'disp': False, 'maxiter': int(1e6)} if method == 'Nelder-Mead': options['maxfev'] = 3000 # Likelihood arguments kwargs = {'model': model, 'target': target, 'cfree': cfree, 'restriction': restriction, 'groups': groups, 'cython': cython, 'use_penalty': use_penalty} # Likelihood function likelihood = partial(self.likelihood, **kwargs) # Run optimization if method == 'basin': opt_out = basinhopping(likelihood, theta_start, niter=100, disp=options['disp'], minimizer_kwargs={'method': 'Nelder-Mead'}) else: opt_out = minimize(likelihood, theta_start, method=method, options=options) # How much time did it take in minutes? time_delta = time.time() - time_start # Store optimal parameters in the corresponding class if model == 'standard': param_final = ParamStandard.from_theta(theta=opt_out.x, restriction=restriction, target=target, nstocks=nstocks) elif model == 'spatial': param_final = ParamSpatial.from_theta(theta=opt_out.x, restriction=restriction, target=target, cfree=cfree, groups=groups) else: raise NotImplementedError('The model is not implemented!') return BEKKResults(innov=self.innov, hvar=self.hvar, cython=cython, var_target=var_target, model=model, method=method, use_target=use_target, cfree=cfree, restriction=restriction, param_start=param_start, param_final=param_final, time_delta=time_delta, opt_out=opt_out)
def init_param_standard(self, restriction='scalar', use_target=False, method='SLSQP', use_penalty=False): """Estimate scalar BEKK with variance targeting. Parameters ---------- restriction : str Restriction on parameters. Must be - 'full' - 'diagonal' - 'scalar' method : str Optimization method. See scipy.optimize.minimize Returns ------- ParamStandard instance Parameter object """ param = ParamStandard(nstocks=self.innov.shape[1], target=estimate_uvar(self.innov), abstart=(.2, .6)) if restriction == 'scalar': return param kwargs = {'model': 'standard', 'use_penalty': use_penalty, 'use_target': use_target, 'method': method} est_partial = partial(self.estimate, **kwargs) if restriction in ('diagonal', 'full'): result = est_partial(param_start=param, restriction='scalar') param = result.param_final if restriction in ('full'): result = est_partial(param_start=param, restriction='diagonal') param = result.param_final return param def init_param_spatial(self, restriction='shomo', groups=None, use_target=False, method='SLSQP', cfree=False, use_penalty=False): """Estimate scalar BEKK with variance targeting. Parameters ---------- restriction : str Restriction on parameters. Must be - 'hetero' (heterogeneous) - 'ghomo' (group homogeneous) - 'homo' (homogeneous) - 'shomo' (scalar homogeneous) cfree : bool Whether to leave C matrix free (True) or not (False) groups : list of lists of tuples Encoded groups of items method : str Optimization method. See scipy.optimize.minimize Returns ------- ParamSpatial instance Parameter object """ param = ParamSpatial.from_groups(groups=groups, target=estimate_uvar(self.innov), abstart=(.2, .7)) if restriction == 'shomo': return param kwargs = {'use_target': False, 'groups': groups, 'use_target': use_target, 'use_penalty': use_penalty, 'model': 'spatial', 'cfree': cfree, 'method': method} est_partial = partial(self.estimate, **kwargs) if restriction in ('homo', 'ghomo', 'hetero'): result = est_partial(param_start=param, restriction='shomo') param = result.param_final if restriction in ('ghomo', 'hetero'): result = est_partial(param_start=param, restriction='homo') param = result.param_final if restriction in ('hetero'): result = est_partial(param_start=param, restriction='ghomo') param = result.param_final return param def estimate_loop(self, model='standard', use_target=True, groups=None, restriction='scalar', cfree=False, method='SLSQP', ngrid=2, use_penalty=False): """Estimate parameters starting from a grid of a and b. Parameters ---------- model : str Specific model to estimate. Must be - 'standard' - 'spatial' restriction : str Restriction on parameters. Must be - 'full' = 'diagonal' - 'group' - 'scalar' groups : list of lists of tuples Encoded groups of items use_target : bool Whether to use variance targeting (True) or not (False) cfree : bool Whether to leave C matrix free (True) or not (False) method : str Optimization method. See scipy.optimize.minimize ngrid : int Number of starting values in one dimension use_penalty : bool Whether to include penalty term in the likelihood Returns ------- BEKKResults instance Estimation results object """ target = estimate_uvar(self.innov) nstocks = self.innov.shape[1] achoice = np.linspace(.01, .5, ngrid) bchoice = np.linspace(.1, .9, ngrid) out = dict() for abstart in itertools.product(achoice, bchoice): if model == 'spatial': param = ParamSpatial.from_groups(groups=groups, target=target, abstart=abstart) if model == 'standard': param = ParamStandard(nstocks=nstocks, target=target, abstart=abstart) if param.constraint() >= 1: continue result = self.estimate(param_start=param, method=method, use_target=use_target, cfree=cfree, model=model, restriction=restriction, groups=groups, use_penalty=use_penalty) out[abstart] = (result.opt_out.fun, result) df = pd.DataFrame.from_dict(out, orient='index') return df.sort_values(by=0).iloc[0, 1] @staticmethod def forecast_one(hvar=None, innov=None, param=None): """One step ahead volatility forecast. Parameters ---------- hvar : (nstocks, nstocks) array Current variance/covariances innov : (nstocks, ) array Current inovations param : ParamStandard or ParamSpatial instance Parameter object Returns ------- (nstocks, nstocks) array Volatility forecast """ forecast = param.cmat.dot(param.cmat.T) forecast += param.amat.dot(BEKK.sqinnov(innov)).dot(param.amat.T) forecast += param.bmat.dot(hvar).dot(param.bmat.T) return forecast @staticmethod def sqinnov(innov): """Volatility proxy. Square returns. Parameters ---------- innov : (nstocks, ) array Current inovations Returns ------- (nstocks, nstocks) array Volatility proxy """ return innov * innov[:, np.newaxis] @staticmethod def weights_equal(nstocks): """Equal weights. Parameters ---------- nstocks : int Number of stocks Returns ------- (nstocks, ) array """ return np.ones(nstocks) / nstocks @staticmethod def weights_minvar(hvar): """Minimum variance weights. Returns ------- (nobs, nstocks) array """ nstocks = hvar.shape[0] inv_hvar = np.linalg.solve(hvar, np.ones(nstocks)) return inv_hvar / inv_hvar.sum() @staticmethod def weights(nstocks=None, hvar=None, kind='equal'): """Portfolio weights. Parameters ---------- nstocks : int Number of stocks weight : str Either 'equal' or 'minvar' (minimum variance). Returns ------- (nobs, nstocks) array """ if kind == 'equal': return BEKK.weights_equal(nstocks) elif kind == 'minvar': return BEKK.weights_minvar(hvar) else: raise ValueError('Weight choice is not supported!') @staticmethod def pret(innov, weights=None): """Portfolio return. Parameters ---------- innov : (nstocks, ) array Current inovations weights : (nstocks, ) array Portfolio weightings Returns ------- float Portfolio return """ if weights is None: nstocks = innov.shape[0] weights = BEKK.weights(nstocks=nstocks) else: weights = np.array(weights) / np.sum(weights) return np.sum(innov * weights) @staticmethod def pvar(var, weights=None): """Portfolio variance. Parameters ---------- var : (nstocks, nstocks) array Variance matrix of returns weights : (nstocks, ) array Portfolio weightings Returns ------- float Portfolio variance """ if weights is None: nstocks = var.shape[0] weights = BEKK.weights(nstocks=nstocks) else: weights = np.array(weights) / np.sum(weights) return np.sum(weights * var.dot(weights)) @staticmethod def loss_eucl(forecast=None, proxy=None): """Eucledean loss function. Parameters ---------- forecast : (nstocks, nstocks) array Volatililty forecast proxy : (nstocks, nstocks) array Proxy for actual volatility Returns ------- float """ diff = (forecast - proxy)[np.tril_indices_from(forecast)] return np.linalg.norm(diff)**2 @staticmethod def loss_frob(forecast=None, proxy=None): """Frobenius loss function. Parameters ---------- forecast : (nstocks, nstocks) array Volatililty forecast proxy : (nstocks, nstocks) array Proxy for actual volatility Returns ------- float """ diff = forecast - proxy return np.trace(diff.T.dot(diff)) @staticmethod def loss_stein(forecast=None, proxy=None): """Stein loss function for non-degenerate proxy. Parameters ---------- forecast : (nstocks, nstocks) array Volatililty forecast proxy : (nstocks, nstocks) array Proxy for actual volatility Returns ------- float """ nstocks = forecast.shape[0] ratio = np.linalg.solve(forecast, proxy) return np.trace(ratio) - np.log(np.linalg.det(ratio)) - nstocks @staticmethod def loss_stein2(forecast=None, innov=None): """Stein loss function. Parameters ---------- forecast : (nstocks, nstocks) array Volatililty forecast innov : (nstocks, ) array Returns Returns ------- float """ lower = True forecast, lower = scl.cho_factor(forecast, lower=lower, check_finite=False) norm_innov = scl.cho_solve((forecast, lower), innov, check_finite=False) return (np.log(np.diag(forecast)**2) + norm_innov * innov).sum() @staticmethod def portf_lscore(forecast=None, innov=None, weights=None): """Portfolio log-score loss function. Parameters ---------- forecast : (nstocks, nstocks) array Volatililty forecast innov : (nstocks, ) array Returns weights : (nstocks, ) array Portfolio weights Returns ------- float """ if weights is None: nstocks = forecast.shape[0] weights = BEKK.weights(nstocks=nstocks) else: weights = np.array(weights) / np.sum(weights) pret = BEKK.pret(innov, weights=weights) pvar = BEKK.pvar(forecast, weights=weights) return (np.log(pvar) + pret**2 / pvar) / 2 @staticmethod def portf_mse(forecast=None, proxy=None, weights=None): """Portfolio MSE loss function. Parameters ---------- forecast : (nstocks, nstocks) array Volatililty forecast proxy : (nstocks, nstocks) array Proxy for actual volatility weights : (nstocks, ) array Portfolio weights Returns ------- float """ if weights is None: nstocks = forecast.shape[0] weights = BEKK.weights(nstocks=nstocks) else: weights = np.array(weights) / np.sum(weights) pvar_exp = BEKK.pvar(forecast, weights=weights) pvar_real = BEKK.pvar(proxy, weights=weights) return (pvar_exp - pvar_real) ** 2 @staticmethod def portf_qlike(forecast=None, proxy=None, weights=None): """Portfolio QLIKE loss function. Parameters ---------- forecast : (nstocks, nstocks) array Volatililty forecast proxy : (nstocks, nstocks) array Proxy for actual volatility weights : (nstocks, ) array Portfolio weights Returns ------- float """ if weights is None: nstocks = forecast.shape[0] weights = BEKK.weights(nstocks=nstocks) else: weights = np.array(weights) / np.sum(weights) pvar_exp = BEKK.pvar(forecast, weights=weights) pvar_real = BEKK.pvar(proxy, weights=weights) return np.log(pvar_exp) + pvar_real**2 / pvar_exp @staticmethod def portf_var(forecast=None, alpha=.05, weights=None): """Portfolio Value-at-Risk. Parameters ---------- forecast : (nstocks, nstocks) array Volatililty forecast alpha : float Risk level. Usually 1% or 5%. weights : (nstocks, ) array Portfolio weights Returns ------- float """ if weights is None: nstocks = forecast.shape[0] weights = BEKK.weights(nstocks=nstocks) else: weights = np.array(weights) / np.sum(weights) return scs.norm.ppf(alpha) * BEKK.pvar(forecast, weights=weights)**.5 @staticmethod def var_error(innov=None, forecast=None, alpha=.05, weights=None): """Portfolio Value-at-Risk error. Parameters ---------- innov : (nstocks, ) array Returns forecast : (nstocks, nstocks) array Volatililty forecast alpha : float Risk level. Usually 1% or 5%. weights : (nstocks, ) array Portfolio weights Returns ------- float """ if weights is None: nstocks = forecast.shape[0] weights = BEKK.weights(nstocks=nstocks) else: weights = np.array(weights) / np.sum(weights) var = BEKK.portf_var(forecast=forecast, alpha=alpha, weights=weights) pret = BEKK.pret(innov, weights=weights) return pret - var @staticmethod def var_exception(error=None): """Exception associated with portfolio Value-at-Risk. Parameters ---------- error : float VaR error Returns ------- float """ if error < 0: return 1 else: return 0 @staticmethod def loss_var(error=None): """Loss associated with portfolio Value-at-Risk. Parameters ---------- error : float VaR error Returns ------- float """ if error < 0: return 1 + error ** 2 else: return 0. @staticmethod def loss_qntl(error=None, alpha=.05): """Loss associated with portfolio Value-at-Risk as a quantile function. Parameters ---------- error : float VaR error alpha : float Risk level. Usually 1% or 5%. Returns ------- float """ return (alpha - float(error < 0)) * error @staticmethod def all_losses(forecast=None, proxy=None, innov=None, alpha=.05, kind='equal'): """Collect all loss functions. Parameters ---------- forecast : (nstocks, nstocks) array Volatililty forecast proxy : (nstocks, nstocks) array Proxy for actual volatility innov : (nstocks, ) array Returns alpha : float Risk level. Usually 1% or 5%. kind : str Either 'equal' or 'minvar' (minimum variance). Returns ------- dict """ nstocks = forecast.shape[0] weights = BEKK.weights(nstocks=nstocks, hvar=forecast, kind=kind) var_error = BEKK.var_error(innov=innov, forecast=forecast, alpha=alpha, weights=weights) return {'eucl': BEKK.loss_eucl(forecast=forecast, proxy=proxy), 'frob': BEKK.loss_frob(forecast=forecast, proxy=proxy), 'stein': BEKK.loss_stein2(forecast=forecast, innov=innov), 'lscore': BEKK.portf_lscore(forecast=forecast, innov=innov), 'mse': BEKK.portf_mse(forecast=forecast, proxy=proxy), 'qlike': BEKK.portf_qlike(forecast=forecast, proxy=proxy), 'pret': BEKK.pret(innov, weights=weights), 'var': BEKK.portf_var(forecast=forecast, alpha=alpha, weights=weights), 'var_exception': BEKK.var_exception(error=var_error), 'var_loss': BEKK.loss_var(error=var_error), 'qntl_loss': BEKK.loss_qntl(error=var_error, alpha=alpha)}
[docs] @staticmethod def collect_losses(param_start=None, innov_all=None, window=1000, model='standard', use_target=False, groups=('NA', 'NA'), restriction='scalar', cfree=False, method='SLSQP', use_penalty=False, ngrid=5, alpha=.05, kind='equal', tname='losses', path=None): """Collect forecast losses using rolling window. Parameters ---------- param_start : ParamStandard or ParamSpatial instance Initial parameters for estimation innov_all: (nobs, nstocks) array Inovations window : int Window length for in-sample estimation model : str Specific model to estimate. Must be - 'standard' - 'spatial' restriction : str Restriction on parameters. Must be - 'full' = 'diagonal' - 'group' (for 'spatial' model only) - 'scalar' groups : tuple First item is the string code. Second is spatial groups specification. use_target : bool Whether to use variance targeting (True) or not (False) cfree : bool Whether to leave C matrix free (True) or not (False) ngrid : int Number of starting values in one dimension use_penalty : bool Whether to include penalty term in the likelihood alpha : float Risk level. Usually 1% or 5%. kind : str Portfolio weighting scheme. Either 'equal' or 'minvar' (minimum variance). tname : str Name to be used while writing data to the disk Returns ------- float Average loss_frob function """ nobs = innov_all.shape[0] nstocks = innov_all.shape[1] common = {'groups': groups[1], 'use_target': use_target, 'model': model, 'restriction': restriction, 'cfree': cfree, 'use_penalty': use_penalty} loc_name = tname + '_' + model + '_' + restriction + '_' + groups[0] fname = path + '/' + loc_name + '.h5' for first in range(nobs - window): loop = 0 last = window + first innov = innov_all[first:last] bekk = BEKK(innov) time_start = time.time() if first == 0: result = bekk.estimate(method='basin', **common) else: result = bekk.estimate(param_start=param_start, method=method, **common) if result.opt_out.fun == 1e10: loop = 1 result = bekk.estimate(param_start=param_start, method='basin', **common) if result.opt_out.fun == 1e10: loop = 2 result = bekk.estimate_loop(ngrid=ngrid, method=method, **common) time_delta = time.time() - time_start param_start = result.param_final forecast = BEKK.forecast_one(hvar=result.hvar[-1], innov=innov[-1], param=result.param_final) proxy = BEKK.sqinnov(innov_all[last]) data = BEKK.all_losses(forecast=forecast, proxy=proxy, innov=innov_all[last], alpha=alpha, kind=kind) data['logl'] = result.opt_out.fun data['time_delta'] = time_delta data['loop'] = loop ids = [[model], [restriction], [groups[0]], [first]] names = ['model', 'restriction', 'group', 'first'] index = pd.MultiIndex.from_arrays(ids, names=names) losses = pd.DataFrame(data, index=index) append = False if first == 0 else True losses.to_hdf(fname, tname, format='t', append=append, min_itemsize=10) return losses