Merge commit '57e889fb45a31abda48930fecf8006ef1db4ffcc' as 'python/fsb5'

This commit is contained in:
Haoyu Xu
2025-05-01 10:40:57 +08:00
13 changed files with 31707 additions and 0 deletions

View File

@@ -0,0 +1,230 @@
from collections import namedtuple
from enum import IntEnum
from io import BytesIO
from .utils import BinaryReader
__version__ = "1.0"
__author__ = "Simon Pinfold"
__email__ = "simon@uint8.me"
class SoundFormat(IntEnum):
NONE = 0
PCM8 = 1
PCM16 = 2
PCM24 = 3
PCM32 = 4
PCMFLOAT = 5
GCADPCM = 6
IMAADPCM = 7
VAG = 8
HEVAG = 9
XMA = 10
MPEG = 11
CELT = 12
AT9 = 13
XWMA = 14
VORBIS = 15
@property
def file_extension(self):
if self == SoundFormat.MPEG:
return "mp3"
elif self == SoundFormat.VORBIS:
return "ogg"
elif self.is_pcm:
return "wav"
return "bin"
@property
def is_pcm(self):
return self in (SoundFormat.PCM8, SoundFormat.PCM16, SoundFormat.PCM32)
FSB5Header = namedtuple("FSB5Header", [
"id",
"version",
"numSamples",
"sampleHeadersSize",
"nameTableSize",
"dataSize",
"mode",
"zero",
"hash",
"dummy",
"unknown",
"size"
])
Sample = namedtuple("Sample", [
"name",
"frequency",
"channels",
"dataOffset",
"samples",
"metadata",
"data"
])
frequency_values = {
1: 8000,
2: 11000,
3: 11025,
4: 16000,
5: 22050,
6: 24000,
7: 32000,
8: 44100,
9: 48000
}
class MetadataChunkType(IntEnum):
CHANNELS = 1
FREQUENCY = 2
LOOP = 3
XMASEEK = 6
DSPCOEFF = 7
XWMADATA = 10
VORBISDATA = 11
chunk_data_format = {
MetadataChunkType.CHANNELS : "B",
MetadataChunkType.FREQUENCY: "I",
MetadataChunkType.LOOP: "II"
}
VorbisData = namedtuple("VorbisData", ["crc32", "unknown"])
def bits(val, start, len):
stop = start + len
r = val & ((1<<stop)-1)
return r >> start
class FSB5:
def __init__(self, data):
buf = BinaryReader(BytesIO(data), endian="<")
magic = buf.read(4)
if magic != b"FSB5":
raise ValueError("Expected magic header 'FSB5' but got %r" % (magic))
buf.seek(0)
self.header = buf.read_struct_into(FSB5Header, "4s I I I I I I 8s 16s 8s")
if self.header.version == 0:
self.header = self.header._replace(unknown=buf.read_type("I"))
self.header = self.header._replace(mode=SoundFormat(self.header.mode), size=buf.tell())
self.raw_size = self.header.size + self.header.sampleHeadersSize + self.header.nameTableSize + self.header.dataSize
self.samples = []
for i in range(self.header.numSamples):
raw = buf.read_type("Q")
next_chunk = bits(raw, 0, 1)
frequency = bits(raw, 1, 4)
channels = bits(raw, 1+4, 1) + 1
dataOffset = bits(raw, 1+4+1, 28) * 16
samples = bits(raw, 1+4+1+28, 30)
chunks = {}
while next_chunk:
raw = buf.read_type("I")
next_chunk = bits(raw, 0, 1)
chunk_size = bits(raw, 1, 24)
chunk_type = bits(raw, 1+24, 7)
try:
chunk_type = MetadataChunkType(chunk_type)
except ValueError:
pass
if chunk_type == MetadataChunkType.VORBISDATA:
chunk_data = VorbisData(
crc32 = buf.read_type("I"),
unknown = buf.read(chunk_size-4)
)
elif chunk_type in chunk_data_format:
fmt = chunk_data_format[chunk_type]
if buf.struct_calcsize(fmt) != chunk_size:
err = "Expected chunk %s of size %d, SampleHeader specified %d" % (
chunk_type, buf.struct_calcsize(fmt), chunk_size
)
raise ValueError(err)
chunk_data = buf.read_struct(fmt)
else:
chunk_data = buf.read(chunk_size)
chunks[chunk_type] = chunk_data
if MetadataChunkType.FREQUENCY in chunks:
frequency = chunks[MetadataChunkType.FREQUENCY][0]
elif frequency in frequency_values:
frequency = frequency_values[frequency]
else:
raise ValueError("Frequency value %d is not valid and no FREQUENCY metadata chunk was provided")
self.samples.append(Sample(
name="%04d" % (i),
frequency=frequency,
channels=channels,
dataOffset=dataOffset,
samples=samples,
metadata=chunks,
data=None
))
if self.header.nameTableSize:
nametable_start = buf.tell()
samplename_offsets = []
for i in range(self.header.numSamples):
samplename_offsets.append(buf.read_type("I"))
for i in range(self.header.numSamples):
buf.seek(nametable_start + samplename_offsets[i])
name = buf.read_string(maxlen=self.header.nameTableSize)
self.samples[i] = self.samples[i]._replace(name=name.decode("utf-8"))
buf.seek(self.header.size + self.header.sampleHeadersSize + self.header.nameTableSize)
for i in range(self.header.numSamples):
data_start = self.samples[i].dataOffset
data_end = data_start + self.header.dataSize
if i < self.header.numSamples-1:
data_end = self.samples[i+1].dataOffset
self.samples[i] = self.samples[i]._replace(data=buf.read(data_end - data_start))
def rebuild_sample(self, sample):
if sample not in self.samples:
raise ValueError("Sample to decode did not originate from the FSB archive decoding it")
if self.header.mode == SoundFormat.MPEG:
return sample.data
elif self.header.mode == SoundFormat.VORBIS:
# import here as vorbis.py requires native libraries
from . import vorbis
return vorbis.rebuild(sample)
elif self.header.mode.is_pcm:
from .pcm import rebuild
if self.header.mode == SoundFormat.PCM8:
width = 1
elif self.header.mode == SoundFormat.PCM16:
width = 2
else:
width = 4
return rebuild(sample, width)
raise NotImplementedError("Decoding samples of type %s is not supported" % (self.header.mode))
def get_sample_extension(self):
return self.header.mode.file_extension
def load(data):
return FSB5(data)

