Apache Airflow

  • Prerequisites
    • Python
    • Docker or Linux/Mac machine
  • Astronomer.io is the best place to run airflow in cloud.
  • Install UV an extremely fast python Package and project manager, written in RUST.
    • A single tool to replace PIP, PIP-tools, Pipx ,poetry ,pyenv ,twine ,virtual env etc.
  • For data engineering projects, refer to following website.
  • Apache air flow Provides following benefits to our data applications.
    • Organisation
      • Apache air flow helps us to set the order of our tasks.
      • Apache air flow, make sure each task starts only when the previous one is complete.
      • It controls the timing of our entire data process.
      • We have methods which have predefined functionality These are synchronised by Apache air flow.
      • It acts as an automated coordinator of our data tasks.
      • Example, if we need to collect data from a database, clean it, Perform some calculations, and then generate a report.
        • Airflow helps-us To define this sequence and make sure each step happens in the correct order, even if some tasks take longer than others.
    • Visibility(Our control Tower)
      • Airflow gives us a bird-eye view of our data tasks.
      • Helps us to monitor the progress of our Workflows.
      • Quickly identify and troubleshoot issues
      • Understand dependencies between tasks.
      • Example, if we are running multiple workflows For different projects, air flow provides a dashboard where you can see the status of each pipeline at a glance
      • If one task fails, we can easily spot it and take action, rather than discovering the problem hours later when your report doesn’t arrive.
    • Flexibility and scalability
      • Air flow is like a Swiss Army knife For data workflows.
      • It is versatile enough to handle large variety of tasks and can grow with your needs.
      • This flexibility allows us to connect too many data sources and tools.
      • Start small and Grow as your project gets bigger.
      • Customise your work to fit your exact needs.
      • You might start by using a flow to plan simple database queries
      • As you need to grow, You can add more complex tasks. Like training, AI models, checking data, quality, or even starting outside programs By triggering external API all by using the same air flow system, you know.
  • Apache airflow is an open source platform to Programmatically author, Schedule and monitor workflows.
  • Apache airflow is a tool That helps you to create, organise, and keep track of all your data tasks automatically.
  • It is a very smart to do list for your data work that runs itself, example extracting data from database, processing the data, and loading the data.
  • Airflow provides following benefits
    • Dynamic
      • Air flow Can adopt and change based on what’s happening.
      • Python based, so it is easy to use and powerful, write your workflow in python.
      • Dynamic tasks helps us to generate tasks based on dynamic inputs.
        • for example If today we have data form two sources to process and tomorrow, we will have data from three sources Then air flow Can generate task per source dynamically without any need of any change
      • Dynamic workflows Helps us to generate work flow based on Static inputs.
        • We need to generate our workflow for configuration files.
      • Branching Helps us to execute Different set of tasks Based on a condition or result.
      • We are analysing quotations received and if we receive more quotations today, then a flow will automatically add extra task to process every quotation without us having to manually change anything.
    • Scalability
      • Airflow can handle Both small and large amount of work.
      • Air flow can manage few simple tasks or hundred of large ones.
      • Air flow provides Different execution modes, which depend on your infrastructure and budget.
      • For example, if we use airflow to process data From one point of sale. As our business grows And point of sale increases Airflow can easily scale up to Handle data from all newly added even hundreds of point of sale without needing a new system.
    • Fully functional user interface
      • Airflow has a visual dashboard where we can see and control our tasks and workflows.
      • It is like having a quarter panel for your data tasks where we can see what is happening and make changes.
      • We can monitor and troubleshoot our workflow
      • Highlight relationships between workflows and tasks
        • identify dependencies between workflows
      • Identify bottlenecks And performance matrix.
      • Manage users and roles, of instance.
      • Example we have a task of updating our sales report. In air flow, UI, we can see if today’s update is running, check when it last ran successfully and even pause or restart if needed.
    • Extensibility
      • We can add new features or connect air flow to other tools easily.
      • We can add new applications to do more things.
      • We can connect it to other data tools, we use.
      • Many providers package with functions to interact with tool or service(AWS, Snowflake, et cetera)
      • Customisable user interface.
      • Possibility to customise existing functions
        • We can add abstraction layer above the function to Abstract the complexity of using it.
      • Example We are using a new cloud storage service. Even if airflow doesn’t work with it Out of the box We can write a small piece of code To connect airflow To this new service, thus extending its capabilities.
  • Core Components
    • The Metadata Database
      • This is a database that stores information about your tasks and their status.
      • It keeps track of all the important details About your Workflows/Users/variables, et cetera.
      • It is like airflow memory. Airflow uses the metadata database to remember which tasks have run, when they ran And their results.
      • Metadata database is used to store information regarding workflows, users, et cetera.
    • The Scheduler
      • The scheduler is responsible For determining when tasks should run.
      • It ensures your tasks run at the right time and in the correct order.
      • Scheduler is used To schedule your tasks using workflows.
    • DAG file processor
      • The DAG file processor parses DAG files and serialises Them into the meta data database.
      • It used to be the part of scheduler in airflow 2.0 But now in airflow 3.0 It is a separate component for scalability, and security reasons.
      • The DAG processor transforms Python workflow, definitions into structured execution plans that airflow scheduler Can efficiently run.
    • The executor
      • The executor Determines how your tasks will run.
      • It manages the execution of your tasks, deciding whether to run them in sequence or in parallel and On which system.
      • For example, if we want to run our tasks on a Kubernetes cluster, We will use Kubernetes executor.
      • The executor does not execute our tasks. It defines how, and on which system we will run our tasks.
      • The executor decides how to run your tasks to optimise performance.
    • The API server
      • The API server Provides endpoints for task operations and serving the UI.
      • Without API server, we cannot run tasks and we cannot access the air flow user interface.
      • It handles task operations sent by the executor And allows you to view, manage and Monitor your workflows Through web browser.
      • We use the air flow, UI provided by the API server To see the status of your tasks, start and stop workflows, And view logs.
      • API server is used to run task and access user interface.
    • The worker
      • Workers are processes that actually perform the task.
      • They do the actual work defined in your task.
      • The workers take tasks from Queue And execute them.
      • Worker run our tasks.
    • The Queue
      • The queue is the list of task waiting to be executed.
      • It helps manage the order of task execution, specially when there are many tasks to run.
      • The queue in air flow ensures tasks are possessed in orderly manner.
    • The Trigger
      • The trigger is responsible for managing deferrable tasks That is tasks that wait for external events.
      • It allows air flow to efficiently handle tasks that depend on external factors without blocking other processes or the tasks.
      • The trigger Keeps an eye on event, Notices about event occurrence’s And starts task execution.
  • The executor And Queue Are internal, we don’t see them when we run air flow.
  • Core concepts
    • DAG(Directed Acyclic Graph)
      • A DAG is a collection of all the tasks you want to run, organised in a way That reflects their dependencies and relationships.
      • It helps you to define the structure of your entire workflow, showing which tasks need to happen before others.
      • A DAG in airflow Lists All the tasks to complete your data workflow In the right sequence.
      • A graph which has a loop, for example, a set of tasks repeat again after a task has a cyclic dependency. Thus, it is not a directed acyclic graph.
        • for example If B has a dependency on A And then again, A has a dependency back on B.
      • In our graph If there is no cycle, then it is a DAG(Directed Acyclic Graph).
    • Operator
      • An operator defines a single, ideally idempotent, Task in your DAG.
      • Idempotent Means that we can run a task as many times as we want, and for the same input, we will get same output every time.
      • Operator allows us to breakdown our workflow into discrete, manageable pieces of work.
      • Example “ Extract data operator“ Could be a single task in your DAG that pulls data from a specific source.
      • Airflow has thousands of operators
        • The python operator, execute a python script Or a function.
        • The bash operator executes a bash script or command.
        • The SQL execute query operator execute a SQL query To a database.
        • The file sensor waits for a file it doesn’t execute anything.
      • https://registry.astronomer.io/
        • We can find a list of all providers, which is a python package with functions so operator’s Can interact with tool or service from our workflow.
        • To interact with airbyte Choose airbyte Provider and we can see what operator, it provides use in our workflow’s.
      • Operator Is a single task like one step in workflow.
    • Task/Task instance
      • A task is a specific instance of an operator.
      • When an operator is assigned to a DAG, it becomes a task.
      • Tasks are the actual units of work that get executed when our DAG runs.
      • Task is the actual execution of that step at a specific time.
      • Task and task, instance, specific execution of an operation like following a step at a particular time.
    • Workflow
      • A workflow is the entire process defined by your DAG, including all the tasks and their dependencies.
      • It represents your entire data pipeline, showing how all the pieces fit together to achieve your goal.
      • For example, of the daily sales report may be our workflow, which includes tasks like
        • Extracting sales data
        • Processing it
        • Generating a report
        • Emailing it to stakeholders
      • Each task in sequence is independent on Previous task.
      • Workflow is also referred to as DAG.
      • DAG is the Overall workflow structure for our data pipeline.
      • Workflow is the entire process defined by DAG from start to finish.
  • Disadvantages
    • Airflow cannot be used for data processing. It is not a data processing framework.
      • Air flow is not designed to process large amount of data.
      • It is an orchestrator and not a data processing engine.
    • Airflow is not real time, streaming Solution
      • A flow is built for batch processing and scheduling and not handling real time data streams
    • Air flow is not a data storage system
      • Air flow uses database to store metadata but it is not meant to be used as a data warehouse or database for your actual data.
    • Airflow cannot be used for Frequency, Sub Minute scheduling
      • If we need to run tasks, every few seconds airflow is not designed for such high frequency scheduling.
      • Air flow is better for tasks that run at intervals of minutes, hours, or days.
    • When we need to process large Datasets directly.
      • if we need to perform complex calculation on terabyte of data. Air flow is not the tool to do processing.
      • Air flow orchestrate jobs And does not perform them.
      • We can use air flow to trigger a spark job rather than processing data in air flow.
      • Though, depending on the infrastructure and computer resources We may be able to process data in air flow at a higher scale.
    • Real time, data processing
      • We cannot use airflow for real time data Stream like trading platform, et cetera.
      • Air flow operates on a batch schedule and is not designed for continuous real time data process processing.
      • We can schedule our data pipelines And DAG’s Based on data events, but still, it is not streaming.
    • For simple linear workflow with few dependencies, airflow may be an overkill
      • The power of air flow comes from managing complex Dependencies and parallelism for simple flows.
      • For simpler workflows We use simpler tools like cron jobs Et cetera.
  • Airflow is less suitable for high frequency, sub menu tasks, Direct large data, processing, real time, reaction, or overly simple workflows.
  • Apache airflow architecture
    • Single node architecture
      • A node is a single computer or server.
      • A single note, architecture means all components Of air flow are running on one machine..
      • This is the typical set up when we install and run air flow.
      • The API server, which serves the user interface and provides Endpoints for task Operation’s that are used by other components of air flow.
      • Scheduler checks for Tasks that are ready to be triggered.
      • Executor Defines how, and on which system our tasks will run
        • We use local executor in single node architecture
      • In metadata Database air flow stores, all the metadata related To our environment, for example, our task instances, DAG runs, the users of air flow, instance, et cetera
        • This will be a relational database.
      • We have one or more workers which run our tasks and in the context of a single node architecture These are the sub process of the scheduler.
      • Triggers is used For deployable operator tasks, it exist, and it is used in single node architecture As well, only if we use deployable operator.
        • Deployable operator is a special kind of task.
      • The last and also important component of DAG is the file processor.
      • The DAG file processor parses and serialises Your workflows, And your DAG files.
        • DAG files are in the DAG folder, which could be a got repository.
      • The triggers And workers communicate with the API server. They never communicate directly with the meta data database for security and scalability reasons.
      • The API server communicate with the meta data database.
      • The scheduler Also communicate with the meta data database.
      • The DAG file processor which Serialises our workflow Into the meta data database, communicates with meta data database.
      • Uses of single node Architecture
        • Great To get started with small workflow.
        • Simple to set up and manage
        • We can’t scale a single node architecture As our Workflows, grow and become More complex.
    • Multi Node architecture
      • For better performance and scalability, we may have to move to multi Node set up.
      • A node Is a single computer or server.
      • Multi Node refers to running air flow across multiple computers or servers.
      • This set up is used When we want to handle larger workloads, or we want More reliability.
      • This is a typical set up. You get when a flow runs in production to ensure performance, scalability and reliability.
      • We have multiple nodes Multiple computers in the architecture.
      • The API server provides the user interface and the end points for your workers, and the trigger to communicate with the air flow meta database.
      • We can have more than one API server, which helps many clients to access the air flow user interface.
        • The clients can be people or workers which run tasks.
        • We can add a load balancer in front of API server to maintain resilience.
      • The meta data database must be running on a dedicated machine for scalability and security reasons.
      • The scheduler May be More than one in a multi node architecture. This improves Resilience, if One scheduler is down other one can run as a backup.
        • The executors are part of Schedulers.
        • The schedulers communicate with the Metadata database Directly.
      • The DAG file processor are responsible for parsing And serialising our DAG’s, Our python files into the meta data database.
        • We can Have more than one DAG file processor.
        • If we have many DAG to parse We can scale up our air Flow Instance.
      • The Queue is Specific for celery executor.Celery Is a python framework to distribute our tasks among multiple machines and Queue .
        • The queue Can be rabbit, MQ or redis Which is an external Queue.
      • The workers execute our tasks
        • We have one worker, per node.
        • The workers pull the task to run from the queue.
        • The queue will receive the tasks to run from the executor.
        • Once the task is complete, the worker will communicate with the API servers to update the status of task in the air flow, meta data database
        • The workers never communicate with the meta data database directly.
      • We can organise Components in a multi node set up in air flow in many ways.
      • It ultimately depends on our architecture, budget, and resources we have.
      • Benefits of multi Node architecture
        • Scalable, we can add more workers to handle larger workloads.
        • Reliable, if one machine fails, others can take over its work.
        • Performance by distributing work across machines, we can process more tasks simultaneously.
      • This architecture may be Overkill for smaller workflows As it is complex to manage.
      • This multi Node architecture is great for larger organisations with complex workflows, high reliability needs or large volumes of tasks.
      • It allows air flow to scale up to meet demanding workload while providing better fault tolerance.
  • Working
    • Add a new python file to the DAG folder.
    • The DAG file processor continuously passes the DAG folder every five minutes for New DAG files.
    • When it detects that there is a new DAG in the DAG folder, the DAG file processor Serialises the DAG file Into the metadata database, and that serialised Representation of your DAG is used by both the API server to show the code of the DAG on air flow UI and and also by the scheduler To check if there are task to run.
      • As soon as there is DAG file the scheduler checks for task to run. If there are tasks, it creates a DAG run and tasks are submitted To the executor.
    • If there is a task ready to run the scheduler creates a DAG run object, An instance of DAG and by default, the state of the DAG run object is Queued.
      • The tasks are now in Queued state and the worker will pull the tasks out of the Queue in order to run Them and then it communicates with the API server to update the states of the task in the metadata database.
    • As soon as airflow Can run your DAG, then the DAG run object is now in running state.
    • Now, the scheduler creates the task instance object corresponding to the tasks to run, and so the state is Scheduled by default, and then the scheduler submits the task to the executor.
    • Because the executor defines how and which system the task should run and now the task instance object is in Queued State.
      • There is always a Queue To run our tasks in quick order.
      • The queue can be external or internal, depending on the Architecture, we have set up for the air flow instance.
      • If the Queue Is Internal it exist, but we can’t see it.
    • So now the task instance is in queued state.
    • Next, the worker process that runs the task, Picks up the task Instance, object. From the Queue.
    • The task instance is now in running state.
    • If the task is successfully completed, the worker is going to Communicate with the API server To indicate the current state of the task, instance as success.
      • The scheduler Updates the diagram based on the state of the status of task.
    • The API server updates, the corresponding state of the task instance object in the meta data database.
    • The scheduler now gets this information Of the task from its status that it is a success. And the DAG run is also completed in success.
    • When in single node architecture All the components work in same process.
  • Installation
    • Check your compatible python version matches with airflow version
      • https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt
      • https://raw.githubusercontent.com/apache/airflow/constraints-3.1.8/constraints-3.13.txt
      • AIRFLOW_VERSION - Airflow version (e.g. 3.1.8) or main2-0, for latest development version

      • PYTHON_VERSION Python version e.g. 3.103.11

    • To use a downgraded python version
      • Install the version => sudo dnf install python3.13
      • Create a directory which will host envirnoment and airflow
        • Create directory airflow in your Home directory.
      • Create directories for components
        • mkdir -p ./dags ./logs ./plugins ./config
      • Create a virtual environment => python3.13 -m venv my_venv_name
      • Activate envirnoment => source my_venv_name/bin/activate
    • Install using pip/uv
      • pip install "apache-airflow[celery]==3.1.8" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.1.8/constraints-3.13.txt"
      • pip3 install apache-airflow
        • This may not work or will produce an unusable Airflow installation due to dependencies not loaded.
    • Use the following docker compose to install using docker.
    • Setup a home directory
      • export AIRFLOW_HOME=~/airflow
    • Initialize backend/database
      • airflow db init
      • If above does not work use "airflow db migrate"
      • To clean database use
        • airflow db clean
    • If you are using postgres change the following property in airflow.cfg in airflow home
      • sql_alchemy_conn = postgresql+psycopg2://postgres:postgres@localhost/airflow_db
      • sql_alchemy_conn = postgresql://postgres:postgres@localhost/airflow_db
      • When you use SQLAlchemy 1.4.0+, you need to use postgresql:// as the database in the sql_alchemy_conn.
      • For mysql use
        • mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
    • The command airflow providers list can show available providers.
    • The simple auth manager is the auth manager that comes by default in Airflow 3
      • https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/auth-manager/simple/index.html
      • For Airflow 2 and below
        • check and install provider => pip install "apache-airflow-providers-fab"

        • In airflow configuarion file
        • auth_manager=airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager

        • create user using
          • airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
    • Start API Server
      • airflow api-server -p 8081
    • Start scheduler
      • airflow scheduler
    • Start DAG processor
      • airflow dag-processor
  • Airflow views
    • The Home view
      • On top, we have the health statistics ofevery meta data component.
        • The meta data database
        • The scheduler
        • The trigger
        • The DAG processor
      • Any state in red means the component is down.
      • Above the health, we have the states which include failed DAG’s, DAG import errors, running DAG’s, Active DAG’s.
        • Active DAG’s are DAG’s That are not running, but unpaused That is scheduled.
      • Below Health, we have the history of last 24 hours by default
        • We can change the time to custom time as well.
        • The history shows the count of status of our DAG runs and task instances.
      • Asset events are logical grouping of data, it can be a file or a table in your database.
      • When ever a DAG materialises an asset that is, it creates an asset that produces an asset event. It is shown. On the right side of the dashboard.
      • If we want to quickly access our DAG runs in success or failed, or even our task instances in upstream, failed or success, we just need to click on the corresponding state.
    • The DAG’s View
      • At the top of the page in the DAG’s View dashboard, we have three tabs, DAG’s, DAG runs, And task instances.
      • In the DAG tab all the DAG’s Are listed in a page sequence.
      • We can also search our DAG.
      • Below the search bar, we have different states And these states are useful because they allow us to filter our DAG’s Based on the latest Run.
      • If we want to display DAG’s In last run success, state Click on success.
      • Next to it, we have status of all, active or paused. DAG’s.
        • Active DAG’s are DAG’s that are ready to be scheduled.
        • Paused DAG’s Are not ready to be scheduled.
      • We can also filter DAG’s By tag’s On this page.
      • We can display DAG’s As tiles or list.
      • We can sort DAG’s By DAG ID, NEXT RUN, LATEST run OR SORT DATE.
      • The display name is the unique identifier Of our DAG.
      • WE HAVE THE SCHEDULE UNDER THAT WHICH DEFINES WHEN THE TAG WILL RUN.
      • We have the latest run, Next run.
      • In the end, we have Some bars. Each bar here corresponds to a DAG run.
        • When we hover over the bar we see some additional information such as state of specific DAG run, Run after date, start date, end date, and duration.
        • Run after start date and end, date are timestamps.
        • The higher the bar is, the longer it took to complete the iteration or specific DAG run.
        • Click on the DAG run to view more information of that particular DAG run.
      • We can click on the toggle to ensure that DAG cannot be scheduled and switch it off,which pauses The DAG RUN.
      • The trigger button Manually trigger’s our DAG So that we can run it.
        • We can use the manual run to monitor our DAG's and find something or debug our DAG's.
      • Next to the DAG, we have DAG runs.
        • We can filter by state that is DAG runs with the queued by state that is DAG runs with the queued or running or failed or success state.
        • We can filter by Run type that is backfill, manual, scheduled, asset triggered.
        • We have the list of DAG runs here where we have a table with DAG ID, Run after, state, Run type, start date, end date, and duration of every DAG run and the version of the DAG for which that DAG was executed.
          • If we change the DAG and run it again, we get our next version.
        • We can re run our DAG or mark it as failed with next two buttons.
        • If we are admin, then we also get option to delete DAG run's
      • Next, we have the task instances which lists us a list of task instances, we have the DAG ID, so we know that this task belongs to my DAG.
        • Next, we have the DAG run, the task ID, the start date, the end date.
        • The map index is filled if the task is dynamically generated.
        • The try number which shows how many times that task instance was tried.
        • The operator, the task uses that is Python decorated operator, python operator, or Bash operator, et cetera.
        • The duration of the task and the DAG version for which that task was executed.
        • We can restart the task or mark it as failed.
        • We can search for specific task from the search bar.
        • We can filter task by state.
    • The Assets view
      • An asset is a logical group of data, for example, in a CSV file or a table in a database.
      • Before air flow 3.0 assets were called as the data sets.
      • We can create a group for assets if we have multiple assets related to each other.
        • For example, we can create a group of assets of user and user location.
      • The DAG Number, which consume the user asset are shown in corresponding column.
        • When we hover over the DAG number, we see the DAG Names.
      • Assets are dependent on DAG’s using them because DAG’s Consume assets.
      • The producing tasks shows the number of tasks that produces that asset.
      • If we want to create an asset event We have a button with play sign. When we click the button, we have two options either we can materialise the asset or we can just create an asset event.
        • When we materialise an asset We are actually Triggering The DAG behind that asset.
          • When we materialise an asset that is, we create an asset behind the scenes, an asset is a DAG with one task.
          • We Run that DAG With one task to materialise our asset.
          • If our asset is a file that is produced by extracting data from a source, then we are going to execute the logic in order to get our file and so our asset.
        • If we want to create an asset event and want to simulate that an asset has been materialised. In this case, we will click manual.
          • this can be useful if we have some downstream assets or DAG’s That depend on that asset, and we don’t want to materialise them again.
          • With manual, we can pass extra parameters that we will be able to retrieve In the downstream assets or DAG’s.
          • When we create an event, it will trigger the downstream dependencies.
      • We can search for specific assets from the top.
      • If we click on the asset, we get a view, which is the graph view of DAG’s But for assets.
        • We have the DAG With one task that Materialises the asset.
        • We are shown DAG’s And assets materialised.
        • The DAG’s Have toggles on them because an asset can be materialised at a specific schedule interval.
          • we can materialise asset at midnight every day by a DAG.
        • When an asset depends on another asset, that dependent asset DAG is Triggered as soon as The parent asset materialises.
        • On the right, depending on the asset chosen, we get its information about the group and the producing tasks and the consuming DAG’s.
        • We also have asset events on right hand side. Whenever we create an asset event, or whenever our asset is materialised, we will see an event.
      • This view defines the dependencies between our assets and is used to create asset events that trigger downstream DAG’s Or downstream assets.

    • The Xcom View
      • Go to browse and select XCom’s
      • We Get the list of XCOM in our airflow instance.
      • An XCom Is a little box which has a value and that box is created by a task.
      • We can share values between our tasks by creating X com’s.
      • We have a task that creates X com with a value in it.Another task Can pull that XCom To get the value.
      • We have a key, which is the identifier of an XCom Of that box.
      • We have the DAG, the DAG That created that XCom.
      • The run ID is the unique identifier of the run of the DAG that created the XCom.
      • The task ID defines the name of the task that created that XCom.
      • The map index which is related to an advanced concept called dynamic task mapping.
      • The last column shows the value of XCom.
    • The variable view
      • The variable view can be accessed from the admin menu.
      • A Variable is useful when you want to share a value across multiple DAG’s Because we can pull our variables From our DAG’s
      • If we want to share an API endpoint We create a variable.
      • Click on add variable, add a key, which is unique identifier, Add value, add description and save.
      • Now we have the variable with key name with the following value that will be available across all of our DAG’s
      • If we want to edit the variable We can do so by clicking edit button.
      • We can delete variable using delete button.
    • The providers view
      • Each time we want to interact with an external tool, an external service, we need to install corresponding provider.
      • A provider Is a python package that allows you to extend functionalities Of your airflow instance So we can interact with some specific services and tools.
      • For example, the package provider Amazon allows you to interact with the Amazon web services, for example S3, redshift etc.
      • We can also see the version of that provider.
      • So whenever we want to interact with the external service or tool, we have to install the corresponding provider.
    • The connection view
      • To interact with a tool or service, we need to create a connection. These connections can be found in connection view which can be found in admin and then clicking on connections.
      • We can add a new connection, add connection ID, which is the unique identify of our connection.
      • To interact with a particular service, we provide the connection type.
      • If we don’t see a particular connection type, for example, AWS, then this means our provider is not properly installed.
      • Next fill in the specific fields required for that particular service for which connection is being created.
    • From the user menu We can switch to light mode/dark mode
    • We can default to graph view When we click on our DAG.
      • This is good if we prefer the graph view.
    • We can change the display time on the UI if we want.
  • Example data pipeline flow
    • Create table
      • Create a table in Postgres Database in our data pipeline. We will interact with Postgres database Using airflow.
      • Creating a task To create a table In Postgres Database
        • Earlier we used to use Postgres Operator.
        • An Operator is a template For a predefined task, That you can just define declaratively inside your DAG.
        • We used Postgres Operator to run a sql query.
        • Now we use the SQLExecuteQueryOperator Which is a generic operator For any SQL database.
        • Now we don’t use database specific operators.
        • Import the operator in your DAG file.
        • Add it to your DAG method and pass arguments like task ID, connection, ID, SQL.
        • For connection ID Create a connection in your DAG UI With same connection ID.
        • Go to admin and then connections
          • Click add connection.
          • Provide a connection ID
          • Select connection type
          • Fill Host, username, password, port, and database.
          • If you are unable to see necessary connection type. You may need to install a connection provider.
          • For example, for Postgres Use following command
            • &
          • For My sequel, use following command
            • &
        • Airflow is built in a modular way, which means the cord of air flow provides core scheduler Functionality, which allows you to write some basic tasks.
        • Capabilities of air flow can be extended by installing additional packages called as providers.
        • Select the provider on registry.astronomer.io And click use provider to install it.
    • Is API available
      • We will check if the API is available or not. We will use a Special task here.
      • Create a dummy API.
      • We can use a sensor operator, which allows us to wait for a condition To be true Before executing the next task.
        • We are waiting for a file or we are waiting for an API to be available. We can use a sensor for that.
        • A sensor checks at regular intervals of time if a condition is true or not.
        • If the condition is true Sensor succeeds else after a while Sensor fails.
      • Import a task decorator to create a task.
      • From sensor library, import PokeReturn value.
      • Create a new task to check if the API is available or not.
      • Use @task.sensor decorator
        • This indicates the python function we will be using is a sensor.
        • @task Is a decorator, which corresponds To a python operator. If we want to execute python code, we add this decorator.
        • Task sensor means that the python code we decorate is a sensor.
        • Show a special operator that waits for a condition to be true To succeed.
        • The function should return a PokeReturn value.
      • Sensor has two parameters
        • Poke_interval
          • Interval after which condition is checked.
        • timeout
          • Time after which sensor is failed.
      • PokeReturn Value is returned with parameters
        • Is_done which equals condition.
        • xcom_ value, which means fake value
    • Extract user
      • We will extract a user from that API.
    • Process user
      • we will process the user
    • Store user
      • The processed user is stored in Postgres
  • Defining a DAG
    • The very first step to create a data pipeline in workflow is to define a DAG.
      • Create a python file in DAG folder.
      • DAG file is a python file.
      • Define imports
        • Import DAG decorator from airflow.sdk.
          • We use decorator @dag To define the processing function of our DAG.
          • The function is placed below it.
          • The function name is the unique Identifier for each DAG.
          • This must be unique for all DAG’s We have in our airflow instance.
          • A decorator in python takes another function as an argument and extends its behaviour without explicitly, modifying it and returns the modified function.
          • This decorator changes the default behaviour of python function to become a DAG in airflow.
        • Within this function, we define the tasks.
        • Call the DAG function at the end of the file so that it shows up in the airflow UI.
  • Test, run your task
    • To test run a task We use a special command in airflow command line interface.
    • Whenever we call An operator, a task is returned.
    • We can test run a task using airflow task test DAG_name Task_name.
  • References

Comments

Popular posts from this blog