Pular para conteúdo

Referencia

A classe Athena estende a funcionalidade de CursorIterator para executar consultas SQL no Athena, gerenciar os resultados e fornecer métodos para exportar os dados para diversos formatos (CSV, Parquet, Iceberg, etc.).

Ela permite a execução de consultas SQL, a obtenção dos resultados de forma iterativa e a conversão desses resultados para estruturas de dados como Pandas DataFrame, Parquet ou Arrow. A classe também oferece métodos para escrever dados de volta ao banco de dados ou armazená-los em formatos como Parquet e Iceberg, com suporte para operações de particionamento e compressão.

O tipo de cursor utilizado pode ser um dos seguintes
  • CursorPython: Para operações com Python.
  • CursorParquet: Para operações com dados Parquet em integração com Pyarrow.
  • CursorParquetDuckdb: Para operações com dados Parquet em integração com DuckDB.
Métodos
  • execute: Executa uma consulta SQL, com parâmetros opcionais.
  • fetchone: Retorna a próxima linha do resultado.
  • fetchall: Retorna todas as linhas do resultado.
  • fetchmany: Retorna um número especificado de linhas.
  • to_arrow: Converte os resultados para um formato Arrow.
  • to_parquet: Converte os resultados para o formato Parquet.
  • to_csv: Converte os resultados para um arquivo CSV.
  • to_create_table_db: Cria uma tabela no banco de dados usando os resultados.
  • to_partition_create_table_db: Cria uma tabela particionada no banco de dados.
  • to_insert_table_db: Insere dados em uma tabela do banco de dados.
  • write_dataframe: Escreve um DataFrame em uma tabela no banco de dados.
  • write_arrow: Escreve um Table Arrow em uma tabela no banco de dados.
  • write_parquet: Escreve dados em formato Parquet no banco de dados.
  • write_table_iceberg: Escreve dados em formato Iceberg no banco de dados.
  • merge_table_iceberg: Realiza uma operação de merge em uma tabela Iceberg.
  • to_pandas: Converte os resultados para um DataFrame Pandas.
  • close: Libera os recursos utilizados pela consulta.
  • enter / exit: Suporte para contexto (gerenciamento de recursos).
Exceções
  • ProgrammingError: Lançada quando um método não é implementado para o tipo de cursor atual.
Exemplo de uso

Supondo que o cursor seja uma instância válida de um dos tipos: CursorPython, CursorParquet ou CursorParquetDuckdb

from athena_mvsh import CursorPython, CursorParquet, CursorParquetDuckdb

cursor = CursorPython(...)  # ou CursorParquet(...) ou CursorParquetDuckdb(...)

Criando uma instância da classe Athena

with Athena(cursor) as athena:
    # Executando uma consulta SQL
    athena.execute("SELECT * FROM sales_data WHERE region = 'US'")

    # Obtendo os resultados
    results = athena.fetchall()
    for row in results:
        print(row)

    # Convertendo os resultados para um DataFrame Pandas
    df = athena.to_pandas()
    print(df.head())

    # Exportando os resultados para um arquivo CSV
    athena.to_csv("sales_data_us.csv", delimiter=";", include_header=True)

    # Escrevendo dados em uma tabela no banco de dados
    athena.write_parquet(
        file="sales_data.parquet",
        table_name="sales_table",
        schema="public"
    )

Athena

Bases: CursorIterator

Classe responsável por executar consultas SQL no AWS Athena ou em cursores alternativos, como CursorParquetDuckdb, CursorPython ou CursorParquet. A classe suporta gerenciamento automático de recursos com o uso de um gerenciador de contexto.

Attributes:

Name Type Description
cursor CursorParquetDuckdb | CursorPython | CursorParquet

Instância do cursor utilizado para executar consultas.

row_cursor

Armazena o resultado da execução da query.

description property

Retorna os metadados da consulta executada.

Esta propriedade retorna uma lista de tuplas que representam as colunas da consulta executada. Cada tupla contém as seguintes informações: - Nome da coluna - Tipo da coluna - (Valor não utilizado, sempre None) - (Valor não utilizado, sempre None) - Precisão da coluna - Escala da coluna - Se a coluna pode ser nula (Nullable)

Retorno

list[tuple] | None: Uma lista de tuplas contendo os metadados de cada coluna ou None se os metadados não estiverem disponíveis.

Exemplo
cursor = CursorPython(...)
with Athena(cursor) as athena:
    athena.execute("SELECT id, name FROM users")
    print(athena.description)
[('id', 'INTEGER', None, None, 10, 0, True), ('name', 'STRING', None, None, None, None, True)]

