From aeae900caead6d3ed1ccda62c475bbc760b44e1f Mon Sep 17 00:00:00 2001 From: Paul Walko Date: Mon, 26 May 2025 08:54:13 -0400 Subject: [PATCH] split pdf --- poller/README.md | 3 +- poller/main.py | 68 +++++++++++++++++++++++++++++++++---------- poller/pyproject.toml | 1 + poller/uv.lock | 11 +++++++ 4 files changed, 67 insertions(+), 16 deletions(-) diff --git a/poller/README.md b/poller/README.md index 6066b01..374914c 100644 --- a/poller/README.md +++ b/poller/README.md @@ -3,5 +3,6 @@ https://min.io/docs/minio/linux/developers/python/API.html#presigned-get-object-bucket-name-object-name-expires-timedelta-days-7-response-headers-none-request-date-none-version-id-none-extra-query-params-none ## TODO -- if pages > 100 -> chunk to cavepedia-v2-scratch -> collect content +- cavepedia-v2 -> +- split pdfs -> chunk and write to cavepedia-v2-pages -> - cohere embedding limits TODO diff --git a/poller/main.py b/poller/main.py index ae7627b..c9dc0be 100644 --- a/poller/main.py +++ b/poller/main.py @@ -1,10 +1,12 @@ from pgvector.psycopg import register_vector, Bit from psycopg.rows import dict_row from urllib.parse import unquote +from pypdf import PdfReader, PdfWriter import anthropic import cohere import dotenv import datetime +import io import json import minio import numpy as np @@ -18,6 +20,12 @@ COHERE_API_KEY = os.getenv('COHERE_API_KEY') MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY') MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY') +s3 = minio.Minio( + 's3.bigcavemaps.com', + access_key=MINIO_ACCESS_KEY, + secret_key=MINIO_SECRET_KEY, + region='kansascity', +) co = cohere.ClientV2(COHERE_API_KEY) conn = psycopg.connect( host='127.0.0.1', @@ -32,6 +40,7 @@ conn = psycopg.connect( def create_tables(): commands = ( "CREATE EXTENSION IF NOT EXISTS vector", + "DROP TABLE IF EXISTS embeddings", """ CREATE TABLE IF NOT EXISTS embeddings ( bucket TEXT, @@ -46,25 +55,53 @@ def create_tables(): conn.commit() register_vector(conn) +## splitting +def split_pdfs(): + rows = conn.execute('SELECT * FROM events') + + for row in rows: + with conn.cursor() as cur: + for record in row['event_data']['Records']: + bucket = record['s3']['bucket']['name'] + key = record['s3']['object']['key'] + key = unquote(key) + + print(f'SPLITTING bucket: {bucket}, key: {key}') + + ##### get pdf ##### + with s3.get_object(bucket, key) as obj: + with open('/tmp/file.pdf', 'wb') as f: + while True: + chunk = obj.read(1024) + if not chunk: + break + f.write(chunk) + + ##### split ##### + with open('/tmp/file.pdf', 'rb') as f: + reader = PdfReader(f) + + for i in range(len(reader.pages)): + writer = PdfWriter() + writer.add_page(reader.pages[i]) + + with io.BytesIO() as bs: + writer.write(bs) + bs.seek(0) + s3.put_object('cavepedia-v2-pages', f'{key}/page-{i}.pdf', bs, len(bs.getvalue())) + cur.execute('INSERT INTO embeddings (bucket, key) VALUES (%s, %s);', (f'{bucket}-pages', f'{key}/page-{i}.pdf')) + + cur.execute('DELETE FROM events WHERE event_time = %s', (row['event_time'],)) + conn.commit() + ## processing -def get_presigned_url(bucket, key) -> str: - client = minio.Minio( - 's3.bigcavemaps.com', - access_key=MINIO_ACCESS_KEY, - secret_key=MINIO_SECRET_KEY, - region='kansascity', - ) - - url = client.presigned_get_object(bucket, unquote(key)) - return url - def ocr(bucket, key): - url = get_presigned_url(bucket, key) + url = s3.presigned_get_object(bucket, unquote(key)) client = anthropic.Anthropic() message = client.messages.create( model='claude-sonnet-4-20250514', - max_tokens=1000, + max_tokens=4000, temperature=1, messages=[ { @@ -88,7 +125,7 @@ def ocr(bucket, key): return message def process_events(): - rows = conn.execute('SELECT * FROM events') + rows = conn.execute('SELECT * FROM embeddings WHERE embedding IS NULL') for row in rows: for record in row['event_data']['Records']: @@ -119,4 +156,5 @@ def embed(text, input_type): if __name__ == '__main__': create_tables() - process_events() + split_pdfs() +# process_events() diff --git a/poller/pyproject.toml b/poller/pyproject.toml index 9f13285..3ece2a9 100644 --- a/poller/pyproject.toml +++ b/poller/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "mypy>=1.15.0", "pgvector>=0.4.1", "psycopg[binary]>=3.2.9", + "pypdf>=5.5.0", "python-dotenv>=1.1.0", "types-psycopg2>=2.9.21.20250516", ] diff --git a/poller/uv.lock b/poller/uv.lock index 71bf936..7889297 100644 --- a/poller/uv.lock +++ b/poller/uv.lock @@ -550,6 +550,7 @@ dependencies = [ { name = "mypy" }, { name = "pgvector" }, { name = "psycopg", extra = ["binary"] }, + { name = "pypdf" }, { name = "python-dotenv" }, { name = "types-psycopg2" }, ] @@ -562,6 +563,7 @@ requires-dist = [ { name = "mypy", specifier = ">=1.15.0" }, { name = "pgvector", specifier = ">=0.4.1" }, { name = "psycopg", extras = ["binary"], specifier = ">=3.2.9" }, + { name = "pypdf", specifier = ">=5.5.0" }, { name = "python-dotenv", specifier = ">=1.1.0" }, { name = "types-psycopg2", specifier = ">=2.9.21.20250516" }, ] @@ -743,6 +745,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/32/56/8a7ca5d2cd2cda1d245d34b1c9a942920a718082ae8e54e5f3e5a58b7add/pydantic_core-2.33.2-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:329467cecfb529c925cf2bbd4d60d2c509bc2fb52a20c1045bf09bb70971a9c1", size = 2066757, upload-time = "2025-04-23T18:33:30.645Z" }, ] +[[package]] +name = "pypdf" +version = "5.5.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e0/c8/543f8ae1cd9e182e9f979d9ab1df18e3445350471abadbdabc0166ae5741/pypdf-5.5.0.tar.gz", hash = "sha256:8ce6a18389f7394fd09a1d4b7a34b097b11c19088a23cfd09e5008f85893e254", size = 5021690, upload-time = "2025-05-11T14:00:42.043Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a1/4e/931b90b51e3ebc69699be926b3d5bfdabae2d9c84337fd0c9fb98adbf70c/pypdf-5.5.0-py3-none-any.whl", hash = "sha256:2f61f2d32dde00471cd70b8977f98960c64e84dd5ba0d070e953fcb4da0b2a73", size = 303371, upload-time = "2025-05-11T14:00:40.064Z" }, +] + [[package]] name = "python-dotenv" version = "1.1.0"