Skip to content

gcp

GoogleOcrProvider

Bases: BaseOCRProvider

Source code in docprompt/tasks/ocr/gcp.py
class GoogleOcrProvider(BaseOCRProvider):
    name = "gcp_documentai"

    capabilities = [
        PageLevelCapabilities.PAGE_TEXT_OCR,
        PageLevelCapabilities.PAGE_LAYOUT_OCR,
        PageLevelCapabilities.PAGE_RASTERIZATION,
    ]

    max_bytes_per_request: ClassVar[int] = (
        1024 * 1024 * 20
    )  # 20MB is the max size for a single sync request
    max_page_count: ClassVar[int] = 15

    project_id: str = Field(...)
    processor_id: str = Field(...)

    service_account_info: Optional[Dict[str, str]] = Field(None)
    service_account_file: Optional[str] = Field(None)
    location: str = Field("us")
    max_workers: int = Field(multiprocessing.cpu_count() * 2)
    exclude_bounding_poly: bool = Field(False)
    return_images: bool = Field(False)
    return_image_quality_scores: bool = Field(False)

    _documentai: "documentai.DocumentProcessorServiceClient" = PrivateAttr()

    def __init__(
        self,
        project_id: str,
        processor_id: str,
        **kwargs,
    ):
        super().__init__(project_id=project_id, processor_id=processor_id, **kwargs)

        self.service_account_info = self._default_invoke_kwargs.get(
            "service_account_info", None
        )
        self.service_account_file = self._default_invoke_kwargs.get(
            "service_account_file", None
        )

        try:
            from google.cloud import documentai

            self._documentai = documentai
        except ImportError:
            raise ImportError(
                "Please install 'google-cloud-documentai' to use the GoogleCloudVisionTextExtractionProvider"
            )

    def get_documentai_client(self, client_option_kwargs: dict = {}, **kwargs):
        from google.api_core.client_options import ClientOptions

        opts = ClientOptions(
            **{
                "api_endpoint": "us-documentai.googleapis.com",
                **client_option_kwargs,
            }
        )

        base_service_client_kwargs = {
            **kwargs,
            "client_options": opts,
        }

        if self.service_account_info is not None:
            return self._documentai.DocumentProcessorServiceClient.from_service_account_info(
                info=self.service_account_info,
                **base_service_client_kwargs,
            )
        elif self.service_account_file is not None:
            with service_account_file_read_lock:
                return self._documentai.DocumentProcessorServiceClient.from_service_account_file(
                    filename=self.service_account_file,
                    **base_service_client_kwargs,
                )
        else:
            raise ValueError("Missing account info and service file path.")

    def _get_process_options(self):
        if not self.return_image_quality_scores:
            return None

        return self._documentai.ProcessOptions(
            ocr_config=self._documentai.OcrConfig(
                enable_image_quality_scores=True,
            )
        )

    def _process_document_sync(self, document: Document):
        """
        Split the document into chunks of 15 pages or less, and process each chunk
        synchronously.
        """
        client = self.get_documentai_client()
        processor_name = client.processor_path(
            project=self.project_id,
            location=self.location,
            processor=self.processor_id,
        )

        documents: List["documentai.Document"] = []

        file_bytes = document.get_bytes()

        @default_retry_decorator
        def process_byte_chunk(split_bytes: bytes) -> "documentai.Document":
            raw_document = self._documentai.RawDocument(
                content=split_bytes,
                mime_type="application/pdf",
            )

            field_mask = (
                "text,pages.layout,pages.words,pages.lines,pages.tokens,pages.blocks"
            )

            if self.return_images:
                field_mask += ",pages.image"

            if self.return_image_quality_scores:
                field_mask += ",image_quality_scores"

            request = self._documentai.ProcessRequest(
                name=processor_name,
                raw_document=raw_document,
                process_options=self._get_process_options(),
            )

            result = client.process_document(request=request)

            return result.document

        with tqdm.tqdm(
            total=len(file_bytes), unit="B", unit_scale=True, desc="Processing document"
        ) as pbar:
            for split_bytes in pdf_split_iter_with_max_bytes(
                file_bytes,
                max_page_count=self.max_page_count,
                max_bytes=self.max_bytes_per_request,
            ):
                document = process_byte_chunk(split_bytes)

                documents.append(document)

                pbar.update(len(split_bytes))

        return gcp_documents_to_result(
            documents,
            self.name,
            document_name=document.name,
            file_hash=document.document_hash,
            exclude_bounding_poly=self.exclude_bounding_poly,
            return_images=self.return_images,
        )

    def _process_document_concurrent(
        self,
        document: Document,
        start: Optional[int] = None,
        stop: Optional[int] = None,
        include_raster: bool = False,
    ):
        # Process page chunks concurrently
        client = self.get_documentai_client()
        processor_name = client.processor_path(
            project=self.project_id,
            location=self.location,
            processor=self.processor_id,
        )

        file_bytes = document.file_bytes

        if document.bytes_per_page > 1024 * 1024 * 2:
            logger.info("Document has few pages but is large, compressing first")
            file_bytes = document.to_compressed_bytes()

        logger.info("Splitting document into chunks...")
        document_byte_splits = list(
            pdf_split_iter_with_max_bytes(
                file_bytes,
                max_page_count=self.max_page_count,
                max_bytes=self.max_bytes_per_request,
            )
        )

        max_workers = min(len(document_byte_splits), self.max_workers)

        @default_retry_decorator
        def process_byte_chunk(split_bytes: bytes):
            raw_document = self._documentai.RawDocument(
                content=split_bytes,
                mime_type="application/pdf",
            )

            field_mask = (
                "text,pages.layout,pages.words,pages.lines,pages.tokens,pages.blocks"
            )

            if self.return_images:
                field_mask += ",pages.image"

            if self.return_image_quality_scores:
                field_mask += ",image_quality_scores"

            request = self._documentai.ProcessRequest(
                name=processor_name,
                raw_document=raw_document,
                process_options=self._get_process_options(),
            )

            result = client.process_document(request=request)

            document = result.document

            return document

        logger.info(f"Processing {len(document_byte_splits)} chunks...")
        with tqdm.tqdm(
            total=len(document_byte_splits), desc="Processing document"
        ) as pbar:
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                future_to_index = {
                    executor.submit(process_byte_chunk, split): index
                    for index, split in enumerate(document_byte_splits)
                }

                documents: List["documentai.Document"] = [None] * len(
                    document_byte_splits
                )  # type: ignore

                for future in as_completed(future_to_index):
                    index = future_to_index[future]
                    documents[index] = future.result()
                    pbar.update(1)

        logger.info("Recombining OCR results...")
        return gcp_documents_to_result(
            documents,
            self.name,
            document_name=document.name,
            file_hash=document.document_hash,
            exclude_bounding_poly=self.exclude_bounding_poly,
            return_images=self.return_images,
        )

    def _invoke(
        self,
        input: List[PdfDocument],
        config: None = None,
        start: Optional[int] = None,
        stop: Optional[int] = None,
        **kwargs,
    ):
        if len(input) != 1:
            raise ValueError(
                "GoogleOcrProvider only supports processing a single document at a time."
            )

        return self._process_document_concurrent(input[0], start=start, stop=stop)

    def process_document_node(
        self,
        document_node: "DocumentNode",
        task_config: None = None,
        start: Optional[int] = None,
        stop: Optional[int] = None,
        contribute_to_document: bool = True,
        **kwargs,
    ) -> Dict[int, OcrPageResult]:
        base_result = self.invoke(
            [document_node.document.file_bytes], start=start, stop=stop, **kwargs
        )

        # For OCR, we also need to populate the ocr_results for powered search
        self._populate_ocr_results(document_node, base_result)

        return base_result

text_from_layout(layout, document_text, offset=0)

Offset is used to account for the fact that text references are relative to the entire document.

Source code in docprompt/tasks/ocr/gcp.py
def text_from_layout(
    layout: Union["documentai.Document.Page.Layout", "documentai.Document.Page.Token"],
    document_text: str,
    offset: int = 0,
) -> str:
    """
    Offset is used to account for the fact that text references
    are relative to the entire document.
    """
    working_text = ""

    for segment in sorted(layout.text_anchor.text_segments, key=lambda x: x.end_index):
        start = getattr(segment, "start_index", 0)
        end = segment.end_index

        working_text += document_text[start - offset : end - offset]

    return working_text