rowcount property

Retorna o número total de registros afetados pela execução da consulta.

Este método recupera o número de registros afetados pela última execução de consulta, como determinado pelo cursor utilizado. O valor é fornecido pela implementação do cursor, seguindo as especificações da DB API Python.

Returns:

Name Type Description
int int

O número total de registros afetados pela última execução de consulta.

Notes
  • O valor retornado pode variar dependendo do tipo de consulta executada (como SELECT, INSERT, UPDATE, DELETE).
  • O método rowcount depende da implementação do cursor específico. Por exemplo, cursors como CursorPython, CursorParquetDuckdb ou outros tipos podem ter diferentes formas de calcular os registros afetados.
  • Se a consulta não afetar nenhum registro, o valor retornado será -1.
Exemplo
from athena_mvsh import Athena, CursorPython

cursor = CursorPython(...)

with Athena(cursor) as athena:
    athena.execute("UPDATE tabela SET coluna = 'valor' WHERE condicao = 'verdadeira'")
    print(f"Registros afetados: {athena_cursor.rowcount}")

__init__(cursor)

Inicializa a classe Athena com um cursor, permitindo a execução de consultas SQL.

Parameters:

Name Type Description Default
cursor CursorParquetDuckdb | CursorPython | CursorParquet

Instância do cursor que define o backend para execução das queries.

required

execute(query, parameters=None, *, result_reuse_enable=False)

Executa uma consulta SQL no backend configurado pelo cursor.

Parameters:

Name Type Description Default
query str

A string da consulta SQL a ser executada.

required
parameters tuple | dict

Parâmetros para a consulta. - Se for uma tupla, é interpretada como argumentos posicionais. - Se for um dicionário, é interpretado como argumentos nomeados. - Padrão é None.

None
result_reuse_enable bool

Habilita a reutilização de resultados da consulta armazenados em cache (se suportado pelo cursor). Padrão é False.

False
Retorno

self: A instância atual da classe Athena, permitindo chamadas encadeadas.

Notes
  • A classe é compatível com os cursores:
  • CursorParquetDuckdb
  • CursorPython
  • CursorParquet
  • Cada cursor deve ser inicializado com os seguintes parâmetros obrigatórios:

    • s3_staging_dir (str): Diretório no Amazon S3 usado como área de staging.
    • schema_name (str, optional): Nome do schema do banco de dados.
    • catalog_name (str, optional): Nome do catálogo no Athena.
    • poll_interval (float, optional): Intervalo em segundos entre verificações de consulta. Padrão: 1.0.
    • result_reuse_enable (bool, optional): Habilita reutilização de resultados.
  • Caso as configurações do AWS CLI não estejam disponíveis, os parâmetros opcionais para autenticação podem ser passados como kwargs ao instanciar os cursores:

    • region_name (str): Região da AWS.
    • aws_access_key_id (str): Chave de acesso da AWS.
    • aws_secret_access_key (str): Chave secreta de acesso da AWS.
  • Se a consulta for um comando DDL (ex.: CREATE, ALTER, DROP), o método retorna o resultado de fetchone() imediatamente.

Exemplo
from athena_mvsh import Athena, CursorParquetDuckdb

Inicializando CursorParquetDuckdb com credenciais AWS

cursor = CursorParquetDuckdb(
    s3_staging_dir="s3://my-bucket/staging/",
    schema_name="default",
    catalog_name="awsdatacatalog",
    region_name="us-west-2",
    aws_access_key_id="my-access-key",
    aws_secret_access_key="my-secret-key"
)

with Athena(cursor) as athena:
    athena.execute("SELECT * FROM example_table").fetchall()

Inicializando CursorPython sem credenciais adicionais (usa AWS CLI configurado)

cursor = CursorPython(
    s3_staging_dir="s3://my-bucket/staging/"
 )
with Athena(cursor) as athena:
    athena.execute("SELECT COUNT(*) FROM parquet_data")

Inicializando CursorParquet com credenciais

cursor = CursorParquet(
    s3_staging_dir="s3://my-bucket/staging/",
    schema_name="analytics",
    region_name="us-west-1",
    aws_access_key_id="my-access-key",
    aws_secret_access_key="my-secret-key"
)

with Athena(cursor) as athena:
    result = athena.execute("SELECT * FROM parquet_table").fetchone()
print(result)
(1000,)

fetchall()

Retorna todas as linhas do cursor como uma lista.

Este método converte todas as linhas disponíveis no cursor em uma lista de tuplas, onde cada tupla representa uma linha do resultado da consulta executada.

Retorno

