import io
import struct
from asyncio import iscoroutinefunction
import aiofiles
[docs]class GbxException(BaseException):
"""
Exception with parsing the Gbx file.
"""
pass
class _LookBackUtils:
PREDEFINED_STRINGS = {
11: 'Valley',
12: 'Canyon',
13: 'Lagoon',
17: 'TMCommon',
26: 'Stadium',
202: 'Storm',
299: 'SMCommon',
10003: 'Common',
}
def __init__(self, buffer):
self.buffer = buffer
self.store = list()
self.version = None
async def read_string(self):
length, = struct.unpack('<L', await self.buffer.read(4))
return struct.unpack('<{}s'.format(length), await self.buffer.read(length))[0].decode()
async def read_lookback_string(self):
if self.version is None:
# We should see the lookback version right now.
self.version, = struct.unpack('<L', await self.buffer.read(4))
# Get the index.
idx, = struct.unpack('<L', await self.buffer.read(4))
if idx == 0:
return None
# Check if this will be the first occurrence.
if idx & 0xc0000000 != 0 and idx & 0x3fffffff == 0:
value = await self.read_string()
self.store.append(value)
return value
# Check if idx is telling us that it's empty.
if idx == 0xffffffff:
return ''
# Check if it's a predefined value.
if idx & 0x3fffffff == idx:
return self.PREDEFINED_STRINGS[idx]
# Get from local store.
idx &= 0x3fffffff
if idx - 1 >= len(self.store):
raise GbxException('String not found in lookback list!. Offset: {}'.format(await self.buffer.tell))
return self.store[idx - 1]
def reset(self):
if self.version:
self.store = list()
self.version = None
class _AsyncBufferProxy:
def __init__(self, buffer):
"""
:param buffer: Buffer
:type buffer: io.BufferedIOBase
"""
self.buffer = buffer
async def read(self, size=1):
return self.buffer.read(size)
async def seek(self, offset, whence=io.SEEK_CUR):
return self.buffer.seek(offset, whence)
async def tell(self):
return self.buffer.tell()
[docs]class GbxParser:
"""
Async GBX Map Information Parser.
Author: Toffe.
"""
def __init__(self, file=None, buffer=None):
"""
Initiate a parser with either a file path or buffer.
:param file: File path.
:param buffer: Buffer
:type file: str
"""
super().__init__()
if file and not isinstance(file, str):
raise Exception('File should be a string, pointing to the file you want to load.')
if not file and not buffer:
raise Exception('File or buffer is required!')
self.file = file
if buffer:
if iscoroutinefunction(buffer.read):
self.buffer = buffer
else:
self.buffer = _AsyncBufferProxy(buffer)
else:
self.buffer = _AsyncBufferProxy(buffer)
self.strings = _LookBackUtils(self.buffer)
self.result = dict()
self.header = None
self.header_xml = None
self.header_length = 0
self.header_chunk_count = 0
self.header_chunks = dict()
self.parse_thumb = False
self.parse_header_xml = False
[docs] async def seek(self, offset):
"""
We need to override the second param to move from the current position.
:param offset: offset to move away.
:type offset: int
"""
return await self.buffer.seek(offset, io.SEEK_CUR)
async def parse(self, thumb=False, header_xml=False):
self.parse_thumb = thumb
self.parse_header_xml = header_xml
if self.file:
async with aiofiles.open(self.file, mode='rb') as self.buffer:
self.strings = _LookBackUtils(self.buffer)
return await self.__parse()
elif self.buffer:
return await self.__parse()
raise Exception('No buffer or file given at init.')
async def __parse(self):
# Skip until class reference.
await self.seek(9)
# Read class ID.
class_id, = struct.unpack('<I', await self.buffer.read(4))
if class_id != ((0x3 << 24) | (0x43 << 12)):
raise GbxException('Gbx file has no valid parser, only maps are supported right now.')
self.result.update(await self.__parse_header())
return self.result
async def __parse_header(self):
self.header_length, = struct.unpack('<I', await self.buffer.read(4))
self.header_chunk_count, = struct.unpack('<I', await self.buffer.read(4))
self.header_chunks = dict()
self.header = dict()
# Save header data from binary.
for nr in range(self.header_chunk_count):
chunk_id, = struct.unpack('<I', await self.buffer.read(4))
chunk_size, = struct.unpack('<I', await self.buffer.read(4))
self.header_chunks[chunk_id] = chunk_size & ~0x80000000
# Parse all header chunks.
for chunk_id, chunk_size in self.header_chunks.items():
self.strings.reset()
self.header.update(await self.__parse_chunk(chunk_id, chunk_size))
return self.header
async def __parse_chunk(self, chunk_id, chunk_size):
if chunk_id == 0x03043002:
version, = struct.unpack('<B', await self.buffer.read(1))
await self.seek(4)
time_bronze, time_silver, time_gold, time_author = struct.unpack('<LLLL', await self.buffer.read(16))
price, is_multilap, map_type = struct.unpack('<LLL', await self.buffer.read(12))
is_multilap = True if is_multilap == 1 else False
await self.seek(4)
author_score, editor = struct.unpack('<LL', await self.buffer.read(8))
editor = 'simple' if editor == 1 else 'advanced'
await self.seek(4)
checkpoints, laps = struct.unpack('<LL', await self.buffer.read(8))
return dict(
time_bronze=time_bronze, time_silver=time_silver, time_gold=time_gold, time_author=time_author,
price=price, is_multilap=is_multilap, map_type=map_type, author_score=author_score, editor=editor,
checkpoints=checkpoints, laps=laps
)
elif chunk_id == 0x03043003:
version, = struct.unpack('<B', await self.buffer.read(1))
uid = await self.strings.read_lookback_string()
environment = await self.strings.read_lookback_string()
author_login = await self.strings.read_lookback_string()
name = await self.strings.read_string()
await self.seek(5)
await self.strings.read_string() # Unknown, mostly empty.
mood = await self.strings.read_lookback_string()
decoration_env_id = await self.strings.read_lookback_string()
decoration_env_author = await self.strings.read_lookback_string()
await self.seek(4*4+16)
map_type = await self.strings.read_string()
map_style = await self.strings.read_string()
await self.seek(9)
title_id = await self.strings.read_lookback_string()
return dict(
uid=uid, environment=environment, author_login=author_login, name=name, mood=mood,
decoration_env_id=decoration_env_id, decoration_env_author=decoration_env_author,
map_type=map_type, map_style=map_style, title_id=title_id
)
elif chunk_id == 0x03043004:
version, = struct.unpack('<B', await self.buffer.read(1))
await self.seek(chunk_size - 1)
elif chunk_id == 0x03043005:
self.header_xml = await self.strings.read_string()
elif chunk_id == 0x03043007:
has_thumb = bool(struct.unpack('<L', await self.buffer.read(4))[0])
comment = None
thumb = None
if has_thumb:
thumb_size, = struct.unpack('<L', await self.buffer.read(4))
await self.seek(15) # Skip XML thumb tag.
if self.parse_thumb:
thumb = struct.unpack('<{}s'.format(thumb_size), await self.buffer.read(thumb_size))[0].decode()
else:
await self.seek(thumb_size)
await self.seek(16 + 10) # </Thumbnail.jpg></Comments>
comment_size, = struct.unpack('<L', await self.buffer.read(4))
if comment_size > 0:
comment = struct.unpack('<{}s'.format(comment_size), await self.buffer.read(comment_size))[0].decode()
await self.seek(11) # </Comments>
else:
await self.seek(chunk_size - 4)
return dict(has_thumb=has_thumb, thumb=thumb, comment=comment)
elif chunk_id == 0x03043008:
version, = struct.unpack('<L', await self.buffer.read(4))
author_version, = struct.unpack('<L', await self.buffer.read(4))
author_login = await self.strings.read_string()
author_nickname = await self.strings.read_string()
author_zone = await self.strings.read_string()
author_extra = await self.strings.read_string()
return dict(
author_version=author_version, author_login=author_login, author_nickname=author_nickname,
author_zone=author_zone, author_extra=author_extra
)
return dict()