-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscript.py
More file actions
executable file
·325 lines (292 loc) · 10.1 KB
/
script.py
File metadata and controls
executable file
·325 lines (292 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
#!/usr/bin/env -S uv run --script
#
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "requests>=2.32,<3",
# "click>=8.2,<9",
# "click-loglevel>=0.6,<1",
# "pydantic>=2.11,<3",
# ]
# ///
import logging
import re
import sys
import urllib.parse
from collections.abc import Iterator
import click
import click_loglevel
import pydantic
import requests
LOGGER = logging.getLogger()
logging.root.addHandler(logging.StreamHandler(sys.stderr))
def process_api_response[T: object](
r: requests.Response,
*,
expected_type: type[T],
) -> T:
"""
Process the response from the firefly api, if the response contains an
error, raise an explicit exception containing the api response, otherwise
try to validate the returned data and convert it to the expected python
type.
:param r: The response from the api.
:param expected_type: The expected type of the response.
"""
try:
data = r.json()
except requests.JSONDecodeError as e:
raise ValueError(f"Unexpected response format: {r.text}") from e
match data:
case {"message": str() as message, "exception": str() as exception}:
raise RuntimeError(exception + ": " + message)
case {"data": object() as actual_data}:
adapter = pydantic.TypeAdapter(expected_type)
return adapter.validate_python(actual_data)
case _:
raise ValueError(f"Unexpected response format: {data}")
def process_paginated_api_response[T: object](
r: requests.Response,
*,
expected_type: type[T],
session: requests.Session,
) -> Iterator[T]:
"""
Similar to process_api_response, excepts that the data of the response is
always expected to be a list, and may contain multiple pages. For this
function, the expected type is not the one of the data, but of the items
inside the data list. All pages will be accessed, lazily, when reaching
the end of the previous page.
"""
try:
data = r.json()
except requests.JSONDecodeError as e:
raise ValueError(f"Unexpected response format: {r.text}") from e
yield from process_api_response(r, expected_type=list[expected_type])
match data:
case {"links": {"next": str() as next}}:
yield from process_paginated_api_response(
session.get(next),
expected_type=expected_type,
session=session,
)
case _:
pass
class FireflySession(requests.Session):
"""
Session class to interact with the firefly api. This session offers some
helpers to facilitate the interaction with the api.
"""
def __init__(self, base_url: str, api_token: str):
super().__init__()
self.base_url = base_url
self.headers.update(
{
"Authorization": f"Bearer {api_token}",
},
)
def request(
self, method: str, url: str, *args: object, **kwargs: object
) -> requests.Response:
# Prefix the url received with the firefly base url
url = urllib.parse.urljoin(self.base_url, url)
return super().request(method, url, *args, **kwargs)
def get_transactions_with_notes(
session: requests.Session, account: str
) -> Iterator[dict]:
"""
Get all the transactions of the given account that have a note specifying
another beneficiary name.
"""
query = {
"account_is": account,
"type": "withdrawal",
"notes_contain": "Original account name",
}
for page in process_paginated_api_response(
session.get(
"/api/v1/search/transactions",
params={"query": " ".join(f'{k}:"{v}"' for k, v in query.items())},
),
expected_type=dict,
session=session,
):
yield from page["attributes"]["transactions"]
def create_fixing_rule(
session: requests.Session, account: str, beneficiary: str, group: str
) -> dict:
"""
Create a rule that triggers for each transaction going to the given
account, which has a note containing the rightful beneficiary of the
transaction. Place the rule in the given group.
"""
if (pos := beneficiary.find(" " * 2)) != -1:
# More than one consecutive whitespace in the beneficiary name
# The full original account expression will be impossible to match
# because of the how the search engine works and it will not be fixed
# cf. https://github.com/firefly-iii/firefly-iii/issues/6121#issuecomment-1143079568 # noqa: E501
# Only workaround is to remove what comes after the double whitespace
# and hope we don't get any overlap with another account
new_beneficiary = beneficiary[:pos]
LOGGER.warning(
"Beneficiary name %s contains multiple consecutive whitespaces. "
"The name will be reduced to %s.",
repr(beneficiary),
repr(new_beneficiary),
)
beneficiary = new_beneficiary
LOGGER.debug("Creating rule %s in group %s", beneficiary, group)
rule = process_api_response(
session.post(
"/api/v1/rules",
json={
"title": beneficiary,
"rule_group_title": group,
"strict": True,
"trigger": "store-journal",
"triggers": [
{
"type": "transaction_type",
"value": "withdrawal",
},
{
"type": "to_account_is",
"value": account,
},
{
"type": "notes_contains",
"value": f"Original account name: {beneficiary}",
},
],
"actions": [
{
"type": "set_destination_account",
"value": beneficiary,
},
],
},
),
expected_type=dict,
)
# Trigger the rule to apply the change to the transactions that match it
rule_id = rule["id"]
LOGGER.debug("Triggering newly created rule %s (%s)", rule_id, beneficiary)
session.post(f"/api/v1/rules/{rule_id}/trigger", json={}).raise_for_status()
return rule
@click.command()
@click.option(
"-l",
"--log-level",
type=click_loglevel.LogLevel(),
default="INFO",
help="Set logging level",
show_default=True,
show_envvar=True,
envvar="LOG_LEVEL",
)
@click.option(
"--url",
"-u",
required=True,
envvar="FIREFLY_III_URL",
help=(
"The url where the firefly api can be reached, "
"i.e. https://demo.firefly-iii.org/."
),
show_envvar=True,
)
@click.option(
"--access-token",
"-t",
required=True,
envvar="FIREFLY_III_ACCESS_TOKEN",
help="A user token to interact with the api",
show_envvar=True,
)
@click.option(
"--group",
"-g",
required=True,
envvar="FIREFLY_III_RULE_GROUP",
help="The name of the group in which the rules should be created",
show_envvar=True,
)
@click.option(
"--dry-run",
is_flag=True,
default=False,
envvar="DRY_RUN",
help=(
"When set to true, only perform get requests to the api, and display "
"an overview of the changes that would be made if the script was run "
"without the flag."
),
show_envvar=True,
)
@click.argument("account", envvar="ACCOUNT_NAME", nargs=1)
def main(
log_level: int, url: str, access_token: str, group: str, dry_run: bool, account: str
) -> None:
"""
This tool helps you manage and cleanup transactions imported using gocardless for
which the destination account is a common payment platform such as visa, by updating
the destination account to match the actual business you made the payment to.
"""
LOGGER.setLevel(log_level)
with FireflySession(
url,
access_token,
) as session:
# Get and print the current user, to validate that the api setup works
user = process_api_response(
session.get("/api/v1/about/user"),
expected_type=dict,
)
LOGGER.info("Authenticated as %s", user["attributes"]["email"])
# For each account provided, look for transactions that contain a note
# describing the true payment beneficiary
missing_rules: set[str] = set()
for transaction in get_transactions_with_notes(session, account):
LOGGER.debug(
"Transaction %s: %s",
transaction["transaction_journal_id"],
transaction["notes"],
)
# Extract the beneficiary name from the notes of the transaction
matched = re.search(
r"Original account name: ([^\n]*)", transaction["notes"]
)
if not matched:
LOGGER.error(
"Failed to match note of transaction %s: %s",
transaction["transaction_journal_id"],
transaction["notes"],
)
continue
# Add the account name to the set of missing rules
missing_rules.add(matched.group(1).strip())
if not missing_rules:
LOGGER.info(
"No transaction towards account %s need fixing, no rule to create",
account,
)
return
LOGGER.info(
"Account %s contains transactions towards %d other beneficiaries:\n- %s",
account,
len(missing_rules),
"\n- ".join(missing_rules),
)
LOGGER.info("New rules will be created in rule group %s", group)
if dry_run:
# Nothing else to do, we are in dry-run mode
return
# Create all the rules, then execute them
for beneficiary in missing_rules:
rule = create_fixing_rule(session, account, beneficiary, group)
LOGGER.info(
"Successfully created rule %s (%s)",
rule["attributes"]["title"],
rule["id"],
)
main()