list: Uma lista de tuplas representando todas as linhas retornadas pela consulta.

Exemplo
cursor = CursorPython(...)
with Athena(cursor) as athena:
    athena.execute("SELECT id, name FROM users")
    rows = athena.fetchall()
    print(rows)  # Exemplo: [(1, 'Alice'), (2, 'Bob'), (3, 'Charlie')]

fetchmany(size=1)

Retorna um número especificado de linhas do cursor.

Este método retorna até size linhas do cursor, como uma lista de tuplas. Se o número de linhas disponíveis for menor que o valor de size, ele retorna o número de linhas restantes. O valor padrão de size é 1.

Parameters:

Name Type Description Default
size (int, opcional)

O número de linhas a serem retornadas. O valor padrão é 1.

1
Retorno

list: Uma lista de tuplas representando as linhas retornadas pelo cursor.

Exemplo
cursor = CursorPython(...)
with Athena(cursor) as athena:
    athena.execute("SELECT id, name FROM users")
    rows = athena.fetchmany(2)
    print(rows)  # Exemplo: [(1, 'Alice'), (2, 'Bob')]

fetchone()

Retorna a próxima linha do cursor.

Este método tenta obter a próxima linha do cursor. Se houver uma linha disponível, ela é retornada. Caso contrário, retorna None quando o cursor atingir o final dos resultados.

Retorno

tuple | None: Retorna a próxima linha como uma tupla ou None se não houver mais linhas.

Exemplo
cursor = CursorPython(...)
with Athena(cursor) as athena:
    athena.execute("SELECT id, name FROM users")
    print(athena.fetchone())  # Exemplo de uma linha retornada: (1, 'Alice')
    print(athena.fetchone())  # Exemplo de uma linha retornada: (2, 'Bob')
    print(athena.fetchone())  # Exemplo de retorno `None`, se não houver mais dados

merge_table_iceberg(target_table, source_data, schema, predicate, delete_condition=None, update_condition=None, insert_condition=None, alias=('t', 's'), sync_schema=False, location=None, catalog_name='awsdatacatalog')

Executa um merge (UPSERT) em uma tabela Iceberg no Athena, utilizando uma tabela temporária criada no Athena.

Este método usa o DuckDB para criar uma tabela temporária no Athena com os dados fornecidos e, em seguida, executa um merge (UPSERT) entre a tabela de destino e os dados da tabela temporária. A operação permite atualizar ou inserir registros na tabela de destino com base em uma condição de junção (predicate). Pode-se utilizar um DataFrame pandas, Table Arrow ou arquivos Parquet como dados de origem.

Parameters:

Name Type Description Default
target_table str

O nome da tabela de destino do tipo Iceberg no Athena, onde os dados serão mesclados.

required
source_data DataFrame | list[str | Path] | str | Path | Table

Dados de origem a serem mesclados com a tabela de destino. Pode ser um DataFrame pandas, Table Arrow ou um ou mais arquivos Parquet (caminhos em formato string ou Path).

required
schema str

O esquema onde a tabela Iceberg será manipulada no Athena.

required
predicate str

A condição de junção (predicate) que define a lógica de correspondência entre os dados de origem e os dados da tabela de destino para o merge (atualização ou inserção).

required
delete_condition (str, opcional)

Condição para excluir registros da tabela de destino. Se None, a cláusula DELETE não será aplicada.

None
update_condition (str, opcional)

Condição para atualizar registros existentes na tabela de destino. Se None, todos os registros correspondentes serão atualizados..

None
insert_condition (str, opcional)

Condição para inserir novos registros na tabela de destino. Se None, todos os registros correspondentes serão inseridos.

None
alias tuple

Alias para as tabelas de origem e destino no merge. O valor padrão é ('t', 's'), onde 't' é o alias da tabela de destino e 's' é o alias da tabela de origem.

('t', 's')
sync_schema bool

Indica se o esquema da tabela de destino deve ser sincronizado com os dados de origem. O valor padrão é False.

False
location str

O local no S3 onde os dados serão armazenados, se aplicável.

None
catalog_name str

O nome do catálogo de dados a ser utilizado. O valor padrão é 'awsdatacatalog'.

'awsdatacatalog'
Exceções

ProgrammingError: Se a função for chamada com um cursor incompatível.

Detalhes
  • O método cria uma tabela temporária no Athena com os dados fornecidos (DataFrame pandas, Table Arrow ou arquivos Parquet) e executa um merge (UPSERT) entre a tabela de destino (Iceberg) e a tabela temporária.
  • O predicate define a condição de junção para o merge, permitindo atualizar ou inserir registros na tabela de destino.
  • O parâmetro alias é utilizado para dar nomes alias às tabelas de origem e destino durante a execução do SQL gerado.
  • O parâmetro catalog_name permite configurar o catálogo de dados, sendo 'awsdatacatalog' o valor padrão.
