Integrating data sources and publishing open data with fastETL & Airflow

Augusto Herrmann
19.4.2023

csv,conf,v7

Who am I

  • Augusto Herrmann – https://herrmann.tech
    • background in Open Data (2010-2018)
    • data engineering (2020-2023)

Where to find this presentation

https://herrmann.tech/slide-decks/2023/04/integrating-data-sources-and-publishing-open-data-with-fastetl-and-airflow

Creative Commons License licensed under a Creative Commons Attribution 4.0 International License.

The team who made it

Data engineering team
Secretariat for Management and Innovation
Ministry of Management and Innovation in Public Services, Brazil

What is it

  • a free and open source plugin for Apache Airflow airflow icon
  • has features that are useful for making ETL pipelines 🛠
  • speeds up the data pipeline development process 🏃

Why we made it

  • to develop data pipelines faster
  • promote code reuse
  • code we wrote had been using extensively since 2019 could be useful to others who use Airflow
  • we 💓 free and open source software
  • share knowledge with other teams, expand our network of developers 👥
  • get feedback & contributions 👂

About Airflow

Apache Airflow is

  • free and open source software with a robust and active community
  • task scheduler and orchestrator: directed acyclic graphs (DAGs)
  • visibility into ETL processes and notifications: what broke, when and why

We use Airflow

and fastETL to

  • periodically synchronize data sources in the data lake
    • sources are databases, spreadsheets (Excel, Google, Sharepoint) and others
    • data is used for analysis, auditing and to create dashboards
  • publish open data on the open data portal
  • notify people about publications in the official gazette (Ro-DOU project)
  • notifications by email and Slack
  • manage sprint stories and tasks using the Trello API

Features

  • Full or incremental replication of tables in SQL Server, and Postgres databases (and MySQL sources)
  • Load data from GSheets and from spreadsheets on Samba/Windows networks
  • Extracting CSV from SQL
  • Clean data using custom data patching tasks (e.g. for messy geographical coordinates, mapping canonical values for columns, etc.)

Features

  • Querying the Brazilian National Official Gazette's (DOU's) API
  • Using a Open Street Routing Machine service to calculate route distances
  • Using CKAN or dados.gov.br's API to update dataset metadata
  • Using Frictionless Tabular Data Packages to write data dictionaries in OpenDocument Text format

Example: replicating a database table

t0 = DbToDbOperator(
    task_id="copy_data",
    source={
        "conn_id": airflow_source_conn_id,
        "schema": source_schema,
        "table": table_name,
    },
    destination={
        "conn_id": airflow_dest_conn_id,
        "schema": dest_schema,
        "table": table_name,
    },
    destination_truncate=True,
    copy_table_comments=True,
    chunksize=10000,
)

DbToDbOperator

  • Copies can be:
    • full data
    • incremental
  • Destination databases can be
    • Postgres
    • SQL Server
  • Source databases can be:
    • Postgres
    • SQL Server
    • MySQL

Example use case: TaxiGov

  • ride system for public officials using Taxis
  • over 900k rides since 2016
  • 35k+ users
  • used by 163 public organizations in federal, state and municipal governments

TaxiGov: the data sources

  • 29 heterogeneous databases with various table schemas
  • 5 different suppliers
  • many fields of unstructured text (motive, status, organization) – messy!

TaxiGov: the data pipelines

  1. collect from the 29 data sources (starts daily at 7:45 am)
  2. clean the data with fastETL patchwork and load it into a unified schema
  3. select data with no privacy restrictions and publish it as open data (finished before 8:30 am 🕣)

TaxiGov: cleaning data with patchwork

Example: Geographical coordinates in sources have sometimes

  • wrong decimal separator
  • wrong signal
  • values multiplied by 1,000 or 10,000
  • etc.

fastETL patchwork

using GeoPointDataCleaner

def clean_coordinates(tmp_dir: str, source: str, columns: list):
        source_config = TaxiGovPipeline.get_config(source)

        patch = DataPatch.from_file(
            file_name=os.path.join(tmp_dir, f'{source}.zip'),
            source_id=source,
            schema=source_config['schema'],
            table=source_config['table'],
            primary_keys=source_config['primary_keys'])

        logging.info('%d linhas lidas para a fonte "%s."', len(patch.df), source)
        cleaner = GeoPointDataCleaner(
            patch.df,
            source_id=source,
            schema_name=source_config['schema'],
            table_name=source_config['table'],
            primary_keys=source_config['primary_keys'],
            columns=columns)
        cleaner.clean()
        cleaner.write(tmp_dir)

table metadata

read column descriptions from the database with fastETL's TableComments

table_metadata = TableComments(
    conn_id=MSSQL_CONN_ID, schema="TAXIGOV", table="corridas"
)
df = table_metadata.table_comments
for field in schema.fields:
    if not field.description and any(df.name.isin([field.name])):
        field.description = df[df.name == field.name].iloc[0].comment

and record a Tabular Data Package

table metadata

from Tabular Data Package

schema:
  fields:
    - name: base_origem
      type: string
      title: Base de origem
      description: Identificação da base de dados de origem. Normalmente contém
        a sigla do Estado e um número de versão, ex. TAXIGOV_DF_3.
      constraints:
        required: true
        minLength: 10
        maxLength: 100
    - name: qru_corrida
      type: integer
      title: Código da Corrida
      description: Código que identifica a corrida unicamente dentro da base de
        origem. Não é garantida a ausência de colisão com outros identificadores
        em outras bases de origem.
      constraints:
        required: true
        minimum: 0
    - name: orgao_nome
      type: string
      title: Nome do órgão ou entidade
      description: Nome do órgão, por extenso, sem abreviações ou siglas, conforme
        consta no SIORG.

to data dictionary document with fastETL's DocumentTemplateToDataDictionaryOperator

TaxiGov: open data publication

example workflow of a DAG in Airflow

Using open data with Frictionless

In [1]: from frictionless import Package

In [2]: package = Package("https://repositorio.dados.gov.br/seges/taxigov/v2/datapackage.yaml")

In [3]: package.resource_names
Out[3]: 
['corridas-7-dias',
 'passageiros-solicitantes',
 'corridas',
 'corridas-2017',
 'corridas-2018',
 'corridas-2019',
 'corridas-2020',
 'corridas-2021',
 'corridas-2022',
 'corridas-2023']

In [4]: df = package.get_resource("passageiros-solicitantes").to_pandas()

In [5]: df[df["nome_passageiro"].str.upper().str.startswith("AUGUSTO HERRMANN") == True]
Out[5]: 
         base_origem           nome_passageiro          nome_solicitante cpf_solicitante   ano  mes  distancia   valor  quantidade
105925  TAXIGOV_DF_1  AUGUSTO HERRMANN BATISTA  AUGUSTO HERRMANN BATISTA  ***.303.276-**  2018   11     30.735  125.65           4
111136  TAXIGOV_DF_1  AUGUSTO HERRMANN BATISTA  AUGUSTO HERRMANN BATISTA  ***.303.276-**  2018   10     26.615   97.50           3
115336  TAXIGOV_DF_1  AUGUSTO HERRMANN BATISTA  AUGUSTO HERRMANN BATISTA  ***.303.276-**  2018    9     35.175  125.07           4
134833  TAXIGOV_DF_1  AUGUSTO HERRMANN BATISTA  AUGUSTO HERRMANN BATISTA  ***.303.276-**  2018    5     33.043  112.87           3
143470  TAXIGOV_DF_1  AUGUSTO HERRMANN BATISTA  AUGUSTO HERRMANN BATISTA  ***.303.276-**  2018    3     49.065  173.81           5

Visualizing open data

Available at: https://gestaogovbr.github.io/taxigovviz/

Where to find fastETL

https://github.com/gestaogovbr/FastETL

Installing

  • install the msodbcsql17 and unixodbc-dev libraries to your Airflow environment, then

Credits

  • fastETL logo by Moisés Lima
  • photo by Alex Lvrs on Unsplash
  • photo by Vinicius "amnx" Amano on Unsplash
  • presentation done in Markdown using Marp by Yuki Hattori

Contact

Questions & feedback

👆❓ (também em português / también en español)