Merge pull request #2 from benoit-pierre/fix_win_py38

Fix 64bits Windows support
This commit is contained in:
Ted Morin 2021-10-09 17:48:13 -04:00 committed by GitHub
commit 3d2ea4ef69
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -73,7 +73,7 @@ VENDOR_ID = 0x112b
MAX_READ = 0x200 # Arbitrary read limit
class StenoPacket(object):
class StenoPacket:
"""
Stenograph StenoPacket helper
@ -210,7 +210,7 @@ class StenoPacket(object):
return strokes
class AbstractStenographMachine(object):
class AbstractStenographMachine:
"""Simple interface to connect with and send data to a Stenograph machine"""
def connect(self) -> bool:
@ -225,150 +225,254 @@ class AbstractStenographMachine(object):
"""Send a StenoPacket to the machine and return the response or None"""
raise NotImplementedError('send_receive() is not implemented')
if sys.platform.startswith('win32'):
from ctypes import (
Structure,
POINTER,
c_ulonglong,
windll,
create_string_buffer,
sizeof,
byref,
pointer,
c_char,
)
from ctypes.wintypes import DWORD, HANDLE, BYTE
# For Windows we directly call Windows API functions.
from ctypes import windll, wintypes
import ctypes
import uuid
# Class GUID for Stenograph USB Writer
USB_WRITER_GUID = uuid.UUID('{c5682e20-8059-604a-b761-77c4de9d5dbf}')
GUID = wintypes.BYTE * 16
HDEVINFO = wintypes.HANDLE
class DeviceInterfaceData(Structure):
# Stubs.
LPOVERLAPPED = wintypes.LPVOID
LPSECURITY_ATTRIBUTES = wintypes.LPVOID
PSP_DEVINFO_DATA = wintypes.LPVOID
# Class GUID for Stenograph USB Writer.
USB_WRITER_GUID = GUID(*uuid.UUID('{c5682e20-8059-604a-b761-77c4de9d5dbf}').bytes)
class SP_DEVICE_INTERFACE_DATA(ctypes.Structure):
_fields_ = [
('cbSize', DWORD),
('InterfaceClassGuid', BYTE * 16),
('Flags', DWORD),
('Reserved', POINTER(c_ulonglong))
('cbSize', wintypes.DWORD),
('InterfaceClassGuid', GUID),
('Flags', wintypes.DWORD),
('Reserved', wintypes.PULONG),
]
PSP_DEVICE_INTERFACE_DATA = ctypes.POINTER(SP_DEVICE_INTERFACE_DATA)
class SP_DEVICE_INTERFACE_DETAIL_DATA_A(ctypes.Structure):
_fields_ = [
('cbSize', wintypes.DWORD),
('_DevicePath', wintypes.CHAR * 1),
]
@property
def DevicePath(self):
return ctypes.string_at(ctypes.byref(self, ctypes.sizeof(wintypes.DWORD)))
PSP_DEVICE_INTERFACE_DETAIL_DATA_A = ctypes.POINTER(SP_DEVICE_INTERFACE_DETAIL_DATA_A)
# For Windows we directly call Windows API functions
SetupDiGetClassDevs = windll.setupapi.SetupDiGetClassDevsA
SetupDiGetClassDevs.argtypes = [
ctypes.POINTER(GUID), # ClassGuid
wintypes.LPCWSTR, # Enumerator
wintypes.HWND, # hwndParent
wintypes.DWORD, # Flags
]
SetupDiGetClassDevs.restype = HDEVINFO
SetupDiDestroyDeviceInfoList = windll.setupapi.SetupDiDestroyDeviceInfoList
SetupDiDestroyDeviceInfoList.argtypes = [
HDEVINFO, # DeviceInfoSet
]
SetupDiDestroyDeviceInfoList.restype = wintypes.BOOL
SetupDiEnumDeviceInterfaces = windll.setupapi.SetupDiEnumDeviceInterfaces
SetupDiGetInterfaceDeviceDetail = (
windll.setupapi.SetupDiGetDeviceInterfaceDetailA)
SetupDiEnumDeviceInterfaces.argtypes = [
HDEVINFO, # DeviceInfoSet
PSP_DEVINFO_DATA, # DeviceInfoData
ctypes.POINTER(GUID), # InterfaceClassGuid
wintypes.DWORD, # MemberIndex
PSP_DEVICE_INTERFACE_DATA, # DeviceInterfaceData
]
SetupDiEnumDeviceInterfaces.restype = wintypes.BOOL
SetupDiGetDeviceInterfaceDetail = windll.setupapi.SetupDiGetDeviceInterfaceDetailA
SetupDiGetDeviceInterfaceDetail.argtypes = [
HDEVINFO, # DeviceInfoSet
PSP_DEVICE_INTERFACE_DATA, # DeviceInterfaceData
PSP_DEVICE_INTERFACE_DETAIL_DATA_A, # DeviceInterfaceDetailData
wintypes.DWORD, # DeviceInterfaceDetailDataSize
wintypes.PDWORD, # RequiredSize
PSP_DEVINFO_DATA, # DeviceInfoData
]
SetupDiGetDeviceInterfaceDetail.restype = wintypes.BOOL
CreateFile = windll.kernel32.CreateFileA
CreateFile.argtypes = [
wintypes.LPCSTR, # lpFileName
wintypes.DWORD, # dwDesiredAccess
wintypes.DWORD, # dwShareMode
LPSECURITY_ATTRIBUTES, # lpSecurityAttributes
wintypes.DWORD, # dwCreationDisposition
wintypes.DWORD, # dwFlagsAndAttributes
wintypes.HANDLE, # hTemplateFile
]
CreateFile.restype = wintypes.HANDLE
ReadFile = windll.kernel32.ReadFile
ReadFile.argtypes = [
wintypes.HANDLE, # hFile
wintypes.LPVOID, # lpBuffer
wintypes.DWORD, # nNumberOfBytesToRead
wintypes.LPDWORD, # lpNumberOfBytesRead
LPOVERLAPPED, # lpOverlapped
]
ReadFile.restype = wintypes.BOOL
WriteFile = windll.kernel32.WriteFile
WriteFile.argtypes = [
wintypes.HANDLE, # hFile
wintypes.LPCVOID, # lpBuffer
wintypes.DWORD, # nNumberOfBytesToWrite
wintypes.LPDWORD, # lpNumberOfBytesWritten
LPOVERLAPPED, # lpOverlapped
]
WriteFile.restype = wintypes.BOOL
CloseHandle = windll.kernel32.CloseHandle
GetLastError = windll.kernel32.GetLastError
CloseHandle.argtypes = [
wintypes.HANDLE, # hObject
]
CloseHandle.restype = wintypes.BOOL
# Defines.
CREATE_ALWAYS = 2
CREATE_NEW = 1
DIGCF_DEVICEINTERFACE = 0x00000010
DIGCF_PRESENT = 0x00000002
ERROR_INSUFFICIENT_BUFFER = 0x0000007A
ERROR_NO_MORE_ITEMS = 0x00000103
FILE_ATTRIBUTE_NORMAL = 0x80
FILE_SHARE_READ = 0x00000001
FILE_SHARE_WRITE = 0x00000002
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
INVALID_HANDLE_VALUE = -1
ERROR_INSUFFICIENT_BUFFER = 122
class StenographMachine(object):
class StenographMachine:
def __init__(self):
self._usb_device = HANDLE(0)
self._read_buffer = create_string_buffer(MAX_READ + StenoPacket.HEADER_SIZE)
self._usb_device = INVALID_HANDLE_VALUE
self._read_buffer = ctypes.create_string_buffer(MAX_READ + StenoPacket.HEADER_SIZE)
@staticmethod
def _open_device_instance(device_info, guid):
dev_interface_data = DeviceInterfaceData()
dev_interface_data.cbSize = sizeof(dev_interface_data)
dev_interface_data = SP_DEVICE_INTERFACE_DATA()
dev_interface_data.cbSize = ctypes.sizeof(SP_DEVICE_INTERFACE_DATA)
status = SetupDiEnumDeviceInterfaces(
device_info, None, guid.bytes, 0, byref(dev_interface_data))
if status == 0:
log.debug('status is zero')
if not SetupDiEnumDeviceInterfaces(
device_info, None, ctypes.byref(guid),
0, ctypes.byref(dev_interface_data)
):
if ctypes.GetLastError() != ERROR_NO_MORE_ITEMS:
log.error('SetupDiEnumDeviceInterfaces: %s', ctypes.WinError())
return INVALID_HANDLE_VALUE
request_length = DWORD(0)
# Call with None to see how big a buffer we need for detail data.
SetupDiGetInterfaceDeviceDetail(
request_length = wintypes.DWORD(0)
status = SetupDiGetDeviceInterfaceDetail(
device_info,
byref(dev_interface_data),
ctypes.byref(dev_interface_data),
# Call with (None, 0) to see how big a buffer is needed.
None, 0,
ctypes.pointer(request_length),
None,
0,
pointer(request_length),
None
)
err = GetLastError()
if err != ERROR_INSUFFICIENT_BUFFER:
log.debug('last error not insufficient buffer')
if status or ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
log.debug('last error not insufficient buffer: %s', ctypes.WinError())
return INVALID_HANDLE_VALUE
characters = request_length.value
class DeviceDetailData(Structure):
_fields_ = [('cbSize', DWORD),
('DevicePath', c_char * characters)]
dev_detail_data = DeviceDetailData()
dev_detail_data.cbSize = 5
dev_detail_data_buffer = ctypes.create_string_buffer(request_length.value)
dev_detail_data_ptr = ctypes.cast(dev_detail_data_buffer, PSP_DEVICE_INTERFACE_DETAIL_DATA_A)
dev_detail_data_ptr[0].cbSize = ctypes.sizeof(SP_DEVICE_INTERFACE_DETAIL_DATA_A)
# Now put the actual detail data into the buffer
status = SetupDiGetInterfaceDeviceDetail(
device_info, byref(dev_interface_data), byref(dev_detail_data),
characters, pointer(request_length), None
)
if not status:
log.debug('not status')
if not SetupDiGetDeviceInterfaceDetail(
device_info,
ctypes.byref(dev_interface_data),
dev_detail_data_ptr,
ctypes.sizeof(dev_detail_data_buffer),
None,
None,
):
log.error('SetupDiGetDeviceInterfaceDetail: %s', ctypes.WinError())
return INVALID_HANDLE_VALUE
log.debug('okay, creating file')
return CreateFile(
dev_detail_data.DevicePath,
0xC0000000, 0x3, 0, 0x3, 0x80, 0
)
device_path = dev_detail_data_ptr[0].DevicePath
log.debug('okay, creating file, device path: %s', device_path)
handle = CreateFile(device_path,
GENERIC_READ | GENERIC_WRITE,
FILE_SHARE_READ | FILE_SHARE_WRITE,
None,
CREATE_ALWAYS | CREATE_NEW,
FILE_ATTRIBUTE_NORMAL,
None)
if handle == INVALID_HANDLE_VALUE:
log.error('CreateFile: %s', ctypes.WinError())
return handle
@staticmethod
def _open_device_by_class_interface_and_instance(class_guid):
device_info = SetupDiGetClassDevs(class_guid.bytes, 0, 0, 0x12)
device_info = SetupDiGetClassDevs(ctypes.byref(class_guid), None, None,
DIGCF_DEVICEINTERFACE | DIGCF_PRESENT)
if device_info == INVALID_HANDLE_VALUE:
log.debug('dev info is invalid handle')
log.error('SetupDiGetClassDevs: %s', ctypes.WinError())
return INVALID_HANDLE_VALUE
usb_device = StenographMachine._open_device_instance(
device_info, class_guid)
usb_device = StenographMachine._open_device_instance(device_info, class_guid)
if not SetupDiDestroyDeviceInfoList(device_info):
log.error('SetupDiDestroyDeviceInfoList: %s', ctypes.WinError())
return usb_device
def _usb_write_packet(self, request):
bytes_written = DWORD(0)
bytes_written = wintypes.DWORD(0)
request_packet = request.pack()
WriteFile(
self._usb_device,
request_packet,
StenoPacket.HEADER_SIZE + request.data_length,
byref(bytes_written),
None
)
if not WriteFile(self._usb_device,
request_packet,
StenoPacket.HEADER_SIZE + request.data_length,
ctypes.byref(bytes_written),
None):
log.error('WriteFile: %s', ctypes.WinError())
return 0
return bytes_written.value
def _usb_read_packet(self):
bytes_read = DWORD(0)
ReadFile(
self._usb_device,
byref(self._read_buffer),
MAX_READ + StenoPacket.HEADER_SIZE,
byref(bytes_read),
None
)
bytes_read = wintypes.DWORD(0)
if not ReadFile(self._usb_device,
self._read_buffer,
MAX_READ + StenoPacket.HEADER_SIZE,
ctypes.byref(bytes_read),
None):
log.error('ReadFile: %s', ctypes.WinError())
return None
# Return None if not enough data was read.
if bytes_read.value < StenoPacket.HEADER_SIZE:
log.error('ReadFile: short read, %u < %u',
bytes_read.value, StenoPacket.HEADER_SIZE)
return None
writer_packet = StenoPacket.unpack(self._read_buffer)
return writer_packet
def disconnect(self):
CloseHandle(self._usb_device)
if not CloseHandle(self._usb_device):
log.error('CloseHandle: %s', ctypes.WinError())
self._usb_device = INVALID_HANDLE_VALUE
def connect(self):
# If already connected, disconnect first.
if self._usb_device != INVALID_HANDLE_VALUE:
self.disconnect()
self._usb_device = (
self._open_device_by_class_interface_and_instance(
USB_WRITER_GUID))
self._usb_device = self._open_device_by_class_interface_and_instance(USB_WRITER_GUID)
return self._usb_device != INVALID_HANDLE_VALUE
def send_receive(self, request):
@ -379,13 +483,15 @@ if sys.platform.startswith('win32'):
return None
writer_packet = self._usb_read_packet()
return writer_packet
else:
from usb import core, util
class StenographMachine(AbstractStenographMachine):
def __init__(self):
super(StenographMachine, self).__init__()
super().__init__()
self._usb_device = None
self._endpoint_in = None
self._endpoint_out = None
@ -403,11 +509,7 @@ else:
return self._connected
# Copy the default configuration.
try:
usb_device.set_configuration()
except core.USBError as e:
log.warning('Error connecting: %s', e)
self._error()
usb_device.set_configuration()
config = usb_device.get_active_configuration()
interface = config[(0, 0)]
@ -461,27 +563,22 @@ else:
class ProtocolViolationException(Exception):
"""The writer did something unexpected"""
pass
class UnableToPerformRequestException(Exception):
"""The writer cannot perform the action requested"""
pass
class FileNotAvailableException(Exception):
"""The writer cannot read from the current file"""
pass
class NoRealtimeFileException(Exception):
"""The realtime file doesn't exist, likely because the user hasn't started writing"""
pass
class FinishedReadingClosedFileException(Exception):
"""The closed file being read is complete and cannot be read further"""
pass
class Stenograph(ThreadedStenotypeBase):
@ -496,7 +593,7 @@ class Stenograph(ThreadedStenotypeBase):
KEYMAP_MACHINE_TYPE = 'Stentura'
def __init__(self, params):
super(Stenograph, self).__init__()
super().__init__()
self._machine = StenographMachine()
def _on_stroke(self, keys):
@ -507,7 +604,7 @@ class Stenograph(ThreadedStenotypeBase):
def start_capture(self):
self.finished.clear()
self._initializing()
"""Begin listening for output from the stenotype machine."""
# Begin listening for output from the stenotype machine.
if not self._connect_machine():
log.warning('Stenograph machine is not connected')
self._error()
@ -516,17 +613,12 @@ class Stenograph(ThreadedStenotypeBase):
self.start()
def _connect_machine(self):
connected = False
try:
connected = self._machine.connect()
except ValueError:
log.warning('Libusb must be installed.')
return self._machine.connect()
except Exception:
log.warning('Error connecting', exc_info=True)
self._error()
except AssertionError as e:
log.warning('Error connecting: %s', e)
self._error()
finally:
return connected
return False
def _reconnect(self):
self._initializing()
@ -542,29 +634,30 @@ class Stenograph(ThreadedStenotypeBase):
response = self._machine.send_receive(request)
log.debug('Response from Stenograph: %s', response)
if response is None:
"""No response implies device connection issue."""
# No response implies device connection issue.
raise IOError()
elif response.packet_id == StenoPacket.ID_ERROR:
"""Writer may reply with an error packet"""
if response.packet_id == StenoPacket.ID_ERROR:
# Writer may reply with an error packet.
error_number = response.p1
if error_number == 3:
raise UnableToPerformRequestException()
elif error_number == 7:
if error_number == 7:
raise FileNotAvailableException()
elif error_number == 8:
if error_number == 8:
raise NoRealtimeFileException()
elif error_number == 9:
if error_number == 9:
raise FinishedReadingClosedFileException()
else:
"""Writer has returned a packet"""
if (response.packet_id != request.packet_id
or response.sequence_number != request.sequence_number):
raise ProtocolViolationException()
return response
raise RuntimeError('unknown response error: %u' % error_number)
# Writer has returned a packet.
if (response.packet_id != request.packet_id
or response.sequence_number != request.sequence_number):
raise ProtocolViolationException()
return response
def run(self):
class ReadState(object):
class ReadState:
def __init__(self):
self.realtime = False # Not realtime until we get a 0-length response
self.realtime_file_open = False # We are reading from a file
@ -585,7 +678,7 @@ class Stenograph(ThreadedStenotypeBase):
StenoPacket.make_read_request(file_offset=state.offset)
)
except IOError as e:
log.warning(u'Stenograph machine disconnected, reconnecting…')
log.warning('Stenograph machine disconnected, reconnecting…')
log.debug('Stenograph exception: %s', e)
# User could start a new file while disconnected.
state.reset()
@ -593,15 +686,19 @@ class Stenograph(ThreadedStenotypeBase):
log.warning('Stenograph reconnected.')
self._ready()
except NoRealtimeFileException:
log.debug('NoRealtimeFileException')
# User hasn't started writing, just keep opening the realtime file
state.reset()
except FinishedReadingClosedFileException:
log.debug('FinishedReadingClosedFileException')
# File closed! Open the realtime file.
state.reset()
else:
log.debug('response length: %u', response.data_length)
if response.data_length:
state.offset += response.data_length
elif not state.realtime:
log.debug('state realtime')
state.realtime = True
if response.data_length and state.realtime:
for stroke in response.strokes():
@ -611,6 +708,6 @@ class Stenograph(ThreadedStenotypeBase):
def stop_capture(self):
"""Stop listening for output from the stenotype machine."""
super(Stenograph, self).stop_capture()
super().stop_capture()
self._machine = None
self._stopped()