Initial commit (Clean history)

This commit is contained in:
anhduy-tech
2025-12-30 11:27:14 +07:00
commit ef48c93de0
19255 changed files with 3248867 additions and 0 deletions

View File

@@ -0,0 +1,23 @@
"""Provides objects that can characterize image streams.
That characterization is as to content type and size, as a required step in including
them in a document.
"""
from docx.image.bmp import Bmp
from docx.image.gif import Gif
from docx.image.jpeg import Exif, Jfif
from docx.image.png import Png
from docx.image.tiff import Tiff
SIGNATURES = (
# class, offset, signature_bytes
(Png, 0, b"\x89PNG\x0d\x0a\x1a\x0a"),
(Jfif, 6, b"JFIF"),
(Exif, 6, b"Exif"),
(Gif, 0, b"GIF87a"),
(Gif, 0, b"GIF89a"),
(Tiff, 0, b"MM\x00*"), # big-endian (Motorola) TIFF
(Tiff, 0, b"II*\x00"), # little-endian (Intel) TIFF
(Bmp, 0, b"BM"),
)

View File

@@ -0,0 +1,43 @@
from .constants import MIME_TYPE
from .helpers import LITTLE_ENDIAN, StreamReader
from .image import BaseImageHeader
class Bmp(BaseImageHeader):
"""Image header parser for BMP images."""
@classmethod
def from_stream(cls, stream):
"""Return |Bmp| instance having header properties parsed from the BMP image in
`stream`."""
stream_rdr = StreamReader(stream, LITTLE_ENDIAN)
px_width = stream_rdr.read_long(0x12)
px_height = stream_rdr.read_long(0x16)
horz_px_per_meter = stream_rdr.read_long(0x26)
vert_px_per_meter = stream_rdr.read_long(0x2A)
horz_dpi = cls._dpi(horz_px_per_meter)
vert_dpi = cls._dpi(vert_px_per_meter)
return cls(px_width, px_height, horz_dpi, vert_dpi)
@property
def content_type(self):
"""MIME content type for this image, unconditionally `image/bmp` for BMP
images."""
return MIME_TYPE.BMP
@property
def default_ext(self):
"""Default filename extension, always 'bmp' for BMP images."""
return "bmp"
@staticmethod
def _dpi(px_per_meter):
"""Return the integer pixels per inch from `px_per_meter`, defaulting to 96 if
`px_per_meter` is zero."""
if px_per_meter == 0:
return 96
return int(round(px_per_meter * 0.0254))

View File

@@ -0,0 +1,172 @@
"""Constants specific the the image sub-package."""
class JPEG_MARKER_CODE:
"""JPEG marker codes."""
TEM = b"\x01"
DHT = b"\xc4"
DAC = b"\xcc"
JPG = b"\xc8"
SOF0 = b"\xc0"
SOF1 = b"\xc1"
SOF2 = b"\xc2"
SOF3 = b"\xc3"
SOF5 = b"\xc5"
SOF6 = b"\xc6"
SOF7 = b"\xc7"
SOF9 = b"\xc9"
SOFA = b"\xca"
SOFB = b"\xcb"
SOFD = b"\xcd"
SOFE = b"\xce"
SOFF = b"\xcf"
RST0 = b"\xd0"
RST1 = b"\xd1"
RST2 = b"\xd2"
RST3 = b"\xd3"
RST4 = b"\xd4"
RST5 = b"\xd5"
RST6 = b"\xd6"
RST7 = b"\xd7"
SOI = b"\xd8"
EOI = b"\xd9"
SOS = b"\xda"
DQT = b"\xdb" # Define Quantization Table(s)
DNL = b"\xdc"
DRI = b"\xdd"
DHP = b"\xde"
EXP = b"\xdf"
APP0 = b"\xe0"
APP1 = b"\xe1"
APP2 = b"\xe2"
APP3 = b"\xe3"
APP4 = b"\xe4"
APP5 = b"\xe5"
APP6 = b"\xe6"
APP7 = b"\xe7"
APP8 = b"\xe8"
APP9 = b"\xe9"
APPA = b"\xea"
APPB = b"\xeb"
APPC = b"\xec"
APPD = b"\xed"
APPE = b"\xee"
APPF = b"\xef"
STANDALONE_MARKERS = (TEM, SOI, EOI, RST0, RST1, RST2, RST3, RST4, RST5, RST6, RST7)
SOF_MARKER_CODES = (
SOF0,
SOF1,
SOF2,
SOF3,
SOF5,
SOF6,
SOF7,
SOF9,
SOFA,
SOFB,
SOFD,
SOFE,
SOFF,
)
marker_names = {
b"\x00": "UNKNOWN",
b"\xc0": "SOF0",
b"\xc2": "SOF2",
b"\xc4": "DHT",
b"\xda": "SOS", # start of scan
b"\xd8": "SOI", # start of image
b"\xd9": "EOI", # end of image
b"\xdb": "DQT",
b"\xe0": "APP0",
b"\xe1": "APP1",
b"\xe2": "APP2",
b"\xed": "APP13",
b"\xee": "APP14",
}
@classmethod
def is_standalone(cls, marker_code):
return marker_code in cls.STANDALONE_MARKERS
class MIME_TYPE:
"""Image content types."""
BMP = "image/bmp"
GIF = "image/gif"
JPEG = "image/jpeg"
PNG = "image/png"
TIFF = "image/tiff"
class PNG_CHUNK_TYPE:
"""PNG chunk type names."""
IHDR = "IHDR"
pHYs = "pHYs"
IEND = "IEND"
class TIFF_FLD_TYPE:
"""Tag codes for TIFF Image File Directory (IFD) entries."""
BYTE = 1
ASCII = 2
SHORT = 3
LONG = 4
RATIONAL = 5
field_type_names = {
1: "BYTE",
2: "ASCII char",
3: "SHORT",
4: "LONG",
5: "RATIONAL",
}
TIFF_FLD = TIFF_FLD_TYPE
class TIFF_TAG:
"""Tag codes for TIFF Image File Directory (IFD) entries."""
IMAGE_WIDTH = 0x0100
IMAGE_LENGTH = 0x0101
X_RESOLUTION = 0x011A
Y_RESOLUTION = 0x011B
RESOLUTION_UNIT = 0x0128
tag_names = {
0x00FE: "NewSubfileType",
0x0100: "ImageWidth",
0x0101: "ImageLength",
0x0102: "BitsPerSample",
0x0103: "Compression",
0x0106: "PhotometricInterpretation",
0x010E: "ImageDescription",
0x010F: "Make",
0x0110: "Model",
0x0111: "StripOffsets",
0x0112: "Orientation",
0x0115: "SamplesPerPixel",
0x0117: "StripByteCounts",
0x011A: "XResolution",
0x011B: "YResolution",
0x011C: "PlanarConfiguration",
0x0128: "ResolutionUnit",
0x0131: "Software",
0x0132: "DateTime",
0x0213: "YCbCrPositioning",
0x8769: "ExifTag",
0x8825: "GPS IFD",
0xC4A5: "PrintImageMatching",
}

