1
0
Fork 0
mirror of https://github.com/opentx/opentx.git synced 2025-07-14 20:10:08 +03:00

PEP8 radio/util/sportparse.py

Remove unused import, fix whitespace, indentation, and comments
This commit is contained in:
Sean Vig 2015-11-16 15:55:30 -06:00
parent 0743785126
commit bcc7982b3b

View file

@ -5,173 +5,178 @@
from __future__ import division, print_function from __future__ import division, print_function
import os
import sys import sys
import struct import struct
START_STOP = '\x7e' START_STOP = '\x7e'
BYTESTUFF = '\x7d' BYTESTUFF = '\x7d'
STUFF_MASK = '\x20' STUFF_MASK = '\x20'
DATA_FRAME = 0x10 DATA_FRAME = 0x10
lineNumber = 0 lineNumber = 0
sportDataBuff = "" sportDataBuff = ""
quiet = False quiet = False
def ParseFlVSS(packet, dataId, prim, appId, data, crc): def ParseFlVSS(packet, dataId, prim, appId, data, crc):
# if dataId != 0xa1: # if dataId != 0xa1:
# return # return
print("packet: %s (%4d)" % (dump(packet), lineNumber), end=' ') print("packet: %s (%4d)" % (dump(packet), lineNumber), end=' ')
cells = (data & 0xF0) >> 4 cells = (data & 0xF0) >> 4
battnumber = data & 0xF; battnumber = data & 0xF
voltage1 = ((data & 0x000FFF00) >> 8) // 5 voltage1 = ((data & 0x000FFF00) >> 8) // 5
voltage2 = ((data & 0xFFF00000) >> 20) // 5 voltage2 = ((data & 0xFFF00000) >> 20) // 5
print(" FLVSS: no cells: %d, cell: %d: voltages: %0.2f %0.2f" % (cells, battnumber, voltage1/100., voltage2/100.)) print(" FLVSS: no cells: %d, cell: %d: voltages: %0.2f %0.2f" % (cells, battnumber, voltage1 / 100., voltage2 / 100.))
def ParseRSSI(packet, dataId, prim, appId, data, crc): def ParseRSSI(packet, dataId, prim, appId, data, crc):
print("packet: %s (%4d)" % (dump(packet), lineNumber), end=' ') print("packet: %s (%4d)" % (dump(packet), lineNumber), end=' ')
print(" RSSI: %d" % (data & 0xFF)) print(" RSSI: %d" % (data & 0xFF))
def ParseAdc(packet, dataId, prim, appId, data, crc): def ParseAdc(packet, dataId, prim, appId, data, crc):
print("packet: %s (%4d)" % (dump(packet), lineNumber), end=' ') print("packet: %s (%4d)" % (dump(packet), lineNumber), end=' ')
print(" A%d: %d" % (dataId - 0xf102, data & 0xFF)) print(" A%d: %d" % (dataId - 0xf102, data & 0xFF))
def ParseBatt(packet, dataId, prim, appId, data, crc): def ParseBatt(packet, dataId, prim, appId, data, crc):
print("packet: %s (%4d)" % (dump(packet), lineNumber), end=' ') print("packet: %s (%4d)" % (dump(packet), lineNumber), end=' ')
print(" Batt: %d" % (data & 0xFF)) print(" Batt: %d" % (data & 0xFF))
def ParseSWR(packet, dataId, prim, appId, data, crc): def ParseSWR(packet, dataId, prim, appId, data, crc):
print("packet: %s (%4d)" % (dump(packet), lineNumber), end=' ') print("packet: %s (%4d)" % (dump(packet), lineNumber), end=' ')
print(" SWR: %d" % (data & 0xFF)) print(" SWR: %d" % (data & 0xFF))
def ParseAirSpeed(packet, dataId, prim, appId, data, crc): def ParseAirSpeed(packet, dataId, prim, appId, data, crc):
print "packet: %s (%4d)" % (dump(packet), lineNumber) , print("packet: %s (%4d)" % (dump(packet), lineNumber), end=' ')
print " Aspd: %.1f km/h" % (data/10.0) print(" Aspd: %.1f km/h" % (data / 10.0))
appIdParsers = (
(0x0300, 0x030f, ParseFlVSS), appIdParsers = (
(0xf101, 0xf101, ParseRSSI), (0x0300, 0x030f, ParseFlVSS),
(0xf102, 0xf103, ParseAdc), (0xf101, 0xf101, ParseRSSI),
(0xf104, 0xf104, ParseBatt), (0xf102, 0xf103, ParseAdc),
(0xf105, 0xf105, ParseSWR), (0xf104, 0xf104, ParseBatt),
(0x0a00, 0x0a0f, ParseAirSpeed), (0xf105, 0xf105, ParseSWR),
) )
def dump(data, maxLen = None):
if maxLen and len(data) > maxLen: def dump(data, maxLen=None):
data = data[:maxLen] if maxLen and len(data) > maxLen:
return ":".join("{:02x}".format(ord(c)) for c in data) data = data[:maxLen]
return ":".join("{:02x}".format(ord(c)) for c in data)
def CheckSportCrc(packet): def CheckSportCrc(packet):
return True return True
def ParseSportPacket(packet): def ParseSportPacket(packet):
global lineNumber global lineNumber
(dataId, prim, appId, data, crc) = struct.unpack('<BBHIB', packet) (dataId, prim, appId, data, crc) = struct.unpack('<BBHIB', packet)
#print "dataId:%02x, prim:%02x, appId:%04x, data:%08x, crc:%02x)\n" % (dataId, prim, appId, data, crc) # print "dataId:%02x, prim:%02x, appId:%04x, data:%08x, crc:%02x)\n" % (dataId, prim, appId, data, crc)
if prim != DATA_FRAME: if prim != DATA_FRAME:
print("unknown prim: %02x for packet %s in line %s" % (prim, dump(packet), lineNumber)) print("unknown prim: %02x for packet %s in line %s" % (prim, dump(packet), lineNumber))
#proces according to appId # proces according to appId
for (firstId, lastId, parser) in appIdParsers: for (firstId, lastId, parser) in appIdParsers:
if appId >= firstId and appId <= lastId: if appId >= firstId and appId <= lastId:
parser(packet, dataId, prim, appId, data, crc) parser(packet, dataId, prim, appId, data, crc)
return return
#no parser found # no parser found
if not quiet: if not quiet:
print("\tdataId:%02x, prim:%02x, appId:%04x, data:%08x, crc:%02x)" % (dataId, prim, appId, data, crc)) print("\tdataId:%02x, prim:%02x, appId:%04x, data:%08x, crc:%02x)" % (dataId, prim, appId, data, crc))
print("\tparser for appId %02x not implemented" % appId) print("\tparser for appId %02x not implemented" % appId)
def ParsePacket(packet): def ParsePacket(packet):
global lineNumber global lineNumber
#unstuff packet # unstuff packet
while True: while True:
pos = packet.find(BYTESTUFF) pos = packet.find(BYTESTUFF)
if pos == -1: break if pos == -1:
#print "found stuff at %d in %s" % (pos, dump(packet, 20)) break
#if we have nex char, then unstuff it # print "found stuff at %d in %s" % (pos, dump(packet, 20))
if len(packet) > pos+1: # if we have nex char, then unstuff it
unstuffed = ord(packet[pos]) ^ ord(STUFF_MASK); if len(packet) > pos + 1:
#print "unstuffed data: %02x" % unstuffed unstuffed = ord(packet[pos]) ^ ord(STUFF_MASK)
#buffer[pos] = chr(unstuffed) # print "unstuffed data: %02x" % unstuffed
packet = packet[:pos] + chr(unstuffed) + packet[pos+2:] # buffer[pos] = chr(unstuffed)
#print "buff after unstuff %s" % dump(packet, 20) packet = packet[:pos] + chr(unstuffed) + packet[pos + 2:]
else: # print "buff after unstuff %s" % dump(packet, 20)
#missin data, wait for more data else:
print("unstuff missing data") # missing data, wait for more data
return print("unstuff missing data")
return
#print "packet: %s" % dump(packet, 10) # print "packet: %s" % dump(packet, 10)
if len(packet) == 9: if len(packet) == 9:
# valid sport packet length # valid sport packet length
#print "\npacket: %s @%d" % (dump(packet), lineNumber) # print "\npacket: %s @%d" % (dump(packet), lineNumber)
#check crc # check crc
if not CheckSportCrc(packet): if not CheckSportCrc(packet):
print("error: wrong CRC for packet %s at line %d" % (dump(packet), lineNumber)) print("error: wrong CRC for packet %s at line %d" % (dump(packet), lineNumber))
ParseSportPacket(packet) ParseSportPacket(packet)
else: else:
if len(packet) > 1: if len(packet) > 1:
print("warning: wrong length %s for packet %s at line %d" % (len(packet), dump(packet, 10), lineNumber)) print("warning: wrong length %s for packet %s at line %d" % (len(packet), dump(packet, 10), lineNumber))
def ParseSportData(data): def ParseSportData(data):
global sportDataBuff global sportDataBuff
#convert from hex # convert from hex
parts = data.split(' ') parts = data.split(' ')
binData = [chr(int(hex, 16)) for hex in parts] binData = [chr(int(hex, 16)) for hex in parts]
#print binData # print binData
data = ''.join(binData) data = ''.join(binData)
sportDataBuff += data sportDataBuff += data
#process whole packets # process whole packets
while True: while True:
#find whole frame # find whole frame
# remove all data before START_STOP # remove all data before START_STOP
# find next START_STOP # find next START_STOP
#print "start: %s" % dump(sportDataBuff, 10) # print "start: %s" % dump(sportDataBuff, 10)
posStop = sportDataBuff.find(START_STOP) posStop = sportDataBuff.find(START_STOP)
# process pacet between two START_STOPs # process pacet between two START_STOPs
if posStop > -1: if posStop > -1:
#print "found stop at %d" % posStop # print "found stop at %d" % posStop
ParsePacket(sportDataBuff[:posStop]) ParsePacket(sportDataBuff[:posStop])
# remove processed data # remove processed data
sportDataBuff = sportDataBuff[posStop+1:] sportDataBuff = sportDataBuff[posStop + 1:]
#print "after: %s" % dump(sportDataBuff, 10) # print "after: %s" % dump(sportDataBuff, 10)
else: else:
break break
inputFile = None inputFile = None
if len(sys.argv) > 1: if len(sys.argv) > 1:
inputFile = sys.argv[1] inputFile = sys.argv[1]
#open input # open input
if inputFile: if inputFile:
inp = open(inputFile, 'r') inp = open(inputFile, 'r')
else: else:
inp = sys.stdin inp = sys.stdin
while True: while True:
line = inp.readline() line = inp.readline()
lineNumber += 1 lineNumber += 1
if len(line) == 0: break if len(line) == 0:
line = line.strip('\r\n') break
if len(line) == 0: continue line = line.strip('\r\n')
#print line if len(line) == 0:
parts = line.split(': ') continue
if len(parts) < 2: # print line
print("weird data: \"%s\" at line %d" % (line, lineNumber)) parts = line.split(': ')
continue if len(parts) < 2:
sportData = parts[1].strip() print("weird data: \"%s\" at line %d" % (line, lineNumber))
# print "sd: %s" % sportData continue
ParseSportData(sportData) sportData = parts[1].strip()
# print "sd: %s" % sportData
ParseSportData(sportData)