Coverage for src/xlbudget/rwxlb.py: 32.10%

199 statements  

« prev     ^ index     » next       coverage.py v7.2.5, created at 2024-01-05 07:44 +0000

1"""xlbudget file reading and writing.""" 

2 

3import calendar 

4from logging import getLogger 

5from typing import Dict, List, NamedTuple 

6 

7import pandas as pd 

8from openpyxl import Workbook 

9from openpyxl.chart import BarChart, Reference 

10from openpyxl.styles import Alignment, Font 

11from openpyxl.utils import get_column_letter 

12from openpyxl.utils.cell import column_index_from_string, coordinate_from_string 

13from openpyxl.worksheet.table import Table, TableStyleInfo 

14from openpyxl.worksheet.worksheet import Worksheet 

15 

16logger = getLogger(__name__) 

17 

18FORMAT_ACCOUNTING = '_($* #,##0.00_);_($* (#,##0.00);_($* "-"??_);_(@_)' 

19FORMAT_DATE = "MM/DD/YYYY" 

20FORMAT_NUMBER = '_($* #,##0_);_($* (#,##0);_($* "-"??_);_(@_)' 

21 

22MONTH_NAME_0_IND = calendar.month_name[1:] 

23MONTH_TABLES_ROW = 17 

24MONTH_TABLES_COL = 6 

25 

26 

27class ColumnSpecs(NamedTuple): 

28 name: str 

29 format: str 

30 width: int 

31 

32 

33MONTH_COLUMNS = [ 

34 ColumnSpecs(name="Date", format=FORMAT_DATE, width=12), 

35 ColumnSpecs(name="Description", format="", width=20), 

36 ColumnSpecs(name="Amount", format=FORMAT_ACCOUNTING, width=12), 

37] 

38SUMMARY_COLUMNS = [ 

39 ColumnSpecs(name="Month", format="", width=12), 

40 ColumnSpecs(name="Incomes", format=FORMAT_ACCOUNTING, width=12), 

41 ColumnSpecs(name="Expenses", format=FORMAT_ACCOUNTING, width=12), 

42 ColumnSpecs(name="Net", format=FORMAT_ACCOUNTING, width=12), 

43] 

44 

45 

46class TablePosition: 

47 """The state and bounds of a worksheet table. 

48 Read-only fields were implemented with properties that return mangled variables. 

49 """ 

50 

51 def __init__(self, ref: str) -> None: 

52 # excel ref format: "<top left cell coordinate>:<bottom right cell coordinate>" 

53 start, end = ref.split(":") 

54 

55 self.__first_col, self.__header_row = coordinate_from_string(start) 

56 self.next_row = self.__header_row + 1 

57 self.__first_col_ind = column_index_from_string(self.__first_col) 

58 

59 self.__last_col, self.__initial_last_row = coordinate_from_string(end) 

60 

61 @property 

62 def header_row(self) -> int: 

63 return self.__header_row 

64 

65 @property 

66 def first_col(self) -> int: 

67 return self.__first_col_ind 

68 

69 @property 

70 def initial_last_row(self) -> int: 

71 return self.__initial_last_row 

72 

73 def __repr__(self) -> str: 

74 return ( 

75 f"{self.__class__.__name__}(next_row={self.next_row}, " 

76 f"first_col={self.first_col}, initial_last_row={self.initial_last_row}, " 

77 f"header_row={self.header_row})" 

78 ) 

79 

80 def get_ref(self) -> str: 

81 # Excel tables must have at least 2 rows: 1 header and 1+ data. `last_row` is 

82 # implemented as follows so that `next_row` can be incremented consistently. 

83 last_row = ( 

84 self.next_row - 1 

85 if self.next_row - 1 >= self.header_row + 1 

86 else self.header_row + 1 

87 ) 

88 return f"{self.__first_col}{self.header_row}:{self.__last_col}{last_row}" 

89 

90 

91def create_year_sheet(wb: Workbook, year: int) -> None: 

92 """Creates a year sheet, with a table for each month. 

93 

94 Args: 

95 wb (openpyxl.workbook.workbook.Workbook): The workbook to create the sheet in. 

96 year (int): The year. 

97 

98 Raises: 

99 ValueError: If year sheet `year` already exists in the workbook `wb`. 

100 """ 

101 index = 0 