View File

@@ -0,0 +1,13 @@
"""Exceptions specific the the image sub-package."""
class InvalidImageStreamError(Exception):
"""The recognized image stream appears to be corrupted."""
class UnexpectedEndOfFileError(Exception):
"""EOF was unexpectedly encountered while reading an image stream."""
class UnrecognizedImageError(Exception):
"""The provided image stream could not be recognized."""

View File

@@ -0,0 +1,38 @@
from struct import Struct
from .constants import MIME_TYPE
from .image import BaseImageHeader
class Gif(BaseImageHeader):
"""Image header parser for GIF images.
Note that the GIF format does not support resolution (DPI) information. Both
horizontal and vertical DPI default to 72.
"""
@classmethod
def from_stream(cls, stream):
"""Return |Gif| instance having header properties parsed from GIF image in
`stream`."""
px_width, px_height = cls._dimensions_from_stream(stream)
return cls(px_width, px_height, 72, 72)
@property
def content_type(self):
"""MIME content type for this image, unconditionally `image/gif` for GIF
images."""
return MIME_TYPE.GIF
@property
def default_ext(self):
"""Default filename extension, always 'gif' for GIF images."""
return "gif"
@classmethod
def _dimensions_from_stream(cls, stream):
stream.seek(6)
bytes_ = stream.read(4)
struct = Struct("<HH")
px_width, px_height = struct.unpack(bytes_)
return px_width, px_height

View File

@@ -0,0 +1,86 @@
from struct import Struct
from .exceptions import UnexpectedEndOfFileError
BIG_ENDIAN = ">"
LITTLE_ENDIAN = "<"
class StreamReader:
"""Wraps a file-like object to provide access to structured data from a binary file.
Byte-order is configurable. `base_offset` is added to any base value provided to
calculate actual location for reads.
"""
def __init__(self, stream, byte_order, base_offset=0):
super(StreamReader, self).__init__()
self._stream = stream
self._byte_order = LITTLE_ENDIAN if byte_order == LITTLE_ENDIAN else BIG_ENDIAN
self._base_offset = base_offset
def read(self, count):
"""Allow pass-through read() call."""
return self._stream.read(count)
def read_byte(self, base, offset=0):
"""Return the int value of the byte at the file position defined by
self._base_offset + `base` + `offset`.
If `base` is None, the byte is read from the current position in the stream.
"""
fmt = "B"
return self._read_int(fmt, base, offset)
def read_long(self, base, offset=0):
"""Return the int value of the four bytes at the file position defined by
self._base_offset + `base` + `offset`.
If `base` is None, the long is read from the current position in the stream. The
endian setting of this instance is used to interpret the byte layout of the
long.
"""
fmt = "<L" if self._byte_order is LITTLE_ENDIAN else ">L"
return self._read_int(fmt, base, offset)
def read_short(self, base, offset=0):
"""Return the int value of the two bytes at the file position determined by
`base` and `offset`, similarly to ``read_long()`` above."""
fmt = b"<H" if self._byte_order is LITTLE_ENDIAN else b">H"
return self._read_int(fmt, base, offset)
def read_str(self, char_count, base, offset=0):
"""Return a string containing the `char_count` bytes at the file position
determined by self._base_offset + `base` + `offset`."""
def str_struct(char_count):
format_ = "%ds" % char_count
return Struct(format_)
struct = str_struct(char_count)
chars = self._unpack_item(struct, base, offset)
unicode_str = chars.decode("UTF-8")
return unicode_str
def seek(self, base, offset=0):
location = self._base_offset + base + offset
self._stream.seek(location)
def tell(self):
"""Allow pass-through tell() call."""
return self._stream.tell()
def _read_bytes(self, byte_count, base, offset):
self.seek(base, offset)
bytes_ = self._stream.read(byte_count)
if len(bytes_) < byte_count:
raise UnexpectedEndOfFileError
return bytes_
def _read_int(self, fmt, base, offset):
struct = Struct(fmt)
return self._unpack_item(struct, base, offset)
def _unpack_item(self, struct, base, offset):
bytes_ = self._read_bytes(struct.size, base, offset)
return struct.unpack(bytes_)[0]

View File

