|
| 1 | +import threading |
| 2 | +from typing import List |
| 3 | +import base64 |
| 4 | + |
| 5 | +from .SnapshotValue import SnapshotValue |
| 6 | +from .ConvertToWindowsNewlines import ConvertToWindowsNewlines |
| 7 | +from .ParseException import ParseException |
| 8 | +from .SnapshotReader import SnapshotReader |
| 9 | +from .SnapshotValueReader import SnapshotValueReader |
| 10 | + |
| 11 | + |
| 12 | + |
| 13 | +class SnapshotFile: |
| 14 | + HEADER_PREFIX = "📷 " |
| 15 | + END_OF_FILE = "[end of file]" |
| 16 | + |
| 17 | + def __init__(self): |
| 18 | + self.unix_newlines = True |
| 19 | + self.metadata = None |
| 20 | + self._snapshots = {} |
| 21 | + self._lock = threading.Lock() |
| 22 | + self._was_set_at_test_time = False |
| 23 | + |
| 24 | + @property |
| 25 | + def snapshots(self): |
| 26 | + return self._snapshots |
| 27 | + |
| 28 | + @snapshots.setter |
| 29 | + def snapshots(self, value): |
| 30 | + with self._lock: |
| 31 | + self._snapshots = value |
| 32 | + |
| 33 | + @property |
| 34 | + def was_set_at_test_time(self): |
| 35 | + return self._was_set_at_test_time |
| 36 | + |
| 37 | + def set_at_test_time(self, key, snapshot): |
| 38 | + with self._lock: |
| 39 | + old_snapshots = self._snapshots.copy() |
| 40 | + self._snapshots[key] = snapshot |
| 41 | + self._was_set_at_test_time = True if self._snapshots != old_snapshots else self._was_set_at_test_time |
| 42 | + |
| 43 | + def serialize(self, value_writer_raw): |
| 44 | + value_writer = value_writer_raw if self.unix_newlines else ConvertToWindowsNewlines(value_writer_raw) |
| 45 | + if self.metadata: |
| 46 | + self.write_entry(value_writer, f"📷 {self.metadata[0]}", None, SnapshotValue.of(self.metadata[1])) |
| 47 | + for key, snapshot in self._snapshots.items(): |
| 48 | + self.write_entry(value_writer, key, None, snapshot.subject) |
| 49 | + for facet_key, facet_value in snapshot.facets.items(): |
| 50 | + self.write_entry(value_writer, key, facet_key, facet_value) |
| 51 | + self.write_entry(value_writer, "", "end of file", SnapshotValue.of("")) |
| 52 | + |
| 53 | + def write_entry(value_writer, key, facet, value): |
| 54 | + value_writer.write("╔═ ") |
| 55 | + value_writer.write(SnapshotValueReader.nameEsc.escape(key)) |
| 56 | + if facet is not None: |
| 57 | + value_writer.write("[") |
| 58 | + value_writer.write(SnapshotValueReader.nameEsc.escape(facet)) |
| 59 | + value_writer.write("]") |
| 60 | + value_writer.write(" ═╗") |
| 61 | + if value.is_binary: |
| 62 | + binary_length = len(value.value_binary()) |
| 63 | + value_writer.write(f" base64 length {binary_length} bytes") |
| 64 | + value_writer.write("\n") |
| 65 | + |
| 66 | + if key == "" and facet == "end of file": |
| 67 | + return |
| 68 | + |
| 69 | + if value.is_binary: |
| 70 | + # Base64 encoding and replacing \r with an empty string |
| 71 | + binary_data = value.value_binary() |
| 72 | + encoded = base64.b64encode(binary_data).decode('utf-8') |
| 73 | + # Assuming efficientReplace is a more efficient method for replacing characters |
| 74 | + # Here, we just use the regular replace method for simplicity |
| 75 | + escaped = encoded.replace("\r", "") |
| 76 | + value_writer.write(escaped) |
| 77 | + else: |
| 78 | + # For string values, applying specific escape logic and then replacing "\n╔" with a special sequence |
| 79 | + text_data = value.value_string() |
| 80 | + # Assuming body_escape function handles the escaping as in SnapshotValueReader.bodyEsc.escape |
| 81 | + escaped = body_escape(text_data).replace("\n╔", "\n\uDF41") |
| 82 | + value_writer.write(escaped) |
| 83 | + value_writer.write("\n") |
| 84 | + |
| 85 | + @staticmethod |
| 86 | + def parse(value_reader): |
| 87 | + try: |
| 88 | + result = SnapshotFile() |
| 89 | + result.unix_newlines = value_reader.unix_newlines |
| 90 | + reader = SnapshotReader(value_reader) |
| 91 | + |
| 92 | + # Check if the first value starts with 📷 |
| 93 | + if reader.peek_key() and reader.peek_key().startswith(SnapshotFile.HEADER_PREFIX): |
| 94 | + metadata_name = reader.peek_key()[len(SnapshotFile.HEADER_PREFIX):] |
| 95 | + metadata_value = reader.value_reader.next_value().value_string() |
| 96 | + # Assuming 'entry' function creates a dictionary entry in Python |
| 97 | + result.metadata = (metadata_name, metadata_value) |
| 98 | + |
| 99 | + while reader.peek_key() is not None: |
| 100 | + key = reader.peek_key() |
| 101 | + snapshot = reader.next_snapshot() |
| 102 | + # Update snapshots dictionary with new key-value pair |
| 103 | + result.snapshots.update({key: snapshot}) |
| 104 | + |
| 105 | + return result |
| 106 | + |
| 107 | + except ValueError as e: |
| 108 | + if isinstance(e, ParseException): |
| 109 | + raise e |
| 110 | + else: |
| 111 | + raise ParseException(value_reader.line_reader, e) from None |
| 112 | + |
| 113 | + |
| 114 | + @staticmethod |
| 115 | + def create_empty_with_unix_newlines(unix_newlines): |
| 116 | + result = SnapshotFile() |
| 117 | + result.unix_newlines = unix_newlines |
| 118 | + return result |
| 119 | + |
| 120 | + def remove_all_indices(self, indices: List[int]): |
| 121 | + if not indices: |
| 122 | + return |
| 123 | + self._was_set_at_test_time = True |
| 124 | + self.snapshots = self.snapshots.minus_sorted_indices(indices) |
0 commit comments