Files
api/path/to/venv/lib/python3.12/site-packages/psycopg/_pipeline.py
2025-12-30 11:27:14 +07:00

82 lines
2.1 KiB
Python

# WARNING: this file is auto-generated by 'async_to_sync.py'
# from the original file '_pipeline_async.py'
# DO NOT CHANGE! Change the original file instead.
"""
Psycopg Pipeline object implementation.
"""
# Copyright (C) 2021 The Psycopg Team
from __future__ import annotations
import logging
from types import TracebackType
from typing import TYPE_CHECKING, Any
from . import errors as e
from ._compat import Self
from ._pipeline_base import BasePipeline
if TYPE_CHECKING:
from .connection import Connection
logger = logging.getLogger("psycopg")
class _DummyLock:
def __enter__(self) -> None:
pass
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
pass
class Pipeline(BasePipeline):
"""Handler for (sync) connection in pipeline mode."""
__module__ = "psycopg"
_conn: Connection[Any]
def __init__(self, conn: Connection[Any], _no_lock: bool = False) -> None:
super().__init__(conn)
self._lock = _DummyLock() if _no_lock else conn.lock
def sync(self) -> None:
"""Sync the pipeline, send any pending command and receive and process
all available results.
"""
try:
with self._lock:
self._conn.wait(self._sync_gen())
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
def __enter__(self) -> Self:
with self._lock:
self._conn.wait(self._enter_gen())
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
try:
with self._lock:
self._conn.wait(self._exit_gen())
except Exception as exc2:
# Don't clobber an exception raised in the block with this one
if exc_val:
logger.warning("error ignored terminating %r: %s", self, exc2)
else:
raise exc2.with_traceback(None)
finally:
self._exit(exc_val)