@@ -0,0 +1,234 @@
"""Provides objects that can characterize image streams.
That characterization is as to content type and size, as a required step in including
them in a document.
"""
from __future__ import annotations
import hashlib
import io
import os
from typing import IO, Tuple
from docx.image.exceptions import UnrecognizedImageError
from docx.shared import Emu, Inches, Length, lazyproperty
class Image:
"""Graphical image stream such as JPEG, PNG, or GIF with properties and methods
required by ImagePart."""
def __init__(self, blob: bytes, filename: str, image_header: BaseImageHeader):
super(Image, self).__init__()
self._blob = blob
self._filename = filename
self._image_header = image_header
@classmethod
def from_blob(cls, blob: bytes) -> Image:
"""Return a new |Image| subclass instance parsed from the image binary contained
in `blob`."""
stream = io.BytesIO(blob)
return cls._from_stream(stream, blob)
@classmethod
def from_file(cls, image_descriptor: str | IO[bytes]):
"""Return a new |Image| subclass instance loaded from the image file identified
by `image_descriptor`, a path or file-like object."""
if isinstance(image_descriptor, str):
path = image_descriptor
with open(path, "rb") as f:
blob = f.read()
stream = io.BytesIO(blob)
filename = os.path.basename(path)
else:
stream = image_descriptor
stream.seek(0)
blob = stream.read()
filename = None
return cls._from_stream(stream, blob, filename)
@property
def blob(self):
"""The bytes of the image 'file'."""
return self._blob
@property
def content_type(self) -> str:
"""MIME content type for this image, e.g. ``'image/jpeg'`` for a JPEG image."""
return self._image_header.content_type
@lazyproperty
def ext(self):
"""The file extension for the image.
If an actual one is available from a load filename it is used. Otherwise a
canonical extension is assigned based on the content type. Does not contain the
leading period, e.g. 'jpg', not '.jpg'.
"""
return os.path.splitext(self._filename)[1][1:]
@property
def filename(self):
"""Original image file name, if loaded from disk, or a generic filename if
loaded from an anonymous stream."""
return self._filename
@property
def px_width(self) -> int:
"""The horizontal pixel dimension of the image."""
return self._image_header.px_width
@property
def px_height(self) -> int:
"""The vertical pixel dimension of the image."""
return self._image_header.px_height
@property
def horz_dpi(self) -> int:
"""Integer dots per inch for the width of this image.
Defaults to 72 when not present in the file, as is often the case.
"""
return self._image_header.horz_dpi
@property
def vert_dpi(self) -> int:
"""Integer dots per inch for the height of this image.
Defaults to 72 when not present in the file, as is often the case.
"""
return self._image_header.vert_dpi
@property
def width(self) -> Inches:
"""A |Length| value representing the native width of the image, calculated from
the values of `px_width` and `horz_dpi`."""
return Inches(self.px_width / self.horz_dpi)
@property
def height(self) -> Inches:
"""A |Length| value representing the native height of the image, calculated from
the values of `px_height` and `vert_dpi`."""
return Inches(self.px_height / self.vert_dpi)
def scaled_dimensions(
self, width: int | Length | None = None, height: int | Length | None = None
) -> Tuple[Length, Length]:
"""(cx, cy) pair representing scaled dimensions of this image.
The native dimensions of the image are scaled by applying the following rules to
the `width` and `height` arguments.
* If both `width` and `height` are specified, the return value is (`width`,
`height`); no scaling is performed.
* If only one is specified, it is used to compute a scaling factor that is then
applied to the unspecified dimension, preserving the aspect ratio of the image.
* If both `width` and `height` are |None|, the native dimensions are returned.
The native dimensions are calculated using the dots-per-inch (dpi) value
embedded in the image, defaulting to 72 dpi if no value is specified, as is
often the case. The returned values are both |Length| objects.
"""
if width is None and height is None:
return self.width, self.height
if width is None:
assert height is not None
scaling_factor = float(height) / float(self.height)
width = round(self.width * scaling_factor)
if height is None:
scaling_factor = float(width) / float(self.width)
height = round(self.height * scaling_factor)
return Emu(width), Emu(height)
@lazyproperty
def sha1(self):
"""SHA1 hash digest of the image blob."""
return hashlib.sha1(self._blob).hexdigest()
@classmethod
def _from_stream(
cls,
stream: IO[bytes],
blob: bytes,
filename: str | None = None,
) -> Image:
"""Return an instance of the |Image| subclass corresponding to the format of the
image in `stream`."""
image_header = _ImageHeaderFactory(stream)
if filename is None:
filename = "image.%s" % image_header.default_ext
return cls(blob, filename, image_header)
def _ImageHeaderFactory(stream: IO[bytes]):
"""A |BaseImageHeader| subclass instance that can parse headers of image in `stream`."""
from docx.image import SIGNATURES
def read_32(stream: IO[bytes]):
stream.seek(0)
return stream.read(32)
header = read_32(stream)
for cls, offset, signature_bytes in SIGNATURES:
end = offset + len(signature_bytes)
found_bytes = header[offset:end]
if found_bytes == signature_bytes:
return cls.from_stream(stream)
raise UnrecognizedImageError
class BaseImageHeader:
"""Base class for image header subclasses like |Jpeg| and |Tiff|."""
def __init__(self, px_width: int, px_height: int, horz_dpi: int, vert_dpi: int):
self._px_width = px_width
self._px_height = px_height
self._horz_dpi = horz_dpi
self._vert_dpi = vert_dpi
@property
def content_type(self) -> str:
"""Abstract property definition, must be implemented by all subclasses."""
msg = "content_type property must be implemented by all subclasses of BaseImageHeader"
raise NotImplementedError(msg)
@property
def default_ext(self) -> str:
"""Default filename extension for images of this type.
An abstract property definition, must be implemented by all subclasses.
"""
raise NotImplementedError(
"default_ext property must be implemented by all subclasses of BaseImageHeader"
)
@property
def px_width(self):
"""The horizontal pixel dimension of the image."""
return self._px_width
@property
def px_height(self):
"""The vertical pixel dimension of the image."""
return self._px_height
@property
def horz_dpi(self):
"""Integer dots per inch for the width of this image.
Defaults to 72 when not present in the file, as is often the case.
"""
return self._horz_dpi
@property
def vert_dpi(self):
"""Integer dots per inch for the height of this image.
Defaults to 72 when not present in the file, as is often the case.
"""
return self._vert_dpi

View File

