Files
cavepediav2/poller/main.py
2025-05-27 00:21:32 -04:00

207 lines
6.1 KiB
Python

from pgvector.psycopg import register_vector, Bit
from psycopg.rows import dict_row
from urllib.parse import unquote
from pypdf import PdfReader, PdfWriter
import anthropic
import cohere
import dotenv
import datetime
import io
import json
import minio
import numpy as np
import os
import psycopg
import time
import logging
from pythonjsonlogger.json import JsonFormatter
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logHandler = logging.StreamHandler()
formatter = JsonFormatter("{asctime}{message}", style="{")
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
#####
dotenv.load_dotenv('/home/paul/scripts-private/lech/cavepedia-v2/poller.env')
COHERE_API_KEY = os.getenv('COHERE_API_KEY')
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY')
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY')
s3 = minio.Minio(
's3.bigcavemaps.com',
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
region='kansascity',
)
co = cohere.ClientV2(COHERE_API_KEY)
conn = psycopg.connect(
host='127.0.0.1',
port=4010,
dbname='cavepediav2_db',
user='cavepediav2_user',
password='cavepediav2_pw',
row_factory=dict_row,
)
BACKOFF = False
## init
def create_tables():
commands = (
"CREATE EXTENSION IF NOT EXISTS vector",
"""
CREATE TABLE IF NOT EXISTS embeddings (
bucket TEXT,
key TEXT,
content TEXT,
embedding vector(1536),
PRIMARY KEY (bucket, key)
)
""")
for command in commands:
conn.execute(command)
conn.commit()
register_vector(conn)
## splitting
def split_pdfs():
rows = conn.execute('SELECT * FROM events')
for row in rows:
with conn.cursor() as cur:
for record in row['event_data']['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
key = unquote(key)
logger.info(f'SPLITTING bucket: {bucket}, key: {key}')
##### get pdf #####
with s3.get_object(bucket, key) as obj:
with open('/tmp/file.pdf', 'wb') as f:
while True:
chunk = obj.read(1024)
if not chunk:
break
f.write(chunk)
##### split #####
with open('/tmp/file.pdf', 'rb') as f:
reader = PdfReader(f)
for i in range(len(reader.pages)):
writer = PdfWriter()
writer.add_page(reader.pages[i])
with io.BytesIO() as bs:
writer.write(bs)
bs.seek(0)
s3.put_object('cavepedia-v2-pages', f'{key}/page-{i}.pdf', bs, len(bs.getvalue()))
cur.execute('INSERT INTO embeddings (bucket, key) VALUES (%s, %s);', (f'{bucket}-pages', f'{key}/page-{i}.pdf'))
cur.execute('DELETE FROM events WHERE event_time = %s', (row['event_time'],))
conn.commit()
## processing
def ocr(bucket, key):
url = s3.presigned_get_object(bucket, unquote(key))
client = anthropic.Anthropic()
message = client.messages.create(
model='claude-sonnet-4-20250514',
max_tokens=4000,
temperature=1,
messages=[
{
'role': 'user',
'content': [
{
'type': 'document',
'source': {
'type': 'url',
'url': url
}
},
{
'type': 'text',
'text': 'Extract all text from this document. Do not include any summary or conclusions of your own.'
}
]
}
],
)
return message
def process_events():
rows = conn.execute('SELECT * FROM embeddings WHERE embedding IS NULL')
for row in rows:
bucket = row['bucket']
key = row['key']
logger.info(f'PROCESSING bucket: {bucket}, key: {key}')
# tier 1 limit: 4k tokens/min
# single pdf = 2-3k tokens
max_retries = 5
retry_delay = 30
for attempt in range(max_retries):
try:
ai_ocr = ocr(bucket, key)
text = ai_ocr.content[0].text
text = text.replace('\n',' ')
embedding=embed(text, 'search_document')
conn.execute('UPDATE embeddings SET content = %s, embedding = %s::vector WHERE bucket = %s AND key = %s;', (text, embedding, bucket, key))
conn.commit()
break
except anthropic.APIStatusError as e:
if e.type == 'overloaded_error':
if attempt < max_retries - 1:
sleep_time = retry_delay * (2 ** attempt)
logger.info(f"Overload error. Retrying in {sleep_time:.2f} seconds...")
time.sleep(sleep_time)
else:
logger.info('Max retries reached.')
raise
else:
raise
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
BACKOFF = True
break
### embeddings
def embed(text, input_type):
resp = co.embed(
texts=[text],
model='embed-v4.0',
input_type=input_type,
embedding_types=['float'],
)
return resp.embeddings.float[0]
def fix_pages():
i = 766
while i > 0:
conn.execute('UPDATE embeddings SET key = %s WHERE key = %s', (f'public/va/caves-of-virginia.pdf/page-{i}.pdf', f'public/va/caves-of-virginia.pdf/page-{i-1}.pdf'))
conn.commit()
i -= 1
if __name__ == '__main__':
create_tables()
while True:
BACKOFF = False
split_pdfs()
process_events()
if BACKOFF:
logger.info('BACKOFF')
time.sleep(10 * 60)
time.sleep(5 * 60)