Toolkit functions for writing headers.

This commit is contained in:
Robert Smallshire
2014-10-23 14:16:17 +02:00
parent 6f3aba6a4d
commit 2ee69544de
+102 -8
View File
@@ -11,14 +11,18 @@ from binary_reel_header_definition import HEADER_DEF
from ibm_float import ibm2ieee, ieee2ibm
from revisions import canonicalize_revision
from trace_header_definition import TRACE_HEADER_DEF
from util import file_length, batched
from util import file_length, batched, pad, round_up
from portability import EMPTY_BYTE_STRING
TEXTUAL_HEADER_NUM_BYTES = 3200
CARD_LENGTH = 80
CARDS_PER_HEADER = 40
TEXTUAL_HEADER_NUM_BYTES = CARD_LENGTH * CARDS_PER_HEADER
BINARY_HEADER_NUM_BYTES = 400
REEL_HEADER_NUM_BYTES = TEXTUAL_HEADER_NUM_BYTES + BINARY_HEADER_NUM_BYTES
TRACE_HEADER_NUM_BYTES = 240
END_TEXT_STANZA = "((SEG: EndText))"
def extract_revision(binary_reel_header):
"""Obtain the SEG Y revision from the reel header.
@@ -131,7 +135,7 @@ def read_textual_reel_header(fh, encoding=None):
if encoding is None:
encoding = guess_encoding(raw_header)
lines = tuple(bytes(raw_line).decode(encoding) for raw_line in batched(raw_header, 80))
lines = tuple(bytes(raw_line).decode(encoding) for raw_line in batched(raw_header, CARD_LENGTH))
return lines
@@ -159,13 +163,13 @@ def has_end_text_stanza(ext_header):
"""Determine whether the header is the end text stanza.
Args:
ext_header: A sequence of forty 80 character Unicode strings.
ext_header: A sequence of forty CARD_LENGTH character Unicode strings.
Returns:
True if the header is the SEG Y Revision 1 end text header,
otherwise False.
"""
return "((SEG: EndText))" in ext_header[0]
return END_TEXT_STANZA in ext_header[0]
def read_extended_headers_until_end(fh, encoding):
@@ -181,7 +185,7 @@ def read_extended_headers_until_end(fh, encoding):
Typically 'cp037' for EBCDIC or 'ascii' for ASCII.
Returns:
A list of tuples each containing forty 80-character Unicode strings.
A list of tuples each containing forty CARD_LENGTH character Unicode strings.
"""
extended_headers = []
while True:
@@ -210,7 +214,7 @@ def read_extended_headers_counted(fh, num_expected, encoding):
Typically 'cp037' for EBCDIC or 'ascii' for ASCII.
Returns:
A list of tuples each containing forty 80-character Unicode strings.
A list of tuples each containing forty CARD_LENGTH -character Unicode strings.
"""
assert num_expected >= 0
extended_headers = []
@@ -472,13 +476,43 @@ def unpack_values(buf, count, item_size, fmt, endian='>'):
# swapping ourselves.
def write_textual_reel_header(fh, lines, encoding):
"""Write the SEG Y card image header, also known as the textual header
Args:
fh: A file-like object open in binary mode positioned such that the
beginning of the textual header will be the next byte to read.
lines: An iterable series of forty lines, each of which must be a
Unicode string of CARD_LENGTH characters. The first three characters
of each line are often "C 1" to "C40" (as required by the SEG Y
standard) although this is not enforced by this function, since
many widespread SEG Y readers and writers do not adhere to this
constraint. To produce a SEG Y compliant series of header lines
consider using the standard_textual_header() function.
Any lines longer than CARD_LENGTH characters will be truncated without
warning. Any excess lines over CARDS_PER_HEADER will be discarded. Short
or omitted lines will be padded with spaces.
encoding: Typically 'cp037' for EBCDIC or 'ascii' for ASCII.
"""
padded_lines = [line.encode(encoding).ljust(CARD_LENGTH, ' '.encode(encoding))[:CARD_LENGTH]
for line in pad(lines, padding='', size=CARDS_PER_HEADER)]
header = ''.join(padded_lines)
assert len(header) == 3200
fh.write(header)
def write_binary_reel_header(fh, binary_reel_header, endian='>'):
"""Write the binary_reel_header to the given file-like object.
Args:
fh: A file-like object open in binary mode for writing.
binary_reel_header: A dictionary of
binary_reel_header: A dictionary of values using a subset of the keys
in binary_reel_header_definition.HEADER_DEF associated with
compatible values.
"""
for key in HEADER_DEF:
pos = HEADER_DEF[key]['pos']
@@ -487,6 +521,66 @@ def write_binary_reel_header(fh, binary_reel_header, endian='>'):
write_binary_values(fh, [value], ctype, pos)
def page_buffer(padded_buffer, page_size):
return [padded_buffer[i:i + page_size] for i in
range(0, len(padded_buffer), page_size)]
def format_extended_textual_header(text, encoding, include_text_stop=False):
"""Format an extended textual header into 3200 byte pages.
Args:
text: A Unicode string to be written to the extended headers.
encoding: Typically 'cp037' for EBCDIC or 'ascii' for ASCII.
include_text_stop: If True, a text-stop header will be written.
Returns:
A sequence of byte strings, each of which will be exactly 3200 bytes in length.
"""
buffer = text.encode(encoding)
padded_buffer = buffer.ljust(round_up(len(buffer), TEXTUAL_HEADER_NUM_BYTES), ' '.encode(encoding))
pages = page_buffer(padded_buffer, TEXTUAL_HEADER_NUM_BYTES)
if include_text_stop:
pages.append(text_stop_page(encoding))
return pages
def write_extended_textual_headers(fh, pages):
"""Write extended textual headers.
Args:
fh: fh: A file-like object open in binary mode for writing.
pages: A sequence of byte strings each of which is exactly
TEXTUAL_HEADER_NUM_BYTES in length. To produce such a
sequence of pages, consider calling the
format_extended_textual_header() function.
"""
if any(len(page) != TEXTUAL_HEADER_NUM_BYTES for page in pages):
raise ValueError("Page length must be {} bytes".format(TEXTUAL_HEADER_NUM_BYTES))
for page in pages:
fh.write(page)
_text_stop_pages = {}
def text_stop_page(encoding):
"""Produce a text-stop extended textual header page.
Args:
encoding: Typically 'cp037' for EBCDIC or 'ascii' for ASCII.
"""
if encoding not in _text_stop_pages:
_text_stop_pages[encoding] = (END_TEXT_STANZA + '\r\n') \
.encode(encoding) \
.ljust(TEXTUAL_HEADER_NUM_BYTES, ' '.encode(encoding))
return _text_stop_pages[encoding]
def write_trace_header(fh, trace_header, trace_header_format, pos=None):
"""Write a TraceHeader to file.