From 2ee69544ded62696d14b2c5de03c63d4f79d26fe Mon Sep 17 00:00:00 2001 From: Robert Smallshire Date: Thu, 23 Oct 2014 14:16:17 +0200 Subject: [PATCH] Toolkit functions for writing headers. --- toolkit.py | 110 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 102 insertions(+), 8 deletions(-) diff --git a/toolkit.py b/toolkit.py index cb92e8e..9024194 100644 --- a/toolkit.py +++ b/toolkit.py @@ -11,14 +11,18 @@ from binary_reel_header_definition import HEADER_DEF from ibm_float import ibm2ieee, ieee2ibm from revisions import canonicalize_revision from trace_header_definition import TRACE_HEADER_DEF -from util import file_length, batched +from util import file_length, batched, pad, round_up from portability import EMPTY_BYTE_STRING -TEXTUAL_HEADER_NUM_BYTES = 3200 +CARD_LENGTH = 80 +CARDS_PER_HEADER = 40 + +TEXTUAL_HEADER_NUM_BYTES = CARD_LENGTH * CARDS_PER_HEADER BINARY_HEADER_NUM_BYTES = 400 REEL_HEADER_NUM_BYTES = TEXTUAL_HEADER_NUM_BYTES + BINARY_HEADER_NUM_BYTES TRACE_HEADER_NUM_BYTES = 240 +END_TEXT_STANZA = "((SEG: EndText))" def extract_revision(binary_reel_header): """Obtain the SEG Y revision from the reel header. @@ -131,7 +135,7 @@ def read_textual_reel_header(fh, encoding=None): if encoding is None: encoding = guess_encoding(raw_header) - lines = tuple(bytes(raw_line).decode(encoding) for raw_line in batched(raw_header, 80)) + lines = tuple(bytes(raw_line).decode(encoding) for raw_line in batched(raw_header, CARD_LENGTH)) return lines @@ -159,13 +163,13 @@ def has_end_text_stanza(ext_header): """Determine whether the header is the end text stanza. Args: - ext_header: A sequence of forty 80 character Unicode strings. + ext_header: A sequence of forty CARD_LENGTH character Unicode strings. Returns: True if the header is the SEG Y Revision 1 end text header, otherwise False. """ - return "((SEG: EndText))" in ext_header[0] + return END_TEXT_STANZA in ext_header[0] def read_extended_headers_until_end(fh, encoding): @@ -181,7 +185,7 @@ def read_extended_headers_until_end(fh, encoding): Typically 'cp037' for EBCDIC or 'ascii' for ASCII. Returns: - A list of tuples each containing forty 80-character Unicode strings. + A list of tuples each containing forty CARD_LENGTH character Unicode strings. """ extended_headers = [] while True: @@ -210,7 +214,7 @@ def read_extended_headers_counted(fh, num_expected, encoding): Typically 'cp037' for EBCDIC or 'ascii' for ASCII. Returns: - A list of tuples each containing forty 80-character Unicode strings. + A list of tuples each containing forty CARD_LENGTH -character Unicode strings. """ assert num_expected >= 0 extended_headers = [] @@ -472,13 +476,43 @@ def unpack_values(buf, count, item_size, fmt, endian='>'): # swapping ourselves. +def write_textual_reel_header(fh, lines, encoding): + """Write the SEG Y card image header, also known as the textual header + + Args: + fh: A file-like object open in binary mode positioned such that the + beginning of the textual header will be the next byte to read. + + lines: An iterable series of forty lines, each of which must be a + Unicode string of CARD_LENGTH characters. The first three characters + of each line are often "C 1" to "C40" (as required by the SEG Y + standard) although this is not enforced by this function, since + many widespread SEG Y readers and writers do not adhere to this + constraint. To produce a SEG Y compliant series of header lines + consider using the standard_textual_header() function. + + Any lines longer than CARD_LENGTH characters will be truncated without + warning. Any excess lines over CARDS_PER_HEADER will be discarded. Short + or omitted lines will be padded with spaces. + + encoding: Typically 'cp037' for EBCDIC or 'ascii' for ASCII. + """ + padded_lines = [line.encode(encoding).ljust(CARD_LENGTH, ' '.encode(encoding))[:CARD_LENGTH] + for line in pad(lines, padding='', size=CARDS_PER_HEADER)] + header = ''.join(padded_lines) + assert len(header) == 3200 + fh.write(header) + + def write_binary_reel_header(fh, binary_reel_header, endian='>'): """Write the binary_reel_header to the given file-like object. Args: fh: A file-like object open in binary mode for writing. - binary_reel_header: A dictionary of + binary_reel_header: A dictionary of values using a subset of the keys + in binary_reel_header_definition.HEADER_DEF associated with + compatible values. """ for key in HEADER_DEF: pos = HEADER_DEF[key]['pos'] @@ -487,6 +521,66 @@ def write_binary_reel_header(fh, binary_reel_header, endian='>'): write_binary_values(fh, [value], ctype, pos) +def page_buffer(padded_buffer, page_size): + return [padded_buffer[i:i + page_size] for i in + range(0, len(padded_buffer), page_size)] + + +def format_extended_textual_header(text, encoding, include_text_stop=False): + """Format an extended textual header into 3200 byte pages. + + Args: + text: A Unicode string to be written to the extended headers. + + encoding: Typically 'cp037' for EBCDIC or 'ascii' for ASCII. + + include_text_stop: If True, a text-stop header will be written. + + Returns: + A sequence of byte strings, each of which will be exactly 3200 bytes in length. + """ + buffer = text.encode(encoding) + padded_buffer = buffer.ljust(round_up(len(buffer), TEXTUAL_HEADER_NUM_BYTES), ' '.encode(encoding)) + pages = page_buffer(padded_buffer, TEXTUAL_HEADER_NUM_BYTES) + + if include_text_stop: + pages.append(text_stop_page(encoding)) + return pages + + +def write_extended_textual_headers(fh, pages): + """Write extended textual headers. + + Args: + fh: fh: A file-like object open in binary mode for writing. + + pages: A sequence of byte strings each of which is exactly + TEXTUAL_HEADER_NUM_BYTES in length. To produce such a + sequence of pages, consider calling the + format_extended_textual_header() function. + """ + if any(len(page) != TEXTUAL_HEADER_NUM_BYTES for page in pages): + raise ValueError("Page length must be {} bytes".format(TEXTUAL_HEADER_NUM_BYTES)) + for page in pages: + fh.write(page) + + +_text_stop_pages = {} + + +def text_stop_page(encoding): + """Produce a text-stop extended textual header page. + + Args: + encoding: Typically 'cp037' for EBCDIC or 'ascii' for ASCII. + """ + if encoding not in _text_stop_pages: + _text_stop_pages[encoding] = (END_TEXT_STANZA + '\r\n') \ + .encode(encoding) \ + .ljust(TEXTUAL_HEADER_NUM_BYTES, ' '.encode(encoding)) + return _text_stop_pages[encoding] + + def write_trace_header(fh, trace_header, trace_header_format, pos=None): """Write a TraceHeader to file.