102 year_str = str(year) 

103 if year_str in wb.sheetnames: 

104 raise ValueError(f"Year sheet {year_str} already exists") 

105 

106 logger.debug(f"Creating sheet {year_str} at {index=}") 

107 ws = wb.create_sheet(year_str, index) 

108 num_tables = len(MONTH_NAME_0_IND) 

109 

110 for c_start in range( 

111 MONTH_TABLES_COL, 

112 (len(MONTH_COLUMNS) + 1) * num_tables + MONTH_TABLES_COL, 

113 len(MONTH_COLUMNS) + 1, 

114 ): 

115 month_ind = (c_start - MONTH_TABLES_COL) // (len(MONTH_COLUMNS) + 1) 

116 month = MONTH_NAME_0_IND[month_ind] 

117 table_name = _get_month_table_name(month, year_str) 

118 logger.debug(f"creating {table_name} table") 

119 

120 _add_table( 

121 ws, table_name, c_start, r_start=MONTH_TABLES_ROW, columns=MONTH_COLUMNS 

122 ) 

123 

124 logger.debug("Creating summary table") 

125 summ_table_name = _get_summary_table_name(year_str) 

126 _add_table(ws, summ_table_name, c_start=1, r_start=1, columns=SUMMARY_COLUMNS) 

127 summ_tab = ws.tables[summ_table_name] 

128 summ_tab_pos = TablePosition(ref=summ_tab.ref) 

129 

130 for month in MONTH_NAME_0_IND: 

131 month_table_name = _get_month_table_name(month, year_str) 

132 table_range = f"{month_table_name}[{MONTH_COLUMNS[-1].name}]" 

133 

134 # set month cell 

135 ws.cell(row=summ_tab_pos.next_row, column=summ_tab_pos.first_col).value = month 

136 

137 # set incomes cell 

138 incomes_cell = ws.cell( 

139 row=summ_tab_pos.next_row, column=summ_tab_pos.first_col + 1 

140 ) 

141 incomes_cell.value = f'=SUMIFS({table_range}, {table_range}, ">0")' 

142 incomes_cell.number_format = SUMMARY_COLUMNS[1].format 

143 

144 # set expenses cell 

145 expenses_cell = ws.cell( 

146 row=summ_tab_pos.next_row, column=summ_tab_pos.first_col + 2 

147 ) 

148 expenses_cell.value = f'=-SUMIFS({table_range}, {table_range}, "<=0")' 

149 expenses_cell.number_format = SUMMARY_COLUMNS[2].format 

150 

151 # set net cell 

152 net_cell = ws.cell(row=summ_tab_pos.next_row, column=summ_tab_pos.first_col + 3) 

153 net_cell.value = f"={incomes_cell.coordinate}-{expenses_cell.coordinate}" 

154 net_cell.number_format = SUMMARY_COLUMNS[3].format 

155 

156 summ_tab_pos.next_row += 1 

157 

158 summ_tab.ref = summ_tab_pos.get_ref() 

159 

160 # compute totals 

161 # set month cell 

162 ws.cell(row=summ_tab_pos.next_row, column=summ_tab_pos.first_col).value = "Total" 

163 

164 # set other cells and create charts 

165 for i in range(1, len(SUMMARY_COLUMNS)): 

166 cell = ws.cell(row=summ_tab_pos.next_row, column=summ_tab_pos.first_col + i) 

167 cell.value = f"=SUM({summ_table_name}[{SUMMARY_COLUMNS[i].name}])" 

168 cell.number_format = SUMMARY_COLUMNS[i].format 

169 

170 chart = BarChart() 

171 data = Reference( 

172 ws, 

173 min_col=i + 1, 

174 min_row=summ_tab_pos.header_row, 

175 max_row=summ_tab_pos.next_row - 1, 

176 ) 

177 cats = Reference( 

178 ws, 

179 min_col=summ_tab_pos.first_col, 

180 min_row=summ_tab_pos.header_row + 1, 

181 max_row=summ_tab_pos.next_row - 1, 

182 ) 

183 chart.add_data(data, titles_from_data=True) 

184 chart.set_categories(cats) 

185 chart.legend = None 

186 chart.y_axis.numFmt = FORMAT_NUMBER 

187 chart.height = 7.5 

188 chart.width = 8.5 # type: ignore[assignment] 

