Skip to main content

Home

Monitoring Airflow workflows

This topic describes how to set up Unravel Server to monitor Airflow workflows so you can see them in Unravel Web UI.

Important

Due to an Airflow bug in v1.10.0, Unravel only supports v1.10.1+, not v1.10.0.

Note

Before you start, ensure the Unravel Server host and the server that runs the Airflow web service are in the same cluster.

All the following steps are on the Unravel Server host that runs the unravel_jcs2 daemon.

Connecting to your Airflow Web UI
  1. In /usr/local/unravel/etc/unravel.properties, update or add these properties:

    HTTP for Airflow Web UI access

    If your Airflow Web UI uses HTTP, set these properties:

    com.unraveldata.airflow.protocol=http
    com.unraveldata.airflow.server.url=airflow-ui-url
    com.unraveldata.airflow.available=true
    

    HTTPS for Airflow Web UI access

    If your Airflow Web UI uses HTTPS, set these properties:

    com.unraveldata.airflow.server.url=airflow-ui-url
    com.unraveldata.airflow.available=true
    com.unraveldata.airflow.login.name=airflow-ui-userame
    com.unraveldata.airflow.login.password=airflow-ui-password
    
  2. Restart unravel_jcs2daemon.

    sudo /etc/init.d/unravel_jcs2 restart
Changing the Monitoring Window

By default, Unravel Server ingests all the workflows that started within the last five (5) days. You change the date range to the last X days.

  1. Open /usr/local/unravel/etc/unravel.properties and update the following property. If you can't find it, add it. Note there’s a “-” (minus sign) in the value.

    airflow.look.back.num.days=-X
  2. Restart the unravel_jcs2daemon.

    sudo /etc/init.d/unravel_jcs2 restart
Enabling AirFlow

Here is a sample script, spark-test.py, for Spark.

spark-test.py

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
import subprocess


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

In Airflow, workflows are represented by directed acyclic graphs (DAGs). For example,

dag = DAG('spark-test', default_args=default_args)
  1. Add hooks for Unravel instrumentation.

    The following script, example-hdp-client.sh, adds hooks for Unravel instrumentation by setting three Unravel-specific configuration parameters for Spark applications.

    • spark.driver.extraJavaOptions

    • spark.executor.extraJavaOptions

    • spark.unravel.server.hostport

    We recommend setting these parameters on a per-application basis only when you want to monitor/profile certain applications, rather than all the applications running in the cluster. Alternatively, you can specify these parameters in spark-defaults.conf.

    This script can be invoked to submit an Airflow Spark application via spark-submit. It references the following variables, which need to be changed to values valid for your local environment.

    • PATH_TO_SPARK_EXAMPLE_JAR=/usr/hdp/2.3.6.0-3796/spark/lib/spark-examples-*.jar

    • UNRAVEL_SERVER_IP_PORT=10.20.30.40:4043

    • SPARK_EVENT_LOG_DIR=hdfs://ip-10-0-0-21.ec2.internal:8020/user/ec2-user/eventlog

    example-hdp-client.sh

    hdfs dfs -rmr pair.parquet
    spark-submit \
    --class org.apache.spark.examples.sql.RDDRelation \
    --master yarn-cluster \
    --conf "spark.driver.extraJavaOptions=-javaagent:/usr/local/unravel_client/btrace-agent.jar=unsafe=true,stdout=false,noServer=true,startupRetransform=false,bootClassPath=/usr/local/unravel_client/unravel-boot.jar,systemClassPath=/usr/local/unravel_client/unravel-sys.jar,scriptOutputFile=/dev/null,script=DriverProbe.class:SQLProbe.class -Dcom.sun.btrace.FileClient.flush=-1 -Dcom.unraveldata.spark.sensor.disableLiveUpdates=true" \
    --conf "spark.executor.extraJavaOptions=-javaagent:/usr/local/unravel_client/btrace-agent.jar=unsafe=true,stdout=false,noServer=true,startupRetransform=false,bootClassPath=/usr/local/unravel_client/unravel-boot.jar,systemClassPath=/usr/local/unravel_client/unravel-sys.jar,scriptOutputFile=/dev/null,script=ExecutorProbe.class -Dcom.sun.btrace.FileClient.flush=-1" \
    --conf "spark.unravel.server.hostport=$UNRAVEL_SERVER_IP_PORT" \
    --conf "spark.eventLog.dir=$SPARK_EVENT_LOG_DIR" \
    --conf "spark.eventLog.enabled=true" \
    $PATH_TO_SPARK_EXAMPLE_JAR
  2. Submit the workflow.

    Operators (also called tasks) determine execution order (dependencies). In the example below, t1 and t2 are operators created by BashOperator or PythonOperator. They invoke example-hdp-client.sh, that submits the workflow for execution.

    Note

    Note: The path name of example-hdp-client.sh is relative to the current directory, not to ~/airflow/dags as in the Airflow operator above.

    t1 = BashOperator(
    task_id='example-hdp-client',
    bash_command="example-scripts/example-hdp-client.sh",
    retries=3,
    dag=dag)
    
    
    def spark_callback(**kwargs):
    sp
     = subprocess.Popen(['/bin/bash', 
    'airflow/dags/example-scripts/example-hdp-client.sh'], 
    stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    print sp.stdout.read()
    
    
    t2 = PythonOperator(
    task_id='example-python-call',
    provide_context=True,
    python_callable=spark_callback,
    retries=1,
    dag=dag)
    
    
    t2.set_upstream(t1)
    

    Tip

    You can test the operators first. For example, in Airflow:

    airflow test spark-test example-python-call