@@ -0,0 +1,425 @@
"""Objects related to parsing headers of JPEG image streams.
Includes both JFIF and Exif sub-formats.
"""
import io
from docx.image.constants import JPEG_MARKER_CODE, MIME_TYPE
from docx.image.helpers import BIG_ENDIAN, StreamReader
from docx.image.image import BaseImageHeader
from docx.image.tiff import Tiff
class Jpeg(BaseImageHeader):
"""Base class for JFIF and EXIF subclasses."""
@property
def content_type(self):
"""MIME content type for this image, unconditionally `image/jpeg` for JPEG
images."""
return MIME_TYPE.JPEG
@property
def default_ext(self):
"""Default filename extension, always 'jpg' for JPG images."""
return "jpg"
class Exif(Jpeg):
"""Image header parser for Exif image format."""
@classmethod
def from_stream(cls, stream):
"""Return |Exif| instance having header properties parsed from Exif image in
`stream`."""
markers = _JfifMarkers.from_stream(stream)
# print('\n%s' % markers)
px_width = markers.sof.px_width
px_height = markers.sof.px_height
horz_dpi = markers.app1.horz_dpi
vert_dpi = markers.app1.vert_dpi
return cls(px_width, px_height, horz_dpi, vert_dpi)
class Jfif(Jpeg):
"""Image header parser for JFIF image format."""
@classmethod
def from_stream(cls, stream):
"""Return a |Jfif| instance having header properties parsed from image in
`stream`."""
markers = _JfifMarkers.from_stream(stream)
px_width = markers.sof.px_width
px_height = markers.sof.px_height
horz_dpi = markers.app0.horz_dpi
vert_dpi = markers.app0.vert_dpi
return cls(px_width, px_height, horz_dpi, vert_dpi)
class _JfifMarkers:
"""Sequence of markers in a JPEG file, perhaps truncated at first SOS marker for
performance reasons."""
def __init__(self, markers):
super(_JfifMarkers, self).__init__()
self._markers = list(markers)
def __str__(self): # pragma: no cover
"""Returns a tabular listing of the markers in this instance, which can be handy
for debugging and perhaps other uses."""
header = " offset seglen mc name\n======= ====== == ====="
tmpl = "%7d %6d %02X %s"
rows = []
for marker in self._markers:
rows.append(
tmpl
% (
marker.offset,
marker.segment_length,
ord(marker.marker_code),
marker.name,
)
)
lines = [header] + rows
return "\n".join(lines)
@classmethod
def from_stream(cls, stream):
"""Return a |_JfifMarkers| instance containing a |_JfifMarker| subclass instance
for each marker in `stream`."""
marker_parser = _MarkerParser.from_stream(stream)
markers = []
for marker in marker_parser.iter_markers():
markers.append(marker)
if marker.marker_code == JPEG_MARKER_CODE.SOS:
break
return cls(markers)
@property
def app0(self):
"""First APP0 marker in image markers."""
for m in self._markers:
if m.marker_code == JPEG_MARKER_CODE.APP0:
return m
raise KeyError("no APP0 marker in image")
@property
def app1(self):
"""First APP1 marker in image markers."""
for m in self._markers:
if m.marker_code == JPEG_MARKER_CODE.APP1:
return m
raise KeyError("no APP1 marker in image")
@property
def sof(self):
"""First start of frame (SOFn) marker in this sequence."""
for m in self._markers:
if m.marker_code in JPEG_MARKER_CODE.SOF_MARKER_CODES:
return m
raise KeyError("no start of frame (SOFn) marker in image")
class _MarkerParser:
"""Service class that knows how to parse a JFIF stream and iterate over its
markers."""
def __init__(self, stream_reader):
super(_MarkerParser, self).__init__()
self._stream = stream_reader
@classmethod
def from_stream(cls, stream):
"""Return a |_MarkerParser| instance to parse JFIF markers from `stream`."""
stream_reader = StreamReader(stream, BIG_ENDIAN)
return cls(stream_reader)
def iter_markers(self):
"""Generate a (marker_code, segment_offset) 2-tuple for each marker in the JPEG
`stream`, in the order they occur in the stream."""
marker_finder = _MarkerFinder.from_stream(self._stream)
start = 0
marker_code = None
while marker_code != JPEG_MARKER_CODE.EOI:
marker_code, segment_offset = marker_finder.next(start)
marker = _MarkerFactory(marker_code, self._stream, segment_offset)
yield marker
start = segment_offset + marker.segment_length
class _MarkerFinder:
"""Service class that knows how to find the next JFIF marker in a stream."""
def __init__(self, stream):
super(_MarkerFinder, self).__init__()
self._stream = stream
@classmethod
def from_stream(cls, stream):
"""Return a |_MarkerFinder| instance to find JFIF markers in `stream`."""
return cls(stream)
def next(self, start):
"""Return a (marker_code, segment_offset) 2-tuple identifying and locating the
first marker in `stream` occuring after offset `start`.
The returned `segment_offset` points to the position immediately following the
2-byte marker code, the start of the marker segment, for those markers that have
a segment.
"""
position = start
while True:
# skip over any non-\xFF bytes
position = self._offset_of_next_ff_byte(start=position)
# skip over any \xFF padding bytes
position, byte_ = self._next_non_ff_byte(start=position + 1)
# 'FF 00' sequence is not a marker, start over if found
if byte_ == b"\x00":
continue
# this is a marker, gather return values and break out of scan
marker_code, segment_offset = byte_, position + 1
break
return marker_code, segment_offset
def _next_non_ff_byte(self, start):
"""Return an offset, byte 2-tuple for the next byte in `stream` that is not
'\xff', starting with the byte at offset `start`.
If the byte at offset `start` is not '\xff', `start` and the returned `offset`
will be the same.
"""
self._stream.seek(start)
byte_ = self._read_byte()
while byte_ == b"\xff":
byte_ = self._read_byte()
offset_of_non_ff_byte = self._stream.tell() - 1
return offset_of_non_ff_byte, byte_
def _offset_of_next_ff_byte(self, start):
"""Return the offset of the next '\xff' byte in `stream` starting with the byte
at offset `start`.
Returns `start` if the byte at that offset is a hex 255; it does not necessarily
advance in the stream.
"""
self._stream.seek(start)
byte_ = self._read_byte()
while byte_ != b"\xff":
byte_ = self._read_byte()
offset_of_ff_byte = self._stream.tell() - 1
return offset_of_ff_byte
def _read_byte(self):
"""Return the next byte read from stream.
Raise Exception if stream is at end of file.
"""
byte_ = self._stream.read(1)
if not byte_: # pragma: no cover
raise Exception("unexpected end of file")
return byte_
def _MarkerFactory(marker_code, stream, offset):
"""Return |_Marker| or subclass instance appropriate for marker at `offset` in
`stream` having `marker_code`."""
if marker_code == JPEG_MARKER_CODE.APP0:
marker_cls = _App0Marker
elif marker_code == JPEG_MARKER_CODE.APP1:
marker_cls = _App1Marker
elif marker_code in JPEG_MARKER_CODE.SOF_MARKER_CODES:
marker_cls = _SofMarker
else:
marker_cls = _Marker
return marker_cls.from_stream(stream, marker_code, offset)
class _Marker:
"""Base class for JFIF marker classes.
Represents a marker and its segment occuring in a JPEG byte stream.
"""
def __init__(self, marker_code, offset, segment_length):
super(_Marker, self).__init__()
self._marker_code = marker_code
self._offset = offset
self._segment_length = segment_length
@classmethod
def from_stream(cls, stream, marker_code, offset):
"""Return a generic |_Marker| instance for the marker at `offset` in `stream`
having `marker_code`."""
if JPEG_MARKER_CODE.is_standalone(marker_code):
segment_length = 0
else:
segment_length = stream.read_short(offset)
return cls(marker_code, offset, segment_length)
@property
def marker_code(self):
"""The single-byte code that identifies the type of this marker, e.g. ``'\xe0'``
for start of image (SOI)."""
return self._marker_code
@property
def name(self): # pragma: no cover
return JPEG_MARKER_CODE.marker_names[self._marker_code]
@property
def offset(self): # pragma: no cover
return self._offset
@property
def segment_length(self):
"""The length in bytes of this marker's segment."""
return self._segment_length
class _App0Marker(_Marker):
"""Represents a JFIF APP0 marker segment."""
def __init__(self, marker_code, offset, length, density_units, x_density, y_density):
super(_App0Marker, self).__init__(marker_code, offset, length)
self._density_units = density_units
self._x_density = x_density
self._y_density = y_density
@property
def horz_dpi(self):
"""Horizontal dots per inch specified in this marker, defaults to 72 if not
specified."""
return self._dpi(self._x_density)
@property
def vert_dpi(self):
"""Vertical dots per inch specified in this marker, defaults to 72 if not
specified."""
return self._dpi(self._y_density)
def _dpi(self, density):
"""Return dots per inch corresponding to `density` value."""
if self._density_units == 1:
dpi = density
elif self._density_units == 2:
dpi = int(round(density * 2.54))
else:
dpi = 72
return dpi
@classmethod
def from_stream(cls, stream, marker_code, offset):
"""Return an |_App0Marker| instance for the APP0 marker at `offset` in
`stream`."""
# field off type notes
# ------------------ --- ----- -------------------
# segment length 0 short
# JFIF identifier 2 5 chr 'JFIF\x00'
# major JPEG version 7 byte typically 1
# minor JPEG version 8 byte typically 1 or 2
# density units 9 byte 1=inches, 2=cm
# horz dots per unit 10 short
# vert dots per unit 12 short
# ------------------ --- ----- -------------------
segment_length = stream.read_short(offset)
density_units = stream.read_byte(offset, 9)
x_density = stream.read_short(offset, 10)
y_density = stream.read_short(offset, 12)
return cls(marker_code, offset, segment_length, density_units, x_density, y_density)
class _App1Marker(_Marker):
"""Represents a JFIF APP1 (Exif) marker segment."""
def __init__(self, marker_code, offset, length, horz_dpi, vert_dpi):
super(_App1Marker, self).__init__(marker_code, offset, length)
self._horz_dpi = horz_dpi
self._vert_dpi = vert_dpi
@classmethod
def from_stream(cls, stream, marker_code, offset):
"""Extract the horizontal and vertical dots-per-inch value from the APP1 header
at `offset` in `stream`."""
# field off len type notes
# -------------------- --- --- ----- ----------------------------
# segment length 0 2 short
# Exif identifier 2 6 6 chr 'Exif\x00\x00'
# TIFF byte order 8 2 2 chr 'II'=little 'MM'=big endian
# meaning of universe 10 2 2 chr '*\x00' or '\x00*' depending
# IFD0 off fr/II or MM 10 16 long relative to ...?
# -------------------- --- --- ----- ----------------------------
segment_length = stream.read_short(offset)
if cls._is_non_Exif_APP1_segment(stream, offset):
return cls(marker_code, offset, segment_length, 72, 72)
tiff = cls._tiff_from_exif_segment(stream, offset, segment_length)
return cls(marker_code, offset, segment_length, tiff.horz_dpi, tiff.vert_dpi)
@property
def horz_dpi(self):
"""Horizontal dots per inch specified in this marker, defaults to 72 if not
specified."""
return self._horz_dpi
@property
def vert_dpi(self):
"""Vertical dots per inch specified in this marker, defaults to 72 if not
specified."""
return self._vert_dpi
@classmethod
def _is_non_Exif_APP1_segment(cls, stream, offset):
"""Return True if the APP1 segment at `offset` in `stream` is NOT an Exif
segment, as determined by the ``'Exif\x00\x00'`` signature at offset 2 in the
segment."""
stream.seek(offset + 2)
exif_signature = stream.read(6)
return exif_signature != b"Exif\x00\x00"
@classmethod
def _tiff_from_exif_segment(cls, stream, offset, segment_length):
"""Return a |Tiff| instance parsed from the Exif APP1 segment of
`segment_length` at `offset` in `stream`."""
# wrap full segment in its own stream and feed to Tiff()
stream.seek(offset + 8)
segment_bytes = stream.read(segment_length - 8)
substream = io.BytesIO(segment_bytes)
return Tiff.from_stream(substream)
class _SofMarker(_Marker):
"""Represents a JFIF start of frame (SOFx) marker segment."""
def __init__(self, marker_code, offset, segment_length, px_width, px_height):
super(_SofMarker, self).__init__(marker_code, offset, segment_length)
self._px_width = px_width
self._px_height = px_height
@classmethod
def from_stream(cls, stream, marker_code, offset):
"""Return an |_SofMarker| instance for the SOFn marker at `offset` in stream."""
# field off type notes
# ------------------ --- ----- ----------------------------
# segment length 0 short
# Data precision 2 byte
# Vertical lines 3 short px_height
# Horizontal lines 5 short px_width
# ------------------ --- ----- ----------------------------
segment_length = stream.read_short(offset)
px_height = stream.read_short(offset, 3)
px_width = stream.read_short(offset, 5)
return cls(marker_code, offset, segment_length, px_width, px_height)
@property
def px_height(self):
"""Image height in pixels."""
return self._px_height
@property
def px_width(self):
"""Image width in pixels."""
return self._px_width