Exemplo
import pandas as pd
from athena_mvsh import Athena, CursorParquetDuckdb

cursor = CursorParquetDuckdb(...)

# Exemplo de DataFrame
df = pd.DataFrame({
    "id": [1, 2, 3],
    "nome": ["A", "B", "C"]
})

# Realiza um merge (UPSERT) na tabela Iceberg no Athena, utilizando uma tabela temporária
with Athena(cursor) as athena:
    athena.merge_table_iceberg(
        target_table="tabela_destino",
        source_data=df,
        schema="meu_esquema",
        predicate="t.id = s.id",
        alias=("t", "s"),
        location="s3://meu-bucket/dados/",
        catalog_name="meu_catalogo"
    )
Notas
  • Certifique-se de que o cursor utilizado seja instância de CursorParquetDuckdb.
  • O predicate deve ser uma expressão lógica válida para a junção dos dados da origem com os dados da tabela de destino.
  • A operação de merge cria uma tabela temporária no Athena para facilitar a execução do merge. O uso de uma tabela temporária pode otimizar o processo, especialmente quando os dados de origem são grandes.

to_arrow()

Converte os resultados da consulta para um pa.Table (PyArrow Table).

Este método converte os dados retornados pela consulta executada em um objeto pa.Table (tabela do PyArrow). A conversão só é realizada se o cursor utilizado for uma instância de CursorParquet ou CursorParquetDuckdb. Caso contrário, uma exceção ProgrammingError será levantada.

Exceções

ProgrammingError: Se o cursor não for uma instância de CursorParquet ou CursorParquetDuckdb.

Retorno

pa.Table: Um objeto pa.Table contendo os resultados da consulta.

Exemplo
cursor = CursorParquet(...)
with Athena(cursor) as athena:
    athena.execute("SELECT id, name FROM users")
    table = athena.to_arrow()
    print(table)  # Exemplo de saída: <pyarrow.table>

to_create_table_db(table_name, *, database='db.duckdb')

Cria uma tabela no banco de dados DuckDB com base nos resultados da consulta executada.

Este método cria uma tabela no banco de dados DuckDB especificado usando os dados retornados pela consulta executada. A tabela será criada com o nome fornecido como table_name.

O método só pode ser utilizado com o cursor do tipo CursorParquetDuckdb. O nome do banco de dados pode ser especificado como um argumento, e se não fornecido, o banco de dados padrão será 'db.duckdb'.

Parameters:

Name Type Description Default
table_name str

O nome da tabela a ser criada no banco de dados.

required
database (str, opcional)

O nome do banco de dados onde a tabela será criada. O valor padrão é 'db.duckdb'.

'db.duckdb'
Exceções

ProgrammingError: Se o cursor não for uma instância de CursorParquetDuckdb.

Retorno

None: Este método não retorna valor, mas cria a tabela no banco de dados.

Exemplo
cursor = CursorParquetDuckdb(...)
with Athena(cursor) as athena:
    athena.execute("SELECT id, name FROM users")
    athena.to_create_table_db('users_table')

to_csv(output_file, delimiter=';', include_header=True)

Converte os resultados da consulta para um arquivo CSV.

Este método converte os dados retornados pela consulta executada e os salva em um arquivo CSV. A conversão é realizada apenas se o cursor utilizado for do tipo CursorParquet ou CursorParquetDuckdb. Dependendo do tipo do cursor, o método chama a função to_csv correspondente.

  • Se o cursor for uma instância de CursorParquetDuckdb, o método chamará diretamente o método to_csv do cursor, passando os parâmetros apropriados para ele.

  • Para o cursor CursorParquet, os dados são primeiro convertidos para um pa.Table e então salvos como CSV, com opções de escrita personalizáveis.

Parameters:

Name Type Description Default
output_file str

O caminho do arquivo CSV de saída.

required
delimiter (str, opcional)

O delimitador a ser usado no arquivo CSV. O valor padrão é ';'.

';'
include_header (bool, opcional)

Indica se o cabeçalho deve ser incluído no CSV. O valor padrão é True.

True
Exceções

ProgrammingError: Se o cursor não for uma instância de CursorParquet ou CursorParquetDuckdb.

Retorno

None: Este método não retorna valor, mas realiza a conversão para o formato CSV.

