Autor/a

Joel Burbano

Fecha de publicación

8 de agosto de 2024

Automatización de Procesos ETL

En este proyecto vamos a automatizar un proceso ETL, el mismo que tiene los siguientes pasos

  • Extraer los: En nuestro caso utilizaremos los datos de “World Happiness report”

  • Transformar los datos

  • Cargar los datos: se los cargarara en una base de SQLite

Viaualización del dataset

Código

import pandas as pd
import sqlite3
import logging


# Configuración básica de logging
logging.basicConfig(filename='etl_processP.log', level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')

def extract_data():
    try:
        df = pd.read_csv('world_happiness_report.csv')
        logging.info("Extracción de datos completada con éxito.")
        return df
    except Exception as e:
        logging.error("Error durante la extracción de datos: %s", e)
        raise

def transform_data(df):
    try:
        df = df.drop(columns=['Standard Error'])
        df.iloc[:,2:].fillna(df.iloc[:,2:].mean(), inplace=True)
        df = df.drop_duplicates()
        df['Economy (GDP per Capita)'] = df['Economy (GDP per Capita)'] / df['Economy (GDP per Capita)'].max()
        df['Happiness Level'] = df['Happiness Score'].apply(lambda x: 'High' if x > df['Happiness Score'].mean() else 'Low')
        logging.info("Transformación de datos completada con éxito.")
        return df
    except Exception as e:
        logging.error("Error durante la transformación de datos: %s", e)
        raise

def load_data(df):
    try:
        conn = sqlite3.connect('world_happiness.db')
        cursor = conn.cursor()
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS happiness_data (
            Country TEXT,
            Region TEXT,
            Happiness_Rank INTEGER,
            Happiness_Score REAL,
            GDP_per_capita REAL,
            Happiness_Level TEXT
        )
        ''')
        conn.commit()
        df.to_sql('happiness_data', conn, if_exists='replace', index=False)
        conn.close()
        logging.info("Carga de datos completada con éxito.")
    except Exception as e:
        logging.error("Error durante la carga de datos: %s", e)
        raise

def etl_process():
    try:
        logging.info("Inicio del proceso ETL.")
        data = extract_data()
        transformed_data = transform_data(data)
        load_data(transformed_data)
        logging.info("Proceso ETL completado con éxito.")
    except Exception as e:
        logging.critical("El proceso ETL falló: %s", e)
        raise

if __name__ == "__main__":
    etl_process()
Código
# Cargar las bibliotecas necesarias
#install.packages("dplyr")
#install.packages("readr")
#install.packages("DBI")
#install.packages("RSQLite")

library(dplyr)
library(readr)
library(DBI)
library(RSQLite)

# Configuración básica de logging
log_file <- "etl_processR.log"
log_message <- function(level, message) {
  cat(sprintf("%s - %s - %s\n", Sys.time(), level, message), file = log_file, append = TRUE)
}

# Función para extraer datos
extract_data <- function() {
  tryCatch({
    df <- read_csv('world_happiness_report.csv')
    log_message("INFO", "Extracción de datos completada con éxito.")
    return(df)
  }, error = function(e) {
    log_message("ERROR", paste("Error durante la extracción de datos:", e$message))
    stop(e)
  })
}

# Función para transformar datos
transform_data <- function(df) {
  tryCatch({
    df <- df %>% 
      select(-`Standard Error`) %>%
      mutate(across(everything(), ~ifelse(is.na(.), mean(., na.rm = TRUE), .))) %>%
      distinct() %>%
      mutate(`Economy (GDP per Capita)` = `Economy (GDP per Capita)` / max(`Economy (GDP per Capita)`, na.rm = TRUE)) %>%
      mutate(`Happiness Level` = ifelse(`Happiness Score` > mean(`Happiness Score`, na.rm = TRUE), 'High', 'Low'))
    
    log_message("INFO", "Transformación de datos completada con éxito.")
    return(df)
  }, error = function(e) {
    log_message("ERROR", paste("Error durante la transformación de datos:", e$message))
    stop(e)
  })
}

# Función para cargar datos en SQLite
load_data <- function(df) {
  tryCatch({
    conn <- dbConnect(SQLite(), dbname = "world_happiness.db")
    
    dbExecute(conn, '
    CREATE TABLE IF NOT EXISTS happiness_data (
      Country TEXT,
      Region TEXT,
      Happiness_Rank INTEGER,
      Happiness_Score REAL,
      GDP_per_capita REAL,
      Happiness_Level TEXT
    )')
    
    dbWriteTable(conn, "happiness_data", df, overwrite = TRUE, row.names = FALSE)
    dbDisconnect(conn)
    log_message("INFO", "Carga de datos completada con éxito.")
  }, error = function(e) {
    log_message("ERROR", paste("Error durante la carga de datos:", e$message))
    stop(e)
  })
}

# Función principal del proceso ETL
etl_process <- function() {
  tryCatch({
    log_message("INFO", "Inicio del proceso ETL.")
    data <- extract_data()
    transformed_data <- transform_data(data)
    load_data(transformed_data)
    log_message("INFO", "Proceso ETL completado con éxito.")
  }, error = function(e) {
    log_message("CRITICAL", paste("El proceso ETL falló:", e$message))
    stop(e)
  })
}

# Ejecutar el proceso ETL
if (interactive()) {
  etl_process()
}