189 start_col = MONTH_TABLES_COL + (i - 1) * (len(MONTH_COLUMNS) + 1) 

190 anchor = f"{get_column_letter(start_col)}1" 

191 ws.add_chart(chart, anchor) 

192 

193 

194def _add_table( 

195 ws: Worksheet, 

196 table_name: str, 

197 c_start: int, 

198 r_start: int, 

199 columns: List[ColumnSpecs], 

200): 

201 # table title 

202 table_title = ws.cell(row=r_start, column=c_start) 

203 table_title.value = table_name 

204 table_title.font = Font(bold=True) 

205 table_title.alignment = Alignment(horizontal="center") 

206 ws.merge_cells( 

207 start_row=r_start, 

208 start_column=c_start, 

209 end_row=r_start, 

210 end_column=c_start + len(columns) - 1, 

211 ) 

212 

213 # table header and formating 

214 header_row = r_start + 1 

215 transactions_row = r_start + 2 

216 for i in range(len(columns)): 

217 c = c_start + i 

218 

219 # header 

220 ws.cell(row=header_row, column=c).value = columns[i].name 

221 

222 # column format 

223 cell = ws.cell(row=transactions_row, column=c) 

224 if columns[i].format: 

225 cell.number_format = columns[i].format 

226 

227 # column width 

228 ws.column_dimensions[get_column_letter(c)].width = columns[i].width 

229 

230 # create table 

231 c_start_ltr = get_column_letter(c_start) 

232 c_end_ltr = get_column_letter(c_start + len(columns) - 1) 

233 ref = f"{c_start_ltr}{header_row}:{c_end_ltr}{transactions_row}" 

234 logger.debug(f"creating table {table_name} with {ref=}") 

235 tab = Table(displayName=table_name, ref=ref) 

236 

237 # add a default style with striped rows and banded columns 

238 style = TableStyleInfo( 

239 name="TableStyleMedium9", 

240 showFirstColumn=False, 

241 showLastColumn=False, 

242 showRowStripes=True, 

243 showColumnStripes=True, 

244 ) 

245 tab.tableStyleInfo = style 

246 

247 ws.add_table(tab) 

248 

249 

250def update_xlbudget(wb: Workbook, df: pd.DataFrame): 

251 """Updates an xlbudget file. 

252 

253 Args: 

254 wb (openpyxl.workbook.workbook.Workbook): The xlbudget workbook. 

255 df (pd.DataFrame): The input file dataframe. 

256 """ 

257 oldest_date, newest_date = df[df.columns[0]].agg(["min", "max"]) 

258 logger.debug(f"{oldest_date=}, {newest_date=}") 

259 

260 # create year sheets as needed 

261 for year in range(oldest_date.year, newest_date.year + 1): 

262 if str(year) not in wb.sheetnames: 

263 logger.info(f"Creating {year} sheet") 

264 create_year_sheet(wb, year) 

265 

266 # initialize table positions dictionary 

267 # maps worksheet names to dictionaries that map table names to their position. 

268 table_pos: Dict[str, Dict[str, TablePosition]] = {} 

269 for year in range(oldest_date.year, newest_date.year + 1): 

270 sheet_name = str(year) 

271 table_pos[sheet_name] = {} 

272 

273 start_month = oldest_date.month if year == oldest_date.year else 1 

274 end_month = newest_date.month if year == newest_date.year else 12 

275 for month in range(start_month, end_month + 1): 

276 month_name = calendar.month_name[month] 

277 table_name = _get_month_table_name(month=month_name, year=sheet_name) 

278 logger.debug(f"Initializing table {table_name} in sheet {sheet_name}") 

279 ref = wb[sheet_name].tables[table_name].ref 

280 table_pos[sheet_name][table_name] = TablePosition(ref) 

281 

282 # update df with transactions in wb 

283 logger.debug(f"{df.shape=} before checking existing transactions") 

284 for sheet_name in table_pos.keys(): 

285 ws = wb[sheet_name] 

286 

287 for pos in table_pos[sheet_name].values(): 

288 is_populated = bool(ws.cell(row=pos.next_row, column=pos.first_col).value) 

289 if is_populated: 

290 for r in range(pos.next_row, pos.initial_last_row + 1): 

291 transaction = [] 