Exemplo
cursor = CursorParquet(...)
with Athena(cursor) as athena:
    athena.execute("SELECT id, name FROM users")
    athena.to_csv('/path/to/file.csv')

to_insert_table_db(table_name, *, database='db.duckdb')

Insere os resultados de uma consulta executada no Amazon Athena em uma tabela no DuckDB.

Esta função utiliza o cursor específico da biblioteca, CursorParquetDuckdb, para transferir os dados resultantes de uma consulta executada no Amazon Athena para uma tabela no banco de dados DuckDB. Caso o cursor utilizado não seja compatível, um erro será lançado.

Parameters:

Name Type Description Default
table_name str

O nome da tabela de destino no DuckDB onde os dados serão inseridos.

required
database str

O caminho ou nome do banco de dados DuckDB onde a tabela será criada ou atualizada. O valor padrão é 'db.duckdb'.

'db.duckdb'
Exceções

ProgrammingError: Se a função for chamada com um cursor incompatível.

Exemplo
from athena_mvsh import Athena, CursorParquetDuckdb

cursor = CursorParquetDuckdb(...)

with Athena(cursor) as athena:
    athena.to_insert_table_db(
        table_name="tabela_destino",
        database="meu_banco.duckdb"
    )

to_pandas(*args, **kwargs)

Exporta os resultados de uma consulta para um DataFrame pandas.

Este método converte os resultados da consulta executada em diferentes tipos de cursor para um DataFrame pandas. Ele suporta cursors do tipo CursorParquetDuckdb, CursorPython e CursorParquet, adaptando-se conforme o tipo de cursor utilizado.

Parameters:

Name Type Description Default
*args

Argumentos adicionais que serão passados para o método to_pandas do cursor.

()
**kwargs

Argumentos adicionais que serão passados para o método to_pandas do cursor.

{}
Retorno

pd.DataFrame: Um DataFrame pandas contendo os resultados da consulta executada.

Exceções

ProgrammingError: Se o cursor não for do tipo compatível (nenhum dos tipos mencionados acima).

Detalhes
  • Se o cursor for do tipo CursorParquetDuckdb, o método utilizará o método to_pandas do cursor para exportar os resultados da consulta.
  • Se o cursor for do tipo CursorPython, os resultados da consulta serão obtidos via fetchall() e então convertidos para um DataFrame pandas com os nomes das colunas extraídos da descrição da consulta.
  • Se o cursor for do tipo CursorParquet, os resultados serão convertidos para o formato Arrow e, em seguida, para um DataFrame pandas utilizando o método apropriado.
Exemplo
from athena_mvsh import Athena, CursorPython

cursor = CursorPython(...)

with Athena(cursor) as athena:
    athena.execute("SELECT id, name FROM users")
    df = athena.to_pandas()

print(df.head())
Notas
  • A função é adaptável para diferentes tipos de cursor e utiliza os métodos específicos de cada um para converter os resultados para um DataFrame pandas.
  • A compatibilidade do cursor é verificada antes de tentar acessar o método to_pandas específico de cada tipo de cursor.
  • A funcionalidade depende de cada tipo de cursor implementado (como CursorParquetDuckdb, CursorPython e CursorParquet).

to_parquet(*args, **kwargs)

Converte os resultados da consulta para o formato Parquet.

Este método converte os dados retornados pela consulta executada em um arquivo Parquet. A conversão é realizada apenas se o cursor utilizado for do tipo CursorParquet ou CursorParquetDuckdb. Dependendo do tipo do cursor, o método chama a função to_parquet correspondente.

  • Se o cursor for uma instância de CursorParquetDuckdb, o método chamará diretamente o método to_parquet do cursor, passando os argumentos fornecidos para ele.

  • Para o cursor CursorParquet, os dados são primeiro convertidos para um pa.Table e então são gravados como Parquet.

Parameters:

Name Type Description Default
*args

Argumentos adicionais passados para a função to_parquet do cursor. - Para detalhes sobre os argumentos esperados, consulte a documentação do DuckDB e PyArrow: - DuckDB: https://duckdb.org/docs - PyArrow: https://arrow.apache.org/docs/python/

()
**kwargs

Argumentos de palavra-chave adicionais passados para a função

{}
Exceções

ProgrammingError: Se o cursor não for uma instância de CursorParquet ou CursorParquetDuckdb.

Retorno

None: Este método não retorna valor, mas realiza a conversão para o formato Parquet.

Exemplo
cursor = CursorParquet(...)
with Athena(cursor) as athena:
    athena.execute("SELECT id, name FROM users")
    athena.to_parquet('/path/to/file.parquet')

to_partition_create_table_db(table_name, *, database='db.duckdb', workers=WORKERS)

