U
    ڲg&                     @  s0  d Z ddlmZ ddlmZmZ ddlmZ ddlm	Z	m
Z
mZmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZmZmZ ddlmZmZ ddlmZmZmZmZ erddlm Z  ddl!m"Z" ddl#m$Z$ ej%j&Z&ej%j'Z'ej(j)Z)G dd ded Z*G dd deZ+G dd de+Z,G dd de,Z-dS )z6
Objects to support the COPY protocol (sync version).
    )annotations)ABCabstractmethod)TracebackType)AnyIteratorSequenceTYPE_CHECKING   )pq)errors)Self)BaseCopyMAX_BUFFER_SIZE
QUEUE_SIZEPREFER_FLUSH)copy_tocopy_end)spawngatherQueueWorker)Buffer)Cursor)
Connectionc                      s   e Zd ZU dZdZded< dddddd	d
 fddZddddZdddddddZddddZddddZ	dddd Z
d!dd"d#Zd$dd%d&d'Zd(dd)d*d+Zddd,d-d.Z  ZS )/Copyaj  Manage an asynchronous :sql:`COPY` operation.

    :param cursor: the cursor where the operation is performed.
    :param binary: if `!True`, write binary format.
    :param writer: the object to write to destination. If not specified, write
        to the `!cursor` connection.

    Choosing `!binary` is not necessary if the cursor has executed a
    :sql:`COPY` operation, because the operation result describes the format
    too. The parameter is useful when a `!Copy` object is created manually and
    no operation is performed on the cursor, such as when using ``writer=``\
    `~psycopg.copy.FileWriter`.
    ZpsycopgWriterwriterN)binaryr   Cursor[Any]zbool | NonezWriter | None)cursorr   r   c                  s.   t  j||d |st|}|| _|j| _d S )N)r   )super__init__LibpqWriterr   write_write)selfr    r   r   	__class__ 1/tmp/pip-unpacked-wheel-b_ea6rx_/psycopg/_copy.pyr"   5   s
    zCopy.__init__r   returnc                 C  s   |    | S N)_enterr&   r)   r)   r*   	__enter__C   s    zCopy.__enter__ztype[BaseException] | NoneBaseException | NonezTracebackType | NoneNone)exc_typeexc_valexc_tbr,   c                 C  s   |  | d S r-   )finish)r&   r3   r4   r5   r)   r)   r*   __exit__G   s    zCopy.__exit__zIterator[Buffer]c                 c  s   |   }|sq|V  q dS )z5Implement block-by-block iteration on :sql:`COPY TO`.N)readr&   datar)   r)   r*   __iter__Q   s    zCopy.__iter__r   c                 C  s   | j |  S )z
        Read an unparsed row after a :sql:`COPY TO` operation.

        Return an empty string when the data is finished.
        )
connectionwaitZ	_read_genr/   r)   r)   r*   r8   Y   s    z	Copy.readzIterator[tuple[Any, ...]]c                 c  s   |   }|dkrq|V  q dS )z
        Iterate on the result of a :sql:`COPY TO` operation record by record.

        Note that the records returned will be tuples of unparsed strings or
        bytes, unless data types are specified using `set_types()`.
        N)read_row)r&   recordr)   r)   r*   rowsa   s    z	Copy.rowsztuple[Any, ...] | Nonec                 C  s   | j |  S )a  
        Read a parsed row of data from a table after a :sql:`COPY TO` operation.

        Return `!None` when the data is finished.

        Note that the records returned will be tuples of unparsed strings or
        bytes, unless data types are specified using `set_types()`.
        )r<   r=   Z_read_row_genr/   r)   r)   r*   r>   n   s    	zCopy.read_rowzBuffer | str)bufferr,   c                 C  s   | j |}|r| | dS )z
        Write a block of data to a table after a :sql:`COPY FROM` operation.

        If the :sql:`COPY` is in binary format `!buffer` must be `!bytes`. In
        text mode it can be either `!bytes` or `!str`.
        N)	formatterr$   r%   )r&   rA   r:   r)   r)   r*   r$   y   s    z
Copy.writezSequence[Any])rowr,   c                 C  s   | j |}|r| | dS )z=Write a record to a table after a :sql:`COPY FROM` operation.N)rB   	write_rowr%   )r&   rC   r:   r)   r)   r*   rD      s    zCopy.write_rowexcr,   c                 C  sl   | j tkr6| j }|r"| | | j| d| _n2|s>dS | jj	t
krNdS | j  | j|   dS )a  Terminate the copy operation and free the resources allocated.

        You shouldn't need to call this function yourself: it is usually called
        by exit. It is available if, despite what is documented, you end up
        using the `Copy` object outside a block.
        TN)Z