11
python/fsb5/fsb5/pcm.py Normal file
View File

@@ -0,0 +1,11 @@
import wave
from io import BytesIO
def rebuild(sample, width):
data = sample.data[:sample.samples * width]
ret = BytesIO()
with wave.open(ret, "wb") as wav:
wav.setparams((sample.channels, width, sample.frequency, 0, "NONE", "NONE"))
wav.writeframes(data)
return ret.getvalue()

76
python/fsb5/fsb5/utils.py Normal file
View File

@@ -0,0 +1,76 @@
import os
import ctypes
import struct
class BinaryReader:
def __init__(self, buf, endian="<"):
self.buf = buf
self.endian = endian
self.seek(0, 2)
self.size = self.tell()
self.seek(0)
def read(self, *args):
return self.buf.read(*args)
def seek(self, *args):
return self.buf.seek(*args)
def tell(self):
return self.buf.tell()
def finished(self):
return self.tell() == self.size
def read_string(self, maxlen=0):
r = []
start = self.tell()
while maxlen == 0 or len(r) <= maxlen:
c = self.read(1)
if not c:
raise ValueError("Unterminated string starting at %d" % (start))
if c == b"\0":
break
r.append(c)
return b"".join(r)
def struct_calcsize(self, fmt):
return struct.calcsize(fmt)
def read_struct(self, fmt, endian=None):
fmt = (endian or self.endian) + fmt;
fmtlen = struct.calcsize(fmt)
data = self.read(fmtlen)
if len(data) != fmtlen:
raise ValueError("Not enough bytes left in buffer to read struct")
return struct.unpack(fmt, data)
def read_struct_into(self, dest, fmt, endian=None):
fields = self.read_struct(fmt, endian=endian)
fields = list(fields) + [None] * (len(dest._fields) - len(fields))
return dest._make(fields)
def read_type(self, type_fmt, endian=None):
r = self.read_struct(type_fmt, endian=endian)
if len(r) != 1:
raise ValueError("Format %r did not describe a single type" % (type_fmt))
return r[0]
class LibraryNotFoundException(OSError):
pass
def load_lib(*names):
for name in names:
try:
libname = ctypes.util.find_library(name)
if libname:
return ctypes.CDLL(libname)
else:
dll_path = os.path.join(os.getcwd(), "lib%s.dll" % (name))
return ctypes.CDLL(dll_path)
except OSError:
pass
raise LibraryNotFoundException("Could not load the library %r" % (names[0]))

