# BSD 2-Clause License
#
# Copyright (c) 2025, Andrea Zoppi
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
#
# Based on the original C implementation:
#
# /**************************************************************
# LZSS.C -- A Data Compression Program
# (tab = 4 spaces)
# ***************************************************************
# 4/6/1989 Haruhiko Okumura
# Use, distribute, and modify this program freely.
# Please send me your improved versions.
# PC-VAN SCIENCE
# NIFTY-Serve PAF01022
# CompuServe 74050,1022
# **************************************************************/
"""Lempel-Ziv-Storer-Szymanski via Words compression algorithm.
This module implements the LZSSW compression algorithm, a variant of LZSS.
The algorithm maintains a ring buffer and uses bit flags to indicate compressed
vs uncompressed data.
The compressor parameters can be tuned for different use cases:
- Ring buffer size controls how far back to look for matches
- Maximum match length affects compression ratio vs processing time
- Common byte value can be optimized for specific data types
"""
import argparse
import io
import math
import os
import sys
from typing import IO
from typing import Generator
from .base import BaseCompressor
from .base import BaseDecompressor
from .base import ByteIterable
from .base import CodecFile
from .base import codec_compress
from .base import codec_decompress
from .base import codec_open
_CodecGenerator = Generator[None, (int | type(Ellipsis) | None), None]
RING_SIZE_MIN = 0x0200
"""Minimum ring buffer size."""
RING_SIZE_MAX = 0x1000
"""Maximum ring buffer size."""
RING_SIZE_DEF = RING_SIZE_MAX
"""Default uses maximum size."""
MAX_MATCH_LEN_MIN = 0x10
"""Minimum match length limit."""
MAX_MATCH_LEN_MAX = 0x80
"""Maximum match length limit."""
MAX_MATCH_LEN_DEF = MAX_MATCH_LEN_MIN
"""Default match length."""
COMMON_BYTE_DEF = 0x20
"""Default byte value used to fill the initial ring buffer."""
[docs]
class LZSSWException(Exception):
"""Exception raised for LZSSW compression/decompression errors."""
pass
[docs]
class LZSSWCompressor(BaseCompressor):
"""LZSSW compression implementation.
This compressor implements the LZSSW algorithm using a sliding window approach.
It maintains a ring buffer and binary tree structure to efficiently find repeated
sequences in previously processed data.
The compressor can be tuned via three main parameters:
- Ring buffer size: Controls look-back distance for finding matches
- Maximum match length: Limits how long matched sequences can be
- Common byte: Value used to initialize the ring buffer
"""
[docs]
def __init__(
self,
ringsize: int = RING_SIZE_DEF,
maxmatchlen: int = MAX_MATCH_LEN_DEF,
commonbyte: int = COMMON_BYTE_DEF,
) -> None:
"""Initializes a new LZSSW compressor instance.
Args:
ringsize: Size of the ring buffer, must be between RING_SIZE_MIN and
RING_SIZE_MAX. Larger sizes allow finding matches further back.
maxmatchlen: Maximum length of matched sequences, between MAX_MATCH_LEN_MIN
and MAX_MATCH_LEN_MAX. Larger values can improve compression
but increase processing time.
commonbyte: Value used to fill the initial ring buffer, must be 0-255.
Should be set to a commonly occurring byte value in the input.
Raises:
ValueError: If any parameters are out of their valid ranges.
"""
ringsize = ringsize.__index__()
maxmatchlen = maxmatchlen.__index__()
commonbyte = commonbyte.__index__()
ringbits = math.ceil(math.log2(ringsize))
matchbits = math.ceil(math.log2(maxmatchlen))
if not RING_SIZE_MIN <= ringsize <= RING_SIZE_MAX:
raise ValueError(f'not ({RING_SIZE_MIN = }) '
f'<= ({ringsize = }) '
f'<= ({RING_SIZE_MAX = })')
if not MAX_MATCH_LEN_MIN <= maxmatchlen <= MAX_MATCH_LEN_MAX:
raise ValueError(f'not ({MAX_MATCH_LEN_MIN = }) '
f'<= ({maxmatchlen = }) '
f'<= ({MAX_MATCH_LEN_MAX = })')
if ringbits + matchbits > 16:
raise ValueError(f'({ringbits = }) + ({matchbits = }) > 16')
if not 0x00 <= commonbyte <= 0xFF:
raise ValueError(f'not 0x00 <= ({commonbyte = }) <= 0xFF')
self._ringsize = ringsize
self._ringbits = ringbits
self._maxmatchlen = 2 + maxmatchlen
self._matchbits = matchbits
self._commonbyte = commonbyte
self._textbuf = bytearray(ringsize + 1 + maxmatchlen)
self._outbuf = bytearray()
self._textsize = 0
self._matchpos = 0
self._matchlen = 0
self._lhs = [0] * (ringsize + 1)
self._rhs = [0] * (ringsize + 1 + 0x100)
self._upr = [0] * (ringsize + 1)
self._initstate = 0
self._encoder = self._iter_encode()
[docs]
def _init_tree(self) -> None:
"""Initializes the binary tree for match finding.
This internal method sets up the initial state of the binary tree by:
1. Setting right-hand links for all leaf nodes to point to ringsize
2. Setting all parent pointers to ringsize to mark nodes as unconnected
The binary tree structure is used to efficiently find matches in the ring
buffer during compression. Each node in the tree represents a position in
the buffer, with the tree organized to enable quick string comparisons.
"""
ringsize = self._ringsize
rhs = self._rhs
upr = self._upr
for i in range(ringsize + 1, ringsize + 1 + 0x100):
rhs[i] = ringsize
for i in range(ringsize):
upr[i] = ringsize
[docs]
def _insert_node(self, r: int) -> None:
"""Inserts a new node into the binary tree.
This internal method adds a new position in the ring buffer to the binary
tree, maintaining the tree's balance and match-finding properties.
Args:
r: Position in the ring buffer to insert into the tree.
The method also updates the match length and position if it finds a better
match while traversing the tree during insertion.
"""
ringsize = self._ringsize
maxmatchlen = self._maxmatchlen
textbuf = self._textbuf
lhs = self._lhs
rhs = self._rhs
upr = self._upr
cmp = 1
key = r
p = ringsize + 1 + textbuf[key]
rhs[r] = ringsize
lhs[r] = ringsize
self._matchlen = 0
while 1:
if cmp >= 0:
if rhs[p] != ringsize:
p = rhs[p]
else:
rhs[p] = r
upr[r] = p
return
else:
if lhs[p] != ringsize:
p = lhs[p]
else:
lhs[p] = r
upr[r] = p
return
for i in range(maxmatchlen):
cmp = textbuf[key + i] - textbuf[p + i]
if cmp != 0:
break
else:
i = maxmatchlen
if i > self._matchlen:
self._matchlen = i
self._matchpos = p
if i >= maxmatchlen:
break
upr[r] = upr[p]
lhs[r] = lhs[p]
rhs[r] = rhs[p]
upr[lhs[p]] = r
upr[rhs[p]] = r
if rhs[upr[p]] == p:
rhs[upr[p]] = r
else:
lhs[upr[p]] = r
upr[p] = ringsize
[docs]
def _delete_node(self, p: int) -> None:
"""Removes a node from the binary tree.
This internal method removes a position from the binary tree when it
moves out of the sliding window, maintaining the tree's structure.
Args:
p: Position in the ring buffer to remove from the tree.
The method handles all cases of node removal (leaf nodes, nodes with
one child, and nodes with two children) while preserving the tree balance.
"""
ringsize = self._ringsize
upr = self._upr
if upr[p] == ringsize:
return
lhs = self._lhs
rhs = self._rhs
if rhs[p] == ringsize:
q = lhs[p]
elif lhs[p] == ringsize:
q = rhs[p]
else:
q = lhs[p]
if rhs[q] != ringsize:
q = rhs[q]
while rhs[q] != ringsize:
q = rhs[q]
rhs[upr[q]] = lhs[q]
upr[lhs[q]] = upr[q]
lhs[q] = lhs[p]
upr[lhs[p]] = q
rhs[q] = rhs[p]
upr[rhs[p]] = q
upr[q] = upr[p]
if rhs[upr[p]] == p:
rhs[upr[p]] = q
else:
lhs[upr[p]] = q
upr[p] = ringsize
[docs]
def _iter_encode(self) -> _CodecGenerator:
"""Generator that implements the main compression loop.
This internal method is the core of the LZSSW compression algorithm. It:
1. Initializes the sliding window with the common byte
2. Processes input bytes one at a time
3. Maintains the binary tree to find matches
4. Outputs literal bytes or match position/length pairs
5. Updates the ring buffer as compression progresses
Yields:
None on each iteration, accepting the next input byte or None/Ellipsis
to indicate end of input.
"""
self._initstate = 1
ringsize = self._ringsize
maxmatchlen = self._maxmatchlen
textbuf = self._textbuf
outbuf = self._outbuf
codebuf = bytearray(maxmatchlen - 1)
posshift = 16 - self._ringbits
self._init_tree()
codebuf[0] = 0
codebufptr = 1
mask = 1
s = 0
r = ringsize - maxmatchlen
commonbyte = self._commonbyte
for i in range(s, r):
textbuf[i] = commonbyte
for len in range(maxmatchlen):
c = yield # getc()
if c is Ellipsis: # EOF
self._initstate = -1
break
assert isinstance(c, int)
textbuf[r + len] = c
else:
len = maxmatchlen
if len == 0:
return
for i in range(1, 1 + maxmatchlen):
self._insert_node(r - i)
self._insert_node(r)
while len > 0:
if self._matchlen > len:
self._matchlen = len
if self._matchlen < 3:
self._matchlen = 1
codebuf[0] |= mask
codebuf[codebufptr] = textbuf[r]
codebufptr += 1
else:
code = self._matchpos
codebuf[codebufptr] = code & 0x00FF
codebufptr += 1
code = (code >> 8) << posshift
code |= self._matchlen - 3
codebuf[codebufptr] = code
codebufptr += 1
mask = (mask << 1) & 0xFF
if mask == 0:
outbuf.extend(codebuf[i] for i in range(codebufptr))
codebuf[0] = 0
codebufptr = 1
mask = 1
lastmatchlen = self._matchlen
for i in range(lastmatchlen):
if self._initstate < 0:
break
c = yield # getc()
if c is Ellipsis: # EOF
self._initstate = -1
break
self._delete_node(s)
assert isinstance(c, int)
textbuf[s] = c
if s < maxmatchlen - 1:
textbuf[ringsize + s] = c
s = (s + 1) % ringsize
r = (r + 1) % ringsize
self._insert_node(r)
else:
i = lastmatchlen
while i < lastmatchlen:
self._delete_node(s)
s = (s + 1) % ringsize
r = (r + 1) % ringsize
len -= 1
if len:
self._insert_node(r)
i += 1
else:
i += 1
if codebufptr > 1:
outbuf.extend(codebuf[i] for i in range(codebufptr))
[docs]
def reset(self) -> None:
"""Resets the compressor to its initial state.
Clears all internal buffers, match tracking variables, and the binary tree.
This allows the compressor to be reused for a new compression task.
"""
self._outbuf.clear()
self._textsize = 0
self._matchpos = 0
self._matchlen = 0
self._initstate = 0
self._encoder.close()
self._encoder = self._iter_encode()
[docs]
def compress(self, data: ByteIterable) -> bytearray:
"""Compresses the given data using LZSSW encoding.
The method feeds data to the compression generator, which looks for matches
in the sliding window and outputs either literal bytes or match references.
Args:
data: Input data to compress.
Returns:
Compressed data as bytearray.
Raises:
LZSSWException: If the compressor has already been flushed.
"""
if self._initstate < 0:
raise LZSSWException('already flushed')
encoder = self._encoder
send = encoder.send
if self._initstate == 0: # first
send(None)
for b in data:
send(b)
outbuf = self._outbuf
chunk = outbuf[:]
outbuf.clear()
return chunk
[docs]
def flush(self) -> bytearray:
"""Flushes the compressor and returns any remaining compressed data.
This method signals the end of input to the compressor, which may output
additional compressed data based on its internal state. After calling flush,
the compressor cannot be used for further compression without calling reset.
Returns:
Any remaining compressed data as a bytearray, or empty if already flushed.
"""
if self._initstate < 0:
return bytearray()
else:
if self._initstate == 0: # first
self._encoder.send(None)
try:
self._encoder.send(Ellipsis) # EOF
except StopIteration:
pass
outbuf = self._outbuf
chunk = outbuf[:]
outbuf.clear()
self._initstate = -1
return chunk
@property
def eof(self) -> bool:
"""Indicates whether the compressor has reached end of file state.
Returns:
bool: True if in EOF state (initialization state < 0), False otherwise.
"""
return self._initstate < 0
[docs]
class LZSSWDecompressor(BaseDecompressor):
"""LZSSW decompression implementation.
This decompressor handles data compressed by the LZSSW algorithm, expanding
match references back into their original sequences using a ring buffer that
mirrors the one used during compression.
The decompressor parameters must match those used for compression:
- Ring buffer size controls the maximum back-reference distance
- Maximum match length affects how much data a single match can represent
- Common byte is used to initialize the ring buffer
"""
[docs]
def __init__(
self,
ringsize: int = RING_SIZE_DEF,
maxmatchlen: int = MAX_MATCH_LEN_DEF,
commonbyte: int = COMMON_BYTE_DEF,
) -> None:
"""Initializes a new LZSSW decompressor instance.
Args:
ringsize: Size of the ring buffer, must match the compressor setting.
maxmatchlen: Maximum match length, must match the compressor setting.
commonbyte: Initial ring buffer fill value, must match compressor setting.
Raises:
LZSSWException: If any parameters are out of their valid ranges.
"""
ringsize = ringsize.__index__()
maxmatchlen = maxmatchlen.__index__()
commonbyte = commonbyte.__index__()
ringbits = math.ceil(math.log2(ringsize))
matchbits = math.ceil(math.log2(maxmatchlen))
if not RING_SIZE_MIN <= ringsize <= RING_SIZE_MAX:
raise ValueError(f'not ({RING_SIZE_MIN = }) '
f'<= ({ringsize = }) '
f'<= ({RING_SIZE_MAX = })')
if not MAX_MATCH_LEN_MIN <= maxmatchlen <= MAX_MATCH_LEN_MAX:
raise ValueError(f'not ({MAX_MATCH_LEN_MIN = }) '
f'<= ({maxmatchlen = }) '
f'<= ({MAX_MATCH_LEN_MAX = })')
if ringbits + matchbits > 16:
raise ValueError(f'({ringbits = }) + ({matchbits = }) > 16')
if not 0x00 <= commonbyte <= 0xFF:
raise ValueError(f'not 0x00 <= ({commonbyte = }) <= 0xFF')
self._ringsize = ringsize
self._ringbits = ringbits
self._maxmatchlen = maxmatchlen + 2
self._matchbits = matchbits
self._commonbyte = commonbyte
self._textbuf = bytearray(ringsize)
self._outbuf = bytearray()
self._ahead = bytearray()
self._initstate = 0
self._decoder = self._iter_decode()
[docs]
def _iter_decode(self) -> _CodecGenerator:
"""Generator that implements the main decompression loop.
This internal method drives the LZSSW decompression process:
1. Initializes the ring buffer with the common byte
2. Processes flag bits to determine what follows (literal/match)
3. Copies literal bytes directly to output
4. Expands match references using the ring buffer
5. Maintains the ring buffer for future matches
Yields:
None on each iteration, accepting compressed data or None/Ellipsis
to indicate end of input.
"""
self._initstate = 1
ringsize = self._ringsize
maxmatchlen = self._maxmatchlen
textbuf = self._textbuf
outbuf = self._outbuf
posshift = 8 - (16 - self._ringbits)
lenmask = (1 << self._matchbits) - 1
commonbyte = self._commonbyte
for i in range(ringsize - maxmatchlen):
textbuf[i] = commonbyte
r = ringsize - maxmatchlen
flags = 0
while 1:
flags >>= 1
if (flags & 0x0100) == 0:
c = yield
if c is Ellipsis: # EOF
break
assert isinstance(c, int)
flags = c | 0xFF00
if (flags & 0x0001) != 0:
c = yield
if c is Ellipsis: # EOF
break
assert isinstance(c, int)
outbuf.append(c)
textbuf[r] = c
r = (r + 1) % ringsize
else:
i = yield
if i is Ellipsis: # EOF
break
j = yield
if j is Ellipsis: # EOF
break
assert isinstance(i, int)
assert isinstance(j, int)
i |= (j << posshift) & 0xFF00
j = (j & lenmask) + 2
for k in range(j + 1):
p = (i + k) % ringsize
c = textbuf[p]
outbuf.append(c)
textbuf[r] = c
r = (r + 1) % ringsize
self._initstate = -1
[docs]
def decompress(
self,
data: ByteIterable,
max_length: int = -1,
/,
) -> bytearray:
"""Decompresses LZSSW-encoded data.
The method feeds compressed data to the decompression generator, which
expands match references and copies literal bytes to rebuild the original
data.
Args:
data: Compressed input data to decompress.
max_length: Maximum number of bytes to decompress. Default -1 means no limit.
Returns:
Decompressed data as bytearray.
Raises:
LZSSWException: If the decompressor has been flushed.
"""
if self._initstate < 0:
raise LZSSWException('already flushed')
max_length = max_length.__index__()
if max_length < 0:
max_length = -1
total = -2
limited = False
else:
total = 0
limited = True
outbuf = self._outbuf
ahead = self._ahead
ahead.extend(data)
ahead_len = len(ahead)
ahead_idx = 0
decoder = self._decoder
send = decoder.send
if self._initstate == 0: # first
send(None)
while total < max_length and ahead_idx < ahead_len:
send(ahead[ahead_idx])
ahead_idx += 1
if max_length > 0:
total = len(outbuf)
del ahead[:ahead_idx]
if limited:
chunk = outbuf[:max_length]
del outbuf[:max_length]
else:
chunk = outbuf[:]
outbuf.clear()
return chunk
[docs]
def flush(self) -> bytearray:
"""Flushes any remaining data from the decompressor.
This method finalizes the decompression process by:
1. Processing any remaining input data
2. Signaling EOF to the decoder
3. Returning any remaining decompressed data
Returns:
Any remaining decompressed data, or empty bytearray if already flushed.
"""
if self._initstate < 0:
return bytearray()
chunk = self.decompress(b'')
try:
self._decoder.send(Ellipsis) # EOF
except StopIteration:
pass
return chunk
[docs]
def reset(self) -> None:
"""Resets the decompressor to its initial state.
Clears all internal buffers and state variables, allowing the decompressor
to be reused for a new decompression task. This includes:
- Clearing output and ahead buffers
- Resetting state variables
- Creating a new decoder generator
"""
self._outbuf.clear()
self._ahead.clear()
self._initstate = 0
self._decoder.close()
self._decoder = self._iter_decode()
@property
def eof(self) -> bool:
"""Whether the decompressor has reached the end of the compressed stream.
Returns:
True if all input has been processed and flushed, False otherwise.
"""
return self._initstate < 0
@property
def unused_data(self) -> bytearray:
"""Gets any unprocessed data remaining after decompression.
Returns any data that was found after the end of the compressed stream
or None if decompression is not yet complete.
Returns:
Remaining unprocessed data if eof is True, empty bytearray otherwise.
"""
return self._ahead if self.eof else bytearray()
@property
def needs_input(self) -> bool:
"""Checks if more input data is needed to continue decompression.
This property indicates whether the decompressor needs more input data
to make progress. It returns False only when the end of the compressed
stream has been reached.
Returns:
True if more input data is needed, False if decompression is complete.
"""
return not self.eof
[docs]
class LZSSWFile(CodecFile):
"""File-like object for reading/writing LZSSW compressed files.
This class provides a high-level interface for working with LZSSW compressed
files, supporting both reading and writing operations with automatic
compression/decompression.
"""
[docs]
def __init__(
self,
filename: str | bytes | os.PathLike | IO,
mode: str = 'r',
) -> None:
"""Creates a new LZSSW file object.
Args:
filename: Path to the file or an existing file object.
mode: File open mode ('r'/'rb' for reading, 'w'/'wb'/'x'/'xb'/'a'/'ab' for writing).
"""
comp = LZSSWCompressor()
decomp = LZSSWDecompressor()
super().__init__(filename, mode=mode, comp=comp, decomp=decomp)
[docs]
def compress(
data: ByteIterable,
ringsize: int = RING_SIZE_DEF,
maxmatchlen: int = MAX_MATCH_LEN_DEF,
commonbyte: int = COMMON_BYTE_DEF,
) -> bytes:
"""Compresses data using the LZSSW algorithm.
This is a convenience function that creates a compressor with specified
parameters, compresses the data, and returns the result.
Args:
data: Data to compress.
ringsize: Size of the ring buffer, must be between RING_SIZE_MIN and
RING_SIZE_MAX. Larger sizes allow finding matches further back.
maxmatchlen: Maximum length of matched sequences, between MAX_MATCH_LEN_MIN
and MAX_MATCH_LEN_MAX.
commonbyte: Value used to fill the initial ring buffer, must be 0-255.
Returns:
Compressed data as bytes.
Raises:
LZSSWException: If any parameters are out of their valid ranges.
"""
comp = LZSSWCompressor(ringsize, maxmatchlen, commonbyte)
return codec_compress(data, comp)
[docs]
def decompress(
data: ByteIterable,
ringsize: int = RING_SIZE_DEF,
maxmatchlen: int = MAX_MATCH_LEN_DEF,
commonbyte: int = COMMON_BYTE_DEF,
) -> bytes:
"""Decompresses LZSSW-compressed data.
This is a convenience function that creates a decompressor with specified
parameters, decompresses the data, and returns the result. The parameters
must match those used during compression.
Args:
data: LZSSW-compressed data to decompress.
ringsize: Size of the ring buffer, must match compression setting.
maxmatchlen: Maximum length of matched sequences, must match compression.
commonbyte: Value used to fill the initial ring buffer, must match compression.
Returns:
Decompressed data as bytes.
Raises:
LZSSWException: If any parameters are out of their valid ranges.
"""
decomp = LZSSWDecompressor(ringsize, maxmatchlen, commonbyte)
return codec_decompress(data, decomp)
[docs]
def open(
filename: str | bytes | IO,
mode: str = 'r',
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
) -> CodecFile | io.TextIOWrapper:
"""Opens an LZSSW compressed file.
This provides a high-level interface similar to the built-in open() function
but for LZSSW compressed files. It supports both binary and text modes with
default compression parameters.
Args:
filename: Path to file or file object.
mode: File open mode ('r'/'rb' for reading, 'w'/'wb'/'x'/'xb'/'a'/'ab' for writing).
encoding: Text encoding for text mode.
errors: How to handle encoding/decoding errors in text mode.
newline: How to handle newlines in text mode.
Returns:
A CodecFile for binary mode or TextIOWrapper for text mode.
Raises:
ValueError: For invalid mode combinations.
"""
comp = LZSSWCompressor()
decomp = LZSSWDecompressor()
return codec_open(
filename,
mode=mode,
encoding=encoding,
errors=errors,
newline=newline,
comp=comp,
decomp=decomp,
)
[docs]
def main() -> None:
"""Command-line interface for LZSSW compression/decompression.
Provides a command-line tool for compressing or decompressing files using
the LZSSW algorithm. Supports reading from standard input and writing to
standard output, with configurable compression parameters.
"""
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--decompress', action='store_true',
help='Preform decompression instead of compression.')
parser.add_argument('-r', '--ring-buffer-size', type=int, default=RING_SIZE_DEF,
help='LZSS ring buffer size.')
parser.add_argument('-m', '--max-match-size', type=int, default=MAX_MATCH_LEN_DEF,
help='Maximum LZSS match size.')
parser.add_argument('-f', '--fill-value', type=int, default=COMMON_BYTE_DEF,
help='Fill byte value; default: ASCII space (0x20).')
parser.add_argument('infile', nargs='?', type=argparse.FileType('rb'), default=sys.stdin,
help='Input binary file; default: STDIN.')
parser.add_argument('outfile', nargs='?', type=argparse.FileType('wb'), default=sys.stdout,
help='Output binary file; default: STDOUT.')
args = parser.parse_args()
if args.decompress:
decomp = LZSSWDecompressor(ringsize=args.ring_buffer_size,
maxmatchlen=args.max_match_size,
commonbyte=args.fill_value)
out_file = args.outfile
with codec_open(args.infile, mode='rb', decomp=decomp) as in_file:
out_file.write(in_file.read())
else:
comp = LZSSWCompressor(ringsize=args.ring_buffer_size,
maxmatchlen=args.max_match_size,
commonbyte=args.fill_value)
in_file = args.infile
with codec_open(args.outfile, mode='wb', comp=comp) as out_file:
out_file.write(in_file.read())
if __name__ == '__main__': # pragma: no cover
main()