Manejo de ETL’s
Python
R
SQL
Automatización de Procesos ETL
En este proyecto vamos a automatizar un proceso ETL, el mismo que tiene los siguientes pasos
Extraer los: En nuestro caso utilizaremos los datos de “World Happiness report”
Transformar los datos
Cargar los datos: se los cargarara en una base de SQLite
Viaualización del dataset
Código
import pandas as pd
import sqlite3
import logging
# Configuración básica de logging
logging.basicConfig(filename='etl_processP.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
def extract_data():
try:
df = pd.read_csv('world_happiness_report.csv')
logging.info("Extracción de datos completada con éxito.")
return df
except Exception as e:
logging.error("Error durante la extracción de datos: %s", e)
raise
def transform_data(df):
try:
df = df.drop(columns=['Standard Error'])
df.iloc[:,2:].fillna(df.iloc[:,2:].mean(), inplace=True)
df = df.drop_duplicates()
df['Economy (GDP per Capita)'] = df['Economy (GDP per Capita)'] / df['Economy (GDP per Capita)'].max()
df['Happiness Level'] = df['Happiness Score'].apply(lambda x: 'High' if x > df['Happiness Score'].mean() else 'Low')
logging.info("Transformación de datos completada con éxito.")
return df
except Exception as e:
logging.error("Error durante la transformación de datos: %s", e)
raise
def load_data(df):
try:
conn = sqlite3.connect('world_happiness.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS happiness_data (
Country TEXT,
Region TEXT,
Happiness_Rank INTEGER,
Happiness_Score REAL,
GDP_per_capita REAL,
Happiness_Level TEXT
)
''')
conn.commit()
df.to_sql('happiness_data', conn, if_exists='replace', index=False)
conn.close()
logging.info("Carga de datos completada con éxito.")
except Exception as e:
logging.error("Error durante la carga de datos: %s", e)
raise
def etl_process():
try:
logging.info("Inicio del proceso ETL.")
data = extract_data()
transformed_data = transform_data(data)
load_data(transformed_data)
logging.info("Proceso ETL completado con éxito.")
except Exception as e:
logging.critical("El proceso ETL falló: %s", e)
raise
if __name__ == "__main__":
etl_process()Código
# Cargar las bibliotecas necesarias
#install.packages("dplyr")
#install.packages("readr")
#install.packages("DBI")
#install.packages("RSQLite")
library(dplyr)
library(readr)
library(DBI)
library(RSQLite)
# Configuración básica de logging
log_file <- "etl_processR.log"
log_message <- function(level, message) {
cat(sprintf("%s - %s - %s\n", Sys.time(), level, message), file = log_file, append = TRUE)
}
# Función para extraer datos
extract_data <- function() {
tryCatch({
df <- read_csv('world_happiness_report.csv')
log_message("INFO", "Extracción de datos completada con éxito.")
return(df)
}, error = function(e) {
log_message("ERROR", paste("Error durante la extracción de datos:", e$message))
stop(e)
})
}
# Función para transformar datos
transform_data <- function(df) {
tryCatch({
df <- df %>%
select(-`Standard Error`) %>%
mutate(across(everything(), ~ifelse(is.na(.), mean(., na.rm = TRUE), .))) %>%
distinct() %>%
mutate(`Economy (GDP per Capita)` = `Economy (GDP per Capita)` / max(`Economy (GDP per Capita)`, na.rm = TRUE)) %>%
mutate(`Happiness Level` = ifelse(`Happiness Score` > mean(`Happiness Score`, na.rm = TRUE), 'High', 'Low'))
log_message("INFO", "Transformación de datos completada con éxito.")
return(df)
}, error = function(e) {
log_message("ERROR", paste("Error durante la transformación de datos:", e$message))
stop(e)
})
}
# Función para cargar datos en SQLite
load_data <- function(df) {
tryCatch({
conn <- dbConnect(SQLite(), dbname = "world_happiness.db")
dbExecute(conn, '
CREATE TABLE IF NOT EXISTS happiness_data (
Country TEXT,
Region TEXT,
Happiness_Rank INTEGER,
Happiness_Score REAL,
GDP_per_capita REAL,
Happiness_Level TEXT
)')
dbWriteTable(conn, "happiness_data", df, overwrite = TRUE, row.names = FALSE)
dbDisconnect(conn)
log_message("INFO", "Carga de datos completada con éxito.")
}, error = function(e) {
log_message("ERROR", paste("Error durante la carga de datos:", e$message))
stop(e)
})
}
# Función principal del proceso ETL
etl_process <- function() {
tryCatch({
log_message("INFO", "Inicio del proceso ETL.")
data <- extract_data()
transformed_data <- transform_data(data)
load_data(transformed_data)
log_message("INFO", "Proceso ETL completado con éxito.")
}, error = function(e) {
log_message("CRITICAL", paste("El proceso ETL falló:", e$message))
stop(e)
})
}
# Ejecutar el proceso ETL
if (interactive()) {
etl_process()
}