Lê arquivos Parquet gerados a partir da consulta executada e insere os dados na tabela do banco de dados DuckDB.

Este método executa uma consulta no Athena para gerar arquivos Parquet que são armazenados no S3. Após a consulta, os arquivos Parquet são lidos e os dados são inseridos na tabela especificada no DuckDB. A inserção dos dados é paralelizada utilizando múltiplos trabalhadores (threads) para otimizar o desempenho.

O método realiza os seguintes passos: - Executa a consulta fornecida no Athena para gerar os arquivos Parquet. - Descarrega os arquivos Parquet do S3 a partir do manifest gerado pela consulta. - Para cada arquivo Parquet, insere os dados na tabela especificada no DuckDB, utilizando ThreadPoolExecutor para paralelizar as inserções.

Parameters:

Name Type Description Default
table_name str

O nome da tabela onde os dados serão inseridos no DuckDB.

required
database (str, opcional)

O nome do banco de dados DuckDB. O valor padrão é 'db.duckdb'.

'db.duckdb'
workers (int, opcional)

O número de trabalhadores (threads) a serem usados para paralelizar a inserção dos arquivos Parquet. O valor padrão é WORKERS.

WORKERS
Exceções

ProgrammingError: Se o cursor não for do tipo CursorParquetDuckdb. Exception: Se ocorrer um erro durante a execução da inserção dos dados.

Retorno

None: O método não retorna valor, mas insere os dados na tabela do banco de dados DuckDB.

Exemplo
cursor = CursorParquetDuckdb(...)
with Athena(cursor) as athena:
    athena.to_partition_create_table_db(
        table_name='target_table',
        database='db.duckdb',
        workers=4
    )

write_arrow(tbl, table_name, schema, location=None, partitions=None, catalog_name='awsdatacatalog', compression='ZSTD')

Escreve um Table Arrow em uma tabela externa no Athena usando o DuckDB.

Este método utiliza o DuckDB para escrever um Table Arrow como uma tabela externa no Amazon Athena, permitindo especificar o esquema, particionamento e compressão dos dados. Caso o cursor utilizado não seja compatível, um erro será lançado.

Parameters:

Name Type Description Default
tbl Table

O Table Arrow que será escrito na tabela externa no Athena.

required
table_name str

O nome da tabela externa de destino no Athena.

required
schema str

O esquema onde a tabela será criada ou atualizada no Athena.

required
location str

O local onde os dados serão armazenados (geralmente um bucket S3), se aplicável.

None
partitions list[str]

Lista de colunas para particionamento dos dados na tabela.

None
catalog_name str

O nome do catálogo de dados a ser utilizado. O valor padrão é 'awsdatacatalog'.

'awsdatacatalog'
compression Literal['ZSTD', 'SNAPPY', 'GZIP']

O algoritmo de compressão a ser utilizado nos dados. O valor padrão é 'ZSTD'.

'ZSTD'
Exceções

ProgrammingError: Se a função for chamada com um cursor incompatível.

Detalhes
  • O Table Arrow fornecido será convertido e escrito como uma tabela externa no Athena.
  • A opção de particionamento permite distribuir os dados em várias pastas ou arquivos, facilitando o gerenciamento de grandes volumes de dados no S3.
  • A compressão ZSTD pode ser usada para reduzir o tamanho do arquivo gerado, com outras opções como SNAPPY também sendo suportadas.
  • O catálogo de dados awsdatacatalog pode ser alterado para refletir configurações específicas do ambiente de execução.
Exemplo
import pandas as pd
from athena_mvsh import Athena, CursorParquetDuckdb

cursor = CursorParquetDuckdb(...)

# Exemplo de DataFrame
df = pd.DataFrame({
    "coluna1": [1, 2, 3],
    "coluna2": ["A", "B", "C"]
})

# Escreve o Table Arrow em uma tabela externa no Athena
with Athena(cursor) as athena:
    athena.write_arrow(
        tbl,
        table_name="tabela_destino",
        schema="meu_esquema",
        location="s3://meu-bucket/dados/",
        partitions=["coluna1"],
        catalog_name="meu_catalogo",
        compression="ZSTD"
    )
Notas
  • Certifique-se de que o cursor utilizado seja instância de CursorParquetDuckdb.
  • O particionamento é útil para grandes volumes de dados, permitindo consultas mais eficientes no Athena.
  • A compressão pode ser ajustada para balancear entre desempenho e tamanho do arquivo.

write_dataframe(df, table_name, schema, location=None, partitions=None, catalog_name='awsdatacatalog', compression='ZSTD')

