mirror of
https://gitlab.alpinelinux.org/alpine/aports.git
synced 2025-07-13 03:09:51 +03:00
438 lines
18 KiB
Diff
438 lines
18 KiB
Diff
Compatibility with newer python versions
|
|
|
|
--- a/xxxswf/xxxswf.py
|
|
+++ b/xxxswf/xxxswf.py
|
|
@@ -43,10 +43,10 @@
|
|
import zlib
|
|
import hashlib
|
|
import re
|
|
-import imp
|
|
+import importlib
|
|
import os
|
|
from optparse import OptionParser
|
|
-from StringIO import StringIO
|
|
+from io import BytesIO
|
|
|
|
|
|
class BitStream(object):
|
|
@@ -83,8 +83,8 @@
|
|
return struct.unpack('<I', c)[0]
|
|
|
|
def check_type(self, swf):
|
|
- if type(swf) is str:
|
|
- swf = StringIO(swf)
|
|
+ if type(swf) is bytes:
|
|
+ swf = BytesIO(swf)
|
|
return swf
|
|
|
|
def header_info(self, swf):
|
|
@@ -92,11 +92,11 @@
|
|
swf = self.check_type(swf)
|
|
header = {}
|
|
header['signature'] = swf.read(3)
|
|
- if header['signature'] == "FWS":
|
|
+ if header['signature'] == b'FWS':
|
|
header['compression'] = None
|
|
- elif header['signature'] == "CWS":
|
|
+ elif header['signature'] == b'CWS':
|
|
header['compression'] = "zlib"
|
|
- elif header['signature'] == "ZWS":
|
|
+ elif header['signature'] == b'ZWS':
|
|
header['compression'] = "lzma"
|
|
header['version'] = self.read_ui8(swf.read(1))
|
|
header['file_length'] = self.read_ui32(swf.read(4))
|
|
@@ -105,13 +105,13 @@
|
|
swf.seek(3)
|
|
vfl = swf.read(5)
|
|
swf.read(4)
|
|
- swf = "FWS" + vfl + pylzma.decompress(swf.read())
|
|
+ swf = b'FWS' + vfl + pylzma.decompress(swf.read())
|
|
swf = self.check_type(swf)
|
|
swf.seek(8)
|
|
elif header['compression'] == 'zlib':
|
|
swf.seek(3)
|
|
vfl = swf.read(5)
|
|
- swf = "FWS" + vfl + zlib.decompress(swf.read())
|
|
+ swf = b'FWS' + vfl + zlib.decompress(swf.read())
|
|
swf = self.check_type(swf)
|
|
swf.seek(8)
|
|
tmp = swf.tell()
|
|
@@ -134,22 +134,22 @@
|
|
def print_header(self):
|
|
header = self.header
|
|
if header == None:
|
|
- print '\t[HEADER] Error could not read header'
|
|
+ print('\t[HEADER] Error could not read header')
|
|
return
|
|
- print '\t[HEADER] File Header: %s' % header['signature']
|
|
+ print('\t[HEADER] File Header: %s' % header['signature'])
|
|
if header['compression'] is not None:
|
|
- print '\t[HEADER] Compression Type: %s' % header['compression']
|
|
- if header['compression'] is 'lzma':
|
|
- print '\t[HEADER] Compressed Data Length: %s' % header['compressed_len']
|
|
- print '\t[HEADER] File Veader: %i' % header['version']
|
|
- print '\t[HEADER] File Size: %i' % header['file_length']
|
|
- print '\t[HEADER] Rect Nbit: %i' % header['nbits']
|
|
- print '\t[HEADER] Rect Xmin: %i' % header['xmin']
|
|
- print '\t[HEADER] Rect Xmax: %i' % header['xmax']
|
|
- print '\t[HEADER] Rect Ymin: %i' % header['ymin']
|
|
- print '\t[HEADER] Rect Ymax: %i' % header['ymax']
|
|
- print '\t[HEADER] Frame Rate: %i' % header['frame_rate']
|
|
- print '\t[HEADER] Frace Count: %i' % header['frame_count']
|
|
+ print('\t[HEADER] Compression Type: %s' % header['compression'])
|
|
+ if header['compression'] == 'lzma':
|
|
+ print('\t[HEADER] Compressed Data Length: %s' % header['compressed_len'])
|
|
+ print('\t[HEADER] File Veader: %i' % header['version'])
|
|
+ print('\t[HEADER] File Size: %i' % header['file_length'])
|
|
+ print('\t[HEADER] Rect Nbit: %i' % header['nbits'])
|
|
+ print('\t[HEADER] Rect Xmin: %i' % header['xmin'])
|
|
+ print('\t[HEADER] Rect Xmax: %i' % header['xmax'])
|
|
+ print('\t[HEADER] Rect Ymin: %i' % header['ymin'])
|
|
+ print('\t[HEADER] Rect Ymax: %i' % header['ymax'])
|
|
+ print('\t[HEADER] Frame Rate: %i' % header['frame_rate'])
|
|
+ print('\t[HEADER] Frace Count: %i' % header['frame_count'])
|
|
|
|
|
|
class xxxswf:
|
|
@@ -174,14 +174,14 @@
|
|
|
|
def lzma_installed(self):
|
|
try:
|
|
- imp.find_module('pylzma')
|
|
+ importlib.util.find_spec('pylzma')
|
|
self.lzma_install = True
|
|
except ImportError:
|
|
self.lzma_install = False
|
|
|
|
def find_swf(self, data_stream):
|
|
'searches a data stream for the headers of SWF files'
|
|
- return [tmp.start() for tmp in re.finditer('CWS|FWS|ZWS', data_stream.read())]
|
|
+ return [tmp.start() for tmp in re.finditer(br'CWS|FWS|ZWS', data_stream.read())]
|
|
|
|
def walk_path_find_swf(self, file_path):
|
|
'returns a [path, [address, address]]'
|
|
@@ -189,14 +189,14 @@
|
|
r = path_addr * 0
|
|
if os.path.isdir(file_path) != True and file_path != '':
|
|
if self.show_errors:
|
|
- print "\t[ERROR] File path must be a directory"
|
|
+ print("\t[ERROR] File path must be a directory")
|
|
for root, dirs, files, in os.walk(file_path):
|
|
for name in files:
|
|
try:
|
|
fh = open(os.path.join(root,name), "rb")
|
|
except:
|
|
if self.show_errors:
|
|
- print "\t[ERROR] Could not open file %s " % os.path.join(root,name)
|
|
+ print("\t[ERROR] Could not open file %s " % os.path.join(root,name))
|
|
swf_addr = self.find_swf(fh)
|
|
if len(swf_addr) != 0:
|
|
path_addr[0] = os.path.join(root, name)
|
|
@@ -217,7 +217,7 @@
|
|
"""uncompress lzma compressed stream"""
|
|
if not self.lzma_install:
|
|
if self.show_errors:
|
|
- print "\t[ERROR] pylzma module not installed - aborting validation/decompression"
|
|
+ print("\t[ERROR] pylzma module not installed - aborting validation/decompression")
|
|
return None
|
|
else:
|
|
data = data[4:]
|
|
@@ -236,101 +236,101 @@
|
|
|
|
def verify_swf(self, stream, addr):
|
|
"""carve and verify embedded SWF"""
|
|
- if type(stream) is str:
|
|
- stream = StringIO(stream)
|
|
+ if type(stream) is bytes:
|
|
+ stream = BytesIO(stream)
|
|
# set index to address of the header
|
|
stream.seek(addr)
|
|
# read the header. index is address of version
|
|
header = stream.read(3)
|
|
# verify header. Should never happen but will test anyway
|
|
- if header not in ["FWS", "CWS", "ZWS"]:
|
|
+ if header not in [b'FWS', b'CWS', b'ZWS']:
|
|
if self.debug:
|
|
- print '\t\t[DEBUG] Header not found',
|
|
+ print('\t\t[DEBUG] Header not found %s' % header)
|
|
return None
|
|
# read version. index is file length
|
|
version = struct.unpack("<b", stream.read(1))[0]
|
|
# verify version
|
|
if version > self.valid_version:
|
|
if self.debug:
|
|
- print '\n\t\t[DEBUG] Invalid Version',
|
|
+ print('\n\t\t[DEBUG] Invalid Version')
|
|
return None
|
|
# read size
|
|
size = struct.unpack("<i", stream.read(4))[0]
|
|
# len(header) = 3, len(version) = 1, len(size) = 4, Total 8
|
|
- if header == "FWS":
|
|
+ if header == b'FWS':
|
|
if self.cmd_run:
|
|
- print "- FWS Header"
|
|
+ print("- FWS Header")
|
|
if size < 10:
|
|
if self.debug:
|
|
- print '\t\t[DEBUG] FWS Size Invalid',
|
|
+ print('\t\t[DEBUG] FWS Size Invalid')
|
|
return None
|
|
stream.seek(addr)
|
|
try:
|
|
return stream.read(size)
|
|
except:
|
|
if self.debug:
|
|
- print '\t\t[DEBUG] FWS Size Invalid',
|
|
+ print('\t\t[DEBUG] FWS Size Invalid')
|
|
return None
|
|
- elif header == "CWS":
|
|
+ elif header == b'CWS':
|
|
if self.cmd_run:
|
|
- print "- CWS Header"
|
|
+ print("- CWS Header")
|
|
uncompress_data = self.uncompress_zlib(stream.read())
|
|
if uncompress_data == None:
|
|
if self.debug:
|
|
- print '\t\t[DEBUG] Zlib decompession failed',
|
|
+ print('\t\t[DEBUG] Zlib decompession failed')
|
|
return None
|
|
# set index to version, skipping over the header
|
|
stream.seek(addr+3)
|
|
- return "FWS" + stream.read(5) + uncompress_data[:size-8]
|
|
- elif header == "ZWS":
|
|
+ return b'FWS' + stream.read(5) + uncompress_data[:size-8]
|
|
+ elif header == b'ZWS':
|
|
if self.cmd_run:
|
|
- print "- ZWS Header"
|
|
+ print("- ZWS Header")
|
|
uncompress_lzma = self.uncompress_lzma(stream.read())
|
|
if uncompress_lzma == None:
|
|
if self.debug:
|
|
- print '\t\t[DEBUG] lzma decompession failed',
|
|
+ print('\t\t[DEBUG] lzma decompession failed')
|
|
return None
|
|
stream.seek(addr+3)
|
|
- return "FWS" + stream.read(5) + uncompress_lzma[:size-8]
|
|
+ return b'FWS' + stream.read(5) + uncompress_lzma[:size-8]
|
|
return None
|
|
|
|
def yara_scan(self, _data):
|
|
"""scan uncompressed SWF with Yara"""
|
|
try:
|
|
- imp.find_module('yara')
|
|
+ importlib.util.find_spec('yara')
|
|
import yara
|
|
except ImportError:
|
|
if self.show_errors:
|
|
- print "\t[ERROR] Yara module not installed - aborting scan"
|
|
+ print("\t[ERROR] Yara module not installed - aborting scan")
|
|
return None
|
|
try:
|
|
rule = yara.compile(r'rules.yara')
|
|
except:
|
|
if self.show_errors:
|
|
- print "\t[ERROR] Yara compile error - aborting scan"
|
|
+ print("\t[ERROR] Yara compile error - aborting scan")
|
|
return None
|
|
matches = rule.match(data=_data)
|
|
for each in rule:
|
|
- print '\t[BAD] Yara Signature Hit:', each
|
|
+ print('\t[BAD] Yara Signature Hit:', each)
|
|
|
|
def yara_md5_scan(self, _data):
|
|
hashed = self.md5_hash_buffer(_data)
|
|
try:
|
|
- imp.find_module('yara')
|
|
+ importlib.util.find_spec('yara')
|
|
import yara
|
|
except ImportError:
|
|
if self.show_errors:
|
|
- print "\t[ERROR] Yara module not installed - aborting scan"
|
|
+ print("\t[ERROR] Yara module not installed - aborting scan")
|
|
return None
|
|
try:
|
|
rule = yara.compile(r'md5.yara')
|
|
except:
|
|
if self.show_errors:
|
|
- print "\t[ERROR] Yara compile error - aborting scan"
|
|
+ print("\t[ERROR] Yara compile error - aborting scan")
|
|
return None
|
|
matches = rule.match(data=hashed)
|
|
for each in rule:
|
|
- print '\t[BAD] Yara MD5 Hit:', each
|
|
+ print('\t[BAD] Yara MD5 Hit:', each)
|
|
|
|
def pre_file_scan(self, data, file_scan):
|
|
"""executes a user defined function"""
|
|
@@ -340,7 +340,7 @@
|
|
modified = func(data)
|
|
except:
|
|
if self.show_errors:
|
|
- print "\t[ERROR] Could not call pre-file-scan function"
|
|
+ print("\t[ERROR] Could not call pre-file-scan function")
|
|
return None
|
|
if modified == None:
|
|
continue
|
|
@@ -352,14 +352,14 @@
|
|
"""executes a user defined function"""
|
|
if type(swf_scan) is not list:
|
|
if self.show_errors:
|
|
- print "\t[ERROR] SWF-scan functions not a list"
|
|
+ print("\t[ERROR] SWF-scan functions not a list")
|
|
return None
|
|
for func in self.data:
|
|
try:
|
|
modified = func(data)
|
|
except:
|
|
if self.show_errors:
|
|
- print "\t[ERROR] Could not call SWF-scan function"
|
|
+ print("\t[ERROR] Could not call SWF-scan function")
|
|
return None
|
|
if modified == None:
|
|
continue
|
|
@@ -369,17 +369,17 @@
|
|
|
|
def compress_lzma(self, swf):
|
|
"""compress a SWF with LZMA"""
|
|
- if type(swf) is str:
|
|
- swf = StringIO(swf)
|
|
+ if type(swf) is bytes:
|
|
+ swf = BytesIO(swf)
|
|
if not self.lzma_install:
|
|
if self.show_errors:
|
|
- print "\t[ERROR] pylzma module not installed - aborting validation/decompression"
|
|
+ print("\t[ERROR] pylzma module not installed - aborting validation/decompression")
|
|
return None
|
|
try:
|
|
signature = swf.read(3)
|
|
- if signature != 'FWS':
|
|
+ if signature != b'FWS':
|
|
if self.show_errors:
|
|
- print "\t[ERROR] FWS Header not found, aborting lzma compression"
|
|
+ print("\t[ERROR] FWS Header not found, aborting lzma compression")
|
|
return None
|
|
else:
|
|
vfl = swf.read(5)
|
|
@@ -387,25 +387,25 @@
|
|
# TEST
|
|
import pylzma
|
|
lzma_data = pylzma.compress(swf.read())
|
|
- return "ZWS" + vfl + struct.pack("<I", len(lzma_data)-5) + lzma_data
|
|
+ return b'ZWS' + vfl + struct.pack("<I", len(lzma_data)-5) + lzma_data
|
|
except:
|
|
return None
|
|
|
|
def compress_zlib(self, swf):
|
|
- if type(swf) is str:
|
|
- swf = StringIO(swf)
|
|
+ if type(swf) is bytes:
|
|
+ swf = BytesIO(swf)
|
|
self.show_errors = True
|
|
try:
|
|
signature = swf.read(3)
|
|
- if signature != 'FWS':
|
|
+ if signature != b'FWS':
|
|
if self.show_errors:
|
|
- print "\t[ERROR] FWS Header not found, aborting zlib compression"
|
|
+ print("\t[ERROR] FWS Header not found, aborting zlib compression")
|
|
return None
|
|
vfl = swf.read(5)
|
|
- return 'CWS' + vfl + zlib.compress(swf.read())
|
|
+ return b'CWS' + vfl + zlib.compress(swf.read())
|
|
except:
|
|
if self.show_errors:
|
|
- print "\t[ERROR] Zlib compression failed"
|
|
+ print("\t[ERROR] Zlib compression failed")
|
|
return None
|
|
|
|
def get_arguments(self):
|
|
@@ -461,7 +461,7 @@
|
|
f = open(sys.argv[len(sys.argv)-1],'rb+')
|
|
file_name = sys.argv[len(sys.argv)-1]
|
|
except Exception:
|
|
- print '[ERROR] File can not be opended/accessed'
|
|
+ print('[ERROR] File can not be opended/accessed')
|
|
return
|
|
self.process(f, file_name)
|
|
f.close()
|
|
@@ -474,7 +474,7 @@
|
|
while os.path.exists( name + '.' + str(count) + '.' + ext):
|
|
if count == 50:
|
|
if self.show_errors:
|
|
- print '\t[ERROR] Skipped 50 Matching MD5 SWFs'
|
|
+ print('\t[ERROR] Skipped 50 Matching MD5 SWFs')
|
|
return None
|
|
count += 1
|
|
if count == None:
|
|
@@ -484,11 +484,11 @@
|
|
|
|
def md5_hash_buffer(self, data):
|
|
"""MD5 hashes a buffer"""
|
|
- if type(data) is str:
|
|
- data = StringIO(data)
|
|
- if data == None or data == '':
|
|
+ if type(data) is bytes:
|
|
+ data = BytesIO(data)
|
|
+ if data == None or data == b'':
|
|
if self.show_errors:
|
|
- print '\t[ERROR] Empty buffer, hashing exiting'
|
|
+ print('\t[ERROR] Empty buffer, hashing exiting')
|
|
return None
|
|
md5 = hashlib.md5()
|
|
while True:
|
|
@@ -503,14 +503,14 @@
|
|
if name == None:
|
|
return
|
|
if self.cmd_run:
|
|
- print '\t\t[FILE] Carved SWF MD5: %s' % name
|
|
+ print('\t\t[FILE] Carved SWF MD5: %s' % name)
|
|
try:
|
|
o = open(name, 'wb+')
|
|
o.write(swf)
|
|
o.close()
|
|
- except IOError, e:
|
|
+ except IOError as e:
|
|
if self.cmd_run:
|
|
- print '\t[ERROR] Could Not Create %s ' % e
|
|
+ print('\t[ERROR] Could Not Create %s ' % e)
|
|
return
|
|
|
|
def process(self, stream, file_name = None):
|
|
@@ -520,14 +520,14 @@
|
|
if temp_swf is not None:
|
|
self.write_swf(temp_swf)
|
|
else:
|
|
- print "\t[ERROR] Zlib compression failed"
|
|
+ print("\t[ERROR] Zlib compression failed")
|
|
return
|
|
if self.opt_zcompress is not None:
|
|
temp_swf = self.compress_lzma(stream)
|
|
if temp_swf is not None:
|
|
self.write_swf(temp_swf)
|
|
else:
|
|
- print "\t[ERROR] lzma compression failed"
|
|
+ print("\t[ERROR] lzma compression failed")
|
|
return
|
|
# execute pre-parsing functions
|
|
for func in self.stream_func:
|
|
@@ -540,17 +540,17 @@
|
|
stream.seek(0)
|
|
# print the number of found SWF Headers, FPs included
|
|
if self.cmd_run:
|
|
- print "\n[SUMMARY] Potentially %d SWF(s) in MD5 %s:%s" % (len(swf_data), self.md5_hash_buffer(stream), file_name)
|
|
+ print("\n[SUMMARY] Potentially %d SWF(s) in MD5 %s:%s" % (len(swf_data), self.md5_hash_buffer(stream), file_name))
|
|
# for each SWF in the file
|
|
for index, swf_addr in enumerate(swf_data):
|
|
if self.cmd_run:
|
|
- print "\t[ADDR] SWF %d at %s" % (index+1, hex(swf_addr)),
|
|
+ print("\t[ADDR] SWF %d at %s" % (index+1, hex(swf_addr)))
|
|
# set read to the address of the SWF header found
|
|
stream.seek(swf_addr)
|
|
# verify and extract SWF.
|
|
swf = self.verify_swf(stream, swf_addr)
|
|
if swf == None:
|
|
- print
|
|
+ print()
|
|
continue
|
|
for func in self.stream_swf:
|
|
temp_swf = func(swf)
|