diff --git a/tests/test_pysnooper.py b/tests/test_pysnooper.py index 78dbc80..3a80d52 100644 --- a/tests/test_pysnooper.py +++ b/tests/test_pysnooper.py @@ -8,6 +8,8 @@ import time import types import os import sys +import zipfile +import importlib from pysnooper.utils import truncate import pytest @@ -1914,3 +1916,147 @@ def test_exception_on_entry(): with pytest.raises(TypeError): f() + + +def test_valid_zipfile(): + with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder, \ + mini_toolbox.TempSysPathAdder(str(folder)): + module_name = 'my_valid_zip_module' + zip_name = 'valid.zip' + zip_base_path = 'ansible/modules' + python_file_path = folder / ('%s/%s/%s.py' % (zip_name, zip_base_path, module_name)) + os.makedirs(folder / ('%s/%s' % (zip_name, zip_base_path))) + sys.path.insert(0, str(folder / (zip_name))) + content = textwrap.dedent(u''' + import pysnooper + @pysnooper.snoop(color=False) + def f(x): + return x + ''') + with python_file_path.open('w') as python_file: + python_file.write(content) + + module = importlib.import_module('%s.%s' % ('.'.join(zip_base_path.split('/')), module_name)) + + with zipfile.ZipFile(folder / 'foo_bar.zip', 'w') as myZipFile: + myZipFile.write(folder.joinpath('%s/%s/%s.py' % (zip_name, zip_base_path, module_name)), '%s/%s.py' % (zip_base_path, module_name,), zipfile.ZIP_DEFLATED) + + python_file_path.unlink() + os.rename(folder.joinpath(zip_name), folder.joinpath('%s.delete' % (zip_name)),) + os.rename(folder / 'foo_bar.zip', folder.joinpath(zip_name),) + + with mini_toolbox.OutputCapturer(stdout=False, + stderr=True) as output_capturer: + result = getattr(module, 'f')(7) + assert result == 7 + output = output_capturer.output + + assert_output( + output, + ( + SourcePathEntry(), + VariableEntry(stage='starting'), + CallEntry('def f(x):'), + LineEntry('return x'), + ReturnEntry('return x'), + ReturnValueEntry('7'), + ElapsedTimeEntry(), + ) + ) + + +def test_invalid_zipfile(): + with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder, \ + mini_toolbox.TempSysPathAdder(str(folder)): + module_name = 'my_invalid_zip_module' + zip_name = 'invalid.zip' + zip_base_path = 'invalid/modules/path' + python_file_path = folder / ('%s/%s/%s.py' % (zip_name, zip_base_path, module_name)) + os.makedirs(folder / ('%s/%s' % (zip_name, zip_base_path))) + sys.path.insert(0, str(folder / (zip_name))) + content = textwrap.dedent(u''' + import pysnooper + @pysnooper.snoop(color=False) + def f(x): + return x + ''') + with python_file_path.open('w') as python_file: + python_file.write(content) + + module = importlib.import_module('%s.%s' % ('.'.join(zip_base_path.split('/')), module_name)) + + with zipfile.ZipFile(folder / 'foo_bar.zip', 'w') as myZipFile: + myZipFile.write(folder.joinpath('%s/%s/%s.py' % (zip_name, zip_base_path, module_name)), '%s/%s.py' % (zip_base_path, module_name,), zipfile.ZIP_DEFLATED) + + python_file_path.unlink() + os.rename(folder.joinpath(zip_name), folder.joinpath('%s.delete' % (zip_name)),) + os.rename(folder / 'foo_bar.zip', folder.joinpath(zip_name),) + + with mini_toolbox.OutputCapturer(stdout=False, + stderr=True) as output_capturer: + result = getattr(module, 'f')(7) + assert result == 7 + output = output_capturer.output + + assert_output( + output, + ( + SourcePathEntry(), + VariableEntry(stage='starting'), + CallEntry('SOURCE IS UNAVAILABLE'), + LineEntry('SOURCE IS UNAVAILABLE'), + ReturnEntry('SOURCE IS UNAVAILABLE'), + ReturnValueEntry('7'), + ElapsedTimeEntry(), + ) + ) + + +def test_valid_damaged_zipfile(): + with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder, \ + mini_toolbox.TempSysPathAdder(str(folder)): + module_name = 'my_damaged_module' + zip_name = 'damaged.zip' + zip_base_path = 'ansible/modules' + python_file_path = folder / ('%s/%s/%s.py' % (zip_name, zip_base_path, module_name)) + os.makedirs(folder / ('%s/%s' % (zip_name, zip_base_path))) + sys.path.insert(0, str(folder / (zip_name))) + content = textwrap.dedent(u''' + import pysnooper + @pysnooper.snoop(color=False) + def f(x): + return x + ''') + with python_file_path.open('w') as python_file: + python_file.write(content) + + module = importlib.import_module('%s.%s' % ('.'.join(zip_base_path.split('/')), module_name)) + + python_file_path.unlink() + os.rename(folder.joinpath(zip_name), folder.joinpath('%s.delete' % (zip_name)),) + + content_damaged_zip = textwrap.dedent(u''' + I am not a zip file + ''') + + with folder.joinpath(zip_name).open('w') as damaged_zip_file: + damaged_zip_file.write(content_damaged_zip) + + with mini_toolbox.OutputCapturer(stdout=False, + stderr=True) as output_capturer: + result = getattr(module, 'f')(7) + assert result == 7 + output = output_capturer.output + + assert_output( + output, + ( + SourcePathEntry(), + VariableEntry(stage='starting'), + CallEntry('SOURCE IS UNAVAILABLE'), + LineEntry('SOURCE IS UNAVAILABLE'), + ReturnEntry('SOURCE IS UNAVAILABLE'), + ReturnValueEntry('7'), + ElapsedTimeEntry(), + ) + )