From 5aba0148413ec819c1e30c082bf57e76763f77f8 Mon Sep 17 00:00:00 2001 From: Nimelli Date: Sun, 25 Oct 2020 15:01:28 +0100 Subject: [PATCH 1/2] standard python gitignore added --- .gitignore | 140 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fa2c4bb --- /dev/null +++ b/.gitignore @@ -0,0 +1,140 @@ +my_test_file.py + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ From deb211240705e4b46e5034f1ff247f5835222985 Mon Sep 17 00:00:00 2001 From: Nimelli Date: Sun, 25 Oct 2020 15:07:28 +0100 Subject: [PATCH 2/2] bugfix when using high level api. Find device from vid and pid only (no name field), removed index of handle_winusb for write,read,etc --- README.md | 33 ++++++++++++++++++++++++++++---- winusbpy/winusbpy.py | 42 ++++++++++++++++++++++++++--------------- winusbpy/winusbutils.py | 10 +++++++--- 3 files changed, 63 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 6ccad05..d926c80 100755 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ Built on top of the low level wrapper is a more usable api to perform common USB def list_usb_devices(self, **kwargs): # vid and pid must be str, returns True if device was correctly initialized and False otherwise -def init_winusb_device(self, vid, pid): +def init_winusb_device(self, vid, pid): # Returns True if device was correctly closed and False otherwise. def close_winusb_device(self): @@ -82,7 +82,7 @@ def query_pipe(self, pipe_index): # it returns a dict with the response and with the buffer under the keywords 'result' and 'buffer' def control_transfer(self, setup_packet, buff=None): -#Send Bulk data to the Usb device, write_buffer must be a str buffer +#Send Bulk data to the Usb device, write_buffer must be of type "bytes" def write(self, pipe_id, write_buffer): #Read Bulk data from the Usb device, Returns of a buffer not greater than length_buffer length @@ -99,8 +99,33 @@ pid = "pid_device" api = WinUsbPy() result = api.list_usb_devices(deviceinterface=True, present=True) if result: - if api.init_winusb_device(pl2303_vid, pl2303_pid): - api.write(0x02, "hello") + if api.init_winusb_device(vid, pid): + + # print device interface 0 descriptors + interface_descriptor = api.query_interface_settings(0) + if interface_descriptor != None: + print("bLength: " + str(interface_descriptor.b_length)) + print("bDescriptorType: " + str(interface_descriptor.b_descriptor_type)) + print("bInterfaceNumber: " + str(interface_descriptor.b_interface_number)) + print("bAlternateSetting: " + str(interface_descriptor.b_alternate_setting)) + print("bNumEndpoints " + str(interface_descriptor.b_num_endpoints)) + print("bInterfaceClass " + str(interface_descriptor.b_interface_class)) + print("bInterfaceSubClass: " + str(interface_descriptor.b_interface_sub_class)) + print("bInterfaceProtocol: " + str(interface_descriptor.b_interface_protocol)) + print("iInterface: " + str(interface_descriptor.i_interface)) + + # print device endpoint descriptors + pipe_info_list = map(api.query_pipe, range(interface_descriptor.b_num_endpoints)) + for item in pipe_info_list: + print("PipeType: " + str(item.pipe_type)) + print("PipeId: " + str(item.pipe_id)) + print("MaximumPacketSize: " + str(item.maximum_packet_size)) + print("Interval: " + str(item.interval)) + + api.write(0x02, b"hello") # send bulk packet on OUT endpoint 2 + + # close + api.close_winusb_device() ``` Real examples diff --git a/winusbpy/winusbpy.py b/winusbpy/winusbpy.py index 71eb05f..2bc071a 100644 --- a/winusbpy/winusbpy.py +++ b/winusbpy/winusbpy.py @@ -105,17 +105,29 @@ def list_usb_devices(self, **kwargs): return self.device_paths def find_device(self, path): - return is_device(self._name, self._vid, self._pid, path) + return is_device(self._vid, self._pid, path, self._name) - def init_winusb_device(self, name, vid, pid): + def extract_device_from_vid_pid(self, vid, pid, dev_list): + name = None + path = None + + for dev in dev_list: + dev_path = dev_list[dev] + if( dev_path.find("vid_{:04x}&pid_{:04x}".format( int(str(vid), 16), int(str(pid), 16)) ) != -1 ): + # dev found + name = dev + path = dev_path + break + + return (name, path) + + def init_winusb_device(self, vid, pid): self._vid = vid self._pid = pid - self._name = name - try: - ftr = filter(self.find_device, self.device_paths) - paths = [p for p in ftr] - path = self.device_paths[paths[0]] - except IndexError: + + # extract device (name, path) from device_list + _ , path = self.extract_device_from_vid_pid(vid, pid, self.device_paths) + if(path == None): return False self.handle_file = self.api.exec_function_kernel32(CreateFile, path, GENERIC_WRITE | GENERIC_READ, @@ -207,14 +219,14 @@ def control_transfer(self, setup_packet, buff=None): def write(self, pipe_id, write_buffer): write_buffer = create_string_buffer(write_buffer) written = c_ulong(0) - self.api.exec_function_winusb(WinUsb_WritePipe, self.handle_winusb[self._index], c_ubyte(pipe_id), write_buffer, + self.api.exec_function_winusb(WinUsb_WritePipe, self.handle_winusb, c_ubyte(pipe_id), write_buffer, c_ulong(len(write_buffer) - 1), byref(written), None) return written.value def read(self, pipe_id, length_buffer): read_buffer = create_string_buffer(length_buffer) read = c_ulong(0) - result = self.api.exec_function_winusb(WinUsb_ReadPipe, self.handle_winusb[self._index], c_ubyte(pipe_id), + result = self.api.exec_function_winusb(WinUsb_ReadPipe, self.handle_winusb, c_ubyte(pipe_id), read_buffer, c_ulong(length_buffer), byref(read), None) if result != 0: if read.value != length_buffer: @@ -237,12 +249,12 @@ class POLICY_TYPE: policy_type = c_ulong(POLICY_TYPE.PIPE_TRANSFER_TIMEOUT) value_length = c_ulong(4) value = c_ulong(int(timeout * 1000)) # in ms - result = self.api.exec_function_winusb(WinUsb_SetPipePolicy, self.handle_winusb[self._index], c_ubyte(pipe_id), + result = self.api.exec_function_winusb(WinUsb_SetPipePolicy, self.handle_winusb, c_ubyte(pipe_id), policy_type, value_length, byref(value)) return result def flush(self, pipe_id): - result = self.api.exec_function_winusb(WinUsb_FlushPipe, self.handle_winusb[self._index], c_ubyte(pipe_id)) + result = self.api.exec_function_winusb(WinUsb_FlushPipe, self.handle_winusb, c_ubyte(pipe_id)) return result def _overlapped_read_do(self,pipe_id): @@ -251,14 +263,14 @@ def _overlapped_read_do(self,pipe_id): self.olread_ol.Offset = 0 self.olread_ol.OffsetHigh = 0 self.olread_ol.Pointer = 0 - self.olread_ol.hEvent = 0 - result = self.api.exec_function_winusb(WinUsb_ReadPipe, self.handle_winusb, c_ubyte(pipe_id), self.olread_buf, + self.olread_ol.hEvent = 0 + result = self.api.exec_function_winusb(WinUsb_ReadPipe, self.handle_winusb, c_ubyte(pipe_id), self.olread_buf, c_ulong(self.olread_buflen), byref(c_ulong(0)), byref(self.olread_ol)) if result != 0: return True else: return False - + def overlapped_read_init(self, pipe_id, length_buffer): self.olread_ol = Overlapped() self.olread_buf = create_string_buffer(length_buffer) diff --git a/winusbpy/winusbutils.py b/winusbpy/winusbutils.py index e3ebe68..47608ad 100644 --- a/winusbpy/winusbutils.py +++ b/winusbpy/winusbutils.py @@ -53,7 +53,7 @@ def get_winusb_functions(windll): # BOOL __stdcall WinUsb_ControlTransfer(_In_ WINUSB_INTERFACE_HANDLE InterfaceHandle,_In_ WINUSB_SETUP_PACKET SetupPacket, _Out_ PUCHAR Buffer,_In_ ULONG BufferLength,_Out_opt_ PULONG LengthTransferred,_In_opt_ LPOVERLAPPED Overlapped); winusb_functions[WinUsb_ControlTransfer] = windll.WinUsb_ControlTransfer # winusb_restypes[WinUsb_ControlTransfer] = BOOL - # winusb_argtypes[WinUsb_ControlTransfer] = [c_void_p, UsbSetupPacket, POINTER(c_ubyte), c_ulong, POINTER(c_ulong), LpOverlapped] + # winusb_argtypes[WinUsb_ControlTransfer] = [c_void_p, UsbSetupPacket, POINTER(c_ubyte), c_ulong, POINTER(c_ulong), LpOverlapped] # BOOL __stdcall WinUsb_GetDescriptor(_In_ WINUSB_INTERFACE_HANDLE InterfaceHandle,_In_ UCHAR DescriptorType,_In_ UCHAR Index,_In_ USHORT LanguageID,_Out_ PUCHAR Buffer,_In_ ULONG BufferLength,_Out_ PULONG LengthTransferred); winusb_functions[WinUsb_GetDescriptor] = windll.WinUsb_GetDescriptor @@ -108,12 +108,12 @@ def get_winusb_functions(windll): winusb_restypes[WinUsb_GetAssociatedInterface] = BOOL winusb_argtypes[WinUsb_GetAssociatedInterface] = [c_void_p, c_ubyte, POINTER(c_void_p)] - winusb_dict["functions"] = winusb_functions + winusb_dict["functions"] = winusb_functions winusb_dict["restypes"] = winusb_restypes winusb_dict["argtypes"] = winusb_argtypes return winusb_dict - + def get_kernel32_functions(kernel32): kernel32_dict = {} kernel32_functions = {} @@ -209,6 +209,10 @@ def get_setupapi_functions(setupapi): def is_device(vid, pid, path, name=None): + # this is not working + # not used + # replaced by extract_device_from_vid_pid + if name and name.lower() == path.lower(): return True if vid and pid: