U
    ڲgE                     @  s  d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	m
Z
mZmZmZmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZmZmZmZmZmZmZ ddlmZ ddl m!Z!m"Z"m#Z#m$Z$ ddl%m&Z& ddl'm(Z( ddl)m*Z* ddl+m,Z,m-Z- ddl+m.Z.m/Z/ ddl0m1Z1 ddl2m3Z3 ddl4m5Z5 ddl6m7Z7 ddl8m9Z9 ddl:m;Z; ddl<m=Z=m>Z>m?Z? ddl@Z@ddlAZAddlAmBZB ddl)mCZC erddlDmEZE dZFejGjHZHejGjIZIejJjKZKejJjLZLejJjMZMeAjNeOfZPeQdZRG d d! d!e=e! ZSdS )"z+
Psycopg connection object (async version)
    )annotationsN)	monotonic)TracebackType)AnyAsyncGeneratorAsyncIteratorcastoverloadTYPE_CHECKING)asynccontextmanager   )pq)errors)waiting)AdaptContextConnDict	ConnParamParamsPQGenQueryRV)Xid)RowAsyncRowFactory	tuple_rowargs_row)AdaptersMap)IsolationLevel)Self)make_conninfoconninfo_to_dict)conninfo_attempts_asynctimeout_from_conninfo)AsyncPipeline)notifies)AsyncTransaction)AsyncCursor)capabilities)AsyncServerCursor)BaseConnection	CursorRowNotify)Lock)	to_thread)PGconng?psycopgc                      s  e Zd ZU dZdZded< ded< ded< d	ed
