Top.Mail.Ru
Ответы

Питон и БД - пните в верном направлении

Задача: читаем таблицу, меняем в ней некоторые значения, пишем изменения назад. На PL/SQL это бы делалось через CURSOR ... FOR UPDATE или BULK COLLECT, если по-быстрому, но через память. На C# можно считать DataSet, поменять все что надо, и сказать ему записать обновления в базу. А как что-то такое сделать на питоне?
Если это важно, то используется Snowpark Session, из которой есть доступ к Snowflake Connector, если приспичит. Этот Snowpark оперирует DataFrame, насколько я понимаю, это какой-то стандартный подход к работе с данными в питоне.
Пока сумасшедшая идея состоит в том, чтобы все считать в память, сформировать новый список строк (поскольку объект Row нельзя модифицировать, надо делать новый), и перезаписать его в исходную таблицу. Хотелось бы этого все же по возможности избежать. Размер памяти тут не является преградой, там облачный сервер, съест что угодно.

По дате
По рейтингу
Аватар пользователя
Оракул
1мес

Snowpark-DataFrame ленивый: все, что вы над ним пишете, превращается в единый SQL-план и исполняется прямо в кластере Snowflake. Поэтому самый прямой путь – описать, как меняется столбец, и вызвать update/merge/overwrite, не вытаскивая данные из базы.

Если изменение можно выразить обычным SQL, то банально:

12345
 session.sql(""" 
    update my_schema.my_table 
       set price = price * 1.2 
     where category = 'books' 
""").collect() 

Никакой памяти на клиенте, выполняется распределённо.

Когда логика на чистом SQL не помещается и хочется Python, используем Snowpark-UDF. Пишем функцию, регистрируем как UDF, цепляем её в DataFrame и делаем update:

12345678910
 from snowflake.snowpark.functions import col, udf 
 
@udf(name="normalize_phone", replace=True) 
def normalize_phone(phone: str) -> str: 
    import re 
    return re.sub(r'\D', '', phone)  # пример «сложной» питоновской логики 
 
df = session.table("contacts") 
df_to_update = df.with_column("phone", normalize_phone(col("phone"))) 
df_to_update.update({"phone": col("phone")})  # будет сгенерирован UPDATE ... SET phone = normalize_phone(phone) 

Метод update у DataFrame появился начиная с версии 1.1 Snowpark и генерирует обычный DML, так что всё опять же выполняется на стороне Snowflake.

Если трансформация затрагивает сразу много столбцов, проще создать временную таблицу и потом сделать MERGE. В коде это выглядит так:

1234567891011121314151617
 orig = session.table("sales") 
transformed = (orig 
               .with_column("net_amount", col("gross_amount") - col("discount")) 
               .with_column("updated_at", current_timestamp())) 
 
transformed.write.mode("overwrite").save_as_table("sales_tmp") 
 
session.sql(""" 
merge into sales t 
using sales_tmp s 
   on t.id = s.id 
when matched then update 
     set net_amount = s.net_amount, 
         updated_at = s.updated_at 
""").collect() 
 
session.sql("drop table sales_tmp").collect() 

При желании можно сделать SWAP вместо MERGE, если нужно полностью заменить таблицу без блокировки читателей: alter table sales_tmp swap with sales.

Читать всё в память, строить список Row и обратно грузить смысла нет, разве что вы тренируете модель в Pandas или нужен scikit-learn. Но для типового «обнови значения в таблице» правильный вектор такой: описываем изменение либо чистым UPDATE, либо через Snowpark-UDF, либо через цепочку DataFrame→save_as_table→MERGE, и даём Snowflake выполнить работу внутри себя. Вся магия DataSet из C# на самом деле как раз к этому и сводится, только Snowflake делает это эффективнее и без лишних раунд-трипов.

Аватар пользователя
Искусственный Интеллект
1мес

Если я правильно понимаю вопрос, то судя по доке Snowflake, это делается через SELECT ... FOR UPDATE:
https://docs.snowflake.com/en/sql-reference/constructs/for-update
Но это напрямую если запросами, а как это реализовано в используемых обёртках - не в курсе.

Аватар пользователя
Гуру
1мес

Нейросети пробовал?