This blog covers how to call IICS mapping tasks using a third party scheduler. I will be using apache airflow as the the third party scheduler and use IICS Rest API’s to trigger the mapping tasks.
For this blog imagine a scenario where we have 3 CDI tasks and 1 CDI-Elastic tasks and there is a need to execute the CDI tasks in parallel and upon successful completion of the CDI tasks trigger the CDI-Elastic task.
For more details on CDI-Elastic please visit this page Big Data Integration Scalability? Get Cloud Data Integration Elastic | Informatica
Apache Airflow overview
Airflow is a platform to programmatically author, schedule and monitor workflows.
Airflow is not a data streaming solution. Tasks do not move data from one to the other (though tasks can exchange metadata!). Airflow is not in the Spark Streaming or Storm space, it is more comparable to Oozie or Azkaban.
Generally, Airflow works in a distributed environment, as you can see in the diagram below. The airflow scheduler schedules jobs according to the dependencies defined in directed acyclic graphs (DAGs), and the airflow workers pick up and run jobs with their loads properly balanced. All job information is stored in the meta DB, which is updated in a timely manner. The users can monitor their jobs via a shiny Airflow web UI and/or the logs.

This blog Informatica DataEngineering jobs execution using Apache Airflow has detailed steps on installing and configuring apache airflow.
Creating a DAG for IICS Mapping Tasks
The DAG code is written in such a way that it dynamically creates the airflow tasks (DAG is equivalent of taskflow in IICS)
In the below code under IICS parameter start section update the code with the iics org username and password. The password can be encrypted by using airflow variables.Please refer to airflow documentation for details on airflow variables.
On line 26 provide list of all CDI tasks and on line 27 provide the list of CDI-Elastic tasks. The code will dynamically parse the list of tasks and it will create the dependency in such a way that all CDI tasks will run in parallel and upon successful completion of CDI tasks the CDI-Elastic tasks will be triggered.
The code not only triggers the IICS mapping tasks but also retrieves the task log for every run to be viewed through airflow web UI.
For the Demo we have 3 CDI tasks and 1 CDI-Elastic task.
- Task_Items, Task_Store_Sales, Task_Date_Dim can run in parallel
- Upon successful completion of the above tasks Total_Store_Sales_IWDEMO will be triggered

""
## Sample Airflow DAG to trigger IICS mappings
Data Integration TaskType , use one of the following codes:
DMASK. Masking task.
DRS. Replication task.
DSS. Synchronization task.
MTT. Mapping task.
PCS. PowerCenter task.
"""
import json
import sys
import time
from datetime import datetime, timedelta
import requests
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
######### IICS Parameters Start ##########
iics_username = "iicsuser_name"
iics_password = "iics_password"
task_type = 'MTT'
base_url = "https://dm-us.informaticacloud.com/ma/api/v2/user/login"
CDI_task_name = ["Task_Date_Dim", "Task_Items", "Task_Store_Sales"]
CDI_E_task_name = ["Task_Total_Store_Sales_IWDEMO"]
######### IICS Parameters End ##########
# Airflow Parameters -- these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'infa',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'start_date': datetime.now() - timedelta(seconds=10),
'schedule': '@daily'
}
def get_session_id(un, pw):
session_id = ''
data = {'@type': 'login', 'username': un, 'password': pw}
url = base_url
headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
r = requests.post(url, data=json.dumps(data), headers=headers)
# print('Session Id API Response Status Code: ' + str(r.status_code))
if r.status_code == 200:
session_id = r.json()["icSessionId"]
server_url = r.json()["serverUrl"]
# print('Session Id: ' + session_id)
else:
print('API call failed:')
print(r.headers)
print(r.json())
sys.exit(1)
return session_id, server_url
def start_job(session_id, server_url, taskname, taskType):
'''Use Session Id and Server URL from the user login API
and start the specified job'''
job_start_url = server_url + "/api/v2/job"
headers = {'Content-Type': 'application/json'
, 'icSessionId': session_id, 'Accept': 'application/json'}
data = {'@type': 'job', 'taskName': taskname, 'taskType': taskType}
r = requests.post(job_start_url, data=json.dumps(data), headers=headers)
if r.status_code == 200:
response_content = json.loads(r.text)
taskid = response_content['taskId']
runid = response_content['runId']
tname = response_content['taskName']
print("Job " + taskname + " has been successfully started")
return taskid, runid, tname
else:
print('Job failed to start with status: ' + str(r.status_code))
print(r.content)
def get_status(server_url, session_id):
job_activity_url = server_url + "/api/v2/activity/activityMonitor"
headers = {'Content-Type': 'application/json', 'icSessionId': session_id, 'Accept': 'application/json'}
r = requests.get(job_activity_url, headers=headers)
if r.status_code == 200:
response_content = json.loads(r.text)
for obj in response_content:
tn = obj['taskName']
tid = obj['taskId']
exec_state = obj['executionState']
rid = obj['runId']
print("Status of job " + tn + " is " + exec_state)
return tid, exec_state, tn, rid
else:
print('Failed to get activity monitor : ' + str(r.status_code))
print(r.content)
def execute_task(task_name):
username = iics_username
password = iics_password
login_response = get_session_id(username, password)
session_id = login_response[0]
server_url = login_response[1]
start_job(session_id, server_url, task_name, task_type)
log_url = server_url + "/api/v2/activity/activityLog/"
headers = {'Content-Type': 'application/json', 'icSessionId': session_id, 'Accept': 'application/json'}
task_status = get_status(server_url, session_id)
task_id = task_status[0]
run_id = task_status[3]
while True:
status = {"RUNNING", "INITIALIZED", "STOPPING", "QUEUED"}
time.sleep(15)
new_status = get_status(server_url, session_id)
if new_status is None:
url = log_url + "?taskId=" + task_id + "&runId=" + str(run_id)
r = requests.get(url, headers=headers)
response_content = json.loads(r.text)
for obj in response_content:
t_id = obj['id']
task_log = requests.get(log_url + t_id + "/sessionLog", headers=headers)
print(task_log.text)
break
# Airflow DAG
dag = DAG(
'IICS_Airflow_Demo',
default_args=default_args,
description='A Sample IICS Airflow DAG')
cdi_start = DummyOperator(
task_id='cdi_start',
dag=dag
)
cdi_end = DummyOperator(
task_id='cdi_end',
dag=dag)
for i in CDI_task_name:
cdi_task = PythonOperator(
task_id='IICS_CDI_' + i,
python_callable=execute_task,
op_kwargs={'task_name': i},
dag=dag)
cdi_start >> cdi_task >> cdi_end
for j in CDI_E_task_name:
cdi_e_task = PythonOperator(
task_id='IICS_CDI_E_' + j,
python_callable=execute_task,
op_kwargs={'task_name': j},
dag=dag)
cdi_end >> cdi_e_task
Save the above code as inside as IICS_Airflow_Sample.py under /opt/infa/airflow/dags folder.
Restart the airflow webserver and the IICS_Airflow_Demo DAG will appear in the list of DAG’s
Click on the DAG and go to Graph View.

Run the DAG and you will see the status of the DAG’s running in the Airflow UI as well as the IICS monitor. In the IICS monitor task details you can see the job is triggered via IICS rest API.

To view session logs,in the airflow Web UI click on any task run and click the “view Log” button to retrieve mapping details and session log.