View File

@@ -0,0 +1,253 @@
from .constants import MIME_TYPE, PNG_CHUNK_TYPE
from .exceptions import InvalidImageStreamError
from .helpers import BIG_ENDIAN, StreamReader
from .image import BaseImageHeader
class Png(BaseImageHeader):
"""Image header parser for PNG images."""
@property
def content_type(self):
"""MIME content type for this image, unconditionally `image/png` for PNG
images."""
return MIME_TYPE.PNG
@property
def default_ext(self):
"""Default filename extension, always 'png' for PNG images."""
return "png"
@classmethod
def from_stream(cls, stream):
"""Return a |Png| instance having header properties parsed from image in
`stream`."""
parser = _PngParser.parse(stream)
px_width = parser.px_width
px_height = parser.px_height
horz_dpi = parser.horz_dpi
vert_dpi = parser.vert_dpi
return cls(px_width, px_height, horz_dpi, vert_dpi)
class _PngParser:
"""Parses a PNG image stream to extract the image properties found in its chunks."""
def __init__(self, chunks):
super(_PngParser, self).__init__()
self._chunks = chunks
@classmethod
def parse(cls, stream):
"""Return a |_PngParser| instance containing the header properties parsed from
the PNG image in `stream`."""
chunks = _Chunks.from_stream(stream)
return cls(chunks)
@property
def px_width(self):
"""The number of pixels in each row of the image."""
IHDR = self._chunks.IHDR
return IHDR.px_width
@property
def px_height(self):
"""The number of stacked rows of pixels in the image."""
IHDR = self._chunks.IHDR
return IHDR.px_height
@property
def horz_dpi(self):
"""Integer dots per inch for the width of this image.
Defaults to 72 when not present in the file, as is often the case.
"""
pHYs = self._chunks.pHYs
if pHYs is None:
return 72
return self._dpi(pHYs.units_specifier, pHYs.horz_px_per_unit)
@property
def vert_dpi(self):
"""Integer dots per inch for the height of this image.
Defaults to 72 when not present in the file, as is often the case.
"""
pHYs = self._chunks.pHYs
if pHYs is None:
return 72
return self._dpi(pHYs.units_specifier, pHYs.vert_px_per_unit)
@staticmethod
def _dpi(units_specifier, px_per_unit):
"""Return dots per inch value calculated from `units_specifier` and
`px_per_unit`."""
if units_specifier == 1 and px_per_unit:
return int(round(px_per_unit * 0.0254))
return 72
class _Chunks:
"""Collection of the chunks parsed from a PNG image stream."""
def __init__(self, chunk_iterable):
super(_Chunks, self).__init__()
self._chunks = list(chunk_iterable)
@classmethod
def from_stream(cls, stream):
"""Return a |_Chunks| instance containing the PNG chunks in `stream`."""
chunk_parser = _ChunkParser.from_stream(stream)
chunks = list(chunk_parser.iter_chunks())
return cls(chunks)
@property
def IHDR(self):
"""IHDR chunk in PNG image."""
match = lambda chunk: chunk.type_name == PNG_CHUNK_TYPE.IHDR # noqa
IHDR = self._find_first(match)
if IHDR is None:
raise InvalidImageStreamError("no IHDR chunk in PNG image")
return IHDR
@property
def pHYs(self):
"""PHYs chunk in PNG image, or |None| if not present."""
match = lambda chunk: chunk.type_name == PNG_CHUNK_TYPE.pHYs # noqa
return self._find_first(match)
def _find_first(self, match):
"""Return first chunk in stream order returning True for function `match`."""
for chunk in self._chunks:
if match(chunk):
return chunk
return None
class _ChunkParser:
"""Extracts chunks from a PNG image stream."""
def __init__(self, stream_rdr):
super(_ChunkParser, self).__init__()
self._stream_rdr = stream_rdr
@classmethod
def from_stream(cls, stream):
"""Return a |_ChunkParser| instance that can extract the chunks from the PNG
image in `stream`."""
stream_rdr = StreamReader(stream, BIG_ENDIAN)
return cls(stream_rdr)
def iter_chunks(self):
"""Generate a |_Chunk| subclass instance for each chunk in this parser's PNG
stream, in the order encountered in the stream."""
for chunk_type, offset in self._iter_chunk_offsets():
chunk = _ChunkFactory(chunk_type, self._stream_rdr, offset)
yield chunk
def _iter_chunk_offsets(self):
"""Generate a (chunk_type, chunk_offset) 2-tuple for each of the chunks in the
PNG image stream.
Iteration stops after the IEND chunk is returned.
"""
chunk_offset = 8
while True:
chunk_data_len = self._stream_rdr.read_long(chunk_offset)
chunk_type = self._stream_rdr.read_str(4, chunk_offset, 4)
data_offset = chunk_offset + 8
yield chunk_type, data_offset
if chunk_type == "IEND":
break
# incr offset for chunk len long, chunk type, chunk data, and CRC
chunk_offset += 4 + 4 + chunk_data_len + 4
def _ChunkFactory(chunk_type, stream_rdr, offset):
"""Return a |_Chunk| subclass instance appropriate to `chunk_type` parsed from
`stream_rdr` at `offset`."""
chunk_cls_map = {
PNG_CHUNK_TYPE.IHDR: _IHDRChunk,
PNG_CHUNK_TYPE.pHYs: _pHYsChunk,
}
chunk_cls = chunk_cls_map.get(chunk_type, _Chunk)
return chunk_cls.from_offset(chunk_type, stream_rdr, offset)
class _Chunk:
"""Base class for specific chunk types.
Also serves as the default chunk type.
"""
def __init__(self, chunk_type):
super(_Chunk, self).__init__()
self._chunk_type = chunk_type
@classmethod
def from_offset(cls, chunk_type, stream_rdr, offset):
"""Return a default _Chunk instance that only knows its chunk type."""
return cls(chunk_type)
@property
def type_name(self):
"""The chunk type name, e.g. 'IHDR', 'pHYs', etc."""
return self._chunk_type
class _IHDRChunk(_Chunk):
"""IHDR chunk, contains the image dimensions."""
def __init__(self, chunk_type, px_width, px_height):
super(_IHDRChunk, self).__init__(chunk_type)
self._px_width = px_width
self._px_height = px_height
@classmethod
def from_offset(cls, chunk_type, stream_rdr, offset):
"""Return an _IHDRChunk instance containing the image dimensions extracted from
the IHDR chunk in `stream` at `offset`."""
px_width = stream_rdr.read_long(offset)
px_height = stream_rdr.read_long(offset, 4)
return cls(chunk_type, px_width, px_height)
@property
def px_width(self):
return self._px_width
@property
def px_height(self):
return self._px_height
class _pHYsChunk(_Chunk):
"""PYHs chunk, contains the image dpi information."""
def __init__(self, chunk_type, horz_px_per_unit, vert_px_per_unit, units_specifier):
super(_pHYsChunk, self).__init__(chunk_type)
self._horz_px_per_unit = horz_px_per_unit
self._vert_px_per_unit = vert_px_per_unit
self._units_specifier = units_specifier
@classmethod
def from_offset(cls, chunk_type, stream_rdr, offset):
"""Return a _pHYsChunk instance containing the image resolution extracted from
the pHYs chunk in `stream` at `offset`."""
horz_px_per_unit = stream_rdr.read_long(offset)
vert_px_per_unit = stream_rdr.read_long(offset, 4)
units_specifier = stream_rdr.read_byte(offset, 8)
return cls(chunk_type, horz_px_per_unit, vert_px_per_unit, units_specifier)
@property
def horz_px_per_unit(self):
return self._horz_px_per_unit
@property
def vert_px_per_unit(self):
return self._vert_px_per_unit
@property
def units_specifier(self):
return self._units_specifier

