Something like this is probably what we want. First some definitions, simplified here, should include everything (`mrd_types.py`):
```python=
from enum import Enum
from typing import Union
import ismrmrd
class MessageType(Enum):
HEADER = 3
CLOSE = 4
ACQUISITION = 1008
IMAGE = 1022
Message = Union[ismrmrd.xsd.ismrmrdHeader, ismrmrd.Acquisition, ismrmrd.Image]
```
Then a source:
```python=
import struct
from typing import BinaryIO, Iterable
from mrd_types import MessageType, Message
import ismrmrd
import ismrmrd.xsd
class IsmrmrdSource():
def __init__(self, input_reader: BinaryIO) -> None:
super().__init__()
self._input_reader = input_reader
def __iter__(self) -> Iterable[Message]:
while True:
message_type = MessageType(struct.unpack('H', self._input_reader.read(2))[0])
if message_type == MessageType.HEADER:
header_size = struct.unpack('I', self._input_reader.read(4))[0]
doc = self._input_reader.read(header_size)
yield ismrmrd.xsd.CreateFromDocument(doc)
elif message_type == MessageType.ACQUISITION:
yield ismrmrd.Acquisition.deserialize_from(self._input_reader.read)
elif message_type == MessageType.IMAGE:
yield ismrmrd.Image.deserialize_from(self._input_reader.read)
elif message_type == MessageType.CLOSE:
if len(self._input_reader.read(1)) != 0:
raise Exception("Unexpected data after close message")
return
else:
raise Exception(f"Unexpected message {message_type}")
```
And a sink:
```python=
class IsmrmrdSink():
def __init__(self, output_writer: BinaryIO) -> None:
super().__init__()
self._output_writer = output_writer
def _write_type(self, message_type: MessageType) -> None:
self._output_writer.write(struct.pack('H', message_type.value))
def write(self, seq: Iterable[Message]) -> None:
for message in seq:
if isinstance(message, ismrmrdHeader):
self._write_type(MessageType.HEADER)
xml_string = ismrmrd.xsd.ToXML(message, 'utf-8')
xml_bytes = bytes(xml_string, 'utf-8')
self._output_writer.write(struct.pack('I', len(xml_bytes)))
self._output_writer.write(xml_bytes)
elif isinstance(message, Acquisition):
self._write_type(MessageType.ACQUISITION)
message.serialize_into(self._output_writer.write)
elif isinstance(message, Image):
self._write_type(MessageType.IMAGE)
message.serialize_into(self._output_writer.write)
else:
raise Exception(f"Unexpected message type {type(message)}")
self._write_type(MessageType.CLOSE)
self._output_writer.flush()
return
```
This has not been tested, but should be the basic idea.