292 for i in range(len(MONTH_COLUMNS)): 

293 c = pos.first_col + i 

294 transaction.append(ws.cell(row=r, column=c).value) 

295 

296 logger.debug(f"Appending {transaction=} to dataframe") 

297 # ignore mypy error and implicitly cast to df.dtypes 

298 df.loc[len(df) + 1] = transaction # type: ignore[call-overload] 

299 df = df_drop_duplicates(df) 

300 # re-sort transactions to make the oldest transactions come first 

301 df = df.sort_values(by=list(df.columns), ascending=True) 

302 logger.debug(f"{df.shape=} after checking existing transactions") 

303 

304 # write dataframe to wb 

305 for row in df.itertuples(index=False): 

306 logger.debug(f"Writing transaction {row} to workbook") 

307 

308 # get worksheet and table position 

309 sheet_name, month_name = str(row.Date.year), calendar.month_name[row.Date.month] 

310 table_name = _get_month_table_name(month=month_name, year=sheet_name) 

311 ws, pos = wb[sheet_name], table_pos[sheet_name][table_name] 

312 

313 # set date cell 

314 date_cell = ws.cell(row=pos.next_row, column=pos.first_col) 

315 date_cell.value = row.Date 

316 date_cell.number_format = MONTH_COLUMNS[0].format 

317 

318 # set description cell 

319 ws.cell(row=pos.next_row, column=pos.first_col + 1).value = row.Description 

320 

321 # set amount cell 

322 amount_cell = ws.cell(row=pos.next_row, column=pos.first_col + 2) 

323 amount_cell.value = row.Amount 

324 amount_cell.number_format = MONTH_COLUMNS[2].format 

325 

326 pos.next_row += 1 

327 

328 # update table refs 

329 for sheet_name in table_pos.keys(): 

330 for table_name, pos in table_pos[sheet_name].items(): 

331 tab = wb[sheet_name].tables[table_name] 

332 ref = pos.get_ref() 

333 if ref != tab.ref: 

334 logger.debug( 

335 f"Updating ref of table {tab.name} from {tab.ref} to {ref}" 

336 ) 

337 tab.ref = pos.get_ref() 

338 

339 

340def df_drop_duplicates(df: pd.DataFrame) -> pd.DataFrame: 

341 """Checks for duplicate rows, dropping them in place if any. 

342 

343 Args: 

344 df (pd.DataFrame): The original dataframe. 

345 

346 Returns: 

347 A[n] `pd.DataFrame` without any duplicate rows. 

348 """ 

349 duplicated = df.duplicated() 

350 duplicates = df[duplicated] 

351 if not duplicates.empty: 

352 logger.warning(f"Dropping duplicate transactions:\n{duplicates}") 

353 return df[~duplicated] 

354 return df 

355 

356 

357def df_drop_ignores(df: pd.DataFrame, ignore: str) -> pd.DataFrame: 

358 """Checks for rows containing `ignore`, dropping them in place if any. 

359 

360 Args: 

361 df (pd.DataFrame): The original dataframe. 

362 ignore (str): The regex pattern that is in descriptions to ignore. 

363 

364 Returns: 

365 A[n] `pd.DataFrame` without any rows containing `ignore`. 

366 """ 

367 ignored = df["Description"].str.contains(ignore) 

368 ignores = df[ignored] 

369 if not ignores.empty: 

370 logger.warning(f"Dropping ignored transactions:\n{ignores}") 

371 return df[~ignored].reset_index(drop=True) 

372 return df 

373 

374 

375def df_drop_na(df: pd.DataFrame) -> pd.DataFrame: 

376 """Checks for rows that contain only `na` values, dropping them in place if any. 

377 

378 Args: 

379 df (pd.DataFrame): The original dataframe. 

380 

381 Returns: 

382 A[n] `pd.DataFrame` without any rows that are entirely `na`. 

383 """ 

384 na = df.isna().all(axis=1) 

385 nas = df[na] 

386 if not nas.empty: 

387 logger.info(f"Dropping rows that contain only `na` values:\n{nas}") 

388 return df[~na].reset_index(drop=True) 

389 return df 

390 

391 

392def _get_month_table_name(month: str, year: str): 

393 return f"_{month}{year}" 

394 

395 

396def _get_summary_table_name(year: str): 

397 return f"_Summary{year}"