chore: bump version 0.7.29

This commit is contained in:
Caren Thomas 2025-06-01 11:36:58 -07:00
parent c609574ba8
commit c1b28204f2
3 changed files with 22 additions and 23 deletions

View File

@ -1,4 +1,4 @@
__version__ = "0.7.28"
__version__ = "0.7.29"
# import clients
from letta.client.client import RESTClient

View File

@ -4,11 +4,10 @@ from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import text
from letta.jobs.llm_batch_job_polling import poll_running_llm_batches
from letta.log import get_logger
from letta.server.db import db_registry
from letta.server.db import db_context
from letta.server.server import SyncServer
from letta.settings import settings
@ -35,16 +34,18 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool:
acquired_lock = False
try:
# Use a temporary connection context for the attempt initially
async with db_registry.async_session() as session:
raw_conn = await session.connection()
with db_context() as session:
engine = session.get_bind()
# Get raw connection - MUST be kept open if lock is acquired
raw_conn = engine.raw_connection()
cur = raw_conn.cursor()
# Try to acquire the advisory lock
sql = text("SELECT pg_try_advisory_lock(CAST(:lock_key AS bigint))")
result = await session.execute(sql, {"lock_key": ADVISORY_LOCK_KEY})
acquired_lock = result.scalar_one()
cur.execute("SELECT pg_try_advisory_lock(CAST(%s AS bigint))", (ADVISORY_LOCK_KEY,))
acquired_lock = cur.fetchone()[0]
if not acquired_lock:
await raw_conn.close()
cur.close()
raw_conn.close()
logger.info("Scheduler lock held by another instance.")
return False
@ -105,14 +106,14 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool:
# Clean up temporary resources if lock wasn't acquired or error occurred before storing
if cur:
try:
await cur.close()
except Exception as e:
logger.warning(f"Error closing cursor: {e}")
cur.close()
except:
pass
if raw_conn:
try:
await raw_conn.close()
except Exception as e:
logger.warning(f"Error closing connection: {e}")
raw_conn.close()
except:
pass
async def _background_lock_retry_loop(server: SyncServer):
@ -160,9 +161,7 @@ async def _release_advisory_lock():
try:
if not lock_conn.closed:
if not lock_cur.closed:
# Use SQLAlchemy text() for raw SQL
unlock_sql = text("SELECT pg_advisory_unlock(CAST(:lock_key AS bigint))")
lock_cur.execute(unlock_sql, {"lock_key": ADVISORY_LOCK_KEY})
lock_cur.execute("SELECT pg_advisory_unlock(CAST(%s AS bigint))", (ADVISORY_LOCK_KEY,))
lock_cur.fetchone() # Consume result
lock_conn.commit()
logger.info(f"Executed pg_advisory_unlock for lock {ADVISORY_LOCK_KEY}")
@ -176,12 +175,12 @@ async def _release_advisory_lock():
# Ensure resources are closed regardless of unlock success
try:
if lock_cur and not lock_cur.closed:
await lock_cur.close()
lock_cur.close()
except Exception as e:
logger.error(f"Error closing advisory lock cursor: {e}", exc_info=True)
try:
if lock_conn and not lock_conn.closed:
await lock_conn.close()
lock_conn.close()
logger.info("Closed database connection that held advisory lock.")
except Exception as e:
logger.error(f"Error closing advisory lock connection: {e}", exc_info=True)
@ -253,4 +252,4 @@ async def shutdown_scheduler_and_release_lock():
try:
scheduler.shutdown(wait=False)
except:
pass
pass

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "letta"
version = "0.7.28"
version = "0.7.29"
packages = [
{include = "letta"},
]