diff --git a/cavepedia-v2.sh b/docker.sh similarity index 72% rename from cavepedia-v2.sh rename to docker.sh index be6071c..338cca0 100755 --- a/cavepedia-v2.sh +++ b/docker.sh @@ -7,9 +7,7 @@ up () { --detach \ --name cp2-pg \ --restart unless-stopped \ - --env POSTGRES_DB=cavepediav2_db \ - --env POSTGRES_PASSWORD=cavepediav2_pw \ - --env POSTGRES_USER=cavepediav2_user \ + --env-file $HOME/scripts-private/lech/cavepedia-v2/cp2-pg.env \ --volume /mammoth/cp2/cp2-pg/data:/var/lib/postgresql/data:rw \ --publish 127.0.0.1:4010:5432 \ --network pew-net \ diff --git a/poller/getobject.py b/poller/getobject.py new file mode 100644 index 0000000..0eead51 --- /dev/null +++ b/poller/getobject.py @@ -0,0 +1,53 @@ +from pgvector.psycopg import register_vector, Bit +from psycopg.rows import dict_row +from urllib.parse import unquote +from pypdf import PdfReader, PdfWriter +import anthropic +import cohere +import dotenv +import datetime +import io +import json +import minio +import numpy as np +import os +import psycopg +import time +import logging +from pythonjsonlogger.json import JsonFormatter + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +logHandler = logging.StreamHandler() +formatter = JsonFormatter("{asctime}{message}", style="{") +logHandler.setFormatter(formatter) +logger.addHandler(logHandler) + +##### + +dotenv.load_dotenv('/home/paul/scripts-private/lech/cavepedia-v2/poller.env') + +COHERE_API_KEY = os.getenv('COHERE_API_KEY') +MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY') +MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY') + +s3 = minio.Minio( + 's3.bigcavemaps.com', + access_key=MINIO_ACCESS_KEY, + secret_key=MINIO_SECRET_KEY, + region='kansascity', +) + +def getobject(): + bucket = 'cavepedia-v2' + key = 'public/var/fyi/VAR-FYI 1982-01.pdf' + with s3.get_object(bucket, key) as obj: + with open('/tmp/file.pdf', 'wb') as f: + while True: + chunk = obj.read(1024) + if not chunk: + break + f.write(chunk) + +if __name__ == '__main__': + getobject() diff --git a/poller/main.py b/poller/main.py index 3e43519..c8b8dac 100644 --- a/poller/main.py +++ b/poller/main.py @@ -47,9 +47,8 @@ conn = psycopg.connect( row_factory=dict_row, ) -BACKOFF = False - ## init +# events table is created by minio up creation of event destination def create_tables(): commands = ( "CREATE EXTENSION IF NOT EXISTS vector", @@ -73,10 +72,11 @@ def split_pdfs(): for row in rows: with conn.cursor() as cur: - for record in row['event_data']['Records']: + for record in row['value']['Records']: bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] key = unquote(key) + key = key.replace('+',' ') logger.info(f'SPLITTING bucket: {bucket}, key: {key}') @@ -100,10 +100,10 @@ def split_pdfs(): with io.BytesIO() as bs: writer.write(bs) bs.seek(0) - s3.put_object('cavepedia-v2-pages', f'{key}/page-{i}.pdf', bs, len(bs.getvalue())) - cur.execute('INSERT INTO embeddings (bucket, key) VALUES (%s, %s);', (f'{bucket}-pages', f'{key}/page-{i}.pdf')) + s3.put_object(f'{bucket}-pages', f'{key}/page-{i + 1}.pdf', bs, len(bs.getvalue())) + cur.execute('INSERT INTO embeddings (bucket, key) VALUES (%s, %s);', (f'{bucket}-pages', f'{key}/page-{i + 1}.pdf')) - cur.execute('DELETE FROM events WHERE event_time = %s', (row['event_time'],)) + cur.execute('DELETE FROM events WHERE key = %s', (row['key'],)) conn.commit() ## processing @@ -137,42 +137,31 @@ def ocr(bucket, key): return message def process_events(): - rows = conn.execute('SELECT * FROM embeddings WHERE embedding IS NULL') + rows = conn.execute("SELECT COUNT(*) FROM embeddings WHERE embedding IS NULL") + row = rows.fetchone() + logger.info(f'Found {row["count"]} ready to be processed') + + rows = conn.execute("SELECT * FROM embeddings WHERE embedding IS NULL") for row in rows: bucket = row['bucket'] key = row['key'] logger.info(f'PROCESSING bucket: {bucket}, key: {key}') - # tier 1 limit: 4k tokens/min - # single pdf = 2-3k tokens - max_retries = 5 - retry_delay = 30 - for attempt in range(max_retries): - try: - ai_ocr = ocr(bucket, key) - text = ai_ocr.content[0].text - text = text.replace('\n',' ') + ## claude 4 sonnet ## + # tier 1 limit: 8k tokens/min + # tier 2: enough + # single pdf page: up to 2k tokens + try: + ai_ocr = ocr(bucket, key) + text = ai_ocr.content[0].text - embedding=embed(text, 'search_document') - conn.execute('UPDATE embeddings SET content = %s, embedding = %s::vector WHERE bucket = %s AND key = %s;', (text, embedding, bucket, key)) - conn.commit() - break - except anthropic.APIStatusError as e: - if e.type == 'overloaded_error': - if attempt < max_retries - 1: - sleep_time = retry_delay * (2 ** attempt) - logger.info(f"Overload error. Retrying in {sleep_time:.2f} seconds...") - time.sleep(sleep_time) - else: - logger.info('Max retries reached.') - raise - else: - raise - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - BACKOFF = True - break + embedding=embed(text, 'search_document') + conn.execute('UPDATE embeddings SET content = %s, embedding = %s::vector WHERE bucket = %s AND key = %s;', (text, embedding, bucket, key)) + conn.commit() + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + return True ### embeddings def embed(text, input_type): @@ -198,9 +187,10 @@ if __name__ == '__main__': BACKOFF = False split_pdfs() - process_events() + BACKOFF = process_events() if BACKOFF: - logger.info('BACKOFF') - time.sleep(10 * 60) + logger.info('backoff detected, sleeping an extra 5 minutes') + time.sleep(5 * 60) + logger.info('sleeping 5 minutes') time.sleep(5 * 60) diff --git a/poller/search.py b/poller/search.py index b174797..b8399b7 100644 --- a/poller/search.py +++ b/poller/search.py @@ -36,7 +36,7 @@ def embed(text, input_type): return resp.embeddings.float[0] def search(): - query = 'tazwell county caves' + query = 'links trip with not more than 2 people' query_embedding = embed(query, 'search_query') rows = conn.execute('SELECT * FROM embeddings ORDER BY embedding <=> %s::vector LIMIT 5', (query_embedding,)).fetchall()