class GoogleOcrProvider(BaseOCRProvider):
name = "gcp_documentai"
capabilities = [
PageLevelCapabilities.PAGE_TEXT_OCR,
PageLevelCapabilities.PAGE_LAYOUT_OCR,
PageLevelCapabilities.PAGE_RASTERIZATION,
]
max_bytes_per_request: ClassVar[int] = (
1024 * 1024 * 20
) # 20MB is the max size for a single sync request
max_page_count: ClassVar[int] = 15
project_id: str = Field(...)
processor_id: str = Field(...)
service_account_info: Optional[Dict[str, str]] = Field(None)
service_account_file: Optional[str] = Field(None)
location: str = Field("us")
max_workers: int = Field(multiprocessing.cpu_count() * 2)
exclude_bounding_poly: bool = Field(False)
return_images: bool = Field(False)
return_image_quality_scores: bool = Field(False)
_documentai: "documentai.DocumentProcessorServiceClient" = PrivateAttr()
def __init__(
self,
project_id: str,
processor_id: str,
**kwargs,
):
super().__init__(project_id=project_id, processor_id=processor_id, **kwargs)
self.service_account_info = self._default_invoke_kwargs.get(
"service_account_info", None
)
self.service_account_file = self._default_invoke_kwargs.get(
"service_account_file", None
)
try:
from google.cloud import documentai
self._documentai = documentai
except ImportError:
raise ImportError(
"Please install 'google-cloud-documentai' to use the GoogleCloudVisionTextExtractionProvider"
)
def get_documentai_client(self, client_option_kwargs: dict = {}, **kwargs):
from google.api_core.client_options import ClientOptions
opts = ClientOptions(
**{
"api_endpoint": "us-documentai.googleapis.com",
**client_option_kwargs,
}
)
base_service_client_kwargs = {
**kwargs,
"client_options": opts,
}
if self.service_account_info is not None:
return self._documentai.DocumentProcessorServiceClient.from_service_account_info(
info=self.service_account_info,
**base_service_client_kwargs,
)
elif self.service_account_file is not None:
with service_account_file_read_lock:
return self._documentai.DocumentProcessorServiceClient.from_service_account_file(
filename=self.service_account_file,
**base_service_client_kwargs,
)
else:
raise ValueError("Missing account info and service file path.")
def _get_process_options(self):
if not self.return_image_quality_scores:
return None
return self._documentai.ProcessOptions(
ocr_config=self._documentai.OcrConfig(
enable_image_quality_scores=True,
)
)
def _process_document_sync(self, document: Document):
"""
Split the document into chunks of 15 pages or less, and process each chunk
synchronously.
"""
client = self.get_documentai_client()
processor_name = client.processor_path(
project=self.project_id,
location=self.location,
processor=self.processor_id,
)
documents: List["documentai.Document"] = []
file_bytes = document.get_bytes()
@default_retry_decorator
def process_byte_chunk(split_bytes: bytes) -> "documentai.Document":
raw_document = self._documentai.RawDocument(
content=split_bytes,
mime_type="application/pdf",
)
field_mask = (
"text,pages.layout,pages.words,pages.lines,pages.tokens,pages.blocks"
)
if self.return_images:
field_mask += ",pages.image"
if self.return_image_quality_scores:
field_mask += ",image_quality_scores"
request = self._documentai.ProcessRequest(
name=processor_name,
raw_document=raw_document,
process_options=self._get_process_options(),
)
result = client.process_document(request=request)
return result.document
with tqdm.tqdm(
total=len(file_bytes), unit="B", unit_scale=True, desc="Processing document"
) as pbar:
for split_bytes in pdf_split_iter_with_max_bytes(
file_bytes,
max_page_count=self.max_page_count,
max_bytes=self.max_bytes_per_request,
):
document = process_byte_chunk(split_bytes)
documents.append(document)
pbar.update(len(split_bytes))
return gcp_documents_to_result(
documents,
self.name,
document_name=document.name,
file_hash=document.document_hash,
exclude_bounding_poly=self.exclude_bounding_poly,
return_images=self.return_images,
)
def _process_document_concurrent(
self,
document: Document,
start: Optional[int] = None,
stop: Optional[int] = None,
include_raster: bool = False,
):
# Process page chunks concurrently
client = self.get_documentai_client()
processor_name = client.processor_path(
project=self.project_id,
location=self.location,
processor=self.processor_id,
)
file_bytes = document.file_bytes
if document.bytes_per_page > 1024 * 1024 * 2:
logger.info("Document has few pages but is large, compressing first")
file_bytes = document.to_compressed_bytes()
logger.info("Splitting document into chunks...")
document_byte_splits = list(
pdf_split_iter_with_max_bytes(
file_bytes,
max_page_count=self.max_page_count,
max_bytes=self.max_bytes_per_request,
)
)
max_workers = min(len(document_byte_splits), self.max_workers)
@default_retry_decorator
def process_byte_chunk(split_bytes: bytes):
raw_document = self._documentai.RawDocument(
content=split_bytes,
mime_type="application/pdf",
)
field_mask = (
"text,pages.layout,pages.words,pages.lines,pages.tokens,pages.blocks"
)
if self.return_images:
field_mask += ",pages.image"
if self.return_image_quality_scores:
field_mask += ",image_quality_scores"
request = self._documentai.ProcessRequest(
name=processor_name,
raw_document=raw_document,
process_options=self._get_process_options(),
)
result = client.process_document(request=request)
document = result.document
return document
logger.info(f"Processing {len(document_byte_splits)} chunks...")
with tqdm.tqdm(
total=len(document_byte_splits), desc="Processing document"
) as pbar:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_index = {
executor.submit(process_byte_chunk, split): index
for index, split in enumerate(document_byte_splits)
}
documents: List["documentai.Document"] = [None] * len(
document_byte_splits
) # type: ignore
for future in as_completed(future_to_index):
index = future_to_index[future]
documents[index] = future.result()
pbar.update(1)
logger.info("Recombining OCR results...")
return gcp_documents_to_result(
documents,
self.name,
document_name=document.name,
file_hash=document.document_hash,
exclude_bounding_poly=self.exclude_bounding_poly,
return_images=self.return_images,
)
def _invoke(
self,
input: List[PdfDocument],
config: None = None,
start: Optional[int] = None,
stop: Optional[int] = None,
**kwargs,
):
if len(input) != 1:
raise ValueError(
"GoogleOcrProvider only supports processing a single document at a time."
)
return self._process_document_concurrent(input[0], start=start, stop=stop)
def process_document_node(
self,
document_node: "DocumentNode",
task_config: None = None,
start: Optional[int] = None,
stop: Optional[int] = None,
contribute_to_document: bool = True,
**kwargs,
) -> Dict[int, OcrPageResult]:
base_result = self.invoke(
[document_node.document.file_bytes], start=start, stop=stop, **kwargs
)
# For OCR, we also need to populate the ocr_results for powered search
self._populate_ocr_results(document_node, base_result)
return base_result