Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions nsv/ensv.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""ENSV -- metadata layer over NSV."""

from typing import List, Iterable

from .reader import Reader
from .writer import Writer
from .reader import Reader as NSVReader
from .writer import Writer as NSVWriter


def lift(seqseq: Iterable[Iterable[str]]) -> List[str]:
Expand All @@ -18,7 +20,7 @@ def lift(seqseq: Iterable[Iterable[str]]) -> List[str]:
if not first:
result.append('')
for cell in row:
result.append(Writer.escape(cell))
result.append(NSVWriter.escape(cell))
first = False
return result

Expand All @@ -37,9 +39,40 @@ def unlift(seq: Iterable[str]) -> List[List[str]]:
row = []
for element in seq:
if element != '':
row.append(Reader.unescape(element))
row.append(NSVReader.unescape(element))
else:
rows.append(row)
row = []
rows.append(row)
return rows


class Reader:
def __init__(self, file_obj):
self._inner = NSVReader(file_obj)
self.meta = []
for row in self._inner:
if not row:
break
self.meta.append(row)

def __iter__(self):
return self

def __next__(self):
return next(self._inner)


class Writer:
def __init__(self, file_obj):
self._inner = NSVWriter(file_obj)

def write_meta(self, meta):
self._inner.write_rows(meta)
self._inner.write_row([])

def write_row(self, row):
self._inner.write_row(row)

def write_rows(self, rows):
self._inner.write_rows(rows)
114 changes: 114 additions & 0 deletions tests/test_ensv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import io
import unittest

from nsv.ensv import Reader, Writer


class TestReader(unittest.TestCase):

def _make(self, text):
return Reader(io.StringIO(text))

def test_meta_and_data(self):
# meta: [columns: a b] [types: str int], separator, data: [Alice 100] [Bob 200]
r = self._make('columns:\na\nb\n\ntypes:\nstr\nint\n\n\nAlice\n100\n\nBob\n200\n\n')
self.assertEqual(r.meta, [['columns:', 'a', 'b'], ['types:', 'str', 'int']])
self.assertEqual(list(r), [['Alice', '100'], ['Bob', '200']])

def test_single_meta_row(self):
r = self._make('columns:\nx\n\n\nhello\n\n')
self.assertEqual(r.meta, [['columns:', 'x']])
self.assertEqual(list(r), [['hello']])

def test_no_data(self):
r = self._make('columns:\na\n\n\n')
self.assertEqual(r.meta, [['columns:', 'a']])
self.assertEqual(list(r), [])

def test_empty_meta(self):
# empty row right away = no meta forms
r = self._make('\nhello\n\n')
self.assertEqual(r.meta, [])
self.assertEqual(list(r), [['hello']])

def test_meta_preserved_for_later(self):
r = self._make('columns:\nname\n\ntypes:\nstr\n\n\nAlice\n\n')
meta = r.meta
_ = list(r)
self.assertEqual(meta, [['columns:', 'name'], ['types:', 'str']])
self.assertIs(r.meta, meta)

def test_streaming(self):
r = self._make('columns:\nx\n\n\n1\n\n2\n\n3\n\n')
self.assertEqual(r.meta, [['columns:', 'x']])
self.assertEqual(next(r), ['1'])
self.assertEqual(next(r), ['2'])
self.assertEqual(next(r), ['3'])

def test_no_separator_all_meta(self):
# no empty row separator = everything is meta
r = self._make('a\nb\n\nc\nd\n\n')
self.assertEqual(r.meta, [['a', 'b'], ['c', 'd']])
self.assertEqual(list(r), [])


class TestWriter(unittest.TestCase):

def _write(self, meta, data):
buf = io.StringIO()
w = Writer(buf)
w.write_meta(meta)
w.write_rows(data)
return buf.getvalue()

def test_meta_and_data(self):
text = self._write(
[['columns:', 'a', 'b']],
[['1', '2'], ['3', '4']],
)
r = Reader(io.StringIO(text))
self.assertEqual(r.meta, [['columns:', 'a', 'b']])
self.assertEqual(list(r), [['1', '2'], ['3', '4']])

def test_multiple_meta_rows(self):
text = self._write(
[['columns:', 'x'], ['types:', 'int']],
[['42']],
)
r = Reader(io.StringIO(text))
self.assertEqual(r.meta, [['columns:', 'x'], ['types:', 'int']])
self.assertEqual(list(r), [['42']])

def test_empty_data(self):
text = self._write([['columns:', 'a']], [])
r = Reader(io.StringIO(text))
self.assertEqual(r.meta, [['columns:', 'a']])
self.assertEqual(list(r), [])

def test_write_row_by_row(self):
buf = io.StringIO()
w = Writer(buf)
w.write_meta([['columns:', 'x']])
w.write_row(['a'])
w.write_row(['b'])
r = Reader(io.StringIO(buf.getvalue()))
self.assertEqual(r.meta, [['columns:', 'x']])
self.assertEqual(list(r), [['a'], ['b']])


class TestRoundTrip(unittest.TestCase):

def test_roundtrip(self):
meta = [['columns:', 'name', 'score'], ['types:', 'str', 'int']]
data = [['Alice', '100'], ['Bob', '200']]
buf = io.StringIO()
w = Writer(buf)
w.write_meta(meta)
w.write_rows(data)
r = Reader(io.StringIO(buf.getvalue()))
self.assertEqual(r.meta, meta)
self.assertEqual(list(r), data)


if __name__ == '__main__':
unittest.main()
Loading