Escreve um DataFrame pandas em uma tabela externa no Athena usando o DuckDB.

Este método utiliza o DuckDB para escrever um DataFrame pandas como uma tabela externa no Amazon Athena, permitindo especificar o esquema, particionamento e compressão dos dados. Caso o cursor utilizado não seja compatível, um erro será lançado.

Parameters:

Name Type Description Default
df DataFrame

O DataFrame pandas que será escrito na tabela externa no Athena.

required
table_name str

O nome da tabela externa de destino no Athena.

required
schema str

O esquema onde a tabela será criada ou atualizada no Athena.

required
location str

O local onde os dados serão armazenados (geralmente um bucket S3), se aplicável.

None
partitions list[str]

Lista de colunas para particionamento dos dados na tabela.

None
catalog_name str

O nome do catálogo de dados a ser utilizado. O valor padrão é 'awsdatacatalog'.

'awsdatacatalog'
compression Literal['ZSTD', 'SNAPPY', 'GZIP']

O algoritmo de compressão a ser utilizado nos dados. O valor padrão é 'ZSTD'.

'ZSTD'
Exceções

ProgrammingError: Se a função for chamada com um cursor incompatível.

Detalhes
  • O DataFrame pandas fornecido será convertido e escrito como uma tabela externa no Athena.
  • A opção de particionamento permite distribuir os dados em várias pastas ou arquivos, facilitando o gerenciamento de grandes volumes de dados no S3.
  • A compressão ZSTD pode ser usada para reduzir o tamanho do arquivo gerado, com outras opções como SNAPPY também sendo suportadas.
  • O catálogo de dados awsdatacatalog pode ser alterado para refletir configurações específicas do ambiente de execução.
Exemplo
import pandas as pd
from athena_mvsh import Athena, CursorParquetDuckdb

cursor = CursorParquetDuckdb(...)

# Exemplo de DataFrame
df = pd.DataFrame({
    "coluna1": [1, 2, 3],
    "coluna2": ["A", "B", "C"]
})

# Escreve o DataFrame em uma tabela externa no Athena
with Athena(cursor) as athena:
    athena.write_dataframe(
        df,
        table_name="tabela_destino",
        schema="meu_esquema",
        location="s3://meu-bucket/dados/",
        partitions=["coluna1"],
        catalog_name="meu_catalogo",
        compression="ZSTD"
    )
Notas
  • Certifique-se de que o cursor utilizado seja instância de CursorParquetDuckdb.
  • O particionamento é útil para grandes volumes de dados, permitindo consultas mais eficientes no Athena.
  • A compressão pode ser ajustada para balancear entre desempenho e tamanho do arquivo.

write_parquet(file, table_name, schema, location=None, partitions=None, catalog_name='awsdatacatalog', compression='ZSTD')

Cria uma tabela externa no Athena a partir de um ou vários arquivos Parquet.

Este método utiliza o DuckDB para criar uma tabela externa no Amazon Athena a partir de arquivos Parquet, permitindo especificar o esquema, particionamento e compressão dos dados. Caso o cursor utilizado não seja compatível, um erro será lançado.

Parameters:

Name Type Description Default
file list[str | Path] | str | Path

Um ou vários caminhos para arquivos Parquet ou diretórios contendo arquivos Parquet. Pode ser uma lista ou uma única string/Path.

required
table_name str

O nome da tabela externa a ser criada no Athena.

required
schema str

O esquema onde a tabela será criada ou atualizada no Athena.

required
location str

O local no S3 onde os arquivos Parquet serão armazenados, se aplicável.

None
partitions list[str]

Lista de colunas para particionamento dos dados na tabela.

None
catalog_name str

O nome do catálogo de dados a ser utilizado. O valor padrão é 'awsdatacatalog'.

'awsdatacatalog'
compression Literal['ZSTD', 'SNAPPY', 'GZIP']

O algoritmo de compressão a ser utilizado nos dados. O valor padrão é 'ZSTD'.

'ZSTD'
Exceções

ProgrammingError: Se a função for chamada com um cursor incompatível.

Detalhes
  • O(s) arquivo(s) Parquet fornecido(s) serão usados para criar uma tabela externa no Athena.
  • A opção de particionamento permite distribuir os dados em várias pastas ou arquivos no S3, facilitando o gerenciamento de grandes volumes de dados.
  • A compressão ZSTD pode ser usada para reduzir o tamanho dos arquivos Parquet gerados, com outras opções como SNAPPY também sendo suportadas.
  • O catálogo de dados awsdatacatalog pode ser alterado para refletir configurações específicas do ambiente de execução.
