-
Notifications
You must be signed in to change notification settings - Fork 179
Description
Hey there, I wanted to use this framework in a larger application, where some actions trigger a read or write, which are in a side thread of the program. The issue is that the read or write locks the thread and can not be canceled because the read or write action may not be executed. When doing a read or write action again after not finishing the old one, it's no longer possible to read / write. A message "Segmentation Error" is shown when exiting the program.
Basic example code can be found below, full code can be found here. In summary, a thread is spun up which should run a loop of reading or writing. The cancel function breaks the loop. This does not work tough, because .read() and .write() will lock until the read or write did happen. Even if the component got cleaned up by the garbage collector, I assume some lower level read/write code is still running, and if using one of the both functions again will cause the described error. If I always read or write a card/chip after using the function (which is not always the case because it's optional) the program runs as intended.
Is there any option or possibility to cancel the read progress over the SimpleMFRC522, so I can implement an optional read into my program?
from mfrc522 import SimpleMFRC522
class BasicMFRC522():
def __init__(self) -> None:
self.rfid = SimpleMFRC522()
def read_card(self) -> Union[str, None]:
_, text = self.rfid.read()
if text is not None:
text = text.strip()
return text
def write_card(self, text: str) -> bool:
_id, _ = self.rfid.write(text)
return _id is not None
class RFIDReader:
def __init__(self) -> None:
self.is_active = False
self.rfid = BasicMFRC522()
def read_rfid(self, side_effect: Callable[[str], None]):
"""Start the rfid reader, calls an side effect with the read value"""
if self.is_active:
return
self.is_active = True
rfid_thread = Thread(target=self._read_thread, args=(side_effect,), daemon=True)
rfid_thread.start()
def _read_thread(self, side_effect: Callable[[str], None]):
"""Execute the reading until reads a value or got canceled"""
text = None
while self.is_active:
text = self.rfid.read_card()
if text is not None:
side_effect(text)
break
time.sleep(0.5)
self.is_active = False
def write_rfid(self, value: str, side_effect: Optional[Callable[[str], None]] = None):
"""Writes the value to the RFID"""
if self.is_active:
return
self.is_active = True
rfid_thread = Thread(target=self._write_thread, args=(value, side_effect,), daemon=True)
rfid_thread.start()
def _write_thread(self, text: str, side_effect: Optional[Callable[[str], None]] = None):
"""Executes the writing until successful or canceled"""
while self.is_active:
success = self.rfid.write_card(text)
if success:
if side_effect:
side_effect(text)
break
time.sleep(0.1)
self.is_active = False
def cancel_reading(self):
"""Cancels the reading loop"""
self.is_active = FalseThanks for the time.
Update: After some more diving into the source code and talking to a friend, I think read_no_block and write_no_block instead of the read and write function should do the general job? Is there anything to look out for or do in addition when interrupting the read/write?