Refactor code

This commit is contained in:
Ted Morin 2018-02-26 22:20:17 -05:00
commit 4d3017912e
3 changed files with 388 additions and 272 deletions

3
.gitignore vendored
View file

@ -1,3 +1,4 @@
/*.egg-info/
/build/
/dist/
/dist/
__pycache__

View file

@ -1,61 +1,241 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Ted Morin & Keith McCready
# See LICENSE.txt for details.
"Thread-based monitoring of the stenograph machine."
from itertools import compress
from struct import *
from time import sleep
import sys
from time import sleep
from plover import log
from typing import Optional
from more_itertools import grouper
from plover.machine.base import ThreadedStenotypeBase
from plover import log
'''
Packet format:
--------------
Name: Size: Value Range:
----------------------------------
Sync 2 bytes "SG"
Sequence # 4 bytes 0 - 0xFFFFFFFF (will wrap from 0xFFFFFFFF to 0x00000000)
Packet ID 2 bytes As Defined (used as packet type)
Data Length 4 bytes 0 - size (limited in most writers to 65536 bytes)
Parameter 1 4 bytes As Defined
Parameter 2 4 bytes As Defined
Parameter 3 4 bytes As Defined
Parameter 4 4 bytes As Defined
Parameter 5 4 bytes As Defined
Command 0x13 Read Bytes
-----------------------
Request (from PC)
Description Packet ID Data Length Param 1 Param 2 Param 3 Param 4 Param 5
------------------------------------------------------------------------------------------------------
Read Bytes 0x0013 00000000 File Offset Byte Count 00000000 00000000 00000000
-Parameter 1 contains the file offset from which the Mira should start returning bytes (or stroke number * 8 since there are 8 bytes returned per stroke (see details of Response))
-Parameter 2 contains the maximum number of bytes the Host wants the Mira to send in response to this request
-The Mira will respond to this packet with a successful Read Bytes packet or an Error packet.
Response (from Mira)
Description Packet ID Data Length Param 1 Param 2 Param 3 Param 4 Param 5
------------------------------------------------------------------------------------------------------
Read Bytes 0x0013 Number of Bytes File Offset 00000000 00000000 00000000 00000000
-Parameter 1 contains the file offset from which the Mira is returning bytes
-For real-time the data is four bytes of steno and 4 bytes of timestamp - 8 bytes per stroke - repeating for the number of strokes returned.
The format of the eight bytes will be:
-Byte 0: 11^#STKP
-Byte 1: 11WHRAO*
-Byte 2: 11EUFRPB
-Byte 3: 11LGTSDZ
-Bytes 4-7: 'timestamp'
-The steno is in the (very) old SmartWriter format where the top two bits of each of the four bytes are set to 1 and the bottom 6 bits as set according to the keys pressed.
-If the Data Length is zero that indicates there are no more bytes available (real-time).
-If the file has been closed (on the writer) an Error packet (error: FileClosed) will be sent in response to Read Bytes.
Description Packet ID Data Length Param 1 Param 2 Param 3 Param 4 Param 5
------------------------------------------------------------------------------------------------------
Open File 0x0012 Number of Bytes Disk ID
- Parameter 1 is the disk ID that the file you wish to open is on (disk A for all intents and purposes)
- Data is the filename, probably 'REALTIME.000'
'''
# ^ is the "stenomark"
STENO_KEY_CHART = (('^', '#', 'S-', 'T-', 'K-', 'P-'),
('W-', 'H-', 'R-', 'A-', 'O-', '*'),
('-E', '-U', '-F', '-R', '-P', '-B'),
('-L', '-G', '-T', '-S', '-D', '-Z'),
)
STENO_KEY_CHART = (
('^', '#', 'S-', 'T-', 'K-', 'P-'),
('W-', 'H-', 'R-', 'A-', 'O-', '*'),
('-E', '-U', '-F', '-R', '-P', '-B'),
('-L', '-G', '-T', '-S', '-D', '-Z'),
)
VENDOR_ID = 0x112b
MAX_OFFSET = 0xFFFFFFFF
HEADER_BYTES = 32
PACKET_ERROR = 0x06
READ_BYTES = 0x13
MAX_READ = 0x200 # Arbitrary read limit
class StenoPacket(object):
"""
Stenograph StenoPacket helper
Can be used to create packets to send to the writer, as well as
decode a packet from the writer.
"""
_SYNC = b'SG'
"""
Packet header format:
'SG' sequence number packet ID data length p1,p2,p3,p4,p5
2 chars 4 bytes 2 bytes 4 bytes 4 bytes each
"""
_STRUCT_FORMAT = '<2sIH6I'
HEADER_SIZE = calcsize(_STRUCT_FORMAT)
_STRUCT = Struct(_STRUCT_FORMAT)
ID_ERROR = 0x6
ID_OPEN = 0x12
ID_READ = 0x13
sequence_number = 0
def __init__(self, sequence_number=None, packet_id=0, data_length=None,
p1=0, p2=0, p3=0, p4=0, p5=0, data=b''):
"""Create a USB Packet
sequence_number -- ideally unique, if not passed one will be assigned sequentially.
packet_id -- type of packet.
data_length -- length of the additional data, calculated if not provided.
p1, p2, p3, p4, p5 -- 4 byte parameters that have different roles based on packet_id
data -- data to be appended to the end of the packet, used for steno strokes from the writer.
"""
if sequence_number is None:
sequence_number = StenoPacket.sequence_number
StenoPacket._increment_sequence_number()
if data_length is None:
data_length = len(data)
self.sequence_number = sequence_number
self.packet_id = packet_id
self.data_length = data_length
self.p1 = p1
self.p2 = p2
self.p3 = p3
self.p4 = p4
self.p5 = p5
self.data = data
def __str__(self):
return (
'StenoPacket(sequence_number=%s, '
'packet_id=%s, data_length=%s, '
'p1=%s, p2=%s, p3=%s, p4=%s, p5=%s, data=%s)'
% (hex(self.sequence_number), hex(self.packet_id),
self.data_length, hex(self.p1), hex(self.p2),
hex(self.p3), hex(self.p4), hex(self.p5),
self.data[:self.data_length])
)
def pack(self):
"""Convert this USB Packet into something that can be sent to the writer."""
return self._STRUCT.pack(
self._SYNC, self.sequence_number, self.packet_id, self.data_length,
self.p1, self.p2, self.p3, self.p4, self.p5
) + (
pack('%ss' % len(self.data), self.data)
)
@staticmethod
def _increment_sequence_number():
StenoPacket.sequence_number = (StenoPacket.sequence_number + 1) % 0xFFFFFFFF
@staticmethod
def unpack(usb_packet):
"""Create a USBPacket from raw data"""
packet = StenoPacket(
# Drop sync when unpacking.
*StenoPacket._STRUCT.unpack(usb_packet[:StenoPacket.HEADER_SIZE])[1:]
)
if packet.data_length:
packet.data, = unpack(
'%ss' % packet.data_length,
usb_packet[StenoPacket.HEADER_SIZE:StenoPacket.HEADER_SIZE + packet.data_length]
)
return packet
@staticmethod
def make_open_request(file_name=b'REALTIME.000', disk_id=b'A'):
"""Request to open a file on the writer, defaults to the realtime file."""
return StenoPacket(
packet_id=StenoPacket.ID_OPEN,
p1=ord(disk_id),
data=file_name,
)
@staticmethod
def make_read_request(file_offset=1, byte_count=MAX_READ):
"""Request to read from the writer, defaults to settings required when reading from realtime file."""
return StenoPacket(
packet_id=StenoPacket.ID_READ,
p1=file_offset,
p2=byte_count,
)
def strokes(self):
"""Get list of strokes represented in this packet's data"""
# Expecting 8-byte chords (4 bytes of steno, 4 of timestamp.)
assert self.data_length % 8 == 0
# Steno should only be present on ACTION_READ packets
assert self.packet_id == self.ID_READ
strokes = []
for stroke_data in grouper(8, self.data, 0):
stroke = []
# Get 4 bytes of steno, ignore timestamp.
for steno_byte, key_chart_row in zip(stroke_data, STENO_KEY_CHART):
assert steno_byte >= 0b11000000
# Only interested in right 6 values
key_mask = [int(i) for i in bin(steno_byte)[-6:]]
stroke.extend(compress(key_chart_row, key_mask))
if stroke:
strokes.append(stroke)
return strokes
class AbstractStenographMachine(object):
"""Simple interface to connect with and send data to a Stenograph machine"""
def connect(self) -> bool:
"""Connect to machine, returns connection status"""
raise NotImplementedError('connect() is not implemented')
def disconnect(self):
"""Disconnect from the machine"""
raise NotImplementedError('disconnect() is not implemented')
def send_receive(self, request: StenoPacket) -> Optional[StenoPacket]:
"""Send a StenoPacket to the machine and return the response or None"""
raise NotImplementedError('send_receive() is not implemented')
if sys.platform.startswith('win32'):
from ctypes import *
from ctypes.wintypes import DWORD, HANDLE, WORD, BYTE
from ctypes.wintypes import DWORD, HANDLE, BYTE
import uuid
class GUID(Structure):
_fields_ = [("Data1", DWORD),
("Data2", WORD),
("Data3", WORD),
("Data4", BYTE * 8)]
class SP_DEVICE_INTERFACE_DATA(Structure):
_fields_ = [('cbSize',DWORD),
('InterfaceClassGuid', BYTE * 16),
('Flags', DWORD),
('Reserved', POINTER(c_ulonglong))]
class USB_Packet(Structure):
_fields_ = [
# Hack to pack structure correctly (without importing 'struct')
("SyncSeqType", c_ubyte * 8),
("uiDataLen", DWORD),
("uiFileOffset", DWORD),
("uiByteCount", DWORD),
("uiParam3", DWORD),
("uiParam4", DWORD),
("uiParam5", DWORD),
("data", c_ubyte * 1024)
] # limiting data to 1024 bytes (should only ask for 512 at a time)
# Class GUID / UUID for Stenograph USB Writer
# Class GUID for Stenograph USB Writer
USB_WRITER_GUID = uuid.UUID('{c5682e20-8059-604a-b761-77c4de9d5dbf}')
class DeviceInterfaceData(Structure):
_fields_ = [
('cbSize', DWORD),
('InterfaceClassGuid', BYTE * 16),
('Flags', DWORD),
('Reserved', POINTER(c_ulonglong))
]
# For Windows we directly call Windows API functions
SetupDiGetClassDevs = windll.setupapi.SetupDiGetClassDevsA
SetupDiEnumDeviceInterfaces = windll.setupapi.SetupDiEnumDeviceInterfaces
@ -67,30 +247,23 @@ if sys.platform.startswith('win32'):
CloseHandle = windll.kernel32.CloseHandle
GetLastError = windll.kernel32.GetLastError
# USB Writer 'defines'
INVALID_HANDLE_VALUE = -1
ERROR_INSUFFICIENT_BUFFER = 122
USB_NO_RESPONSE = -9
RT_FILE_ENDED_ON_WRITER = -8
class WindowsStenographMachine(object):
class StenographMachine(object):
def __init__(self):
# Allocate memory for sending and receiving USB data
self._host_packet = USB_Packet()
self._writer_packet = USB_Packet()
self._sequence_number = 0
self._connected = False
self._usb_device = HANDLE(0)
self._read_buffer = create_string_buffer(MAX_READ + StenoPacket.HEADER_SIZE)
@staticmethod
def _open_device_instance(device_info, guid):
dev_interface_data = SP_DEVICE_INTERFACE_DATA()
dev_interface_data.cbSize=sizeof(dev_interface_data)
dev_interface_data = DeviceInterfaceData()
dev_interface_data.cbSize = sizeof(dev_interface_data)
status = SetupDiEnumDeviceInterfaces(
device_info, None, guid.bytes, 0, byref(dev_interface_data))
if status == 0:
log.debug('status is zero')
return INVALID_HANDLE_VALUE
request_length = DWORD(0)
@ -103,17 +276,19 @@ if sys.platform.startswith('win32'):
pointer(request_length),
None
)
error = GetLastError()
if error != ERROR_INSUFFICIENT_BUFFER:
err = GetLastError()
if err != ERROR_INSUFFICIENT_BUFFER:
log.debug('last error not insufficient buffer')
return INVALID_HANDLE_VALUE
characters = request_length.value
class PSP_INTERFACE_DEVICE_DETAIL_DATA(Structure):
class DeviceDetailData(Structure):
_fields_ = [('cbSize', DWORD),
('DevicePath', c_char * characters)]
dev_detail_data = PSP_INTERFACE_DEVICE_DETAIL_DATA()
dev_detail_data.cbSize = 5 # DWORD + 4 byte pointer
dev_detail_data = DeviceDetailData()
dev_detail_data.cbSize = 5
# Now put the actual detail data into the buffer
status = SetupDiGetInterfaceDeviceDetail(
@ -121,93 +296,52 @@ if sys.platform.startswith('win32'):
characters, pointer(request_length), None
)
if not status:
log.debug('not status')
return INVALID_HANDLE_VALUE
log.debug('okay, creating file')
return CreateFile(
dev_detail_data.DevicePath,
0xC0000000, 0x3, 0, 0x3, 0x80, 0
)
@staticmethod
def _open_device_by_class_interface_and_instance(classguid):
device_info = SetupDiGetClassDevs(classguid.bytes, 0, 0, 0x12)
def _open_device_by_class_interface_and_instance(class_guid):
device_info = SetupDiGetClassDevs(class_guid.bytes, 0, 0, 0x12)
if device_info == INVALID_HANDLE_VALUE:
log.debug('dev info is invalid handle')
return INVALID_HANDLE_VALUE
usb_device = WindowsStenographMachine._open_device_instance(
device_info, classguid)
usb_device = StenographMachine._open_device_instance(
device_info, class_guid)
return usb_device
def _usb_write_packet(self):
self._host_packet.SyncSeqType[0] = ord('S')
self._host_packet.SyncSeqType[1] = ord('G')
self._host_packet.SyncSeqType[2] = self._sequence_number % 255
self._host_packet.SyncSeqType[6] = READ_BYTES
if self._usb_device == INVALID_HANDLE_VALUE:
return 0
def _usb_write_packet(self, request):
bytes_written = DWORD(0)
request_packet = request.pack()
WriteFile(
self._usb_device,
byref(self._host_packet),
32 + self._host_packet.uiDataLen,
request_packet,
StenoPacket.HEADER_SIZE + request.data_length,
byref(bytes_written),
None
)
return bytes_written.value
def _usb_read_packet(self):
assert self._usb_device != INVALID_HANDLE_VALUE, 'device not open'
bytes_read = DWORD(0)
# Always read this maximum amount.
# The header will tell me how much to pay attention to.
ReadFile(
bytes_read = DWORD(0)
ReadFile(
self._usb_device,
byref(self._writer_packet),
32 + 1024,
byref(self._read_buffer),
MAX_READ + StenoPacket.HEADER_SIZE,
byref(bytes_read),
None
)
if bytes_read.value == 0:
return 0
if bytes_read.value < 32: # returned without a full packet
return 0
return 32 + self._writer_packet.uiDataLen
)
# Return None if not enough data was read.
if bytes_read.value < StenoPacket.HEADER_SIZE:
return None
def _read_steno(self, file_offset):
self._host_packet.SyncSeqType[6] = READ_BYTES
self._host_packet.uiDataLen = 0
self._host_packet.uiParam3 = 0
self._host_packet.uiParam4 = 0
self._host_packet.uiParam5 = 0
self._host_packet.uiFileOffset = file_offset
self._host_packet.uiByteCount = 512
if self._usb_write_packet() == 0:
return USB_NO_RESPONSE
# listen for response
amount_read = self._usb_read_packet()
if amount_read > 0:
amount_read -= HEADER_BYTES
else:
# No bytes means we've probably disconnected.
return USB_NO_RESPONSE
# If the sequence number is not the same it is junk
if (self._writer_packet.SyncSeqType[2] ==
self._host_packet.SyncSeqType[2]):
self._sequence_number += 1
if self._writer_packet.SyncSeqType[6] == READ_BYTES:
return self._writer_packet.uiDataLen
else:
# Could check the error code for more specific errors here
if self._writer_packet.SyncSeqType[6] == PACKET_ERROR:
return RT_FILE_ENDED_ON_WRITER
else:
self._usb_read_packet() # toss out any junk
return USB_NO_RESPONSE
writer_packet = StenoPacket.unpack(self._read_buffer)
return writer_packet
def disconnect(self):
CloseHandle(self._usb_device)
@ -222,77 +356,63 @@ if sys.platform.startswith('win32'):
USB_WRITER_GUID))
return self._usb_device != INVALID_HANDLE_VALUE
def read(self, file_offset):
result = self._read_steno(file_offset)
print('result --', result)
sys.stdout.flush()
if result > 0: # Got one or more steno strokes
print('got something:', result)
sys.stdout.flush()
return self._writer_packet.data[:result]
elif not result:
return []
elif result == RT_FILE_ENDED_ON_WRITER:
raise EOFError(
'No open file on writer, open file and reconnect')
elif result == USB_NO_RESPONSE:
# Prompt a reconnect
raise IOError('No response from Stenograph device')
StenographMachine = WindowsStenographMachine
def send_receive(self, request):
assert self._usb_device != INVALID_HANDLE_VALUE, 'device not open'
written = self._usb_write_packet(request)
if written < StenoPacket.HEADER_SIZE:
# We were not able to write the request.
return None
writer_packet = self._usb_read_packet()
return writer_packet
else:
from usb import core, util
class LibUSBStenographMachine(object):
class StenographMachine(AbstractStenographMachine):
def __init__(self):
super(StenographMachine, self).__init__()
self._usb_device = None
self._endpoint_in = None
self._endpoint_out = None
self._sequence_number = 0
self._packet = bytearray(
[0x53, 0x47, # SG → sync (static)
0, 0, 0, 0, # Sequence number
READ_BYTES, 0, # Action (static)
0, 0, 0, 0, # Data length
0, 0, 0, 0, # File offset
0, 0x02, 0, 0, # Requested byte count (static 512)
0, 0, 0, 0, # Parameter 3
0, 0, 0, 0, # Parameter 4
0, 0, 0, 0, # Parameter 5
]
)
self._connected = False
def connect(self):
"""Attempt to and return connection"""
# Disconnect device if it's already connected.
if self._connected:
self.disconnect()
# Find the device by the vendor ID.
self._usb_device = core.find(idVendor=VENDOR_ID)
if not self._usb_device: # Device not found
usb_device = core.find(idVendor=VENDOR_ID)
if not usb_device: # Device not found
return self._connected
# Copy the default configuration.
self._usb_device.set_configuration()
config = self._usb_device.get_active_configuration()
usb_device.set_configuration()
config = usb_device.get_active_configuration()
interface = config[(0, 0)]
# Get the write endpoint.
self._endpoint_out = util.find_descriptor(
endpoint_out = util.find_descriptor(
interface,
custom_match=lambda e:
util.endpoint_direction(e.bEndpointAddress) ==
util.ENDPOINT_OUT)
assert self._endpoint_out is not None, 'cannot find write endpoint'
util.ENDPOINT_OUT
)
assert endpoint_out is not None, 'cannot find write endpoint'
# Get the read endpoint.
self._endpoint_in = util.find_descriptor(
endpoint_in = util.find_descriptor(
interface,
custom_match=lambda e:
util.endpoint_direction(e.bEndpointAddress) ==
util.ENDPOINT_IN)
assert self._endpoint_in is not None, 'cannot find read endpoint'
util.ENDPOINT_IN
)
assert endpoint_in is not None, 'cannot find read endpoint'
self._usb_device = usb_device
self._endpoint_in = endpoint_in
self._endpoint_out = endpoint_out
self._connected = True
return self._connected
@ -303,34 +423,50 @@ else:
self._endpoint_in = None
self._endpoint_out = None
def read(self, file_offset):
def send_receive(self, request):
assert self._connected, 'cannot read from machine if not connected'
self._sequence_number = (self._sequence_number + 1) % MAX_OFFSET
for i in range(4):
self._packet[2 + i] = self._sequence_number >> 8 * i & 255
for i in range(4):
self._packet[12 + i] = file_offset >> 8 * i & 255
try:
self._endpoint_out.write(self._packet)
response = self._endpoint_in.read(1024, 3000)
self._endpoint_out.write(request.pack())
response = self._endpoint_in.read(
MAX_READ + StenoPacket.HEADER_SIZE, 3000)
except core.USBError:
raise IOError('Machine read or write failed')
return None
else:
if response and len(response) >= HEADER_BYTES:
writer_action = response[6]
if writer_action == PACKET_ERROR:
raise EOFError(
'No open file on writer, open file and reconnect')
elif writer_action == READ_BYTES:
return response[HEADER_BYTES:]
return response
StenographMachine = LibUSBStenographMachine
if response and len(response) >= StenoPacket.HEADER_SIZE:
writer_packet = StenoPacket.unpack(response)
# Ignore data if sequence numbers don't match.
if writer_packet.sequence_number == request.sequence_number:
return writer_packet
return None
class ProtocolViolationException(Exception):
"""The writer did something unexpected"""
pass
class UnableToPerformRequestException(Exception):
"""The writer cannot perform the action requested"""
pass
class FileNotAvailableException(Exception):
"""The writer cannot read from the current file"""
pass
class NoRealtimeFileException(Exception):
"""The realtime file doesn't exist, likely because the user hasn't started writing"""
pass
class FinishedReadingClosedFileException(Exception):
"""The closed file being read is complete and cannot be read further"""
pass
class Stenograph(ThreadedStenotypeBase):
KEYMAP_MACHINE_TYPE = 'Stentura'
KEYS_LAYOUT = '''
# # # # # # # # # #
S- T- P- H- * -F -P -L -T -D
@ -338,38 +474,12 @@ class Stenograph(ThreadedStenotypeBase):
A- O- -E -U
^
'''
KEYMAP_MACHINE_TYPE = 'Stentura'
def __init__(self, params):
super(Stenograph, self).__init__()
self._machine = StenographMachine()
# State variables used while reading steno from machine
"""
Stenograph writers have "files" which are indexed from 0.
Each line is a stroke. When the machine is connected, we
don't know what stroke the stenographer is on. We can't
assume zero because there may already be content from before.
Strategy is to read up the indexes until we reach zero. Then,
we consider ourselves "realtime" and further data should be
interpreted.
"""
self._realtime = False
"""
There is a bug in the stenograph firmware. After the steno
"file" is closed on the machine, we receive a flag to tell us.
However, straight after we start receiving 0 length responses,
which are the same format as what we'd usually use to tell whether
the machine is in realtime or not. If the stenographer opens a file
that has previous content in it, we will then blurt out all that
content.
The strategy here is to take note of the first response that is exactly
8 bytes -- a single stroke -- before beginning output. Combined with the
0 length stroke check, hopefully we will not accidentally read back and
blurt an entire steno file."""
self._read_exactly_8 = False
self._file_offset = 0
def _on_stroke(self, keys):
steno_keys = self.keymap.keys_to_actions(keys)
if steno_keys:
@ -394,7 +504,7 @@ class Stenograph(ThreadedStenotypeBase):
log.warning('Libusb must be installed.')
self._error()
except AssertionError as e:
log.warning('Error connecting: %s', str(e))
log.warning('Error connecting: %s', e)
self._error()
finally:
return connected
@ -407,78 +517,81 @@ class Stenograph(ThreadedStenotypeBase):
connected = self._connect_machine()
return connected
def _reset_state(self):
self._realtime = False
self._read_exactly_8 = False
self._file_offset = 0
def _send_receive(self, request):
"""Send a StenoPacket and return the response or raise exceptions."""
log.debug('Requesting from Stenograph: %s', request)
response = self._machine.send_receive(request)
log.debug('Response from Stenograph: %s', response)
if response is None:
"""No response implies device connection issue."""
raise IOError()
elif response.packet_id == StenoPacket.ID_ERROR:
"""Writer may reply with an error packet"""
error_number = response.p1
if error_number == 3:
raise UnableToPerformRequestException()
elif error_number == 7:
raise FileNotAvailableException()
elif error_number == 8:
raise NoRealtimeFileException()
elif error_number == 9:
raise FinishedReadingClosedFileException()
else:
"""Writer has returned a packet"""
if (response.packet_id != request.packet_id
or response.sequence_number != request.sequence_number):
raise ProtocolViolationException()
return response
def run(self):
self._reset_state()
class ReadState(object):
def __init__(self):
self.realtime = False # Not realtime until we get a 0-length response
self.realtime_file_open = False # We are reading from a file
self.offset = 0 # File offset to read from
def reset(self):
self.__init__()
state = ReadState()
while not self.finished.isSet():
try:
response = self._machine.read(self._file_offset)
if not state.realtime_file_open:
# Open realtime file
self._send_receive(StenoPacket.make_open_request())
state.realtime_file_open = True
response = self._send_receive(
StenoPacket.make_read_request(file_offset=state.offset)
)
except IOError as e:
log.warning(u'Stenograph machine disconnected, reconnecting…')
log.debug('Stenograph exception: %s', str(e))
self._reset_state()
log.debug('Stenograph exception: %s', e)
# User could start a new file while disconnected.
state.reset()
if self._reconnect():
log.warning('Stenograph reconnected.')
self._ready()
except EOFError:
# File ended -- will resume normal operation after new file
self._reset_state()
except NoRealtimeFileException:
# User hasn't started writing, just keep opening the realtime file
state.reset()
except FinishedReadingClosedFileException:
# File closed! Open the realtime file.
state.reset()
else:
if response is None:
continue
if not self._read_exactly_8 and len(response) == 8:
self._read_exactly_8 = True
content = len(response) > 0
self._file_offset += len(response)
if not self._realtime and not content:
self._realtime = True
elif self._realtime and content and self._read_exactly_8:
chords = Stenograph.process_steno_packet(response)
for keys in chords:
if keys:
self._on_stroke(keys)
if response.data_length:
state.offset += response.data_length
elif not state.realtime:
state.realtime = True
if response.data_length and state.realtime:
for stroke in response.strokes():
self._on_stroke(stroke)
self._machine.disconnect()
def stop_capture(self):
"""Stop listening for output from the stenotype machine."""
super(Stenograph, self).stop_capture()
self._machine = None
self._stopped()
@staticmethod
def process_steno_packet(steno):
# Expecting 8 byte chords.
# Bytes 0-3 are steno, 4-7 are timestamp.
chords = []
for chord_index in range(len(steno) // 8):
keys = []
chord = steno[chord_index * 8: chord_index * 8 + 4]
for byte_number, byte in enumerate(chord):
if byte is None:
continue
byte_keys = STENO_KEY_CHART[byte_number]
for i in range(6):
if (byte >> i) & 1:
key = byte_keys[-i + 5]
if key:
keys.append(key)
if keys:
chords.append(keys)
return chords
if __name__ == '__main__':
print(Stenograph.process_steno_packet(
[255, 255, 255, 255, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
None, None, None, None, None, None, None, None,
0b11000000, 0b11000000, 0b11000000, 0b11000000, 255, 255, 255, 255,
0b11000001, 0b11000000, 0b11000000, 0b11000000, 255, 255, 255, 255,
0b11000000, 0b11100001, 0b11000000, 0b11000000, 255, 255, 255, 255,
0b11000001, 0b11000000, 0b11100001, 0b11000000, 255, 255, 255, 255,
0b11100000, 0b11000000, 0b11000000, 0b11100001, 255, 255, 255, 255,
]
))
self._stopped()

View file

@ -25,6 +25,8 @@ tests_require =
mock
install_requires =
plover>=4.0.0.dev6
more_itertools
pyusb
py_modules =
plover_stenograph_usb