View File

@@ -0,0 +1,289 @@
from .constants import MIME_TYPE, TIFF_FLD, TIFF_TAG
from .helpers import BIG_ENDIAN, LITTLE_ENDIAN, StreamReader
from .image import BaseImageHeader
class Tiff(BaseImageHeader):
"""Image header parser for TIFF images.
Handles both big and little endian byte ordering.
"""
@property
def content_type(self):
"""Return the MIME type of this TIFF image, unconditionally the string
``image/tiff``."""
return MIME_TYPE.TIFF
@property
def default_ext(self):
"""Default filename extension, always 'tiff' for TIFF images."""
return "tiff"
@classmethod
def from_stream(cls, stream):
"""Return a |Tiff| instance containing the properties of the TIFF image in
`stream`."""
parser = _TiffParser.parse(stream)
px_width = parser.px_width
px_height = parser.px_height
horz_dpi = parser.horz_dpi
vert_dpi = parser.vert_dpi
return cls(px_width, px_height, horz_dpi, vert_dpi)
class _TiffParser:
"""Parses a TIFF image stream to extract the image properties found in its main
image file directory (IFD)"""
def __init__(self, ifd_entries):
super(_TiffParser, self).__init__()
self._ifd_entries = ifd_entries
@classmethod
def parse(cls, stream):
"""Return an instance of |_TiffParser| containing the properties parsed from the
TIFF image in `stream`."""
stream_rdr = cls._make_stream_reader(stream)
ifd0_offset = stream_rdr.read_long(4)
ifd_entries = _IfdEntries.from_stream(stream_rdr, ifd0_offset)
return cls(ifd_entries)
@property
def horz_dpi(self):
"""The horizontal dots per inch value calculated from the XResolution and
ResolutionUnit tags of the IFD; defaults to 72 if those tags are not present."""
return self._dpi(TIFF_TAG.X_RESOLUTION)
@property
def vert_dpi(self):
"""The vertical dots per inch value calculated from the XResolution and
ResolutionUnit tags of the IFD; defaults to 72 if those tags are not present."""
return self._dpi(TIFF_TAG.Y_RESOLUTION)
@property
def px_height(self):
"""The number of stacked rows of pixels in the image, |None| if the IFD contains
no ``ImageLength`` tag, the expected case when the TIFF is embeded in an Exif
image."""
return self._ifd_entries.get(TIFF_TAG.IMAGE_LENGTH)
@property
def px_width(self):
"""The number of pixels in each row in the image, |None| if the IFD contains no
``ImageWidth`` tag, the expected case when the TIFF is embeded in an Exif
image."""
return self._ifd_entries.get(TIFF_TAG.IMAGE_WIDTH)
@classmethod
def _detect_endian(cls, stream):
"""Return either BIG_ENDIAN or LITTLE_ENDIAN depending on the endian indicator
found in the TIFF `stream` header, either 'MM' or 'II'."""
stream.seek(0)
endian_str = stream.read(2)
return BIG_ENDIAN if endian_str == b"MM" else LITTLE_ENDIAN
def _dpi(self, resolution_tag):
"""Return the dpi value calculated for `resolution_tag`, which can be either
TIFF_TAG.X_RESOLUTION or TIFF_TAG.Y_RESOLUTION.
The calculation is based on the values of both that tag and the
TIFF_TAG.RESOLUTION_UNIT tag in this parser's |_IfdEntries| instance.
"""
ifd_entries = self._ifd_entries
if resolution_tag not in ifd_entries:
return 72
# resolution unit defaults to inches (2)
resolution_unit = ifd_entries.get(TIFF_TAG.RESOLUTION_UNIT, 2)
if resolution_unit == 1: # aspect ratio only
return 72
# resolution_unit == 2 for inches, 3 for centimeters
units_per_inch = 1 if resolution_unit == 2 else 2.54
dots_per_unit = ifd_entries[resolution_tag]
return int(round(dots_per_unit * units_per_inch))
@classmethod
def _make_stream_reader(cls, stream):
"""Return a |StreamReader| instance with wrapping `stream` and having "endian-
ness" determined by the 'MM' or 'II' indicator in the TIFF stream header."""
endian = cls._detect_endian(stream)
return StreamReader(stream, endian)
class _IfdEntries:
"""Image File Directory for a TIFF image, having mapping (dict) semantics allowing
"tag" values to be retrieved by tag code."""
def __init__(self, entries):
super(_IfdEntries, self).__init__()
self._entries = entries
def __contains__(self, key):
"""Provides ``in`` operator, e.g. ``tag in ifd_entries``"""
return self._entries.__contains__(key)
def __getitem__(self, key):
"""Provides indexed access, e.g. ``tag_value = ifd_entries[tag_code]``"""
return self._entries.__getitem__(key)
@classmethod
def from_stream(cls, stream, offset):
"""Return a new |_IfdEntries| instance parsed from `stream` starting at
`offset`."""
ifd_parser = _IfdParser(stream, offset)
entries = {e.tag: e.value for e in ifd_parser.iter_entries()}
return cls(entries)
def get(self, tag_code, default=None):
"""Return value of IFD entry having tag matching `tag_code`, or `default` if no
matching tag found."""
return self._entries.get(tag_code, default)
class _IfdParser:
"""Service object that knows how to extract directory entries from an Image File
Directory (IFD)"""
def __init__(self, stream_rdr, offset):
super(_IfdParser, self).__init__()
self._stream_rdr = stream_rdr
self._offset = offset
def iter_entries(self):
"""Generate an |_IfdEntry| instance corresponding to each entry in the
directory."""
for idx in range(self._entry_count):
dir_entry_offset = self._offset + 2 + (idx * 12)
ifd_entry = _IfdEntryFactory(self._stream_rdr, dir_entry_offset)
yield ifd_entry
@property
def _entry_count(self):
"""The count of directory entries, read from the top of the IFD header."""
return self._stream_rdr.read_short(self._offset)
def _IfdEntryFactory(stream_rdr, offset):
"""Return an |_IfdEntry| subclass instance containing the value of the directory
entry at `offset` in `stream_rdr`."""
ifd_entry_classes = {
TIFF_FLD.ASCII: _AsciiIfdEntry,
TIFF_FLD.SHORT: _ShortIfdEntry,
TIFF_FLD.LONG: _LongIfdEntry,
TIFF_FLD.RATIONAL: _RationalIfdEntry,
}
field_type = stream_rdr.read_short(offset, 2)
EntryCls = ifd_entry_classes.get(field_type, _IfdEntry)
return EntryCls.from_stream(stream_rdr, offset)
class _IfdEntry:
"""Base class for IFD entry classes.
Subclasses are differentiated by value type, e.g. ASCII, long int, etc.
"""
def __init__(self, tag_code, value):
super(_IfdEntry, self).__init__()
self._tag_code = tag_code
self._value = value
@classmethod
def from_stream(cls, stream_rdr, offset):
"""Return an |_IfdEntry| subclass instance containing the tag and value of the
tag parsed from `stream_rdr` at `offset`.
Note this method is common to all subclasses. Override the ``_parse_value()``
method to provide distinctive behavior based on field type.
"""
tag_code = stream_rdr.read_short(offset, 0)
value_count = stream_rdr.read_long(offset, 4)
value_offset = stream_rdr.read_long(offset, 8)
value = cls._parse_value(stream_rdr, offset, value_count, value_offset)
return cls(tag_code, value)
@classmethod
def _parse_value(cls, stream_rdr, offset, value_count, value_offset):
"""Return the value of this field parsed from `stream_rdr` at `offset`.
Intended to be overridden by subclasses.
"""
return "UNIMPLEMENTED FIELD TYPE" # pragma: no cover
@property
def tag(self):
"""Short int code that identifies this IFD entry."""
return self._tag_code
@property
def value(self):
"""Value of this tag, its type being dependent on the tag."""
return self._value
class _AsciiIfdEntry(_IfdEntry):
"""IFD entry having the form of a NULL-terminated ASCII string."""
@classmethod
def _parse_value(cls, stream_rdr, offset, value_count, value_offset):
"""Return the ASCII string parsed from `stream_rdr` at `value_offset`.
The length of the string, including a terminating '\x00' (NUL) character, is in
`value_count`.
"""
return stream_rdr.read_str(value_count - 1, value_offset)
class _ShortIfdEntry(_IfdEntry):
"""IFD entry expressed as a short (2-byte) integer."""
@classmethod
def _parse_value(cls, stream_rdr, offset, value_count, value_offset):
"""Return the short int value contained in the `value_offset` field of this
entry.
Only supports single values at present.
"""
if value_count == 1:
return stream_rdr.read_short(offset, 8)
else: # pragma: no cover
return "Multi-value short integer NOT IMPLEMENTED"
class _LongIfdEntry(_IfdEntry):
"""IFD entry expressed as a long (4-byte) integer."""
@classmethod
def _parse_value(cls, stream_rdr, offset, value_count, value_offset):
"""Return the long int value contained in the `value_offset` field of this
entry.
Only supports single values at present.
"""
if value_count == 1:
return stream_rdr.read_long(offset, 8)
else: # pragma: no cover
return "Multi-value long integer NOT IMPLEMENTED"
class _RationalIfdEntry(_IfdEntry):
"""IFD entry expressed as a numerator, denominator pair."""
@classmethod
def _parse_value(cls, stream_rdr, offset, value_count, value_offset):
"""Return the rational (numerator / denominator) value at `value_offset` in
`stream_rdr` as a floating-point number.
Only supports single values at present.
"""
if value_count == 1:
numerator = stream_rdr.read_long(value_offset)
denominator = stream_rdr.read_long(value_offset, 4)
return numerator / denominator
else: # pragma: no cover
return "Multi-value Rational NOT IMPLEMENTED"