_directionCOPY_INrB   endr%   r   r6   	_finished_pgconnZtransaction_statusACTIVEr<   Z_try_cancelr=   Z_end_copy_out_gen)r&   rF   r:   r)   r)   r*   r6      s    



zCopy.finish)__name__
__module____qualname____doc____annotations__r"   r0   r7   r;   r8   r@   r>   r$   rD   r6   __classcell__r)   r)   r'   r*   r   "   s   

r   zConnection[Any]c                   @  s6   e Zd ZdZedddddZdddd	d
dZdS )r   zG
    A class to write copy data somewhere (for async connections).
    r   r2   r:   r,   c                 C  s   dS )zWrite some data to destination.Nr)   r9   r)   r)   r*   r$      s    zWriter.writeNr1   rE   c                 C  s   dS )z
        Called when write operations are finished.

        If operations finished with an error, it will be passed to ``exc``.
        Nr)   r&   rF   r)   r)   r*   r6      s    zWriter.finish)N)rL   rM   rN   rO   r   r$   r6   r)   r)   r)   r*   r      s   r   c                   @  sD   e Zd ZdZdZddddZddd	d
dZddddddZdS )r#   z@
    An `Writer` to write copy data to a Postgres database.
    psycopg.copyr   r    c                 C  s   || _ |j| _| jj| _d S r-   )r    r<   ZpgconnrJ   r&   r    r)   r)   r*   r"      s    zLibpqWriter.__init__r   r2   rR   c              	   C  sd   t |tkr&| jt| j|td n:tdt |tD ](}| jt| j|||t  td q6d S )Nflushr   )lenr   r<   r=   r   rJ   r   ranger&   r:   ir)   r)   r*   r$      s      zLibpqWriter.writeNr1   rE   c                 C  st   |r,dt |j d| }|| jjd}nd }z| jt| j|}W n tj	k
rd   |s` Y nX |g| j
_d S )Nzerror from Python: z - replace)typerN   encoderJ   	_encodingr<   r=   r   eZQueryCanceledr    Z_results)r&   rF   msgZbmsgresr)   r)   r*   r6      s    zLibpqWriter.finish)N)rL   rM   rN   rO   r"   r$   r6   r)   r)   r)   r*   r#      s
   r#   c                      s^   e Zd ZdZdZdd fddZddd	d
ZdddddZdddd fddZ  ZS )QueuedLibpqWriterz
    `Writer` using a buffer to queue data to write.

    `write()` returns immediately, so that the main thread can be CPU-bound
    formatting messages, while a worker thread can be IO-bound waiting to write
    on the connection.
    rT   r   rU   c                   s(   t  | ttd| _d | _d | _d S )N)maxsize)r!   r"   r   r   _queue_worker_worker_errorrV   r'   r)   r*   r"      s    zQueuedLibpqWriter.__init__r2   r+   c              
   C  s\   z.| j  }|sq,| jt| j|td qW n( tk
rV } z
|| _W 5 d}~X Y nX dS )zPush data to the server when available from the copy queue.

        Terminate reading when the queue receives a false-y value, or in case
        of error.

        The function is designed to be run in a separate task.
        rW   N)	rf   getr<   r=   r   rJ   r   BaseExceptionrh   )r&   r:   exr)   r)   r*   worker   s    
zQueuedLibpqWriter.workerr   rR   c                 C  sj   | j st| j| _ | jr| jt|tkr8| j| n.tdt|tD ]}| j|||t   qHd S )Nr   )	rg   r   rl   rh   rY   r   rf   putrZ   r[   r)   r)   r*   r$     s    zQueuedLibpqWriter.writeNr1   rE   c                   s>   | j d | jr"t| j d | _| jr.| jt | d S )N    )rf   rm   rg   r   rh   r!   r6   rS   r'   r)   r*   r6     s    
zQueuedLibpqWriter.finish)N)	rL   rM   rN   rO   r"   rl   r$   r6   rQ   r)   r)   r'   r*   rd      s   rd   N).rO   
__future__r   abcr   r   typesr   typingr   r   r   r	    r   r   ra   _compatr   Z
_copy_baser   r   r   r   
generatorsr   r   Z_acompatr   r   r   r   r   r    r   r<   r   Z
ExecStatusrG   ZCOPY_OUTZTransactionStatusrK   r   r   r#   rd   r)   r)   r)   r*   <module>   s,    /