diff --git a/plover_stenograph_usb.py b/plover_stenograph_usb.py index 7d738d0..8d4a94f 100644 --- a/plover_stenograph_usb.py +++ b/plover_stenograph_usb.py @@ -73,7 +73,7 @@ VENDOR_ID = 0x112b MAX_READ = 0x200 # Arbitrary read limit -class StenoPacket(object): +class StenoPacket: """ Stenograph StenoPacket helper @@ -210,7 +210,7 @@ class StenoPacket(object): return strokes -class AbstractStenographMachine(object): +class AbstractStenographMachine: """Simple interface to connect with and send data to a Stenograph machine""" def connect(self) -> bool: @@ -225,150 +225,254 @@ class AbstractStenographMachine(object): """Send a StenoPacket to the machine and return the response or None""" raise NotImplementedError('send_receive() is not implemented') + if sys.platform.startswith('win32'): - from ctypes import ( - Structure, - POINTER, - c_ulonglong, - windll, - create_string_buffer, - sizeof, - byref, - pointer, - c_char, - ) - from ctypes.wintypes import DWORD, HANDLE, BYTE + + # For Windows we directly call Windows API functions. + + from ctypes import windll, wintypes + import ctypes import uuid - # Class GUID for Stenograph USB Writer - USB_WRITER_GUID = uuid.UUID('{c5682e20-8059-604a-b761-77c4de9d5dbf}') + GUID = wintypes.BYTE * 16 + HDEVINFO = wintypes.HANDLE - class DeviceInterfaceData(Structure): + # Stubs. + LPOVERLAPPED = wintypes.LPVOID + LPSECURITY_ATTRIBUTES = wintypes.LPVOID + PSP_DEVINFO_DATA = wintypes.LPVOID + + # Class GUID for Stenograph USB Writer. + USB_WRITER_GUID = GUID(*uuid.UUID('{c5682e20-8059-604a-b761-77c4de9d5dbf}').bytes) + + class SP_DEVICE_INTERFACE_DATA(ctypes.Structure): _fields_ = [ - ('cbSize', DWORD), - ('InterfaceClassGuid', BYTE * 16), - ('Flags', DWORD), - ('Reserved', POINTER(c_ulonglong)) + ('cbSize', wintypes.DWORD), + ('InterfaceClassGuid', GUID), + ('Flags', wintypes.DWORD), + ('Reserved', wintypes.PULONG), ] + PSP_DEVICE_INTERFACE_DATA = ctypes.POINTER(SP_DEVICE_INTERFACE_DATA) + + class SP_DEVICE_INTERFACE_DETAIL_DATA_A(ctypes.Structure): + _fields_ = [ + ('cbSize', wintypes.DWORD), + ('_DevicePath', wintypes.CHAR * 1), + ] + @property + def DevicePath(self): + return ctypes.string_at(ctypes.byref(self, ctypes.sizeof(wintypes.DWORD))) + PSP_DEVICE_INTERFACE_DETAIL_DATA_A = ctypes.POINTER(SP_DEVICE_INTERFACE_DETAIL_DATA_A) - # For Windows we directly call Windows API functions SetupDiGetClassDevs = windll.setupapi.SetupDiGetClassDevsA + SetupDiGetClassDevs.argtypes = [ + ctypes.POINTER(GUID), # ClassGuid + wintypes.LPCWSTR, # Enumerator + wintypes.HWND, # hwndParent + wintypes.DWORD, # Flags + ] + SetupDiGetClassDevs.restype = HDEVINFO + + SetupDiDestroyDeviceInfoList = windll.setupapi.SetupDiDestroyDeviceInfoList + SetupDiDestroyDeviceInfoList.argtypes = [ + HDEVINFO, # DeviceInfoSet + ] + SetupDiDestroyDeviceInfoList.restype = wintypes.BOOL + SetupDiEnumDeviceInterfaces = windll.setupapi.SetupDiEnumDeviceInterfaces - SetupDiGetInterfaceDeviceDetail = ( - windll.setupapi.SetupDiGetDeviceInterfaceDetailA) + SetupDiEnumDeviceInterfaces.argtypes = [ + HDEVINFO, # DeviceInfoSet + PSP_DEVINFO_DATA, # DeviceInfoData + ctypes.POINTER(GUID), # InterfaceClassGuid + wintypes.DWORD, # MemberIndex + PSP_DEVICE_INTERFACE_DATA, # DeviceInterfaceData + ] + SetupDiEnumDeviceInterfaces.restype = wintypes.BOOL + + SetupDiGetDeviceInterfaceDetail = windll.setupapi.SetupDiGetDeviceInterfaceDetailA + SetupDiGetDeviceInterfaceDetail.argtypes = [ + HDEVINFO, # DeviceInfoSet + PSP_DEVICE_INTERFACE_DATA, # DeviceInterfaceData + PSP_DEVICE_INTERFACE_DETAIL_DATA_A, # DeviceInterfaceDetailData + wintypes.DWORD, # DeviceInterfaceDetailDataSize + wintypes.PDWORD, # RequiredSize + PSP_DEVINFO_DATA, # DeviceInfoData + ] + SetupDiGetDeviceInterfaceDetail.restype = wintypes.BOOL + CreateFile = windll.kernel32.CreateFileA + CreateFile.argtypes = [ + wintypes.LPCSTR, # lpFileName + wintypes.DWORD, # dwDesiredAccess + wintypes.DWORD, # dwShareMode + LPSECURITY_ATTRIBUTES, # lpSecurityAttributes + wintypes.DWORD, # dwCreationDisposition + wintypes.DWORD, # dwFlagsAndAttributes + wintypes.HANDLE, # hTemplateFile + ] + CreateFile.restype = wintypes.HANDLE + ReadFile = windll.kernel32.ReadFile + ReadFile.argtypes = [ + wintypes.HANDLE, # hFile + wintypes.LPVOID, # lpBuffer + wintypes.DWORD, # nNumberOfBytesToRead + wintypes.LPDWORD, # lpNumberOfBytesRead + LPOVERLAPPED, # lpOverlapped + ] + ReadFile.restype = wintypes.BOOL + WriteFile = windll.kernel32.WriteFile + WriteFile.argtypes = [ + wintypes.HANDLE, # hFile + wintypes.LPCVOID, # lpBuffer + wintypes.DWORD, # nNumberOfBytesToWrite + wintypes.LPDWORD, # lpNumberOfBytesWritten + LPOVERLAPPED, # lpOverlapped + ] + WriteFile.restype = wintypes.BOOL + CloseHandle = windll.kernel32.CloseHandle - GetLastError = windll.kernel32.GetLastError + CloseHandle.argtypes = [ + wintypes.HANDLE, # hObject + ] + CloseHandle.restype = wintypes.BOOL + + # Defines. + + CREATE_ALWAYS = 2 + CREATE_NEW = 1 + + DIGCF_DEVICEINTERFACE = 0x00000010 + DIGCF_PRESENT = 0x00000002 + + ERROR_INSUFFICIENT_BUFFER = 0x0000007A + ERROR_NO_MORE_ITEMS = 0x00000103 + + FILE_ATTRIBUTE_NORMAL = 0x80 + + FILE_SHARE_READ = 0x00000001 + FILE_SHARE_WRITE = 0x00000002 + + GENERIC_READ = 0x80000000 + GENERIC_WRITE = 0x40000000 INVALID_HANDLE_VALUE = -1 - ERROR_INSUFFICIENT_BUFFER = 122 - class StenographMachine(object): + class StenographMachine: + def __init__(self): - self._usb_device = HANDLE(0) - self._read_buffer = create_string_buffer(MAX_READ + StenoPacket.HEADER_SIZE) + self._usb_device = INVALID_HANDLE_VALUE + self._read_buffer = ctypes.create_string_buffer(MAX_READ + StenoPacket.HEADER_SIZE) @staticmethod def _open_device_instance(device_info, guid): - dev_interface_data = DeviceInterfaceData() - dev_interface_data.cbSize = sizeof(dev_interface_data) + dev_interface_data = SP_DEVICE_INTERFACE_DATA() + dev_interface_data.cbSize = ctypes.sizeof(SP_DEVICE_INTERFACE_DATA) - status = SetupDiEnumDeviceInterfaces( - device_info, None, guid.bytes, 0, byref(dev_interface_data)) - if status == 0: - log.debug('status is zero') + if not SetupDiEnumDeviceInterfaces( + device_info, None, ctypes.byref(guid), + 0, ctypes.byref(dev_interface_data) + ): + if ctypes.GetLastError() != ERROR_NO_MORE_ITEMS: + log.error('SetupDiEnumDeviceInterfaces: %s', ctypes.WinError()) return INVALID_HANDLE_VALUE - request_length = DWORD(0) - # Call with None to see how big a buffer we need for detail data. - SetupDiGetInterfaceDeviceDetail( + request_length = wintypes.DWORD(0) + status = SetupDiGetDeviceInterfaceDetail( device_info, - byref(dev_interface_data), + ctypes.byref(dev_interface_data), + # Call with (None, 0) to see how big a buffer is needed. + None, 0, + ctypes.pointer(request_length), None, - 0, - pointer(request_length), - None ) - err = GetLastError() - if err != ERROR_INSUFFICIENT_BUFFER: - log.debug('last error not insufficient buffer') + if status or ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: + log.debug('last error not insufficient buffer: %s', ctypes.WinError()) return INVALID_HANDLE_VALUE - characters = request_length.value - - class DeviceDetailData(Structure): - _fields_ = [('cbSize', DWORD), - ('DevicePath', c_char * characters)] - - dev_detail_data = DeviceDetailData() - dev_detail_data.cbSize = 5 + dev_detail_data_buffer = ctypes.create_string_buffer(request_length.value) + dev_detail_data_ptr = ctypes.cast(dev_detail_data_buffer, PSP_DEVICE_INTERFACE_DETAIL_DATA_A) + dev_detail_data_ptr[0].cbSize = ctypes.sizeof(SP_DEVICE_INTERFACE_DETAIL_DATA_A) # Now put the actual detail data into the buffer - status = SetupDiGetInterfaceDeviceDetail( - device_info, byref(dev_interface_data), byref(dev_detail_data), - characters, pointer(request_length), None - ) - if not status: - log.debug('not status') + if not SetupDiGetDeviceInterfaceDetail( + device_info, + ctypes.byref(dev_interface_data), + dev_detail_data_ptr, + ctypes.sizeof(dev_detail_data_buffer), + None, + None, + ): + log.error('SetupDiGetDeviceInterfaceDetail: %s', ctypes.WinError()) return INVALID_HANDLE_VALUE - log.debug('okay, creating file') - return CreateFile( - dev_detail_data.DevicePath, - 0xC0000000, 0x3, 0, 0x3, 0x80, 0 - ) + + device_path = dev_detail_data_ptr[0].DevicePath + + log.debug('okay, creating file, device path: %s', device_path) + + handle = CreateFile(device_path, + GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE, + None, + CREATE_ALWAYS | CREATE_NEW, + FILE_ATTRIBUTE_NORMAL, + None) + if handle == INVALID_HANDLE_VALUE: + log.error('CreateFile: %s', ctypes.WinError()) + return handle @staticmethod def _open_device_by_class_interface_and_instance(class_guid): - device_info = SetupDiGetClassDevs(class_guid.bytes, 0, 0, 0x12) + device_info = SetupDiGetClassDevs(ctypes.byref(class_guid), None, None, + DIGCF_DEVICEINTERFACE | DIGCF_PRESENT) if device_info == INVALID_HANDLE_VALUE: - log.debug('dev info is invalid handle') + log.error('SetupDiGetClassDevs: %s', ctypes.WinError()) return INVALID_HANDLE_VALUE - - usb_device = StenographMachine._open_device_instance( - device_info, class_guid) + usb_device = StenographMachine._open_device_instance(device_info, class_guid) + if not SetupDiDestroyDeviceInfoList(device_info): + log.error('SetupDiDestroyDeviceInfoList: %s', ctypes.WinError()) return usb_device def _usb_write_packet(self, request): - bytes_written = DWORD(0) + bytes_written = wintypes.DWORD(0) request_packet = request.pack() - WriteFile( - self._usb_device, - request_packet, - StenoPacket.HEADER_SIZE + request.data_length, - byref(bytes_written), - None - ) + if not WriteFile(self._usb_device, + request_packet, + StenoPacket.HEADER_SIZE + request.data_length, + ctypes.byref(bytes_written), + None): + log.error('WriteFile: %s', ctypes.WinError()) + return 0 return bytes_written.value def _usb_read_packet(self): - bytes_read = DWORD(0) - ReadFile( - self._usb_device, - byref(self._read_buffer), - MAX_READ + StenoPacket.HEADER_SIZE, - byref(bytes_read), - None - ) + bytes_read = wintypes.DWORD(0) + if not ReadFile(self._usb_device, + self._read_buffer, + MAX_READ + StenoPacket.HEADER_SIZE, + ctypes.byref(bytes_read), + None): + log.error('ReadFile: %s', ctypes.WinError()) + return None # Return None if not enough data was read. if bytes_read.value < StenoPacket.HEADER_SIZE: + log.error('ReadFile: short read, %u < %u', + bytes_read.value, StenoPacket.HEADER_SIZE) return None - writer_packet = StenoPacket.unpack(self._read_buffer) return writer_packet def disconnect(self): - CloseHandle(self._usb_device) + if not CloseHandle(self._usb_device): + log.error('CloseHandle: %s', ctypes.WinError()) self._usb_device = INVALID_HANDLE_VALUE def connect(self): # If already connected, disconnect first. if self._usb_device != INVALID_HANDLE_VALUE: self.disconnect() - self._usb_device = ( - self._open_device_by_class_interface_and_instance( - USB_WRITER_GUID)) + self._usb_device = self._open_device_by_class_interface_and_instance(USB_WRITER_GUID) return self._usb_device != INVALID_HANDLE_VALUE def send_receive(self, request): @@ -379,13 +483,15 @@ if sys.platform.startswith('win32'): return None writer_packet = self._usb_read_packet() return writer_packet + else: + from usb import core, util class StenographMachine(AbstractStenographMachine): def __init__(self): - super(StenographMachine, self).__init__() + super().__init__() self._usb_device = None self._endpoint_in = None self._endpoint_out = None @@ -403,11 +509,7 @@ else: return self._connected # Copy the default configuration. - try: - usb_device.set_configuration() - except core.USBError as e: - log.warning('Error connecting: %s', e) - self._error() + usb_device.set_configuration() config = usb_device.get_active_configuration() interface = config[(0, 0)] @@ -461,27 +563,22 @@ else: class ProtocolViolationException(Exception): """The writer did something unexpected""" - pass class UnableToPerformRequestException(Exception): """The writer cannot perform the action requested""" - pass class FileNotAvailableException(Exception): """The writer cannot read from the current file""" - pass class NoRealtimeFileException(Exception): """The realtime file doesn't exist, likely because the user hasn't started writing""" - pass class FinishedReadingClosedFileException(Exception): """The closed file being read is complete and cannot be read further""" - pass class Stenograph(ThreadedStenotypeBase): @@ -496,7 +593,7 @@ class Stenograph(ThreadedStenotypeBase): KEYMAP_MACHINE_TYPE = 'Stentura' def __init__(self, params): - super(Stenograph, self).__init__() + super().__init__() self._machine = StenographMachine() def _on_stroke(self, keys): @@ -507,7 +604,7 @@ class Stenograph(ThreadedStenotypeBase): def start_capture(self): self.finished.clear() self._initializing() - """Begin listening for output from the stenotype machine.""" + # Begin listening for output from the stenotype machine. if not self._connect_machine(): log.warning('Stenograph machine is not connected') self._error() @@ -516,17 +613,12 @@ class Stenograph(ThreadedStenotypeBase): self.start() def _connect_machine(self): - connected = False try: - connected = self._machine.connect() - except ValueError: - log.warning('Libusb must be installed.') + return self._machine.connect() + except Exception: + log.warning('Error connecting', exc_info=True) self._error() - except AssertionError as e: - log.warning('Error connecting: %s', e) - self._error() - finally: - return connected + return False def _reconnect(self): self._initializing() @@ -542,29 +634,30 @@ class Stenograph(ThreadedStenotypeBase): response = self._machine.send_receive(request) log.debug('Response from Stenograph: %s', response) if response is None: - """No response implies device connection issue.""" + # No response implies device connection issue. raise IOError() - elif response.packet_id == StenoPacket.ID_ERROR: - """Writer may reply with an error packet""" + if response.packet_id == StenoPacket.ID_ERROR: + # Writer may reply with an error packet. error_number = response.p1 if error_number == 3: raise UnableToPerformRequestException() - elif error_number == 7: + if error_number == 7: raise FileNotAvailableException() - elif error_number == 8: + if error_number == 8: raise NoRealtimeFileException() - elif error_number == 9: + if error_number == 9: raise FinishedReadingClosedFileException() - else: - """Writer has returned a packet""" - if (response.packet_id != request.packet_id - or response.sequence_number != request.sequence_number): - raise ProtocolViolationException() - return response + raise RuntimeError('unknown response error: %u' % error_number) + # Writer has returned a packet. + if (response.packet_id != request.packet_id + or response.sequence_number != request.sequence_number): + raise ProtocolViolationException() + return response def run(self): - class ReadState(object): + class ReadState: + def __init__(self): self.realtime = False # Not realtime until we get a 0-length response self.realtime_file_open = False # We are reading from a file @@ -585,7 +678,7 @@ class Stenograph(ThreadedStenotypeBase): StenoPacket.make_read_request(file_offset=state.offset) ) except IOError as e: - log.warning(u'Stenograph machine disconnected, reconnecting…') + log.warning('Stenograph machine disconnected, reconnecting…') log.debug('Stenograph exception: %s', e) # User could start a new file while disconnected. state.reset() @@ -593,15 +686,19 @@ class Stenograph(ThreadedStenotypeBase): log.warning('Stenograph reconnected.') self._ready() except NoRealtimeFileException: + log.debug('NoRealtimeFileException') # User hasn't started writing, just keep opening the realtime file state.reset() except FinishedReadingClosedFileException: + log.debug('FinishedReadingClosedFileException') # File closed! Open the realtime file. state.reset() else: + log.debug('response length: %u', response.data_length) if response.data_length: state.offset += response.data_length elif not state.realtime: + log.debug('state realtime') state.realtime = True if response.data_length and state.realtime: for stroke in response.strokes(): @@ -611,6 +708,6 @@ class Stenograph(ThreadedStenotypeBase): def stop_capture(self): """Stop listening for output from the stenotype machine.""" - super(Stenograph, self).stop_capture() + super().stop_capture() self._machine = None self._stopped()