diff --git a/poller/README.md b/poller/README.md index 374914c..39aefe8 100644 --- a/poller/README.md +++ b/poller/README.md @@ -3,6 +3,4 @@ https://min.io/docs/minio/linux/developers/python/API.html#presigned-get-object-bucket-name-object-name-expires-timedelta-days-7-response-headers-none-request-date-none-version-id-none-extra-query-params-none ## TODO -- cavepedia-v2 -> -- split pdfs -> chunk and write to cavepedia-v2-pages -> -- cohere embedding limits TODO +- claude exponential backoff diff --git a/poller/main.py b/poller/main.py index c9dc0be..5728198 100644 --- a/poller/main.py +++ b/poller/main.py @@ -36,11 +36,12 @@ conn = psycopg.connect( row_factory=dict_row, ) +BACKOFF = False + ## init def create_tables(): commands = ( "CREATE EXTENSION IF NOT EXISTS vector", - "DROP TABLE IF EXISTS embeddings", """ CREATE TABLE IF NOT EXISTS embeddings ( bucket TEXT, @@ -128,21 +129,39 @@ def process_events(): rows = conn.execute('SELECT * FROM embeddings WHERE embedding IS NULL') for row in rows: - for record in row['event_data']['Records']: - bucket = record['s3']['bucket']['name'] - key = record['s3']['object']['key'] - print(f'PROCESSING event_time: {row["event_time"]}, bucket: {bucket}, key: {key}') - print() + bucket = row['bucket'] + key = row['key'] + print(f'PROCESSING bucket: {bucket}, key: {key}') - ai_ocr = ocr(bucket, key) - text = ai_ocr.content[0].text - text = text.replace('\n',' ') + # tier 1 limit: 4k tokens/min + # single pdf = 2-3k tokens + max_retries = 5 + retry_delay = 30 + for attempt in range(max_retries): + try: + ai_ocr = ocr(bucket, key) + text = ai_ocr.content[0].text + text = text.replace('\n',' ') - embedding=embed(text, 'search_document') - with conn.cursor() as cur: - cur.execute('INSERT INTO embeddings (bucket, key, embedding) VALUES (%s, %s, %s::vector);', (bucket, key, embedding)) - cur.execute('DELETE FROM events WHERE event_time = %s', (row['event_time'],)) - conn.commit() + embedding=embed(text, 'search_document') + conn.execute('UPDATE embeddings SET content = %s, embedding = %s::vector WHERE bucket = %s AND key = %s;', (text, embedding, bucket, key)) + conn.commit() + break + except anthropic.APIStatusError as e: + if e.type == 'overloaded_error': + if attempt < max_retries - 1: + sleep_time = retry_delay * (2 ** attempt) + print(f"Overload error. Retrying in {sleep_time:.2f} seconds...") + time.sleep(sleep_time) + else: + print('Max retries reached.') + raise + else: + raise + except Exception as e: + print(f"An unexpected error occurred: {e}") + BACKOFF = True + break ### embeddings def embed(text, input_type): @@ -156,5 +175,13 @@ def embed(text, input_type): if __name__ == '__main__': create_tables() - split_pdfs() -# process_events() + while True: + BACKOFF = False + + split_pdfs() + process_events() + + if BACKOFF: + print('BACKOFF') + time.sleep(10 * 60) + time.sleep(5 * 60) diff --git a/poller/search.py b/poller/search.py index 245062f..7f93eb6 100644 --- a/poller/search.py +++ b/poller/search.py @@ -36,7 +36,7 @@ def embed(text, input_type): return resp.embeddings.float[0] def search(): - query = 'sex' + query = 'caves locations in bath county' query_embedding = embed(query, 'search_query') rows = conn.execute('SELECT * FROM embeddings ORDER BY embedding <=> %s::vector LIMIT 5', (query_embedding,)).fetchall()