U
    ڲgi$                     @  sV  U d Z ddlmZ ddlZddlmZ ddlmZmZ ddl	m
Z
 ddl	mZ dd	lmZmZ dd
lmZmZmZ ddlmZ ddlmZmZmZ ddlmZ erddlmZ ddlmZ ddl m!Z!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z( dZ)de*d< e
j+j,Z,e
j+j-Z-e
j.j/Z/e
j0j1Z1e2dZ3G dd dZ4G dd de4Z5G dd de4Z6dS )z
commands pipeline management
    )annotationsN)TracebackType)AnyTYPE_CHECKING   )pq)errors)PipelineCommandPQGen)DequeSelf	TypeAlias)connection_summary)pipeline_communicate
fetch_manysend)capabilities)PGresult)
Connection)KeyPrepare)
BaseCursor)BaseConnection)AsyncConnectionzEtuple[BaseCursor[Any, Any], tuple[Key, Prepare, bytes] | None] | Noner   PendingResultpsycopgc                   @  s   e Zd ZU ded< ded< ddddd	Zd
dddZeddddZeddddZ	ddddZ
dddddZddddZddddZdddd Zddd!d"d#Zd$d%dd&d'd(Zddd)d*Zd+S ),BasePipelinezDeque[PipelineCommand]command_queuezDeque[PendingResult]result_queuezBaseConnection[Any]Noneconnreturnc                 C  s0   || _ |j| _tt  | _tt  | _d| _d S )Nr   )_connpgconnr   r	   r   r   r   levelselfr!    r(   5/tmp/pip-unpacked-wheel-b_ea6rx_/psycopg/_pipeline.py__init__/   s
    zBasePipeline.__init__strr"   c                 C  s@   | j j d| j j }t| jj}d| d| dt| ddS )N.< z at 0xx>)	__class__
__module____qualname__r   r#   r$   id)r'   clsinfor(   r(   r)   __repr__6   s    zBasePipeline.__repr__zpq.PipelineStatusc                 C  s   t | jjS N)r   ZPipelineStatusr$   Zpipeline_statusr'   r(   r(   r)   status;   s    zBasePipeline.statusboolc                 C  s   t  S )zCReturn `!True` if the psycopg libpq wrapper supports pipeline mode.)r   has_pipeline)r6   r(   r(   r)   is_supported?   s    zBasePipeline.is_supportedzPQGen[None]c                 c  sT   t jdd | jdkr"| j  n | js4| jjtkrB|  E d H  |  jd7  _d S )NT)checkr   r   )	r   r=   r%   r$   Zenter_pipeline_moder   Ztransaction_statusACTIVE	_sync_genr:   r(   r(   r)   
_enter_genD   s    
zBasePipeline._enter_genBaseException | None)excr"   c              
   C  sz   |  j d8  _ | j dkrv| jjtkrvz| j  W nB tjk
rt } z"|rZtd| | n
|	d W 5 d }~X Y nX d S )Nr   r   zerror ignored exiting %r: %s)
r%   r$   r;   BADZexit_pipeline_modeeZOperationalErrorloggerwarningwith_traceback)r'   rD   exc2r(   r(   r)   _exitR   s    zBasePipeline._exitc                 c  s,   |    |  E d H  | jddE d H  d S )NFflush)_enqueue_sync_communicate_gen
_fetch_genr:   r(   r(   r)   rA   `   s    zBasePipeline._sync_genc              	   c  s4   z|   |  E dH  W 5 | j ddE dH  X dS )z_
        Exit current pipeline by sending a Sync and fetch back all remaining results.
        TrL   N)rP   rN   rO   r:   r(   r(   r)   	_exit_gene   s    zBasePipeline._exit_genc                 c  s~   t | j| jE dH }d}|D ]P}| j }z| || W q tjk
rj } z|dkrZ|}W 5 d}~X Y qX q|dk	rz|dS )zqCommunicate with pipeline to send commands and possibly fetch
        results, which are then processed.
        N)r   r$   r   r   popleft_process_resultsrF   Error)r'   Zfetched	exceptionresultsqueuedrD   r(   r(   r)   rO   o   s    
zBasePipeline._communicate_gen)rM   r"   c             
   c  s   | j s
dS |r(| j  t| jE dH  d}| j rt| jE dH }|sHq| j  }z| || W q, tjk
r } z|dkr|}W 5 d}~X Y q,X q,|dk	r|dS )a^  Fetch available results from the connection and process them with
        pipeline queued items.

        If 'flush' is True, a PQsendFlushRequest() is issued in order to make
        sure results can be fetched. Otherwise, the caller may emit a
        PQpipelineSync() call to ensure the output buffer gets flushed before
        fetching.
        N)	r   r$   Zsend_flush_requestr   r   rR   rS   rF   rT   )r'   rM   rU   rV   rW   rD   r(   r(   r)   rP      s$    	

zBasePipeline._fetch_genr   zlist[PGresult])rW   rV   r"   c           	      C  s   |dkrB|\}|j tkr,tj|| jjdq|j tkrtdn>|\}}|rl|\}}}|jj	
|||| || || dS )ar  Process a results set fetched from the current pipeline.

        This matches 'results' with its respective element in the pipeline
        queue. For commands (None value in the pipeline queue), results are
        checked directly. For prepare statement creation requests, update the
        cache. Otherwise, results are attached to their respective cursor.
        N)encodingzpipeline aborted)r;   FATAL_ERRORrF   Zerror_from_resultr$   	_encodingPIPELINE_ABORTEDZPipelineAbortedr#   Z	_preparedvalidateZ_check_resultsZ_set_results)	r'   rW   rV   resultcursorZprepinfokeyprepnamer(   r(   r)   rS      s    



zBasePipeline._process_resultsc                 C  s    | j | jj | jd dS )z#Enqueue a PQpipelineSync() command.N)r   appendr$   Zpipeline_syncr   r:   r(   r(   r)   rN      s    zBasePipeline._enqueue_syncN)__name__r3   r4   __annotations__r*   r8   propertyr;   classmethodr>   rB   rK   rA   rQ   rO   rP   rS   rN   r(   r(   r(   r)   r   +   s    

 r   c                      sf   e Zd ZU dZdZded< ddd fddZdd	d
dZdd	ddZdddddddZ  Z	S )Pipelinez(Handler for connection in pipeline mode.r   zConnection[Any]r#   r   r    c                   s   t  | d S r9   superr*   r&   r2   r(   r)   r*      s    zPipeline.__init__r,   c              
   C  s\   z(| j j | j |   W 5 Q R X W n. tjk
rV } z|dW 5 d}~X Y nX dS )zkSync the pipeline, send any pending command and receive and process
        all available results.
        Nr#   lockwaitrA   rF   Z_NO_TRACEBACKrI   r'   exr(   r(   r)   sync   s
    
zPipeline.syncr   c              	   C  s(   | j j | j |   W 5 Q R X | S r9   r#   rl   rm   rB   r:   r(   r(   r)   	__enter__   s    
zPipeline.__enter__type[BaseException] | NonerC   TracebackType | Noneexc_typeexc_valexc_tbr"   c              
   C  s   znz(| jj | j|   W 5 Q R X W n@ tk
rj } z"|rPtd| | n
|d W 5 d }~X Y nX W 5 |  | X d S Nz error ignored terminating %r: %s	rK   r#   rl   rm   rQ   	ExceptionrG   rH   rI   r'   rv   rw   rx   rJ   r(   r(   r)   __exit__   s    
 zPipeline.__exit__)
rc   r3   r4   __doc__rd   r*   rp   rr   r}   __classcell__r(   r(   rj   r)   rg      s   

rg   c                      sf   e Zd ZU dZdZded< ddd fddZdd	d
dZdd	ddZdddddddZ  Z	S )AsyncPipelinez.Handler for async connection in pipeline mode.r   zAsyncConnection[Any]r#   r   r    c                   s   t  | d S r9   rh   r&   rj   r(   r)   r*      s    zAsyncPipeline.__init__r,   c              
     sp   z<| j j4 I d H  | j |  I d H  W 5 Q I d H R X W n. tjk
rj } z|d W 5 d }~X Y nX d S r9   rk   rn   r(   r(   r)   rp      s
    *zAsyncPipeline.syncr   c              
     s<   | j j4 I d H  | j |  I d H  W 5 Q I d H R X | S r9   rq   r:   r(   r(   r)   
__aenter__   s    &zAsyncPipeline.__aenter__rs   rC   rt   ru   c              
     s   zz<| jj4 I d H  | j|  I d H  W 5 Q I d H R X W n@ tk
r~ } z"|rdtd| | n
|d W 5 d }~X Y nX W 5 |  | X d S ry   rz   r|   r(   r(   r)   	__aexit__   s    * zAsyncPipeline.__aexit__)
rc   r3   r4   r~   rd   r*   rp   r   r   r   r(   r(   rj   r)   r      s   
r   )7r~   
__future__r   loggingtypesr   typingr   r    r   r   rF   abcr	   r
   _compatr   r   r   Zpq.miscr   
generatorsr   r   r   Z_capabilitiesr   Zpq.abcr   
connectionr   Z
_preparingr   r   Z_cursor_baser   Z_connection_baser   Zconnection_asyncr   r   rd   Z
ExecStatusrY   r[   Z
ConnStatusrE   ZTransactionStatusr@   	getLoggerrG   r   rg   r   r(   r(   r(   r)   <module>   s:   

 +