< eee efddd fddZ	e
ddddddddddddddddddZddd d!Zd"d#d$d%d&d'd(Ze
dd)d*d+d,d-Zd%dd.d/Zedd0dd1d2d3d4Zedd0dd5d6d7d8d4Zedddd9ddd:dd;d<d=d4Zedddd9ddd5d:dd>d?d@d4ZddddddAdddBd:ddCd?dDd4ZddddEdFdGd:dd1dHdIdJZd%ddKdLZd%ddMdNZdOdPdQd%dRdSdTZdUdPdQd%dRdVdWZeddXddYdZd[d\Zddd]d^dd_d`dadbZedcddddeZefdfd^dgdhdidjZdd%dkdldmZdd%dkdndoZdpd%dkdqdrZdpd%dkdsdtZ d:d%dkdudvZ!d:d%dkdwdxZ"d:d%dkdydzZ#d:d%dkd{d|Z$dd%d}d~dZ%dd%dddZ&d%dddZ'ddd%dddZ(ddd%dddZ)ddddZ*  Z+S )AsyncConnectionz3
    Wrapper for a connection to the database.
    r/   ztype[AsyncCursor[Row]]cursor_factoryztype[AsyncServerCursor[Row]]server_cursor_factoryzAsyncRowFactory[Row]row_factoryzAsyncPipeline | None	_pipeliner.   )pgconnr3   c                   s*   t  | || _t | _t| _t| _d S N)	super__init__r3   r,   lockr&   r1   r(   r2   )selfr5   r3   	__class__ </tmp/pip-unpacked-wheel-b_ea6rx_/psycopg/connection_async.pyr8   J   s
    zAsyncConnection.__init__ F   N)
autocommitprepare_thresholdcontextr3   r1   strboolz
int | NonezAdaptContext | NonezAsyncRowFactory[Row] | Noneztype[AsyncCursor[Row]] | Noner   r   )conninforA   rB   rC   r3   r1   kwargsreturnc                  sJ  t jdkr(t }t|tjr(td| j|f|I dH }	t	|	}
d}t
|	I dH }|D ]}z0td|}| j||
d}tj|tdI dH }W n\ tjk
r } z<t|dkrtd|d	|d
|dt| |}W 5 d}~X Y qZX  qqZ|s|st|dt||_|r"||_|r.||_|r@t|j|_||_|S )z[
        Connect to a database server and return a new `AsyncConnection` instance.
        win32zPsycopg cannot use the 'ProactorEventLoop' to run in async mode. Please use a compatible event loop, for instance by setting 'asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())'Nr?   timeoutintervalr   z=connection attempt failed: host: %r port: %r, hostaddr %r: %shostportZhostaddr)r?   ) sysplatformasyncioZget_running_loop
isinstanceZProactorEventLoopeZInterfaceError_get_connection_paramsr"   r!   r   Z_connect_genr   wait_conn_async_WAIT_INTERVAL_NO_TRACEBACKlenloggerdebuggetrD   AssertionErrorwith_tracebackrE   Z_autocommitr3   r1   r   adaptersZ	_adaptersrB   )clsrF   rA   rB   rC   r3   r1   rG   ZloopparamsrK   rvattemptsattemptgenexZlast_exr=   r=   r>   connectU   sL    




zAsyncConnection.connect)rH   c                   s   | S r6   r=   r:   r=   r=   r>   
__aenter__   s    zAsyncConnection.__aenter__ztype[BaseException] | NonezBaseException | NonezTracebackType | NoneNone)exc_typeexc_valexc_tbrH   c              
     s   | j r
d S |rTz|  I d H  W qb tk
rP } ztd| | W 5 d }~X Y qbX n|  I d H  t| dd s||  I d H  d S )Nz#error ignored in rollback on %s: %s_pool)closedrollback	ExceptionrZ   warningcommitgetattrclose)r:   rk   rl   rm   Zexc2r=   r=   r>   	__aexit__   s    "zAsyncConnection.__aexit__r   r   )rF   rG   rH   c                   s   t |f|S )z3Manipulate connection parameters before connecting.)r    )r`   rF   rG   r=   r=   r>   rU      s    z&AsyncConnection._get_connection_paramsc                   s   | j r
dS d| _| j  dS )zClose the database connection.NT)ro   _closedr5   finishrh   r=   r=   r>   ru      s    zAsyncConnection.close)binaryzAsyncCursor[Row])ry   rH   c                C  s   d S r6   r=   )r:   ry   r=   r=   r>   cursor   s    zAsyncConnection.cursorzAsyncRowFactory[CursorRow]zAsyncCursor[CursorRow])ry   r3   rH   c                C  s   d S r6   r=   )r:   ry   r3   r=   r=   r>   rz      s    )ry   
scrollablewithholdzbool | NonezAsyncServerCursor[Row])namery   r{   r|   rH   c                C  s   d S r6   r=   )r:   r}   ry   r{   r|   r=   r=   r>   rz      s    zAsyncServerCursor[CursorRow])r}   ry   r3   r{   r|   rH   c                C  s   d S r6   r=   )r:   r}   ry   r3   r{   r|   r=   r=   r>   rz      s    	)ry   r3   r{   r|   zAsyncRowFactory[Any] | Nonez)AsyncCursor[Any] | AsyncServerCursor[Any]c                C  sH   |    |s| j}|r,| j| ||||d}n| j| |d}|rDt|_|S )z\
        Return a new `AsyncCursor` to send commands and queries to the connection.
        )r}   r3   r{   r|   r3   )_check_connection_okr3   r2   r1   BINARYformat)r:   r}   ry   r3   r{   r|   curr=   r=   r>   rz      s    )preparery   r   zParams | None)queryra   r   ry   rH   c             
     s^   z*|   }|rt|_|j|||dI dH W S  tjk
rX } z|dW 5 d}~X Y nX dS )z8Execute a query and return a cursor to read its results.)r   N)rz   r   r   executerT   rX   r^   )r:   r   ra   r   ry   r   rf   r=   r=   r>   r      s    	zAsyncConnection.executec              
     s8   | j 4 I dH  | |  I dH  W 5 Q I dH R X dS )z/Commit any pending transaction to the database.N)r9   waitZ_commit_genrh   r=   r=   r>   rs     s    zAsyncConnection.commitc              
     s8   | j 4 I dH  | |  I dH  W 5 Q I dH R X dS )z2Roll back to the start of any pending transaction.N)r9   r   Z_rollback_genrh   r=   r=   r>   rp     s    zAsyncConnection.rollbackg      >@rJ   float)rK   rH   c                  sF   |   sdS t r2tj| j|dtdI dH  nt| jI dH  dS )a  Cancel the current operation on the connection.

        :param timeout: raise a `~errors.CancellationTimeout` if the
            cancellation request does not succeed within `timeout` seconds.

        Note that a successful cancel attempt on the client is not a guarantee
        that the server will successfully manage to cancel the operation.

        This is a non-blocking version of `~Connection.cancel()` which
        leverages a more secure and improved cancellation feature of the libpq,
        which is only available from version 17.

        If the underlying libpq is older than version 17, the method will fall
        back to using the same implementation of `!cancel()`.
        NrJ   rL   )	Z_should_cancelr'   Zhas_cancel_safer   rV   Z_cancel_genrW   r-   cancel)r:   rK   r=   r=   r>   cancel_safe  s    
 zAsyncConnection.cancel_safe      @c             
     sJ   z| j |dI d H  W n. tk
rD } ztd| W 5 d }~X Y nX d S )NrJ   zquery cancellation failed: %s)r   rq   rZ   rr   )r:   rK   rf   r=   r=   r>   _try_cancel5  s    zAsyncConnection._try_cancelz
str | NonezAsyncIterator[AsyncTransaction])savepoint_nameforce_rollbackrH   c                 C s   t | ||}| jr||  4 I dH L |4 I dH . |  4 I dH  |V  W 5 Q I dH R X W 5 Q I dH R X W 5 Q I dH R X n$|4 I dH  |V  W 5 Q I dH R X dS )a  
        Start a context block with a new transaction or nested transaction.

        :param savepoint_name: Name of the savepoint used to manage a nested
            transaction. If `!None`, one will be chosen automatically.
        :param force_rollback: Roll back the transaction at the end of the
            block even if there were no error (e.g. to try a no-op process).
        :rtype: AsyncTransaction
        N)r%   r4   pipeline)r:   r   r   Ztxr=   r=   r>   transaction;  s    28zAsyncConnection.transaction)rK   
stop_afterzfloat | NonezAsyncGenerator[Notify, None])rK   r   rH   c                C s  |dk	rt  | }t|t}nd}t}d}| j4 I dH  | jj}z| jt| j|dI dH }W n. tj	k
r } z|
dW 5 d}~X Y nX |D ]0}	t|	j||	j||	j}
|
V  |d7 }q|dk	r||krq|rBtt|t   }|dk rBqqBW 5 Q I dH R X dS )a  
        Yield `Notify` objects as soon as they are received from the database.

        :param timeout: maximum amount of time to wait for notifications.
            `!None` means no timeout.
        :param stop_after: stop after receiving this number of notifications.
            You might actually receive more than this number if more than one
            notifications arrives in the same packet.
        Nr   rL   r   g        )r   minrW   r9   r5   	_encodingr   r$   rT   rX   r^   r+   ZrelnamedecodeextraZbe_pid)r:   rK   r   deadlinerM   Z	nreceivedencnsrf   Zpgnnr=   r=   r>   r$   P  s4    

 
 
zAsyncConnection.notifieszAsyncIterator[AsyncPipeline]c                 C s   | j 4 I dH * |   | j}|dkr4t|  }| _W 5 Q I dH R X z(|4 I dH  |V  W 5 Q I dH R X W 5 |jdkr| j 4 I dH  || jkstd| _W 5 Q I dH R X X dS )z<Context manager to switch the connection into pipeline mode.Nr   )r9   r   r4   r#   levelr]   )r:   r   r=   r=   r>   r     s    
zAsyncConnection.pipelinez	PQGen[RV]r   )re   rM   rH   c                   s   zt j|| jj|dI dH W S  tk
r   | jjtkr| jddI dH  zt j|| jj|dI dH  W n tj	k
r~   Y nX  Y nX dS )z
        Consume a generator operating on the connection.

        The function must be used on generators that don't change connection
        fd (i.e. not on connect and reset).
        rL   Nr   rJ   )
r   Z
wait_asyncr5   socket_INTERRUPTEDtransaction_statusACTIVEr   rT   ZQueryCanceled)r:   re   rM   r=   r=   r>   r     s    zAsyncConnection.wait)valuerH   c                 C  s   |  d d S )NrA   _no_set_asyncr:   r   r=   r=   r>   _set_autocommit  s    
zAsyncConnection._set_autocommitc              
     s:   | j 4 I dH  | | |I dH  W 5 Q I dH R X dS )z6Method version of the `~Connection.autocommit` setter.N)r9   r   Z_set_autocommit_genr   r=   r=   r>   set_autocommit  s    zAsyncConnection.set_autocommitzIsolationLevel | Nonec                 C  s   |  d d S )NZisolation_levelr   r   r=   r=   r>   _set_isolation_level  s    
z$AsyncConnection._set_isolation_levelc              
     s:   | j 4 I dH  | | |I dH  W 5 Q I dH R X dS )z;Method version of the `~Connection.isolation_level` setter.N)r9   r   Z_set_isolation_level_genr   r=   r=   r>   set_isolation_level  s    z#AsyncConnection.set_isolation_levelc                 C  s   |  d d S )NZ	read_onlyr   r   r=   r=   r>   _set_read_only  s    
zAsyncConnection._set_read_onlyc              
     s:   | j 4 I dH  | | |I dH  W 5 Q I dH R X dS )z5Method version of the `~Connection.read_only` setter.N)r9   r   Z_set_read_only_genr   r=   r=   r>   set_read_only  s    zAsyncConnection.set_read_onlyc                 C  s   |  d d S )NZ
deferrabler   r   r=   r=   r>   _set_deferrable  s    
zAsyncConnection._set_deferrablec              
     s:   | j 4 I dH  | | |I dH  W 5 Q I dH R X dS )z6Method version of the `~Connection.deferrable` setter.N)r9   r   Z_set_deferrable_genr   r=   r=   r>   set_deferrable  s    zAsyncConnection.set_deferrable)	attributerH   c                 C  s   t d|d| dd S )Nz'the zD property is read-only on async connections: please use 'await .set_z()' instead.)AttributeError)r:   r   r=   r=   r>   r     s    zAsyncConnection._no_set_asyncz	Xid | str)xidrH   c              
     s:   | j 4 I dH  | | |I dH  W 5 Q I dH R X dS )zO
        Begin a TPC transaction with the given transaction ID `!xid`.
        N)r9   r   Z_tpc_begin_genr:   r   r=   r=   r>   	tpc_begin  s    zAsyncConnection.tpc_beginc              
     sr   z8| j 4 I dH  | |  I dH  W 5 Q I dH R X W n4 tjk
rl } ztt|dW 5 d}~X Y nX dS )zV
        Perform the first phase of a transaction started with `tpc_begin()`.
        N)r9   r   Z_tpc_prepare_genrT   ZObjectNotInPrerequisiteStateZNotSupportedErrorrD   )r:   rf   r=   r=   r>   tpc_prepare  s
    (zAsyncConnection.tpc_preparezXid | str | Nonec              
     s<   | j 4 I dH  | | d|I dH  W 5 Q I dH R X dS )z:
        Commit a prepared two-phase transaction.
        NZCOMMITr9   r   Z_tpc_finish_genr   r=   r=   r>   
tpc_commit  s    zAsyncConnection.tpc_commitc              
     s<   | j 4 I dH  | | d|I dH  W 5 Q I dH R X dS )z=
        Roll back a prepared two-phase transaction.
        NZROLLBACKr   r   r=   r=   r>   tpc_rollback  s    zAsyncConnection.tpc_rollbackz	list[Xid]c              
     s   |    | jj}| jttjd4 I d H (}|t I d H  |	 I d H }W 5 Q I d H R X |t
kr| jjtkr|  I d H  |S )Nr~   )Z
_check_tpcinfor   rz   r   r   Z_from_recordr   Z_get_recover_queryZfetchallIDLEINTRANSrp   )r:   statusr   resr=   r=   r>   tpc_recover  s    zAsyncConnection.tpc_recover)r?   )r?   )N)NF)N)N),__name__
__module____qualname____doc____annotations__r   r   r   r   r8   classmethodrg   ri   rv   rU   ru   r	   rz   r   rs   rp   r   r   r   r   r$   r   rW   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   __classcell__r=   r=   r;   r>   r0   >   s   
 "=
	 %     2
r0   )Tr   
__future__r   loggingtimer   typesr   typingr   r   r   r   r	   r
   
contextlibr   r?   r   r   rT   r   abcr   r   r   r   r   r   r   Z_tpcr   Zrowsr   r   r   r   Zadaptr   Z_enumsr   _compatr   rF   r   r    r!   r"   r4   r#   
generatorsr$   r   r%   Zcursor_asyncr&   Z_capabilitiesr'   Zserver_cursorr(   Z_connection_baser)   r*   r+   rP   rR   r,   r-   Zpq.abcr.   rW   ZFormatZTEXTr   ZTransactionStatusr   r   r   ZCancelledErrorKeyboardInterruptr   	getLoggerrZ   r0   r=   r=   r=   r>   <module>   sN    $

