mirror of
https://github.com/cool-RR/PySnooper.git
synced 2026-01-23 02:14:04 +00:00
Add support for Ansible zipped source files (#226)
Add support for Ansible zipped source files
This commit is contained in:
parent
bea7c7a965
commit
4e277a5a1f
3 changed files with 232 additions and 0 deletions
|
|
@ -20,6 +20,7 @@ if pycompat.PY2:
|
||||||
|
|
||||||
|
|
||||||
ipython_filename_pattern = re.compile('^<ipython-input-([0-9]+)-.*>$')
|
ipython_filename_pattern = re.compile('^<ipython-input-([0-9]+)-.*>$')
|
||||||
|
ansible_filename_pattern = re.compile(r'^(.+\.zip)[/|\\](ansible[/|\\]modules[/|\\].+\.py)$')
|
||||||
|
|
||||||
|
|
||||||
def get_local_reprs(frame, watch=(), custom_repr=(), max_length=None, normalize=False):
|
def get_local_reprs(frame, watch=(), custom_repr=(), max_length=None, normalize=False):
|
||||||
|
|
@ -67,6 +68,7 @@ def get_path_and_source_from_frame(frame):
|
||||||
source = source.splitlines()
|
source = source.splitlines()
|
||||||
if source is None:
|
if source is None:
|
||||||
ipython_filename_match = ipython_filename_pattern.match(file_name)
|
ipython_filename_match = ipython_filename_pattern.match(file_name)
|
||||||
|
ansible_filename_match = ansible_filename_pattern.match(file_name)
|
||||||
if ipython_filename_match:
|
if ipython_filename_match:
|
||||||
entry_number = int(ipython_filename_match.group(1))
|
entry_number = int(ipython_filename_match.group(1))
|
||||||
try:
|
try:
|
||||||
|
|
@ -77,6 +79,13 @@ def get_path_and_source_from_frame(frame):
|
||||||
source = source_chunk.splitlines()
|
source = source_chunk.splitlines()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
elif ansible_filename_match:
|
||||||
|
try:
|
||||||
|
import zipfile
|
||||||
|
archive_file = zipfile.ZipFile(ansible_filename_match.group(1), 'r')
|
||||||
|
source = archive_file.read(ansible_filename_match.group(2).replace('\\', '/')).splitlines()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
with open(file_name, 'rb') as fp:
|
with open(file_name, 'rb') as fp:
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import time
|
||||||
import types
|
import types
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
import zipfile
|
||||||
|
|
||||||
from pysnooper.utils import truncate
|
from pysnooper.utils import truncate
|
||||||
import pytest
|
import pytest
|
||||||
|
|
@ -1914,3 +1915,153 @@ def test_exception_on_entry():
|
||||||
|
|
||||||
with pytest.raises(TypeError):
|
with pytest.raises(TypeError):
|
||||||
f()
|
f()
|
||||||
|
|
||||||
|
|
||||||
|
def test_valid_zipfile():
|
||||||
|
with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder, \
|
||||||
|
mini_toolbox.TempSysPathAdder(str(folder)):
|
||||||
|
module_name = 'my_valid_zip_module'
|
||||||
|
zip_name = 'valid.zip'
|
||||||
|
zip_base_path = mini_toolbox.pathlib.Path('ansible/modules')
|
||||||
|
python_file_path = folder / zip_name / zip_base_path / ('%s.py' % (module_name))
|
||||||
|
os.makedirs(str(folder / zip_name / zip_base_path))
|
||||||
|
try:
|
||||||
|
sys.path.insert(0, str(folder / zip_name / zip_base_path))
|
||||||
|
content = textwrap.dedent(u'''
|
||||||
|
import pysnooper
|
||||||
|
@pysnooper.snoop(color=False)
|
||||||
|
def f(x):
|
||||||
|
return x
|
||||||
|
''')
|
||||||
|
|
||||||
|
python_file_path.write_text(content)
|
||||||
|
|
||||||
|
module = __import__(module_name)
|
||||||
|
|
||||||
|
with zipfile.ZipFile(str(folder / 'foo_bar.zip'), 'w') as myZipFile:
|
||||||
|
myZipFile.write(str(folder / zip_name / zip_base_path / ('%s.py' % (module_name))), \
|
||||||
|
'%s/%s.py' % (zip_base_path, module_name,), \
|
||||||
|
zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
|
python_file_path.unlink()
|
||||||
|
folder.joinpath(zip_name).rename(folder.joinpath('%s.delete' % (zip_name)))
|
||||||
|
folder.joinpath('foo_bar.zip').rename(folder.joinpath(zip_name))
|
||||||
|
|
||||||
|
with mini_toolbox.OutputCapturer(stdout=False,
|
||||||
|
stderr=True) as output_capturer:
|
||||||
|
result = getattr(module, 'f')(7)
|
||||||
|
assert result == 7
|
||||||
|
output = output_capturer.output
|
||||||
|
|
||||||
|
assert_output(
|
||||||
|
output,
|
||||||
|
(
|
||||||
|
SourcePathEntry(),
|
||||||
|
VariableEntry(stage='starting'),
|
||||||
|
CallEntry('def f(x):'),
|
||||||
|
LineEntry('return x'),
|
||||||
|
ReturnEntry('return x'),
|
||||||
|
ReturnValueEntry('7'),
|
||||||
|
ElapsedTimeEntry(),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
sys.path.remove(str(folder / zip_name / zip_base_path))
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalid_zipfile():
|
||||||
|
with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder, \
|
||||||
|
mini_toolbox.TempSysPathAdder(str(folder)):
|
||||||
|
module_name = 'my_invalid_zip_module'
|
||||||
|
zip_name = 'invalid.zip'
|
||||||
|
zip_base_path = mini_toolbox.pathlib.Path('invalid/modules/path')
|
||||||
|
python_file_path = folder / zip_name / zip_base_path / ('%s.py' % (module_name))
|
||||||
|
os.makedirs(str(folder / zip_name / zip_base_path))
|
||||||
|
try:
|
||||||
|
sys.path.insert(0, str(folder / zip_name / zip_base_path))
|
||||||
|
content = textwrap.dedent(u'''
|
||||||
|
import pysnooper
|
||||||
|
@pysnooper.snoop(color=False)
|
||||||
|
def f(x):
|
||||||
|
return x
|
||||||
|
''')
|
||||||
|
python_file_path.write_text(content)
|
||||||
|
|
||||||
|
module = __import__(module_name)
|
||||||
|
|
||||||
|
with zipfile.ZipFile(str(folder / 'foo_bar.zip'), 'w') as myZipFile:
|
||||||
|
myZipFile.write(str(folder / zip_name / zip_base_path / ('%s.py' % (module_name))), \
|
||||||
|
str(zip_base_path / ('%s.py' % (module_name,))), \
|
||||||
|
zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
|
python_file_path.unlink()
|
||||||
|
folder.joinpath(zip_name).rename(folder.joinpath('%s.delete' % (zip_name)))
|
||||||
|
folder.joinpath('foo_bar.zip').rename(folder.joinpath(zip_name))
|
||||||
|
|
||||||
|
with mini_toolbox.OutputCapturer(stdout=False,
|
||||||
|
stderr=True) as output_capturer:
|
||||||
|
result = getattr(module, 'f')(7)
|
||||||
|
assert result == 7
|
||||||
|
output = output_capturer.output
|
||||||
|
|
||||||
|
assert_output(
|
||||||
|
output,
|
||||||
|
(
|
||||||
|
SourcePathEntry(),
|
||||||
|
VariableEntry(stage='starting'),
|
||||||
|
CallEntry('SOURCE IS UNAVAILABLE'),
|
||||||
|
LineEntry('SOURCE IS UNAVAILABLE'),
|
||||||
|
ReturnEntry('SOURCE IS UNAVAILABLE'),
|
||||||
|
ReturnValueEntry('7'),
|
||||||
|
ElapsedTimeEntry(),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
sys.path.remove(str(folder / zip_name / zip_base_path))
|
||||||
|
|
||||||
|
|
||||||
|
def test_valid_damaged_zipfile():
|
||||||
|
with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder, \
|
||||||
|
mini_toolbox.TempSysPathAdder(str(folder)):
|
||||||
|
module_name = 'my_damaged_module'
|
||||||
|
zip_name = 'damaged.zip'
|
||||||
|
zip_base_path = mini_toolbox.pathlib.Path('ansible/modules')
|
||||||
|
python_file_path = folder / zip_name / zip_base_path / ('%s.py' % (module_name))
|
||||||
|
os.makedirs(str(folder / zip_name / zip_base_path))
|
||||||
|
try:
|
||||||
|
sys.path.insert(0, str(folder / zip_name / zip_base_path))
|
||||||
|
content = textwrap.dedent(u'''
|
||||||
|
import pysnooper
|
||||||
|
@pysnooper.snoop(color=False)
|
||||||
|
def f(x):
|
||||||
|
return x
|
||||||
|
''')
|
||||||
|
python_file_path.write_text(content)
|
||||||
|
|
||||||
|
module = __import__(module_name)
|
||||||
|
|
||||||
|
python_file_path.unlink()
|
||||||
|
folder.joinpath(zip_name).rename(folder.joinpath('%s.delete' % (zip_name)))
|
||||||
|
|
||||||
|
folder.joinpath(zip_name).write_text(u'I am not a zip file')
|
||||||
|
|
||||||
|
with mini_toolbox.OutputCapturer(stdout=False,
|
||||||
|
stderr=True) as output_capturer:
|
||||||
|
result = getattr(module, 'f')(7)
|
||||||
|
assert result == 7
|
||||||
|
output = output_capturer.output
|
||||||
|
|
||||||
|
assert_output(
|
||||||
|
output,
|
||||||
|
(
|
||||||
|
SourcePathEntry(),
|
||||||
|
VariableEntry(stage='starting'),
|
||||||
|
CallEntry('SOURCE IS UNAVAILABLE'),
|
||||||
|
LineEntry('SOURCE IS UNAVAILABLE'),
|
||||||
|
ReturnEntry('SOURCE IS UNAVAILABLE'),
|
||||||
|
ReturnValueEntry('7'),
|
||||||
|
ElapsedTimeEntry(),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
sys.path.remove(str(folder / zip_name / zip_base_path))
|
||||||
|
|
|
||||||
72
tests/test_utils/test_regex.py
Normal file
72
tests/test_utils/test_regex.py
Normal file
|
|
@ -0,0 +1,72 @@
|
||||||
|
# Copyright 2022 Ram Rachum and collaborators.
|
||||||
|
# This program is distributed under the MIT license.
|
||||||
|
|
||||||
|
import pysnooper
|
||||||
|
from pysnooper.tracer import ansible_filename_pattern
|
||||||
|
|
||||||
|
def test_ansible_filename_pattern():
|
||||||
|
archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip'
|
||||||
|
source_code_file = 'ansible/modules/my_module.py'
|
||||||
|
file_name = '%s/%s' % (archive_file, source_code_file)
|
||||||
|
assert ansible_filename_pattern.match(file_name).group(1) == archive_file
|
||||||
|
assert ansible_filename_pattern.match(file_name).group(2) == source_code_file
|
||||||
|
|
||||||
|
archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_with.zip_name.zip'
|
||||||
|
source_code_file = 'ansible/modules/my_module.py'
|
||||||
|
file_name = '%s/%s' % (archive_file, source_code_file)
|
||||||
|
assert ansible_filename_pattern.match(file_name).group(1) == archive_file
|
||||||
|
assert ansible_filename_pattern.match(file_name).group(2) == source_code_file
|
||||||
|
|
||||||
|
archive_file = '/my/new/path/payload.zip'
|
||||||
|
source_code_file = 'ansible/modules/my_module.py'
|
||||||
|
file_name = '%s/%s' % (archive_file, source_code_file)
|
||||||
|
assert ansible_filename_pattern.match(file_name).group(1) == archive_file
|
||||||
|
assert ansible_filename_pattern.match(file_name).group(2) == source_code_file
|
||||||
|
|
||||||
|
archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip'
|
||||||
|
source_code_file = 'ansible/modules/in/new/path/my_module.py'
|
||||||
|
file_name = '%s/%s' % (archive_file, source_code_file)
|
||||||
|
assert ansible_filename_pattern.match(file_name).group(1) == archive_file
|
||||||
|
assert ansible_filename_pattern.match(file_name).group(2) == source_code_file
|
||||||
|
|
||||||
|
archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip'
|
||||||
|
source_code_file = 'ansible/modules/my_module_is_called_.py.py'
|
||||||
|
file_name = '%s/%s' % (archive_file, source_code_file)
|
||||||
|
assert ansible_filename_pattern.match(file_name).group(1) == archive_file
|
||||||
|
assert ansible_filename_pattern.match(file_name).group(2) == source_code_file
|
||||||
|
|
||||||
|
archive_file = 'C:\\Users\\vagrant\\AppData\\Local\\Temp\\pysnooperw5c2lg35\\valid.zip'
|
||||||
|
source_code_file = 'ansible\\modules\\my_valid_zip_module.py'
|
||||||
|
file_name = '%s\\%s' % (archive_file, source_code_file)
|
||||||
|
assert ansible_filename_pattern.match(file_name).group(1) == archive_file
|
||||||
|
assert ansible_filename_pattern.match(file_name).group(2) == source_code_file
|
||||||
|
|
||||||
|
archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip'
|
||||||
|
source_code_file = 'ANSIBLE/modules/my_module.py'
|
||||||
|
file_name = '%s/%s' % (archive_file, source_code_file)
|
||||||
|
assert ansible_filename_pattern.match(file_name) is None
|
||||||
|
|
||||||
|
archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip'
|
||||||
|
source_code_file = 'ansible/modules/my_module.PY'
|
||||||
|
file_name = '%s/%s' % (archive_file, source_code_file)
|
||||||
|
assert ansible_filename_pattern.match(file_name) is None
|
||||||
|
|
||||||
|
archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.Zip'
|
||||||
|
source_code_file = 'ansible/modules/my_module.py'
|
||||||
|
file_name = '%s/%s' % (archive_file, source_code_file)
|
||||||
|
assert ansible_filename_pattern.match(file_name) is None
|
||||||
|
|
||||||
|
archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip'
|
||||||
|
source_code_file = 'ansible/my_module.py'
|
||||||
|
file_name = '%s/%s' % (archive_file, source_code_file)
|
||||||
|
assert ansible_filename_pattern.match(file_name) is None
|
||||||
|
|
||||||
|
archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip'
|
||||||
|
source_code_file = ''
|
||||||
|
file_name = '%s/%s' % (archive_file, source_code_file)
|
||||||
|
assert ansible_filename_pattern.match(file_name) is None
|
||||||
|
|
||||||
|
archive_file = ''
|
||||||
|
source_code_file = 'ansible/modules/my_module.py'
|
||||||
|
file_name = '%s/%s' % (archive_file, source_code_file)
|
||||||
|
assert ansible_filename_pattern.match(file_name) is None
|
||||||
Loading…
Add table
Add a link
Reference in a new issue