Content-Length: 457476 | pFad | http://github.com/postgrespro/testgres/pull/272/files

2D A support of 'cwd' parameter is added to OsOperations::exec_command by dmitry-lipetsk · Pull Request #272 · postgrespro/testgres · GitHub
Skip to content

A support of 'cwd' parameter is added to OsOperations::exec_command #272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions testgres/operations/local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ def _process_output(encoding, temp_file_path):
output = output.decode(encoding)
return output, None # In Windows stderr writing in stdout

def _run_command__nt(self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding, exec_env=None):
def _run_command__nt(
self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding,
exec_env: typing.Optional[dict],
cwd: typing.Optional[str],
):
assert exec_env is None or type(exec_env) == dict # noqa: E721
assert cwd is None or type(cwd) == str # noqa: E721

# TODO: why don't we use the data from input?

Expand Down Expand Up @@ -104,6 +109,7 @@ def _run_command__nt(self, cmd, shell, input, stdin, stdout, stderr, get_process
stdin=stdin or subprocess.PIPE if input is not None else None,
stdout=stdout,
stderr=stderr,
cwd=cwd,
**extParams,
)
if get_process:
Expand All @@ -116,8 +122,13 @@ def _run_command__nt(self, cmd, shell, input, stdin, stdout, stderr, get_process
output, error = self._process_output(encoding, temp_file_path)
return process, output, error

def _run_command__generic(self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding, exec_env=None):
def _run_command__generic(
self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding,
exec_env: typing.Optional[dict],
cwd: typing.Optional[str],
):
assert exec_env is None or type(exec_env) == dict # noqa: E721
assert cwd is None or type(cwd) == str # noqa: E721

input_prepared = None
if not get_process:
Expand Down Expand Up @@ -154,6 +165,7 @@ def _run_command__generic(self, cmd, shell, input, stdin, stdout, stderr, get_pr
stdin=stdin or subprocess.PIPE if input is not None else None,
stdout=stdout or subprocess.PIPE,
stderr=stderr or subprocess.PIPE,
cwd=cwd,
**extParams
)
assert not (process is None)
Expand All @@ -173,26 +185,44 @@ def _run_command__generic(self, cmd, shell, input, stdin, stdout, stderr, get_pr
error = error.decode(encoding)
return process, output, error

def _run_command(self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding, exec_env=None):
def _run_command(
self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding,
exec_env: typing.Optional[dict],
cwd: typing.Optional[str],
):
"""Execute a command and return the process and its output."""

assert exec_env is None or type(exec_env) == dict # noqa: E721
assert cwd is None or type(cwd) == str # noqa: E721

if os.name == 'nt' and stdout is None: # Windows
method = __class__._run_command__nt
else: # Other OS
method = __class__._run_command__generic

return method(self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding, exec_env=exec_env)
return method(self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding, exec_env, cwd)

def exec_command(self, cmd, wait_exit=False, verbose=False, expect_error=False, encoding=None, shell=False,
text=False, input=None, stdin=None, stdout=None, stderr=None, get_process=False, timeout=None,
ignore_errors=False, exec_env=None):
def exec_command(
self, cmd, wait_exit=False, verbose=False, expect_error=False, encoding=None, shell=False,
text=False, input=None, stdin=None, stdout=None, stderr=None, get_process=False, timeout=None,
ignore_errors=False,
exec_env: typing.Optional[dict] = None,
cwd: typing.Optional[str] = None
):
"""
Execute a command in a subprocess and handle the output based on the provided parameters.
"""
assert type(expect_error) == bool # noqa: E721
assert type(ignore_errors) == bool # noqa: E721
assert exec_env is None or type(exec_env) == dict # noqa: E721
assert cwd is None or type(cwd) == str # noqa: E721

process, output, error = self._run_command(
cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding,
exec_env,
cwd
)

process, output, error = self._run_command(cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding, exec_env=exec_env)
if get_process:
return process

Expand Down
60 changes: 44 additions & 16 deletions testgres/operations/remote_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ def __init__(self, conn_params: ConnectionParams):
def __enter__(self):
return self

def exec_command(self, cmd, wait_exit=False, verbose=False, expect_error=False,
encoding=None, shell=True, text=False, input=None, stdin=None, stdout=None,
stderr=None, get_process=None, timeout=None, ignore_errors=False,
exec_env=None):
def exec_command(
self, cmd, wait_exit=False, verbose=False, expect_error=False,
encoding=None, shell=True, text=False, input=None, stdin=None, stdout=None,
stderr=None, get_process=None, timeout=None, ignore_errors=False,
exec_env: typing.Optional[dict] = None,
cwd: typing.Optional[str] = None
):
"""
Execute a command in the SSH session.
Args:
Expand All @@ -74,28 +77,29 @@ def exec_command(self, cmd, wait_exit=False, verbose=False, expect_error=False,
assert type(expect_error) == bool # noqa: E721
assert type(ignore_errors) == bool # noqa: E721
assert exec_env is None or type(exec_env) == dict # noqa: E721
assert cwd is None or type(cwd) == str # noqa: E721

input_prepared = None
if not get_process:
input_prepared = Helpers.PrepareProcessInput(input, encoding) # throw

assert input_prepared is None or (type(input_prepared) == bytes) # noqa: E721

if type(cmd) == str: # noqa: E721
cmd_s = cmd
elif type(cmd) == list: # noqa: E721
cmd_s = subprocess.list2cmdline(cmd)
else:
raise ValueError("Invalid 'cmd' argument type - {0}".format(type(cmd).__name__))
cmds = []

assert type(cmd_s) == str # noqa: E721
if cwd is not None:
assert type(cwd) == str # noqa: E721
cmds.append(__class__._build_cmdline(["cd", cwd]))

cmds.append(__class__._build_cmdline(cmd, exec_env))

cmd_items = __class__._make_exec_env_list(exec_env=exec_env)
cmd_items.append(cmd_s)
assert len(cmds) >= 1

env_cmd_s = ';'.join(cmd_items)
cmdline = ";".join(cmds)
assert type(cmdline) == str # noqa: E721
assert cmdline != ""

ssh_cmd = ['ssh', self.ssh_dest] + self.ssh_args + [env_cmd_s]
ssh_cmd = ['ssh', self.ssh_dest] + self.ssh_args + [cmdline]

process = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert not (process is None)
Expand Down Expand Up @@ -710,7 +714,31 @@ def _is_port_free__process_1(error: str) -> bool:
return True

@staticmethod
def _make_exec_env_list(exec_env: typing.Dict) -> typing.List[str]:
def _build_cmdline(cmd, exec_env: typing.Dict = None) -> str:
cmd_items = __class__._create_exec_env_list(exec_env)

assert type(cmd_items) == list # noqa: E721

cmd_items.append(__class__._ensure_cmdline(cmd))

cmdline = ';'.join(cmd_items)
assert type(cmdline) == str # noqa: E721
return cmdline

@staticmethod
def _ensure_cmdline(cmd) -> typing.List[str]:
if type(cmd) == str: # noqa: E721
cmd_s = cmd
elif type(cmd) == list: # noqa: E721
cmd_s = subprocess.list2cmdline(cmd)
else:
raise ValueError("Invalid 'cmd' argument type - {0}".format(type(cmd).__name__))

assert type(cmd_s) == str # noqa: E721
return cmd_s

@staticmethod
def _create_exec_env_list(exec_env: typing.Dict) -> typing.List[str]:
env: typing.Dict[str, str] = dict()

# ---------------------------------- SYSTEM ENV
Expand Down
17 changes: 17 additions & 0 deletions tests/test_os_ops_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,23 @@ def test_exec_command_with_exec_env(self, os_ops: OsOperations):
assert type(response) == bytes # noqa: E721
assert response == b'\n'

def test_exec_command_with_cwd(self, os_ops: OsOperations):
assert isinstance(os_ops, OsOperations)

RunConditions.skip_if_windows()

cmd = ["pwd"]

response = os_ops.exec_command(cmd, cwd="/tmp")
assert response is not None
assert type(response) == bytes # noqa: E721
assert response == b'/tmp\n'

response = os_ops.exec_command(cmd)
assert response is not None
assert type(response) == bytes # noqa: E721
assert response != b'/tmp\n'

def test_exec_command__test_unset(self, os_ops: OsOperations):
assert isinstance(os_ops, OsOperations)

Expand Down








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/postgrespro/testgres/pull/272/files

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy