U
    ڲg'                     @  s0  d Z ddlmZ ddlmZmZ ddlmZ ddlm	Z	m
Z
mZmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZmZmZ ddlmZmZ ddlmZmZmZmZ erddlm Z  ddl!m"Z" ddl#m$Z$ ej%j&Z&ej%j'Z'ej(j)Z)G dd ded Z*G dd deZ+G dd de+Z,G dd de,Z-dS )z7
Objects to support the COPY protocol (async version).
    )annotations)ABCabstractmethod)TracebackType)AnyAsyncIteratorSequenceTYPE_CHECKING   )pq)errors)Self)BaseCopyMAX_BUFFER_SIZE
QUEUE_SIZEPREFER_FLUSH)copy_tocopy_end)aspawnagatherAQueueAWorker)Buffer)AsyncCursor)AsyncConnectionc                      s   e Zd ZU dZdZded< dddddd	d
 fddZddddZdddddddZddddZddddZ	dddd Z
d!dd"d#Zd$dd%d&d'Zd(dd)d*d+Zddd,d-d.Z  ZS )/	AsyncCopyaj  Manage an asynchronous :sql:`COPY` operation.

    :param cursor: the cursor where the operation is performed.
    :param binary: if `!True`, write binary format.
    :param writer: the object to write to destination. If not specified, write
        to the `!cursor` connection.

    Choosing `!binary` is not necessary if the cursor has executed a
    :sql:`COPY` operation, because the operation result describes the format
    too. The parameter is useful when a `!Copy` object is created manually and
    no operation is performed on the cursor, such as when using ``writer=``\
    `~psycopg.copy.FileWriter`.
    ZpsycopgAsyncWriterwriterN)binaryr   AsyncCursor[Any]zbool | NonezAsyncWriter | None)cursorr   r   c                  s.   t  j||d |st|}|| _|j| _d S )N)r   )super__init__AsyncLibpqWriterr   write_write)selfr    r   r   	__class__ 7/tmp/pip-unpacked-wheel-b_ea6rx_/psycopg/_copy_async.pyr"   2   s
    zAsyncCopy.__init__r   returnc                   s   |    | S N)_enterr&   r)   r)   r*   
__aenter__@   s    zAsyncCopy.__aenter__ztype[BaseException] | NoneBaseException | NonezTracebackType | NoneNone)exc_typeexc_valexc_tbr,   c                   s   |  |I d H  d S r-   )finish)r&   r3   r4   r5   r)   r)   r*   	__aexit__D   s    zAsyncCopy.__aexit__zAsyncIterator[Buffer]c                 C s    |   I dH }|sq|V  q dS )z5Implement block-by-block iteration on :sql:`COPY TO`.N)readr&   datar)   r)   r*   	__aiter__N   s    zAsyncCopy.__aiter__r   c                   s   | j |  I dH S )z
        Read an unparsed row after a :sql:`COPY TO` operation.

        Return an empty string when the data is finished.
        N)
connectionwaitZ	_read_genr/   r)   r)   r*   r8   V   s    zAsyncCopy.readzAsyncIterator[tuple[Any, ...]]c                 C s$   |   I dH }|dkrq |V  q dS )z
        Iterate on the result of a :sql:`COPY TO` operation record by record.

        Note that the records returned will be tuples of unparsed strings or
        bytes, unless data types are specified using `set_types()`.
        N)read_row)r&   recordr)   r)   r*   rows^   s    zAsyncCopy.rowsztuple[Any, ...] | Nonec                   s   | j |  I dH S )a  
        Read a parsed row of data from a table after a :sql:`COPY TO` operation.

        Return `!None` when the data is finished.

        Note that the records returned will be tuples of unparsed strings or
        bytes, unless data types are specified using `set_types()`.
        N)r<   r=   Z_read_row_genr/   r)   r)   r*   r>   k   s    	zAsyncCopy.read_rowzBuffer | str)bufferr,   c                   s$   | j |}|r | |I dH  dS )z
        Write a block of data to a table after a :sql:`COPY FROM` operation.

        If the :sql:`COPY` is in binary format `!buffer` must be `!bytes`. In
        text mode it can be either `!bytes` or `!str`.
        N)	formatterr$   r%   )r&   rA   r:   r)   r)   r*   r$   v   s    zAsyncCopy.writezSequence[Any])rowr,   c                   s$   | j |}|r | |I dH  dS )z=Write a record to a table after a :sql:`COPY FROM` operation.N)rB   	write_rowr%   )r&   rC   r:   r)   r)   r*   rD      s    zAsyncCopy.write_rowexcr,   c                   s   | j tkrB| j }|r(| |I dH  | j|I dH  d| _n>|sJdS | jj	t
krZdS | j I dH  | j|  I dH  dS )a  Terminate the copy operation and free the resources allocated.

        You shouldn't need to call this function yourself: it is usually called
        by exit. It is available if, despite what is documented, you end up
        using the `Copy` object outside a block.
        NT)Z
_directionCOPY_INrB   endr%   r   r6   	_finished_pgconnZtransaction_statusACTIVEr<   Z_try_cancelr=   Z_end_copy_out_gen)r&   rF   r:   r)   r)   r*   r6      s    

zAsyncCopy.finish)__name__
__module____qualname____doc____annotations__r"   r0   r7   r;   r8   r@   r>   r$   rD   r6   __classcell__r)   r)   r'   r*   r      s   

r   zAsyncConnection[Any]c                   @  s6   e Zd ZdZedddddZdddd	d
dZdS )r   zG
    A class to write copy data somewhere (for async connections).
    r   r2   r:   r,   c                   s   dS )zWrite some data to destination.Nr)   r9   r)   r)   r*   r$      s    zAsyncWriter.writeNr1   rE   c                   s   dS )z
        Called when write operations are finished.

        If operations finished with an error, it will be passed to ``exc``.
        Nr)   r&   rF   r)   r)   r*   r6      s    zAsyncWriter.finish)N)rL   rM   rN   rO   r   r$   r6   r)   r)   r)   r*   r      s   r   c                   @  sD   e Zd ZdZdZddddZddd	d
dZddddddZdS )r#   zE
    An `AsyncWriter` to write copy data to a Postgres database.
    psycopg.copyr   r    c                 C  s   || _ |j| _| jj| _d S r-   )r    r<   ZpgconnrJ   r&   r    r)   r)   r*   r"      s    zAsyncLibpqWriter.__init__r   r2   rR   c              	     sp   t |tkr,| jt| j|tdI d H  n@tdt |tD ].}| jt| j|||t  tdI d H  q<d S )Nflushr   )lenr   r<   r=   r   rJ   r   ranger&   r:   ir)   r)   r*   r$      s       zAsyncLibpqWriter.writeNr1   rE   c                   sz   |r,dt |j d| }|| jjd}nd }z| jt| j|I d H }W n tj	k
rj   |sf Y nX |g| j
_d S )Nzerror from Python: z - replace)typerN   encoderJ   	_encodingr<   r=   r   eZQueryCanceledr    Z_results)r&   rF   msgZbmsgresr)   r)   r*   r6      s    zAsyncLibpqWriter.finish)N)rL   rM   rN   rO   r"   r$   r6   r)   r)   r)   r*   r#      s
   r#   c                      s^   e Zd ZdZdZdd fddZddd	d
ZdddddZdddd fddZ  ZS )AsyncQueuedLibpqWriterz
    `AsyncWriter` using a buffer to queue data to write.

    `write()` returns immediately, so that the main thread can be CPU-bound
    formatting messages, while a worker thread can be IO-bound waiting to write
    on the connection.
    rT   r   rU   c                   s(   t  | ttd| _d | _d | _d S )N)maxsize)r!   r"   r   r   _queue_worker_worker_errorrV   r'   r)   r*   r"      s    zAsyncQueuedLibpqWriter.__init__r2   r+   c              
     sh   z:| j  I dH }|sq8| jt| j|tdI dH  qW n( tk
rb } z
|| _W 5 d}~X Y nX dS )zPush data to the server when available from the copy queue.

        Terminate reading when the queue receives a false-y value, or in case
        of error.

        The function is designed to be run in a separate task.
        NrW   )	rf   getr<   r=   r   rJ   r   BaseExceptionrh   )r&   r:   exr)   r)   r*   worker   s    zAsyncQueuedLibpqWriter.workerr   rR   c                   sv   | j st| j| _ | jr| jt|tkr>| j|I d H  n4tdt|tD ]"}| j|||t  I d H  qNd S )Nr   )	rg   r   rl   rh   rY   r   rf   putrZ   r[   r)   r)   r*   r$     s    zAsyncQueuedLibpqWriter.writeNr1   rE   c                   sP   | j dI d H  | jr.t| jI d H  d | _| jr:| jt |I d H  d S )N    )rf   rm   rg   r   rh   r!   r6   rS   r'   r)   r*   r6      s    zAsyncQueuedLibpqWriter.finish)N)	rL   rM   rN   rO   r"   rl   r$   r6   rQ   r)   r)   r'   r*   rd      s   rd   N).rO   
__future__r   abcr   r   typesr   typingr   r   r   r	    r   r   ra   _compatr   Z
_copy_baser   r   r   r   
generatorsr   r   Z_acompatr   r   r   r   r   Zcursor_asyncr   Zconnection_asyncr   Z
ExecStatusrG   ZCOPY_OUTZTransactionStatusrK   r   r   r#   rd   r)   r)   r)   r*   <module>   s,    /