Coverage for src/xlbudget/inputformat.py: 52.59%

84 statements  

« prev     ^ index     » next       coverage.py v7.2.5, created at 2024-01-05 07:44 +0000

1"""Input file format definitions.""" 

2 

3import io 

4import sys 

5from argparse import Action 

6from datetime import datetime 

7from logging import getLogger 

8from typing import Callable, Dict, List, NamedTuple, Optional 

9 

10import numpy as np 

11import pandas as pd 

12 

13from xlbudget.rwxlb import MONTH_COLUMNS, df_drop_ignores, df_drop_na 

14 

15logger = getLogger(__name__) 

16 

17 

18class InputFormat(NamedTuple): 

19 """Specifies the format of the input file. 

20 

21 Attributes: 

22 header (int): The 0-indexed row of the header in the input file. 

23 names (List[str]): The column names. 

24 usecols (List[int]): The first len(`MONTH_COLUMNS`) elements are indices of 

25 columns that map to `MONTH_COLUMNS`, there may indices after for columns 

26 required for post-processing. 

27 ignores (List[str]): Ignore transactions that contain with these regex patterns. 

28 pre_processing (Callable): The function to call before `pd.read_csv()`. 

29 post_processing (Callable): The function to call after `pd.read_csv()`. 

30 sep (str): The separator. 

31 """ 

32 

33 header: int 

34 names: List[str] 

35 usecols: List[int] 

36 ignores: List[str] 

37 pre_processing: Callable = lambda input, year: input 

38 post_processing: Callable = lambda df: df 

39 seperator: str = "," 

40 

41 def get_usecols_names(self): 

42 return [self.names[i] for i in self.usecols[:3]] 

43 

44 

45# define pre-processing functions below 

46 

47 

48def bmo_cc_adobe_pre_processing(_input: Optional[str], year: str) -> io.StringIO: 

49 """Create CSV from input with each element on a new line. 

50 

51 Args: 

52 _input (Optional[str]): The file to process, if `None` then read from stdin. 

53 year (str): The year of all transactions. 

54 

55 Returns: 

56 A[n] `io.StringIO` CSV. 

57 """ 

58 # get lines from stdin or file 

59 if _input is None: 

60 lines = [] 

61 for line in sys.stdin: 

62 lines.append(line.strip()) 

63 else: 

64 with open(_input) as f: 

65 lines = f.read().splitlines() 

66 

67 rows = [] 

68 i = 0 

69 is_header = True 

70 while i < len(lines): 

71 elems = lines[i : i + 4] 

72 

73 # reformat dates and add year 

74 if not is_header: 

75 elems[0] = year + datetime.strptime(elems[0], "%b. %d").strftime("-%m-%d") 

76 elems[1] = year + datetime.strptime(elems[1], "%b. %d").strftime("-%m-%d") 

77 

78 # add negative sign to amounts that are not credited (CR on next line) 

79 if i + 4 < len(lines) and lines[i + 4] == "CR": 

80 i += 5 

81 else: 

82 if not is_header: 

83 elems[-1] = "-" + elems[-1] 

84 

85 i += 4 

86 

87 row = "\t".join(elems) + "\n" 

88 rows.append(row) 

89 

90 if is_header: 

91 is_header = False 

92 

93 new_input = "".join(rows) 

94 return io.StringIO(new_input) 

95 

96 

97# define post-processing functions below 

98 

99 

100def bmo_acct_web_post_processing(df: pd.DataFrame) -> pd.DataFrame: 

101 """Creates the "Amount" column. 

102 

103 Args: 

104 df (pd.DataFrame): The dataframe to process. 

105 

106 Returns: 

107 A[n] `pd.DataFrame` that combines "Amount" and "Money in" to create "Amount". 

108 """ 

109 df["Amount"] = df["Amount"].replace("[$,]", "", regex=True).astype(float) 

110 df["Money in"] = df["Money in"].replace("[$,]", "", regex=True).astype(float) 

111 df["Amount"] = np.where(df["Money in"].isna(), df["Amount"], df["Money in"]) 

112 df = df.drop("Money in", axis=1) 

113 return df 

114 

115 

116def bmo_cc_web_post_processing(df: pd.DataFrame) -> pd.DataFrame: 

117 """Formats the "Money in/out" column. 

118 

119 Args: 

120 df (pd.DataFrame): The dataframe to process. 

121 

122 Returns: 

123 A[n] `pd.DataFrame` that converts "Money in/out" to a float. 

124 """ 

125 df["Money in/out"] = ( 

126 df["Money in/out"].replace("[$,]", "", regex=True).astype(float) 

127 ) 

128 return df 

129 

130 

131# define input formats below 

132 

133 

134BMO_ACCT = InputFormat( 

135 header=3, 

136 names=[ 

137 "First Bank Card", 

138 "Transaction Type", 

139 "Date Posted", 

140 "Transaction Amount", 

141 "Description", 

142 ], 

143 usecols=[2, 4, 3], 

144 ignores=[r"^\[CW\] TF.*(?:285|493|593|625)$"], 

145) 