379
python/fsb5/fsb5/vorbis.py Normal file
View File

@@ -0,0 +1,379 @@
import ctypes
import ctypes.util
import os
from enum import IntEnum
from io import BytesIO
from . import *
from .utils import BinaryReader, load_lib
from .vorbis_headers import lookup as vorbis_header_lookup
vorbis = load_lib('vorbis')
ogg = load_lib('ogg')
class VorbisInfo(ctypes.Structure):
"""
https://xiph.org/vorbis/doc/libvorbis/vorbis_info.html
"""
_fields_ = [
('version', ctypes.c_int),
('channels', ctypes.c_int),
('rate', ctypes.c_long),
('bitrate_upper', ctypes.c_long),
('bitrate_nominal', ctypes.c_long),
('bitrate_lower', ctypes.c_long),
('bitrate_window', ctypes.c_long),
('codec_setup', ctypes.c_void_p),
]
def __init__(self):
super().__init__()
vorbis.vorbis_info_init(self)
def __del__(self):
vorbis.vorbis_info_clear(self)
class VorbisComment(ctypes.Structure):
"""
https://xiph.org/vorbis/doc/libvorbis/vorbis_info.html
"""
_fields_ = [
('user_comments', ctypes.POINTER(ctypes.c_char_p)),
('comment_lengths', ctypes.POINTER(ctypes.c_int)),
('comments', ctypes.c_int),
('vendor', ctypes.c_char_p)
]
def __init__(self):
super().__init__()
vorbis.vorbis_comment_init(self)
def __del__(self):
vorbis.vorbis_comment_clear(self)
class VorbisDSPState(ctypes.Structure):
"""
https://svn.xiph.org/trunk/vorbis/include/vorbis/codec.h
"""
_fields_ = [
('analysisp', ctypes.c_int),
('vi', ctypes.c_void_p),
('pcm', ctypes.POINTER(ctypes.POINTER(ctypes.c_float))),
('pcmret', ctypes.POINTER(ctypes.POINTER(ctypes.c_float))),
('pcm_storage', ctypes.c_int),
('pcm_current', ctypes.c_int),
('pcm_returned', ctypes.c_int),
('preextrapolate', ctypes.c_int),
('eofflag', ctypes.c_int),
('lW', ctypes.c_long),
('W', ctypes.c_long),
('nW', ctypes.c_long),
('centerW', ctypes.c_long),
('granulepos', ctypes.c_longlong),
('sequence', ctypes.c_longlong),
('glue_bits', ctypes.c_longlong),
('time_bits', ctypes.c_longlong),
('floor_bits', ctypes.c_longlong),
('res_bits', ctypes.c_longlong),
('backend_state', ctypes.c_void_p)
]
class OggStreamState(ctypes.Structure):
"""
https://xiph.org/ogg/doc/libogg/ogg_stream_state.html
"""
_fields_ = [
('body_data', ctypes.POINTER(ctypes.c_char)),
('body_storage', ctypes.c_long),
('body_fill', ctypes.c_long),
('body_returned', ctypes.c_long),
('lacing_vals', ctypes.POINTER(ctypes.c_int)),
('granule_vals', ctypes.POINTER(ctypes.c_longlong)),
('lacing_storage', ctypes.c_long),
('lacing_fill', ctypes.c_long),
('lacing_packet', ctypes.c_long),
('lacing_returned', ctypes.c_long),
('header', ctypes.c_char * 282),
('header_fill', ctypes.c_int),
('e_o_s', ctypes.c_int),
('b_o_s', ctypes.c_int),
('serialno', ctypes.c_long),
('pageno', ctypes.c_int),
('packetno', ctypes.c_longlong),
('granulepos', ctypes.c_longlong)
]
def __init__(self, serialno):
super().__init__()
ogg.ogg_stream_init(self, serialno)
def __del__(self):
ogg.ogg_stream_clear(self)
class OggPacket(ctypes.Structure):
"""
https://xiph.org/ogg/doc/libogg/ogg_packet.html
"""
_fields_ = [
('packet', ctypes.POINTER(ctypes.c_char)),
('bytes', ctypes.c_long),
('b_o_s', ctypes.c_long),
('e_o_s', ctypes.c_long),
('granulepos', ctypes.c_longlong),
('packetno', ctypes.c_longlong)
]
class OggpackBuffer(ctypes.Structure):
"""
https://xiph.org/ogg/doc/libogg/oggpack_buffer.html
"""
_fields_ = [
('endbyte', ctypes.c_long),
('endbit', ctypes.c_int),
('buffer', ctypes.POINTER(ctypes.c_char)),
('ptr', ctypes.POINTER(ctypes.c_char)),
('storage', ctypes.c_long)
]
def __init__(self):
super().__init__()
ogg.oggpack_writeinit(self)
def __del__(self):
ogg.oggpack_writeclear(self)
class OggPage(ctypes.Structure):
"""
https://xiph.org/ogg/doc/libogg/oggpack_buffer.html
"""
_fields_ = [
('header', ctypes.POINTER(ctypes.c_char)),
('header_len', ctypes.c_long),
('body', ctypes.POINTER(ctypes.c_char)),
('body_len', ctypes.c_long)
]
def errcheck(result, func, arguments):
if result != 0:
raise OSError('Call to %s(%s) returned %d (error)' % (func.__name__, ', '.join(str(x) for x in arguments), result))
return result == 0
######## libvorbis functions ########
vorbis.vorbis_info_init.argtypes = [ctypes.POINTER(VorbisInfo)]
vorbis.vorbis_info_init.restype = None
vorbis.vorbis_info_clear.argtypes = [ctypes.POINTER(VorbisInfo)]
vorbis.vorbis_info_clear.restype = None
vorbis.vorbis_comment_init.argtypes = [ctypes.POINTER(VorbisComment)]
vorbis.vorbis_comment_init.restype = None
vorbis.vorbis_comment_clear.argtypes = [ctypes.POINTER(VorbisComment)]
vorbis.vorbis_comment_clear.restype = None
vorbis.vorbis_analysis_init.argtypes = [ctypes.POINTER(VorbisDSPState), ctypes.POINTER(VorbisInfo)]
vorbis.vorbis_analysis_init.errcheck = errcheck
vorbis.vorbis_analysis_headerout.argtypes = [
ctypes.POINTER(VorbisDSPState),
ctypes.POINTER(VorbisComment),
ctypes.POINTER(OggPacket),
ctypes.POINTER(OggPacket),
ctypes.POINTER(OggPacket)
]
vorbis.vorbis_analysis_headerout.errcheck = errcheck
vorbis.vorbis_dsp_clear.argtypes = [ctypes.POINTER(VorbisDSPState)]
vorbis.vorbis_dsp_clear.restype = None
vorbis.vorbis_commentheader_out.argtypes = [ctypes.POINTER(VorbisComment), ctypes.POINTER(OggPacket)]
vorbis.vorbis_commentheader_out.errcheck = errcheck
vorbis.vorbis_synthesis_headerin.argtypes = [
ctypes.POINTER(VorbisInfo),
ctypes.POINTER(VorbisComment),
ctypes.POINTER(OggPacket)
]
vorbis.vorbis_synthesis_headerin.errcheck = errcheck
def vorbis_packet_blocksize_errcheck(result, func, arguments):
if result < 0:
errcheck(result, func, arguments)
return result
vorbis.vorbis_packet_blocksize.argtypes = [ctypes.POINTER(VorbisInfo), ctypes.POINTER(OggPacket)]
vorbis.vorbis_packet_blocksize.errcheck = vorbis_packet_blocksize_errcheck
######## libogg functions ########
ogg.ogg_stream_init.argtypes = [ctypes.POINTER(OggStreamState), ctypes.c_int]
ogg.ogg_stream_init.errcheck = errcheck
ogg.ogg_stream_clear.argtypes = [ctypes.POINTER(OggStreamState)]
ogg.ogg_stream_clear.restype = ctypes.c_int
ogg.oggpack_writeinit.argtypes = [ctypes.POINTER(OggpackBuffer)]
ogg.oggpack_writeinit.restype = None
ogg.oggpack_write.argtypes = [ctypes.POINTER(OggpackBuffer), ctypes.c_ulong, ctypes.c_int]
ogg.oggpack_write.restype = None
ogg.oggpack_writeclear.argtypes = [ctypes.POINTER(OggpackBuffer)]
ogg.oggpack_writeclear.restype = None
ogg.oggpack_bytes.argtypes = [ctypes.POINTER(OggpackBuffer)]
ogg.oggpack_bytes.restype = ctypes.c_int
ogg.oggpack_writeclear.argtypes = [ctypes.POINTER(OggpackBuffer)]
ogg.oggpack_writeclear.restype = None
ogg.ogg_packet_clear.argtypes = [ctypes.POINTER(OggPacket)]
ogg.ogg_packet_clear.restype = None
ogg.ogg_stream_packetin.argtypes = [ctypes.POINTER(OggStreamState), ctypes.POINTER(OggPacket)]
ogg.ogg_stream_packetin.errcheck = errcheck
ogg.ogg_stream_pageout.argtypes = [ctypes.POINTER(OggStreamState), ctypes.POINTER(OggPage)]
ogg.ogg_stream_pageout.restype = ctypes.c_int
ogg.ogg_stream_flush.argtypes = [ctypes.POINTER(OggStreamState), ctypes.POINTER(OggPage)]
ogg.ogg_stream_flush.restype = ctypes.c_int
if hasattr(ogg, 'oggpack_writecheck'):
ogg.oggpack_writecheck.argtypes = [ctypes.POINTER(OggpackBuffer)]
ogg.oggpack_writecheck.errcheck = errcheck
def rebuild(sample):
if MetadataChunkType.VORBISDATA not in sample.metadata:
raise ValueError('Expected sample header to contain a VORBISDATA chunk but none was found')
crc32 = sample.metadata[MetadataChunkType.VORBISDATA].crc32
try:
setup_packet_buff = vorbis_header_lookup[crc32]
except KeyError as e:
raise ValueError('Could not find header info for crc32=%d' % crc32) from e
info = VorbisInfo()
comment = VorbisComment()
state = OggStreamState(1)
outbuf = BytesIO()
id_header = rebuild_id_header(sample.channels, sample.frequency, 0x100, 0x800)
comment_header = rebuild_comment_header()
setup_header = rebuild_setup_header(setup_packet_buff)
vorbis.vorbis_synthesis_headerin(info, comment, id_header)
vorbis.vorbis_synthesis_headerin(info, comment, comment_header)
vorbis.vorbis_synthesis_headerin(info, comment, setup_header)
ogg.ogg_stream_packetin(state, id_header)
write_packets(state, outbuf)
ogg.ogg_stream_packetin(state, comment_header)
write_packets(state, outbuf)
ogg.ogg_stream_packetin(state, setup_header)
write_packets(state, outbuf)
write_packets(state, outbuf, func=ogg.ogg_stream_flush)
packetno = setup_header.packetno
granulepos = 0
prev_blocksize = 0
inbuf = BinaryReader(BytesIO(sample.data))
packet_size = inbuf.read_type('H')
while packet_size:
packetno += 1
packet = OggPacket()
buf = ctypes.create_string_buffer(inbuf.read(packet_size), packet_size)
packet.packet = ctypes.cast(buf, ctypes.POINTER(ctypes.c_char))
packet.bytes = packet_size
packet.packetno = packetno
try:
packet_size = inbuf.read_type('H')
except ValueError:
packet_size = 0
packet.e_o_s = 1 if not packet_size else 0
blocksize = vorbis.vorbis_packet_blocksize(info, packet)
assert blocksize
granulepos = int(granulepos + (blocksize + prev_blocksize) / 4) if prev_blocksize else 0
packet.granulepos = granulepos
prev_blocksize = blocksize
ogg.ogg_stream_packetin(state, packet)
write_packets(state, outbuf)
return outbuf.getbuffer()
def write_packets(state, buf, func=ogg.ogg_stream_pageout):
page = OggPage()
while func(state, page):
buf.write(bytes(page.header[:page.header_len]))
buf.write(bytes(page.body[:page.body_len]))
def rebuild_id_header(channels, frequency, blocksize_short, blocksize_long):
packet = OggPacket()
buf = OggpackBuffer()
ogg.oggpack_write(buf, 0x01, 8)
for c in 'vorbis':
ogg.oggpack_write(buf, ord(c), 8)
ogg.oggpack_write(buf, 0, 32)
ogg.oggpack_write(buf, channels, 8)
ogg.oggpack_write(buf, frequency, 32)
ogg.oggpack_write(buf, 0, 32)
ogg.oggpack_write(buf, 0, 32)
ogg.oggpack_write(buf, 0, 32)
ogg.oggpack_write(buf, len(bin(blocksize_short)) - 3, 4)
ogg.oggpack_write(buf, len(bin(blocksize_long)) - 3, 4)
ogg.oggpack_write(buf, 1, 1)
if hasattr(ogg, 'oggpack_writecheck'):
ogg.oggpack_writecheck(buf)
packet.bytes = ogg.oggpack_bytes(buf)
buf = ctypes.create_string_buffer(bytes(buf.buffer[:packet.bytes]), packet.bytes)
packet.packet = ctypes.cast(ctypes.pointer(buf), ctypes.POINTER(ctypes.c_char))
packet.b_o_s = 1
packet.e_o_s = 0
packet.granulepos = 0
packet.packetno = 0
return packet
def rebuild_comment_header():
packet = OggPacket()
ogg.ogg_packet_clear(packet)
comment = VorbisComment()
vorbis.vorbis_commentheader_out(comment, packet)
return packet
def rebuild_setup_header(setup_packet_buff):
packet = OggPacket()
packet.packet = ctypes.cast(ctypes.pointer(ctypes.create_string_buffer(setup_packet_buff, len(setup_packet_buff))), ctypes.POINTER(ctypes.c_char))
packet.bytes = len(setup_packet_buff)
packet.b_o_s = 0
packet.e_o_s = 0
packet.granulepos = 0
packet.packetno = 2
return packet

File diff suppressed because it is too large Load Diff