Exemplo
from pathlib import Path
from athena_mvsh import Athena, CursorParquetDuckdb

cursor = CursorParquetDuckdb(...)

# Caminho para os arquivos Parquet
arquivos_parquet = Path("/caminho/para/arquivos/*.parquet")

# Cria uma tabela externa no Athena a partir dos arquivos Parquet
with Athena(cursor) as athena:
    athena.write_parquet(
        file=arquivos_parquet,
        table_name="tabela_destino",
        schema="meu_esquema",
        location="s3://meu-bucket/dados/",
        partitions=["coluna1"],
        catalog_name="meu_catalogo"
    )
Notas
  • Certifique-se de que o cursor utilizado seja instância de CursorParquetDuckdb.
  • O particionamento é útil para grandes volumes de dados, permitindo consultas mais eficientes no Athena.
  • A compressão pode ser ajustada para balancear entre desempenho e tamanho do arquivo.

write_table_iceberg(data, table_name, schema, location=None, partitions=None, catalog_name='awsdatacatalog', compression='ZSTD', if_exists='replace', sync_schema=False)

Cria ou insere dados em uma tabela Iceberg no Athena a partir de um DataFrame pandas, Table Arrow ou arquivos Parquet.

Este método usa o DuckDB para criar ou inserir dados em uma tabela Iceberg no Amazon Athena, permitindo que os dados sejam inseridos a partir de um DataFrame pandas, Table Arrow ou múltiplos arquivos Parquet. A operação pode substituir a tabela existente ou adicionar novos dados, dependendo do valor de if_exists.

Parameters:

Name Type Description Default
data DataFrame | list[str | Path] | str | Path | Table

Dados a serem inseridos ou criados na tabela Iceberg. Pode ser um DataFrame pandas, Table Arrow ou um ou mais arquivos Parquet (caminhos em formato string ou Path).

required
table_name str

O nome da tabela Iceberg a ser criada ou inserida no Athena.

required
schema str

O esquema onde a tabela Iceberg será criada ou atualizada no Athena.

required
location str

O local no S3 onde os dados serão armazenados, se aplicável.

None
partitions list[str]

Lista de colunas para particionamento dos dados na tabela.

None
catalog_name str

O nome do catálogo de dados a ser utilizado. O valor padrão é 'awsdatacatalog'.

'awsdatacatalog'
compression Literal['ZSTD', 'SNAPPY', 'GZIP']

O algoritmo de compressão a ser utilizado nos dados. O valor padrão é 'ZSTD'.

'ZSTD'
if_exists Literal['replace', 'append']

Define se os dados devem substituir a tabela existente ('replace') ou ser adicionados ('append'). O valor padrão é 'replace'.

'replace'
sync_schema bool

Indica se o esquema da tabela de destino deve ser sincronizado com os dados de origem. O valor padrão é False.

False
Exceções

ProgrammingError: Se a função for chamada com um cursor incompatível.

Detalhes
  • A tabela Iceberg será criada ou atualizada no Athena com os dados fornecidos.
  • O particionamento permite distribuir os dados em várias pastas ou arquivos no S3, facilitando o gerenciamento de grandes volumes de dados.
  • A compressão ZSTD pode ser usada para reduzir o tamanho dos arquivos Parquet gerados, com outras opções como GZIP também sendo suportadas.
  • O catálogo de dados awsdatacatalog pode ser alterado para refletir configurações específicas do ambiente de execução.
  • O parâmetro if_exists controla se os dados devem substituir ou ser adicionados à tabela existente.
Exemplo
import pandas as pd
from athena_mvsh import Athena, CursorParquetDuckdb

cursor = CursorParquetDuckdb(...)

# Exemplo de DataFrame
df = pd.DataFrame({
    "coluna1": [1, 2, 3],
    "coluna2": ["A", "B", "C"]
})

# Cria ou insere dados na tabela Iceberg no Athena
with Athena(cursor) as athena:
    athena.write_table_iceberg(
        data=df,
        table_name="tabela_iceberg",
        schema="meu_esquema",
        location="s3://meu-bucket/dados/",
        partitions=["coluna1"],
        catalog_name="meu_catalogo",
        if_exists="append"
    )
Notas
  • Certifique-se de que o cursor utilizado seja instância de CursorParquetDuckdb.
  • O particionamento é útil para grandes volumes de dados, permitindo consultas mais eficientes no Athena.
  • A compressão pode ser ajustada para balancear entre desempenho e tamanho do arquivo.
  • O parâmetro if_exists permite escolher entre substituir ou adicionar dados à tabela existente, controlando o comportamento da operação.