146 

147BMO_ACCT_WEB = InputFormat( 

148 header=0, 

149 names=[ 

150 "Date", 

151 "Description", 

152 "Amount", # actually named "Money out", but matches after post-processing 

153 "Money in", 

154 "Balance", 

155 ], 

156 usecols=[0, 1, 2, 3], 

157 ignores=[r"^TF.*(?:285|493|593|625)$"], 

158 post_processing=bmo_acct_web_post_processing, 

159 seperator="\t", 

160) 

161 

162BMO_CC = InputFormat( 

163 header=2, 

164 names=[ 

165 "Item #", 

166 "Card #", 

167 "Transaction Date", 

168 "Posting Date", 

169 "Transaction Amount", 

170 "Description", 

171 ], 

172 usecols=[2, 5, 4], 

173 ignores=[r"^TRSF FROM.*(?:285|493|593)$"], 

174) 

175 

176BMO_CC_WEB = InputFormat( 

177 header=0, 

178 names=[ 

179 "Transaction date", 

180 "Description", 

181 "Money in/out", 

182 ], 

183 usecols=[0, 1, 2], 

184 ignores=[r"^TRSF FROM.*(?:285|493|593)$"], 

185 post_processing=bmo_cc_web_post_processing, 

186 seperator="\t", 

187) 

188 

189BMO_CC_ADOBE = InputFormat( 

190 header=0, 

191 names=[ 

192 "Transaction Date", 

193 "Posting Date", 

194 "Description", 

195 "Amount", 

196 ], 

197 usecols=[0, 2, 3], 

198 ignores=[r"^TRSF FROM.*(?:285|493|593)$"], 

199 pre_processing=bmo_cc_adobe_pre_processing, 

200 seperator="\t", 

201) 

202 

203 

204# define input formats above 

205 

206 

207class GetInputFormats(Action): 

208 """Argparse action for the format argument. 

209 Adapted from [this Stack Overflow answer](https://stackoverflow.com/a/50799463). 

210 

211 Attributes: 

212 input_formats (Dict[str, InputFormat]): Maps format names to values. 

213 """ 

214 

215 input_formats: Dict[str, InputFormat] = { 

216 n: globals()[n] for n in globals() if isinstance(globals()[n], InputFormat) 

217 } 

218 

219 def __call__(self, parser, namespace, values, option_string=None): 

220 setattr(namespace, self.dest, self.input_formats[values]) 

221 

222 

223def parse_input( 

224 input: Optional[str], format: InputFormat, year: Optional[str] 

225) -> pd.DataFrame: 

226 """Parses an input. 

227 

228 Args: 

229 input (Optional[str]): The path to the input file, if None parse from stdin. 

230 format (InputFormat): The input format. 

231 year (Optional[str]): The year of all transactions. 

232 

233 Raises: 

234 ValueError: If input contains duplicate transactions. 

235 

236 Returns: 

237 A[n] `pd.DataFrame` where the columns match the xlbudget file's column names. 

238 """ 

239 input_initially_none = input is None 

240 if input_initially_none: 240 ↛ 241line 240 didn't jump to line 241, because the condition on line 240 was never true

241 print("Paste your transactions here (CTRL+D twice on a blank line to end):") 

242 

243 input = format.pre_processing(input, year) 

244 

245 df = pd.read_csv( 

246 input if input is not None else sys.stdin, 

247 sep=format.seperator, 

248 index_col=False, 

249 names=format.names, 

250 header=format.header if input is not None else None, 

251 usecols=format.usecols, 

252 parse_dates=[0], 

253 skip_blank_lines=False, 

254 ) 

255 

256 if input_initially_none: 256 ↛ 257line 256 didn't jump to line 257, because the condition on line 256 was never true

257 print("---End of transactions---") 

258 

259 df = format.post_processing(df) 

260 

261 # convert first column to datetime and replace any invalid values with NaT 

262 df[df.columns[0]] = pd.to_datetime(df[df.columns[0]], errors="coerce") 

263 

264 df = df_drop_na(df) 

265 

266 df.columns = df.columns.str.strip() 

267 

268 # order columns to match `MONTH_COLUMNS` 

269 df = df[format.get_usecols_names()] 

270 

271 # rename columns to match `MONTH_COLUMNS` 

272 df = df.set_axis([c.name for c in MONTH_COLUMNS], axis="columns") 

273 

274 # sort rows by date 

275 df = df.sort_values(by=list(df.columns), ascending=True) 

276 

277 # strip whitespace from descriptions 

278 df["Description"] = df["Description"].str.strip() 

279 

280 # drop ignored transactions 

281 df = df_drop_ignores(df, "|".join(format.ignores)) 

282 

283 # TODO: write issues to make ignoring identical transactions interactive 

284 # TODO: investigate autocompletions 

285 if df.duplicated().any(): 285 ↛ 286line 285 didn't jump to line 286, because the condition on line 285 was never true

286 logger.warning( 

287 "The following transactions are identical:\n" 

288 f"{df[df.duplicated(keep=False)]}" 

289 ) 

290 

291 return df