Source code for inventory

[docs]class InventoryOptim(object): """ :param df: the `DataFrame` containing data point :param units_costs: a list of pairs :math:`(G_i, C_i)`. :param date_fld: `string` the name of the column keeping each row's date :param start_date: `None` or `datetime`the start date of the analysis; if `None` the minimum date found in `date_fld` is used. :param num_intrvl: `2-tuple` the numerical range to be used for converting dates to numbers :param projectioni_date: `datetime` the target date of the analysis :param c_limit: float between 0 and 1, the confidence interval :param min_samples: `int` minimum number of samples to perform Monte Carlo sampling :param error_tol: `float` error tolerance """ def __init__( self, df, units_costs, date_fld="date", start_date=None, num_intrvl=(0.0, 10.0), projection_date=None, c_limit=0.95, min_samples=5, error_tol=1.0e-4, ): clmns = list(df.columns) if date_fld in clmns: self.date_fld = date_fld else: raise Exception("'%s' is not a column of the given DataFrame" % date_fld) dts = list(df[self.date_fld]) self.MinDate = min(dts) self.MaxDate = max(dts) if start_date is None: self.start_date = self.MinDate else: if (start_date < self.MinDate) or (self.MaxDate < start_date): raise Exception("The given start date is out of the DataFrame's scope") self.start_date = start_date self.num_intrvl = num_intrvl self.min_samples = min_samples self.error_tol = error_tol self.unit_flds = [] self.cost_flds = [] self.unit_cost = [] for uc in units_costs: if len(uc) != 2: raise Exception("'units_costs' must be a lista of pairs.") if (uc[0] in clmns) and (uc[1] in clmns): self.unit_flds.append(uc[0]) self.cost_flds.append(uc[1]) self.unit_cost += list(uc) self.df = df[[self.date_fld] + self.unit_cost].sort_values(by=self.date_fld) mp = [self.date2num(_) for _ in list(self.df[self.date_fld])] self.df["T"] = mp self.df["TotalUnits"] = self.df.apply( lambda x, flds=tuple(self.unit_flds): sum([x[_] for _ in flds]), axis=1 ) self.df["TotalCost"] = self.df.apply( lambda x, cst=tuple(self.cost_flds), unt=tuple(self.unit_flds): sum( [x[cst[_]] * x[unt[_]] for _ in range(len(cst))] ), axis=1, ) self.training_size = sum([1 if _ >= 0 else 0 for _ in mp]) if projection_date is not None: self.projection_date = projection_date self.FT = self.date2num(projection_date) else: from datetime import timedelta projection_date = self.MaxDate + timedelta(50) self.projection_date = projection_date self.FT = self.date2num(projection_date) self.c_limit = (c_limit + 1.0) / 2.0 self.colors = [ "#549dce", "#d86950", "#28b789", "#49dd1c", "#864daf", "#ef6f34", "#db99d1", "#4442c4", "#4286f4", "#46d6cc", "#46d6cc", ] # set the default regressors from sklearn.linear_model import LinearRegression self.unit_regressor = LinearRegression() self.cost_regressor = LinearRegression() self.unt_reg = [] self.cst_reg = [] self.tu_reg = None self.tc_reg = None self.fitted = False self.analyzed = False self.variance_from_trend = {} self.constraints = [] self.bound_flds = set() self.init_x = {} self.init_y = {} self.init_fits = {} self.partial_fits = {} self.result = None self.budget = lambda t: 0.0
[docs] def date2num(self, dt): """ Converts a `datetime` to a number according to `self.num_intrvl` :param dt: `datetime` """ slope = ( float(self.num_intrvl[1] - self.num_intrvl[0]) / (self.MaxDate - self.start_date).days ) y = slope * (dt - self.start_date).days + self.num_intrvl[0] return y
[docs] def set_unit_count_regressor(self, regressor): """ Sets the regressor for unit counts. Any regression inherited from `sk-learn.RegressorMixin` is acceptable :param regressor: `RegressorMixin` """ self.unit_regressor = regressor
[docs] def set_cost_regressor(self, regressor): """ Sets the regressor for unit costs. Any regression inherited from `sk-learn.RegressorMixin` is acceptable :param regressor: `RegressorMixin` """ self.cost_regressor = regressor
[docs] def fit_regressors(self): """ Initializes the regression objects and fit them on training data """ from numpy import reshape from copy import copy n_flds = len(self.unit_flds) t_df = self.df[self.df["T"] >= 0] X = reshape(t_df["T"].values, (-1, 1)) for idx in range(n_flds): y_u = t_df[self.unit_flds[idx]].values y_c = t_df[self.cost_flds[idx]].values reg_u = copy(self.unit_regressor) reg_c = copy(self.cost_regressor) reg_u.fit(X, y_u) reg_c.fit(X, y_c) self.init_fits[self.unit_flds[idx]] = copy(reg_u) self.init_fits[self.cost_flds[idx]] = copy(reg_c) self.unt_reg.append(copy(reg_u)) self.cst_reg.append(copy(reg_c)) y_u = t_df["TotalUnits"].values y_c = t_df["TotalCost"].values reg_u = copy(self.unit_regressor) reg_c = copy(self.cost_regressor) reg_u.fit(X, y_u) reg_c.fit(X, y_c) self.tu_reg = copy(reg_u) self.tc_reg = copy(reg_c) self.fitted = True
def _conf_ints(self): """ Calculates the confidence intervals for regression curves """ from numpy import reshape, power, sqrt, linspace from scipy.stats import t u_conf = [] unts = [] p_unts = [] c_conf = [] csts = [] p_csts = [] n_flds = len(self.unit_flds) t_df = self.df[self.df["T"] >= 0] x = t_df["T"].values X = reshape(x, (-1, 1)) mean_x = x.mean() n = X.shape[0] tstat = t.ppf(self.c_limit, n - 1) fx = linspace(0.0, self.FT, 100) rfx = reshape(fx, (-1, 1)) for idx in range(n_flds): y_u = t_df[self.unit_flds[idx]].values unts.append(y_u) p_unts.append(self.unt_reg[idx].predict(rfx)) s_err_u = sum(power(y_u - self.unt_reg[idx].predict(X), 2)) self.variance_from_trend[self.unit_flds[idx]] = sqrt(s_err_u / n) conf_u = ( tstat * sqrt((s_err_u / (n - 2))) * ( 1.0 / n + ( power(fx - mean_x, 2) / ((sum(power(x, 2))) - n * (power(mean_x, 2))) ) ) ) u_conf.append(conf_u) y_c = t_df[self.cost_flds[idx]].values csts.append(y_c) p_csts.append(self.cst_reg[idx].predict(rfx)) s_err_c = sum(power(y_c - self.cst_reg[idx].predict(X), 2)) self.variance_from_trend[self.cost_flds[idx]] = sqrt(s_err_c / n) conf_c = ( tstat * sqrt((s_err_c / (n - 2))) * ( 1.0 / n + ( power(fx - mean_x, 2) / ((sum(power(x, 2))) - n * (power(mean_x, 2))) ) ) ) c_conf.append(conf_c) return x, fx, unts, p_unts, u_conf, csts, p_csts, c_conf
[docs] def plot_init_system(self): """ Plots the initial data points and regression curves for projection date """ import matplotlib.pyplot as plt # from matplotlib import colors as mcolors plt.figure(figsize=(30, 20)) # self.colors = list(mcolors.CSS4_COLORS.keys())[5:] if not self.fitted: self.fit_regressors() _, fx, _, p_u, cnf_u, _, p_c, cnf_c = self._conf_ints() n_flds = len(self.unit_flds) all_x = self.df["T"].values fig, axes = plt.subplots( nrows=2, ncols=1, figsize=(20, 20), sharex=False, sharey=False ) for idx in range(n_flds): axes[0].plot(fx, p_u[idx], color=self.colors[idx % len(self.colors)]) axes[0].scatter( all_x, self.df[self.unit_flds[idx]].values, color=self.colors[idx % len(self.colors)], s=6, ) axes[0].fill_between( fx, p_u[idx] - abs(cnf_u[idx]), p_u[idx] + abs(cnf_u[idx]), color=self.colors[idx % len(self.colors)], alpha=0.1, ) axes[0].grid(True) axes[1].plot(fx, p_c[idx], color=self.colors[idx % len(self.colors)]) axes[1].scatter( all_x, self.df[self.cost_flds[idx]].values, color=self.colors[idx % len(self.colors)], s=5, ) axes[1].fill_between( fx, p_c[idx] - abs(cnf_c[idx]), p_c[idx] + abs(cnf_c[idx]), color=self.colors[idx % len(self.colors)], alpha=0.1, ) axes[1].grid(True) axes[0].legend(self.unit_flds) axes[1].legend(self.cost_flds) return fig
[docs] def plot_analysis(self): """ Plots the outcome of the adjustment. """ from numpy import array, multiply import matplotlib.pyplot as plt plt.figure(figsize=(40, 20)) if not self.analyzed: self.adjust_system("b") _, fx, _, p_u, cnf_u, _, p_c, cnf_c = self._conf_ints() tot_trend_cost = None tot_actual_cost = None tot_changed_cost = None n_flds = len(self.unit_flds) all_x = self.df["T"].values fig, axes = plt.subplots( nrows=2, ncols=2, figsize=(20, 20), sharex=False, sharey=False ) for idx in range(n_flds): # axes[0, 0].plot( fx, p_u[idx], color=self.colors[idx % len(self.colors)], ls=":" ) axes[0, 0].scatter( all_x, self.df[self.unit_flds[idx]].values, color=self.colors[idx % len(self.colors)], s=6, ) axes[0, 0].fill_between( fx, p_u[idx] - abs(cnf_u[idx]), p_u[idx] + abs(cnf_u[idx]), color=self.colors[idx % len(self.colors)], alpha=0.1, ) ys_u = array([self.partial_fits[self.unit_flds[idx]](_) for _ in fx]) axes[0, 0].plot(fx, ys_u, color=self.colors[idx % len(self.colors)]) axes[0, 0].grid(True) for cns in self.constraints: if cns[0] in self.unit_flds: axes[0, 0].scatter( [self.date2num(cns[2])], [cns[1]], cmap="cubehelix", alpha=0.2 ) axes[0, 1].plot( fx, p_c[idx], color=self.colors[idx % len(self.colors)], ls=":" ) axes[0, 1].scatter( all_x, self.df[self.cost_flds[idx]].values, color=self.colors[idx % len(self.colors)], s=5, ) axes[0, 1].fill_between( fx, p_c[idx] - abs(cnf_c[idx]), p_c[idx] + abs(cnf_c[idx]), color=self.colors[idx % len(self.colors)], alpha=0.1, ) ys_c = array([self.partial_fits[self.cost_flds[idx]](_) for _ in fx]) axes[0, 1].plot(fx, ys_c, color=self.colors[idx % len(self.colors)]) axes[0, 1].grid(True) for cns in self.constraints: if cns[0] in self.cost_flds: axes[0, 1].scatter( [self.date2num(cns[2])], [cns[1]], cmap="cubehelix", alpha=0.2 ) # trend_cost = multiply(p_u[idx], p_c[idx]) actual_cost = multiply( self.df[self.unit_flds[idx]].values, self.df[self.cost_flds[idx]].values ) changed_cost = multiply(ys_u, ys_c) if tot_trend_cost is None: tot_trend_cost = trend_cost tot_actual_cost = actual_cost tot_changed_cost = changed_cost else: tot_trend_cost += trend_cost tot_actual_cost += actual_cost tot_changed_cost += changed_cost axes[1, 0].plot( fx, trend_cost, color=self.colors[idx % len(self.colors)], ls=":" ) axes[1, 0].scatter( all_x, actual_cost, color=self.colors[idx % len(self.colors)], s=6 ) axes[1, 0].plot(fx, changed_cost, color=self.colors[idx % len(self.colors)]) axes[1, 0].grid(True) # budget = array([self.budget(_) for _ in fx]) residual = budget - tot_changed_cost gain = tot_trend_cost - tot_changed_cost axes[1, 1].plot( fx, tot_trend_cost, color=self.colors[n_flds % len(self.colors)], ls=":" ) axes[1, 1].scatter( all_x, tot_actual_cost, color=self.colors[(n_flds + 1) % len(self.colors)], s=6, ) axes[1, 1].plot( fx, tot_changed_cost, color=self.colors[(n_flds + 2) % len(self.colors)] ) axes[1, 1].plot(fx, budget, color=self.colors[(n_flds + 3) % len(self.colors)]) axes[1, 1].plot( fx, residual, color=self.colors[(n_flds + 4) % len(self.colors)] ) axes[1, 1].plot(fx, gain, color=self.colors[(n_flds + 6) % len(self.colors)]) axes[1, 1].grid(True) # axes[0, 0].legend( [ item for sublist in [["_nolegend_", loc] for loc in self.unit_flds] for item in sublist ] ) axes[0, 0].set_title("Capacity") axes[0, 1].legend( [ item for sublist in [["_nolegend_", loc] for loc in self.cost_flds] for item in sublist ] ) axes[0, 1].set_title("Unit Costs") axes[1, 0].legend( [item for sublist in [["_nolegend_", self.unit_flds[_] + "*" + self.cost_flds[_]] for _ in range(n_flds)] for item in sublist] ) axes[1, 0].set_title("Costs") axes[1, 1].legend( [ "Trend of total cost", "Proj. expected cost", "Budget", "Residual", "Gain", "Total cost", ] ) axes[1, 1].set_title("Total Costs") return plt, fig, axes
[docs] def constraint(self, fld, value, dt): """ Suggest a constraint for future. :param fld: `str` the column whose values is about to be adjusted :param value: `float` the suggested value for the given date :param dt: `datetime` the suggested date for adjustment """ self.constraints.append((fld, value, dt)) self.bound_flds.add(fld)
[docs] def make_date_interval_val(self, dt, n_days): """ Converts the outcome of `self.make_date_interval` into a list of floats """ from datetime import timedelta return [ self.date2num(dt + timedelta(days=_)) for _ in range(-n_days, n_days + 1) ]
[docs] def refit(self, fld, val, dt, n_points): """ Refits the regressor of the `fld` after producing `n_points` samples points around `dt` using a normal distribution centered at `val` :param fld: the regression associated to `fld` will be refitted :param val: the suggested value for the regression curve at `dt` :param dt: the suggested `datetime` to make adjustments to the values of `fld` :param n_points: number of samples to be generated for refitting """ from numpy import array, append from numpy.random import normal from copy import copy date_interval = array(self.make_date_interval_val(dt, n_points)).reshape( (-1, 1) ) y_sample = normal(val, self.variance_from_trend[fld], 2 * n_points + 1) X = append(self.init_x[fld], date_interval).reshape((-1, 1)) y = append(self.init_y[fld], y_sample) if fld in self.unit_flds: regressor = copy(self.unit_regressor) else: regressor = copy(self.cost_regressor) regressor.fit(X, y) return lambda t, reg=copy(regressor): reg.predict(array([t]).reshape(-1, 1))[0]
[docs] def adjust_system(self, tbo="u"): """ Forms and solves the optimization problem for trend adjustment :param tbo: `char` if 'u' only trends will be adjusted regardless of unit costs. if 'b' costs of units will be used to adjust trends """ from numpy import array, append, reshape from numpy.random import normal from scipy.optimize import minimize from copy import copy if not self.fitted: self.fit_regressors() num_points = max( self.min_samples, int(self.training_size * (1.0 - self.c_limit)) ) t_df = self.df[self.df["T"] >= 0] np_ft = array([self.FT]).reshape(-1, 1) ######################## # add cost constraints # ######################## for fld in self.cost_flds: if fld not in self.bound_flds: val = self.init_fits[fld].predict(np_ft) self.constraint(fld, val, self.projection_date) ######################## x_ = t_df["T"].values X = reshape(x_, (-1, 1)) if tbo == "u": sel_flds = self.unit_flds tfn = self.tu_reg.predict([[self.FT]])[0] elif tbo == "c": sel_flds = self.cost_flds tfn = self.tc_reg.predict(np_ft)[0] else: sel_flds = self.unit_cost tfn = self.tu_reg.predict(np_ft)[0] for fld in sel_flds: self.init_x[fld] = X self.init_y[fld] = t_df[fld].values for cns in self.constraints: fld = cns[0] if fld not in sel_flds: continue date_interval = array( self.make_date_interval_val(cns[2], num_points) ).reshape((-1, 1)) y_sample = normal(cns[1], self.variance_from_trend[fld], 2 * num_points + 1) self.init_x[fld] = append(self.init_x[fld], date_interval).reshape((-1, 1)) self.init_y[fld] = append(self.init_y[fld], y_sample) if fld in self.unit_flds: regressor = copy(self.unit_regressor) else: regressor = copy(self.cost_regressor) regressor.fit(self.init_x[fld], self.init_y[fld]) self.partial_fits[fld] = lambda t, reg=copy(regressor): reg.predict( array([t]).reshape(-1, 1) )[0] def to_be_optimized(x, tbo="u"): from numpy import array from scipy.integrate import quad idx = 0 fns = {} if tbo == "u": selected_flds = self.unit_flds elif tbo == "c": selected_flds = self.cost_flds else: selected_flds = self.unit_cost for fld_ in selected_flds: if fld_ not in self.bound_flds: fns[fld_] = self.refit(fld_, x[idx], self.projection_date, num_points) idx += 1 else: fns[fld_] = lambda t, fld=fld_: self.partial_fits[fld](t) obj = quad( lambda t, fns=fns: sum( [ ( fns[fld](t) - self.init_fits[fld].predict(array([t]).reshape(-1, 1)) ) ** 2 for fld in self.unit_flds ] ), 0.0, 1.0, )[0] cost_obj = 0.0 if tbo == "b": cost_obj = quad( lambda t: sum( [ fns[self.unit_flds[_]](t) * fns[self.cost_flds[_]](t) - self.budget(t) for _ in range(len(self.unit_flds)) ] ), 0.0, self.FT, )[0] return obj + cost_obj residual = 0.0 cns = () for fld in sel_flds: if fld in self.bound_flds: if fld in self.unit_flds: residual += self.partial_fits[fld](self.FT) def cost_residual(x, sel_flds): cst_res = 0.0 fld_idx = {} idx = 0 for fld_ in sel_flds: if fld_ not in self.bound_flds: fld_idx[fld_] = idx idx += 1 for fld_ in sel_flds: t_cst = 0.0 if fld_ in self.unit_flds: u_fld = fld_ c_fld = self.cost_flds[self.unit_flds.index(fld_)] else: c_fld = fld_ u_fld = self.unit_flds[self.cost_flds.index(fld_)] if u_fld in self.bound_flds: t_cst = self.partial_fits[u_fld](self.FT) else: t_cst = x[fld_idx[u_fld]] if c_fld in self.bound_flds: t_cst *= self.partial_fits[c_fld](self.FT) else: t_cst *= x[fld_idx[c_fld]] cst_res += t_cst res = self.budget(self.FT) - cst_res / 2.0 return res if tbo in ["u", "b"]: cns = ( { "type": "ineq", "fun": lambda x, rsdl=residual, tfn=tfn: sum(x) + rsdl - tfn + self.error_tol, }, { "type": "ineq", "fun": lambda x, rsdl=residual, tfn=tfn: -(sum(x) + rsdl - tfn) + self.error_tol, }, { "type": "ineq", "fun": lambda x, sel_flds=tuple(sel_flds): cost_residual(x, sel_flds), }, ) x0 = [] idx_flds = [] for fld in sel_flds: if fld not in self.bound_flds: idx_flds.append(fld) x0.append(self.init_fits[fld].predict(np_ft)[0]) res = minimize( to_be_optimized, x0=array(x0), method="COBYLA", constraints=cns, args=(tbo) ) self.result = res adj_x = res.x for fld in idx_flds: self.partial_fits[fld] = self.refit( fld, adj_x[idx_flds.index(fld)], self.projection_date, num_points ) self.analyzed = True