Content-Length: 700896 | pFad | http://github.com/postgrespro/testgres/pull/255/files

62 [#247] PortManager__Generic uses lock-dirs for reserved ports by dmitry-lipetsk · Pull Request #255 · postgrespro/testgres · GitHub
Skip to content

[#247] PortManager__Generic uses lock-dirs for reserved ports #255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
99e645e
[#247] PortManager__Generic uses lock-dirs for reserved ports
dmitry-lipetsk May 6, 2025
c6f4b4d
PortManager__Generic is refactored
dmitry-lipetsk May 7, 2025
f085b70
[#256] A used port range is [1024 ... 65535]
dmitry-lipetsk May 7, 2025
c9b4bbf
PortManager__Generic is refactored [consts, asserts]
dmitry-lipetsk May 7, 2025
862c79f
Merge branch 'master' into master-fix247--v001
dmitry-lipetsk May 7, 2025
b5e6f25
Merge branch 'master' into master-fix247--v001
dmitry-lipetsk May 7, 2025
be3cc11
Merge branch 'master' into master-fix247--v001
dmitry-lipetsk May 12, 2025
cc8333c
Merge branch 'master' into master-fix247--v001
dmitry-lipetsk May 12, 2025
d15ecdb
PortManager__Generic sends debug messages about its operations.
dmitry-lipetsk May 29, 2025
8e0869d
[attention] OsOperations::create_lock_fs_obj is added
dmitry-lipetsk May 31, 2025
82b46b3
Code style is fixed
dmitry-lipetsk Jun 1, 2025
a188a63
Merge branch 'master' into master-fix247--v001
dmitry-lipetsk Jun 22, 2025
597f699
PortManager__Generic is synchronized with master
dmitry-lipetsk Jun 24, 2025
b34837e
[FIX] PortManager__Generic::release_port sends a debug message under …
dmitry-lipetsk Jun 24, 2025
0648926
PortManager__Generic::reserve_port is updated (reordered)
dmitry-lipetsk Jun 24, 2025
8c5e340
PortManager__Generic::helper__send_debug_msg is corrected (assert)
dmitry-lipetsk Jun 24, 2025
ea25215
PortManager__Generic::reserve_port is updated (comment)
dmitry-lipetsk Jun 24, 2025
aa677d2
PortManager__Generic::__init__ is updated
dmitry-lipetsk Jun 24, 2025
1dfac90
Merge remote-tracking branch 'pgpro/master' into master-fix247--v001
dmitry-lipetsk Jun 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions testgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from .utils import \
reserve_port, \
release_port, \
bound_ports, \
get_bin_path, \
get_pg_config, \
get_pg_version
Expand All @@ -51,6 +50,7 @@
from .config import testgres_config

from .operations.os_ops import OsOperations, ConnectionParams
from .operations.os_ops import OsLockObj
from .operations.local_ops import LocalOperations
from .operations.remote_ops import RemoteOperations

Expand All @@ -64,7 +64,8 @@
"XLogMethod", "IsolationLevel", "NodeStatus", "ProcessType", "DumpFormat",
"PostgresNode", "NodeApp",
"PortManager",
"reserve_port", "release_port", "bound_ports", "get_bin_path", "get_pg_config", "get_pg_version",
"reserve_port", "release_port", "get_bin_path", "get_pg_config", "get_pg_version",
"First", "Any",
"OsOperations", "LocalOperations", "RemoteOperations", "ConnectionParams"
"OsOperations", "LocalOperations", "RemoteOperations", "ConnectionParams",
"OsLockObj",
]
4 changes: 4 additions & 0 deletions testgres/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
TMP_CACHE = 'tgsc_'
TMP_BACKUP = 'tgsb_'

TMP_TESTGRES = "testgres"

TMP_TESTGRES_PORTS = TMP_TESTGRES + "/ports"

# path to control file
XLOG_CONTROL_FILE = "global/pg_control"

Expand Down
58 changes: 51 additions & 7 deletions testgres/impl/port_manager__generic.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from ..operations.os_ops import OsOperations
from ..operations.os_ops import OsLockObj

from ..port_manager import PortManager
from ..exceptions import PortForException
from .. import consts

import os
import threading
import random
import typing
Expand All @@ -17,7 +20,9 @@ class PortManager__Generic(PortManager):
_guard: object
# TODO: is there better to use bitmap fot _available_ports?
_available_ports: typing.Set[int]
_reserved_ports: typing.Set[int]
_reserved_ports: typing.Dict[int, OsLockObj]

_lock_dir: str

def __init__(self, os_ops: OsOperations):
assert __class__._C_MIN_PORT_NUMBER <= __class__._C_MAX_PORT_NUMBER
Expand All @@ -33,15 +38,28 @@ def __init__(self, os_ops: OsOperations):
assert len(self._available_ports) == (
(__class__._C_MAX_PORT_NUMBER - __class__._C_MIN_PORT_NUMBER) + 1
)
self._reserved_ports = set()
self._reserved_ports = dict()
self._lock_dir = None
return

def reserve_port(self) -> int:
assert self._guard is not None
assert type(self._available_ports) == set # noqa: E721t
assert type(self._reserved_ports) == set # noqa: E721
assert type(self._available_ports) == set # noqa: E721
assert type(self._reserved_ports) == dict # noqa: E721
assert isinstance(self._os_ops, OsOperations)

with self._guard:
if self._lock_dir is None:
temp_dir = self._os_ops.get_tempdir()
assert type(temp_dir) == str # noqa: E721
lock_dir = os.path.join(temp_dir, consts.TMP_TESTGRES_PORTS)
assert type(lock_dir) == str # noqa: E721
self._os_ops.makedirs(lock_dir)
self._lock_dir = lock_dir

assert self._lock_dir is not None
assert type(self._lock_dir) == str # noqa: E721

t = tuple(self._available_ports)
assert len(t) == len(self._available_ports)
sampled_ports = random.sample(t, min(len(t), 100))
Expand All @@ -58,7 +76,22 @@ def reserve_port(self) -> int:
if not self._os_ops.is_port_free(port):
continue

self._reserved_ports.add(port)
try:
lock_path = self.helper__make_lock_path(port)
lock_obj = self._os_ops.create_lock_fs_obj(lock_path) # raise
except: # noqa: 722
continue

assert isinstance(lock_obj, OsLockObj)
assert self._os_ops.path_exists(lock_path)

try:
self._reserved_ports[port] = lock_obj
except: # noqa: 722
assert not (port in self._reserved_ports)
lock_obj.release()
raise

self._available_ports.discard(port)
assert port in self._reserved_ports
assert not (port in self._available_ports)
Expand All @@ -73,15 +106,17 @@ def release_port(self, number: int) -> None:
assert number <= __class__._C_MAX_PORT_NUMBER

assert self._guard is not None
assert type(self._reserved_ports) == set # noqa: E721
assert type(self._reserved_ports) == dict # noqa: E721

with self._guard:
assert number in self._reserved_ports
assert not (number in self._available_ports)
self._available_ports.add(number)
self._reserved_ports.discard(number)
lock_obj = self._reserved_ports.pop(number)
assert not (number in self._reserved_ports)
assert number in self._available_ports
assert isinstance(lock_obj, OsLockObj)
lock_obj.release()
__class__.helper__send_debug_msg("Port {} is released.", number)
return

Expand All @@ -95,3 +130,12 @@ def helper__send_debug_msg(msg_template: str, *args) -> None:
s = "[port manager] "
s += msg_template.format(*args)
logging.debug(s)

def helper__make_lock_path(self, port_number: int) -> str:
assert type(port_number) == int # noqa: E721
# You have to call the reserve_port at first!
assert type(self._lock_dir) == str # noqa: E721

result = os.path.join(self._lock_dir, str(port_number) + ".lock")
assert type(result) == str # noqa: E721
return result
22 changes: 22 additions & 0 deletions testgres/operations/local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ..exceptions import ExecUtilException
from ..exceptions import InvalidOperationException
from .os_ops import ConnectionParams, OsOperations, get_default_encoding
from .os_ops import OsLockObj
from .raise_error import RaiseError
from .helpers import Helpers

Expand All @@ -28,6 +29,23 @@
CMD_TIMEOUT_SEC = 60


class LocalOsLockFsObj(OsLockObj):
_path: str

def __init__(self, path: str):
assert type(path) == str # noqa: str
self._path = path
os.mkdir(path) # throw
assert os.path.exists(path)
self._path = path

def release(self) -> None:
assert type(self._path) == str # noqa: str
assert os.path.exists(self._path)
os.rmdir(self._path)
self._path = None


class LocalOperations(OsOperations):
sm_single_instance: OsOperations = None
sm_single_instance_guard = threading.Lock()
Expand Down Expand Up @@ -535,3 +553,7 @@ def get_tempdir(self) -> str:
assert type(r) == str # noqa: E721
assert os.path.exists(r)
return r

def create_lock_fs_obj(self, path: str) -> OsLockObj:
assert type(path) == str # noqa: E721
return LocalOsLockFsObj(path)
9 changes: 9 additions & 0 deletions testgres/operations/os_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ def get_default_encoding():
return locale.getencoding() or 'UTF-8'


class OsLockObj:
def release(self) -> None:
raise NotImplementedError()


class OsOperations:
def __init__(self, username=None):
self.ssh_key = None
Expand Down Expand Up @@ -133,3 +138,7 @@ def is_port_free(self, number: int):

def get_tempdir(self) -> str:
raise NotImplementedError()

def create_lock_fs_obj(self, path: str) -> OsLockObj:
assert type(path) == str # noqa: E721
raise NotImplementedError()
32 changes: 32 additions & 0 deletions testgres/operations/remote_ops.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import getpass
import os
import platform
Expand All @@ -10,6 +12,7 @@
from ..exceptions import ExecUtilException
from ..exceptions import InvalidOperationException
from .os_ops import OsOperations, ConnectionParams, get_default_encoding
from .os_ops import OsLockObj
from .raise_error import RaiseError
from .helpers import Helpers

Expand Down Expand Up @@ -40,6 +43,31 @@ def cmdline(self):
return cmdline.split()


class RemoteOsLockFsObj(OsLockObj):
_os_ops: RemoteOperations
_path: str

def __init__(self, os_ops: RemoteOperations, path: str):
assert isinstance(os_ops, RemoteOperations)
assert type(path) == str # noqa: str

os_ops.makedir(path) # throw
assert os_ops.path_exists(path)

self._os_ops = os_ops
self._path = path

def release(self) -> None:
assert type(self._path) == str # noqa: str
assert isinstance(self._os_ops, RemoteOperations)
assert self._os_ops.path_exists(self._path)

self._os_ops.rmdir(self._path) # throw

self._path = None
self._os_ops = None


class RemoteOperations(OsOperations):
def __init__(self, conn_params: ConnectionParams):
if not platform.system().lower() == "linux":
Expand Down Expand Up @@ -687,6 +715,10 @@ def get_tempdir(self) -> str:
assert type(temp_dir) == str # noqa: E721
return temp_dir

def create_lock_fs_obj(self, path: str) -> OsLockObj:
assert type(path) == str # noqa: E721
return RemoteOsLockFsObj(self, path)

@staticmethod
def _is_port_free__process_0(error: str) -> bool:
assert type(error) == str # noqa: E721
Expand Down
5 changes: 1 addition & 4 deletions testgres/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
#
_old_port_manager = PortManager__Generic(LocalOperations.get_single_instance())

# ports used by nodes
bound_ports = _old_port_manager._reserved_ports


# re-export version type
class PgVer(Version):
Expand All @@ -46,7 +43,7 @@ def __init__(self, version: str) -> None:

def internal__reserve_port():
"""
Generate a new port and add it to 'bound_ports'.
Reserve a port.
"""
return _old_port_manager.reserve_port()

Expand Down
35 changes: 35 additions & 0 deletions tests/test_os_ops_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from testgres import InvalidOperationException
from testgres import ExecUtilException
from testgres.operations.os_ops import OsLockObj

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future as ThreadFuture
Expand Down Expand Up @@ -1113,3 +1114,37 @@ class tadWorkerData:

logging.info("Test is finished! Total error count is {}.".format(nErrors))
return

def test_create_lock_fs_obj(self, os_ops: OsOperations):
assert isinstance(os_ops, OsOperations)

tmp = os_ops.mkdtemp()
assert type(tmp) == str # noqa: E721
assert os_ops.path_exists(tmp)
logging.info("tmp dir is [{}]".format(tmp))

p1 = os.path.join(tmp, "a.lock")
obj1 = os_ops.create_lock_fs_obj(p1)
assert obj1 is not None
assert isinstance(obj1, OsLockObj)
assert os_ops.path_exists(tmp)
assert os_ops.path_exists(p1)

while True:
try:
os_ops.create_lock_fs_obj(p1)
except Exception as e:
logging.info("OK. We got the error ({}): {}".format(type(e).__name__, e))
break
raise Exception("We wait the exception!")

assert isinstance(obj1, OsLockObj)
assert os_ops.path_exists(tmp)
assert os_ops.path_exists(p1)

obj1.release()
assert not os_ops.path_exists(p1)

assert os_ops.path_exists(tmp)
os_ops.rmdir(tmp)
assert not os_ops.path_exists(tmp)
35 changes: 0 additions & 35 deletions tests/test_testgres_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from testgres import get_pg_version

# NOTE: those are ugly imports
from testgres.utils import bound_ports
from testgres.utils import PgVer
from testgres.node import ProcessProxy

Expand Down Expand Up @@ -90,40 +89,6 @@ def test_pg_config(self):
b = get_pg_config()
assert (id(a) != id(b))

def test_ports_management(self):
assert bound_ports is not None
assert type(bound_ports) == set # noqa: E721

if len(bound_ports) != 0:
logging.warning("bound_ports is not empty: {0}".format(bound_ports))

stage0__bound_ports = bound_ports.copy()

with get_new_node() as node:
assert bound_ports is not None
assert type(bound_ports) == set # noqa: E721

assert node.port is not None
assert type(node.port) == int # noqa: E721

logging.info("node port is {0}".format(node.port))

assert node.port in bound_ports
assert node.port not in stage0__bound_ports

assert stage0__bound_ports <= bound_ports
assert len(stage0__bound_ports) + 1 == len(bound_ports)

stage1__bound_ports = stage0__bound_ports.copy()
stage1__bound_ports.add(node.port)

assert stage1__bound_ports == bound_ports

# check that port has been freed successfully
assert bound_ports is not None
assert type(bound_ports) == set # noqa: E721
assert bound_ports == stage0__bound_ports

def test_child_process_dies(self):
# test for FileNotFound exception during child_processes() function
cmd = ["timeout", "60"] if os.name == 'nt' else ["sleep", "60"]
Expand Down








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/postgrespro/testgres/pull/255/files

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy