Home

Monitoring Airflow workflows

This topic describes how to set up Unravel Server to monitor Airflow workflows so you can see them on Unravel Web UI.

Important

Due to an Airflow bug in v1.10.0, Unravel only supports v1.10.1+, not v1.10.0.

Note

Before you start, ensure the Unravel Server host and the server that runs the Airflow web service are in the same cluster.

All the following steps are on the Unravel Server host that runs the unravel_jcs2 daemon.

Connecting to your Airflow Web UI
  1. Stop Unravel

    <Unravel installation directory>/unravel/manager stop
    
  2. Set the following properties:

    HTTP for Airflow Web UI access

    If your Airflow Web UI uses HTTP, set these properties:

    <Unravel installation directory>/unravel/manager config properties set com.unraveldata.airflow.protocol http
    <Unravel installation directory>/unravel/manager config properties set com.unraveldata.airflow.server.url <airflow-ui-url>
    <Unravel installation directory>/unravel/manager config properties set com.unraveldata.airflow.available true
    

    HTTPS for Airflow Web UI access

    If your Airflow Web UI uses HTTPS, set these properties:

    <Unravel installation directory>/unravel/manager config properties set com.unraveldata.airflow.server.url airflow-ui-url
    <Unravel installation directory>/unravel/manager config properties set com.unraveldata.airflow.available true
    <Unravel installation directory>/unravel/manager config properties set com.unraveldata.airflow.login.name airflow-ui-userame
    <Unravel installation directory>/unravel/manager config properties set com.unraveldata.airflow.login.password airflow-ui-password

    Refer to Airflow properties for the complete list.

  3. Apply the changes.

    <Unravel installation directory>/unravel/manager config apply
    
  4. Start Unravel

    <Unravel installation directory>/unravel/manager start
Changing the Monitoring Window

By default, Unravel Server ingests all the workflows that started within the last five (5) days. You change the date range to the last X days.

  1. Stop Unravel

    <Unravel installation directory>/unravel/manager stop
    
  2. Set the following properties:

    <Unravel installation directory>/unravel/manager config properties airflow.look.back.num.days -X
    
  3. Apply the changes.

    <Unravel installation directory>/unravel/manager config apply
    
  4. Start Unravel

    <Unravel installation directory>/unravel/manager start
Enabling AirFlow

Here is a sample script, spark-test.py, for Spark.

spark-test.py

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
import subprocess


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

In Airflow, workflows are represented by directed acyclic graphs (DAGs). For example,

dag = DAG('spark-test', default_args=default_args)
  1. Add hooks for Unravel instrumentation.

    The following script, example-hdp-client.sh, adds hooks for Unravel instrumentation by setting three Unravel-specific configuration parameters for Spark applications.

    • spark.driver.extraJavaOptions

    • spark.executor.extraJavaOptions

    • spark.unravel.server.hostport

    We recommend setting these parameters on a per-application basis only when you want to monitor/profile certain applications, rather than all the applications running in the cluster. Alternatively, you can specify these parameters in spark-defaults.conf.

    This script can be invoked to submit an Airflow Spark application via spark-submit. It references the following variables, which need to be changed to values valid for your local environment.

    • PATH_TO_SPARK_EXAMPLE_JAR=/usr/hdp/2.3.6.0-3796/spark/lib/spark-examples-*.jar

    • UNRAVEL_SERVER_IP_PORT=10.20.30.40:4043

    • SPARK_EVENT_LOG_DIR=hdfs://ip-10-0-0-21.ec2.internal:8020/user/ec2-user/eventlog

    example-hdp-client.sh

    hdfs dfs -rmr pair.parquet
    spark-submit \
    --class org.apache.spark.examples.sql.RDDRelation \
    --master yarn-cluster \
    --conf "spark.driver.extraJavaOptions=-javaagent:/usr/local/unravel_client/btrace-agent.jar=unsafe=true,stdout=false,noServer=true,startupRetransform=false,bootClassPath=/usr/local/unravel_client/unravel-boot.jar,systemClassPath=/usr/local/unravel_client/unravel-sys.jar,scriptOutputFile=/dev/null,script=DriverProbe.class:SQLProbe.class -Dcom.sun.btrace.FileClient.flush=-1 -Dcom.unraveldata.spark.sensor.disableLiveUpdates=true" \
    --conf "spark.executor.extraJavaOptions=-javaagent:/usr/local/unravel_client/btrace-agent.jar=unsafe=true,stdout=false,noServer=true,startupRetransform=false,bootClassPath=/usr/local/unravel_client/unravel-boot.jar,systemClassPath=/usr/local/unravel_client/unravel-sys.jar,scriptOutputFile=/dev/null,script=ExecutorProbe.class -Dcom.sun.btrace.FileClient.flush=-1" \
    --conf "spark.unravel.server.hostport=$UNRAVEL_SERVER_IP_PORT" \
    --conf "spark.eventLog.dir=$SPARK_EVENT_LOG_DIR" \
    --conf "spark.eventLog.enabled=true" \
    $PATH_TO_SPARK_EXAMPLE_JAR
  2. Submit the workflow.

    Operators (also called tasks) determine execution order (dependencies). In the example below, t1 and t2 are operators created by BashOperator or PythonOperator. They invoke example-hdp-client.sh, that submits the workflow for execution.

    Note

    Note: The path name of example-hdp-client.sh is relative to the current directory, not to ~/airflow/dags as in the Airflow operator above.

    t1 = BashOperator(
    task_id='example-hdp-client',
    bash_command="example-scripts/example-hdp-client.sh",
    retries=3,
    dag=dag)
    
    
    def spark_callback(**kwargs):
    sp
     = subprocess.Popen(['/bin/bash', 
    'airflow/dags/example-scripts/example-hdp-client.sh'], 
    stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    print sp.stdout.read()
    
    
    t2 = PythonOperator(
    task_id='example-python-call',
    provide_context=True,
    python_callable=spark_callback,
    retries=1,
    dag=dag)
    
    
    t2.set_upstream(t1)
    

    Tip

    You can test the operators first. For example, in Airflow:

    airflow test spark-test example-python-call