¿Qué es currying y de verdad es útil?
En esta ocasión les hablaré del currying un concepto bastante usado en programación funcional. básicamente currying o currificación en español es un mecanismo para la invocación parcial de funciones, esto quiere decir que si una función sin currificación le entregamos una serie de parámetros esta nos devolverá un resultado. En cambio, una función currificada se invocara parcialmente por cada argumento que le queramos entregar es decir si le pasamos uno de esos argumentos esta función nos devolverá otra función que aceptara el siguientes argumentos y así hasta obtener el resultado final
Suena algo enredado pero un ejemplo siempre aclarará las ideas. En internet tenemos el típico ejemplo de las sumas.
1
2
3
function sum(value1, value2) {
return value1 + value2;
}
Esta función suma 2 números pero podemos currificarla de la siguiente manera
1
2
3
4
5
6
7
function sum(value1) {
return function(value2) {
return function(value3) {
return value1 + value2 + value3
};
};
}
Para probar nuestra función tenemos que hacer esto
1
2
3
4
5
6
7
var add7 = sum(7);
var add10 = add7(3);
var result10Plus5 = add10(5);
var result10Plus10 = add10(10);
// Results (10 + 5) = 15 (10 + 10) = 20
console.log(`Results (10 + 5) = ${result10Plus5} (10 + 10) = ${result10Plus10}`)
Como vemos una función currificada nos entrega una serie de funciones en donde vamos agregando los parámetros para una ejecución parcial.
Una ligera ventaja de este ejemplo es que podemos reutilizar funciones como en las funciones definidas para la suma del tercer parámetro, pero siendo sinceros este ejemplo es bastante inútil, y a menudo los verás para explicar la currificación, pero en esta ocasión te mostraré un ejemplo más real, los hola mundo en estos post son de otro mundo.
Función currying para un proceso ETL
En esta ocasión vamos a crear un pequeño ETL que puede ser usado en jobs para la extracción, transformación, y carga de una fuente de datos, la cual puede ser definida de acuerdo a nuestras necesidades.
Esta función currificada se compone de una ejecución parcial de una función extractora, seguida de una función transformadora, de carga y finalmente la función que se encargara de ejecutar el proceso.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def extract(extract_fn):
extracted_data = extract_fn()
print(f'extracted-data {extracted_data}')
def transform(transform_fn):
transformed_data = [transform_fn(d) for d in extracted_data]
print(f'transformed-data {transformed_data}')
def load(load_fn):
def execute():
load_fn(transformed_data)
return execute
return load
return transform
¿Cómo funciona nuestro ETL?
Para ejecutar nuestro código crearemos una función extractora que devolverá los datos de la fuente que estimemos conveniente, puede ser una api una base de datos un archivo o lo que sea
Antes que todo definimos un ambiente virtual e instalamos las siguientes dependencias
1
2
3
4
5
6
7
# virtual env optional
python3 -m venv venv
# dependencies
pip3 install requests
pip3 install pandas
Esta es nuestra función extractora la cual nos devolverá una lista de aves chilenas.
1
2
3
def get_chilean_birds():
response = requests.get('https://aves.ninjas.cl/api/birds')
return response.json()
Ejecutaremos nuestra función currificada extract la cual devolverá nuestra función transformadora
1
to_transform = extract(get_chilean_birds)
Nuestra función transformadora recibirá un item de la respuesta http y devolveremos un diccionario solo con los datos que nos interesan.
1
2
3
4
5
6
7
def transform_json_item(json):
return {
'id': json['uid'],
'name': json['name']['spanish'],
'scientific_name': json['name']['latin'],
'image': json['images']['full']
}
Ahora invocamos la función extractora y esta nos retornara una función cargadora con la cual nos devolverá todos los datos ya procesados por extract y transform
1
to_load = to_transform(transform_json_item)
Ahora crearemos 2 funciones de carga una para imprimir las aves y otra función para guardar los datos en un archivo CSV.
1
2
3
4
5
6
7
def load_printing_birds(birds):
for b in birds:
print(b)
def load_birds_to_csv(birds):
df = pd.DataFrame(birds)
df.to_csv('chiliean_birds.csv', index=False)
finalmente ejecutamos
1
2
3
create_csv = to_load(load_birds_to_csv)
create_csv()
Finalmente nuestro ejemplo completo sera el siguiente
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import requests
import pandas as pd
def get_chilean_birds():
response = requests.get('https://aves.ninjas.cl/api/birds')
return response.json()
def transform_json_item(json):
return {
'id': json['uid'],
'name': json['name']['spanish'],
'scientific_name': json['name']['latin'],
# 'image': json['images']['full']
}
def load_printing_birds(birds):
for b in birds:
print(b)
def load_birds_to_csv(birds):
df = pd.DataFrame(birds)
df.to_csv('chiliean_birds.csv', index=False)
def extract(extract_fn):
extracted_data = extract_fn()
print(f'extracted-data {extracted_data}')
def transform(transform_fn):
transformed_data = [transform_fn(d) for d in extracted_data]
print(f'transformed-data {transformed_data}')
def load(load_fn):
def execute():
load_fn(transformed_data)
return execute
return load
return transform
# setting transform function
to_transform = extract(get_chilean_birds)
# setting loading function
to_load = to_transform(transform_json_item)
# etl to print birds
print_birds = to_load(load_printing_birds)
print_birds()
# create csv
create_csv = to_load(load_birds_to_csv)
create_csv()
Para un proceso ETL donde los flujos sean simples como este ejemplo sirve perfectamente el rendimiento y la eficiencia de este ETL dependerá de las funciones que definamos como parámetros por ejemplo como lo haríamos cuando queramos traernos datos desde una base de datos con una tabla con una cantidad considerable de registros ¿podra nuestra función currificada aguantar una carga de 500.000 registros o 1 millon?
Respuesta corta: SI SE PUEDE debemos definir una función extractora con un algoritmo apropiado para leer una tabla con una gran cantidad de registros.
Instalamos psycopg2
1
pip3 install psycopg2
Definimos nuestra función que se conectara con la base de datos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import psycopg2
def posgres_connection():
return psycopg2.connect(user = "postgres",
password = "postgres",
host = "localhost",
port = "5432",
database = "postgres")
def select(query):
try:
connection = posgres_connection()
cursor = connection.cursor()
cursor.execute(query)
records = cursor.fetchall()
return records
except (Exception, psycopg2.Error) as error :
print ("Error while connecting to PostgreSQL", error)
finally:
#closing database connection.
if(connection):
cursor.close()
connection.close()
print("PostgreSQL connection is closed")
Y ahora viene la magia, la siguiente función crea un bucle simple el cual va generando los parámetros limit y offset para una consulta sql de esta manera la carga de la base de datos no se vera comprometida por el tamaño de los datos recibidos por la consulta, nosotros vamos a traernos la data por partes hasta completar la totalidad de los datos.
1
2
3
4
5
6
7
8
9
10
def extractor_by_chunk(limit, extractor_fn):
offset = 0
records = []
while True:
chunk = extractor_fn(limit, offset)
records.extend(chunk)
if len(chunk) == 0:
break
offset += limit
return records
Ahora definimos nuestra función extractora
1
2
3
4
5
6
7
8
9
10
11
def birds_db_extractor():
def get_birds(limit, offset):
query = f"""
SELECT * FROM birds
ORDER BY name
LIMIT {limit} OFFSET {offset}
"""
return select(query)
return extractor_by_chunk(1000, get_birds)
Finalmente creamos nuestro ETL de la misma forma del primer ejemplo
1
2
3
4
5
6
7
# setting transform function
to_transform = extract(birds_db_extractor)
# setting loading function
to_load = to_transform(transform_json_item)
# create csv
create_csv = to_load(load_birds_to_csv)
create_csv()
Conclusiones
Naaa memes