top of page

Data Ingestion Pipelines With Custom Python Scripts

Fivetran is among the best of the best when it comes to data ingestion platforms. It has a wide array of toolsets, functionalities, and a whopping 300+ pre-built data connectors. These pre-built data connectors are all plug-and-play, meaning you can readily use them with the relevant data sources and set up data pipelines with zero coding effort.


While Fivetran provides an extensive range of connectors and functionalities to cover all the fundamental data integration requirements, you may still require some customization for certain cases. You might be using a new data source unique to your project, and there might not be any pre-built connector for it. Or you might have to adapt a connector to suit your particular data formats. Customization options allow you to code your data ingestion pipelines to align with your particular policies and processes.




While some data integration platforms build custom integrations and connectors on demand, Fivetran allows you to integrate your own custom connectors into its framework. This gives you a higher level of control over your customizations. You can always make further updates to your custom connectors as and when required without relying on Fivetran for every update.


How to Integrate Your Custom Code in Fivetran?


Data Solutions Consulting Inc. recommends three approaches to utilizing your custom code with Fivetran:

  1. Making use of Fivetran functions

  2. Using Python, Airflow, and a cloud vendor such as Google

  3. Using Snowpark to perform data integrations in Snowflake environments directly.

Let us take a deeper look into each of these approaches.


Fivetran Functions


The Fivetran functions are a way to embed your custom-coded data connectors as an extension of Fivetran. This helps avoid the complications of developing a complete data pipeline from scratch. Instead, you can use the larger Fivetran framework and only integrate your custom code for the required functionality. You only need to code the part where you extract the data from the data source. The rest of the data pipeline tasks are taken care of by the Fivetran platform.


On top of simplifying customization, Fivetran also provides several templates you can build from to make the development process easier. The basic logic on how these custom connectors work is enumerated below:

  • A sync initiation request from Fivetran must be handled by the connector.

  • On receiving a request, the connector should authenticate using an API key.

  • On successful authentication, the response from the API is processed and returned to Fivetran. Fivetran loads the data response into the corresponding destination or data warehouse solution.

You will have to take care of the error handling, authentication, and the cost accrued for moving the data across the cloud services. You should first gain a good understanding of the data source API and the cloud functions you will be making use of. You must write your own custom error-handling logic and set up your cloud infrastructure to move the data across the pipeline. You can use cloud services like AWS, Azure, and GCP or run the connectors with your virtual private cloud setups. To ensure cost optimizations, your code should be highly optimized and set the necessary threshold limits to avoid excessive billing.


Fivetran functions provide the same additional functionalities provided by a pre-built data connector, such as:

  • Incremental updates

  • Data type inference

  • Automated schema updates

  • Data idempotency

  • Monitoring logs and troubleshooting reports

  • Soft delete

This also helps you test your connectors with built-in testing features and unified logging systems. You get access to the multi-language support that Fivetran provides as well.


Using Python, Airflow, and a Cloud Service


While you can use a range of programming languages to build your custom connector, Python remains the most popular language to develop your data pipelines. With Python, you can access many open-source libraries, thriving community support, and open-source resources. Fivetran also provides excellent documentation and sample Python code to help you get started with custom connector development using Python.

You can set up your custom data pipeline with all the necessary modules, such as data ingestion, storage, orchestration, and container deployment in Fivetran.


Here are the common steps to creating a custom ETL pipeline using Python:

  • Build your data ingestion module using Python

Your coding should take care of the data extraction process from the data source. Thus, you will have to handle the data source API to fetch the data in the required format, process it, and then store it in the data storage platform of your choice.

  • Make use of cloud services such as Google Cloud Storage and Big Query as your data storage module

You will have to load your data to a cloud storage solution. Cloud service platforms provide the necessary infrastructure to move your data from the source to your data warehousing solution. They are fully managed and maintained, thus letting you focus on the data connector logic. Depending upon the selected cloud service, you must code your Python file to load the data into the particular cloud storage properly. This involves the basic authentication, security, and other types of error handling relevant to your selected cloud service. You should also take into consideration the cost it takes and storage limits you might have to deal with.


  • Use Apache Airflow for automating and scheduling your data pipelines

Apache Airflow is a plug-and-play tool that lets you automate tasks and code executions on any cloud platform. By integrating Airflow into your data pipeline, you can set up automated executions of your data extraction and loading tasks. Airflow is also based on Python and thus provides a seamless coding experience when working with Python-based programming for your data pipeline project.


With Apache Airflow, you can import the necessary Python libraries and set up a code to schedule DAGs. A DAG is a schedule interval that can be defined to run tasks on the specified interval per day, per month, per hour, and so on.

  • Make use of Docker to deploy your data integration pipeline

Once you have set up the data integration pipeline, you can deploy it using containerized dockers. To do so, first, create a dockerfile. The dockerfile should contain all the specifications for your container, including the environment setup, executables, and working directory. The next step is to build your docker container and run the resulting docker image.


Snowpark To Directly Perform Data Integrations in Snowflake Environments


There is yet another approach to developing custom data connectors and integrations if you use the Snowflake warehousing solution. Snowflake has built-in support for various data formats such as CSV, JSON, Parquet, Avro, ORC, and XML. Data can be extracted from varied sources and processed into any of the file formats that Snowflake can work with. But when you need to work with data formats other than the ones that Snowflake has native support for, you might need some customizations, and Snowpark lets you do that.


The Snowpark feature lets you access files of any format and store them in Snowflake directly. It allows you to program the data extraction process using either Python or Java programming languages. You just need to use the Snowpark libraries for the respective programming frameworks and use them to code your customized data ingestion programs. Snowpark thus provides the following benefits:

  • Support for specialized file formats

You can work with specialized file formats such as health data stored in HL7 formats, medical imaging data in DICOM format, Sports data in FIT format, Excel sheets, binary data, and XML data exceeding the 16 MB row limit.

  • Support for unstructured and semi-structured data

It allows you to process unstructured data such as raster image files, OCR documents, NLP on textual data, and more. With Snowpark, you can perform advanced AI and ML operations on unstructured data.


Snowpark allows you to use stored procedures or UDTF (User Defined Table Function) to extract and process your data into a Snowflake. These two methods can be used interchangeably or based on the specific use case to optimize your data pipeline with Snowpark.


The stored procedures are similar to any arbitrary code that you can run. You can use this type of programming to specify multiple tasks at a time and execute your data extraction processes.


The UDTF is more frequently used for parallelism and optimized data loading. It returns a table with a predefined schema.


Snowpark lets you bypass long developmental stages and have your warehouse and analytical apps directly access your data files. It greatly reduces developmental efforts and costs and lets you focus on insights and derive value from your data.


In a Nutshell

Thus, there are three approaches through which you can create data ingestion pipelines with custom scripts - Fivetran functions for simplified customization, using Python, Airflow, and cloud services for flexibility, and Snowpark for specialized Snowflake environments. These options let you tailor data pipelines to your unique needs, ensuring efficient data integration and analysis.


That said, if you want to develop custom data ingestion pipelines and are unsure how to get started, we are here to help. Contact us today to learn more.


14 views0 comments

Comments


bottom of page