PrCore Documentation
AI Assistant GitHub Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Stream Data Example

Here you can find a Python script that automatically handles the uploading of data, defining the project, triggering the simulation, and getting the results.

flowchart TB
    upload(Upload the event log file) --> set(Set the columns definition)
    set --> create(Create the project)
    create --> get(Get the project status)
    get --> start(Start the simulation)
    start --> read(Read the simulation result)

Prerequisites

Before you start, make sure you have the following packages installed:

python3 -m venv ./venv
./venv/bin/pip install requests sseclient-py

Example script

You can also download the script from here.

According to the script, uploaded event log should be this one, unless you manually modify the columns definition, etc.

Please change the EVENT_LOG_FILE to get the correct path to your local event log file.

Warning
The outcome and treatment of the project defined in exmaple script is only for demonstration purposes. They don’t represent the actual outcome and treatment of the domain, so you can modify them to fit your needs.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import json
import pprint
import requests
import sseclient
from requests import Response
from time import sleep

# Change this to your own event log file
EVENT_LOG_FILE = "./bpic2012-CSV.zip"

BASE_URL = "http***********"  # Please change this to your local instance address
API_TOKEN = "UaJW0QvkMA1cVnOXB89E0NbLf3JRRoHwv2wWmaY5v=QYpaxr1UD9/FupeZ85sa2r"
HEADERS = {
    "Authorization": f"Bearer {API_TOKEN}"
}
REQUEST_HEADERS = {
    "Authorization": f"Bearer {API_TOKEN}",
    "Content-Type": "application/json"
}
PROJECT_ID = None


def upload_file(file_path) -> Response:
    # Upload a file to the server.
    url = f"{BASE_URL}/event_log"
    files = [
        ("file", ("bpic2012-CSV.zip", open(file_path, "rb"), "application/zip"))
    ]
    response = requests.post(url, files=files, headers=HEADERS, data={"separator": ","})
    return response


def set_columns_definition(event_log_id) -> Response:
    # Set the columns definition for the uploaded file.
    url = f"{BASE_URL}/event_log/{event_log_id}"
    data = {
        "columns_definition": {
            "Case ID": "CASE_ID",
            "start_time": "START_TIMESTAMP",
            "end_time": "END_TIMESTAMP",
            "AMOUNT_REQ": "NUMBER",
            "REG_DATE": "DATETIME",
            "Activity": "ACTIVITY",
            "Resource": "RESOURCE"
        }
    }
    response = requests.put(url, json=data, headers=REQUEST_HEADERS)
    return response


def create_project(event_log_id) -> Response:
    # Create a project with the definition
    url = f"{BASE_URL}/project"
    data = {
        "event_log_id": event_log_id,
        "positive_outcome": [
            [
                {
                    "column": "Activity",
                    "operator": "EQUAL",
                    "value": "A_APPROVED"
                }
            ]
        ],
        "treatment": [
            [
                {
                    "column": "Activity",
                    "operator": "EQUAL",
                    "value": "O_SENT_BACK"
                }
            ]
        ],
        "additional_info": {
            "plugin-causallift-resource-allocation": {
                "available_resources": ["Resource_A", "Resource_B", "Resource_C", "Resource_D", "Resource_E", "Resource_F", "Resource_G", "Resource_H", "Resource_I", "Resource_J", "Resource_K", "Resource_L", "Resource_M", "Resource_N", "Resource_O", "Resource_P", "Resource_Q", "Resource_R", "Resource_S", "Resource_T", "Resource_U", "Resource_V", "Resource_W", "Resource_X", "Resource_Y", "Resource_Z"],
                "treatment_duration": "1h"
            }
        }
    }
    response = requests.post(url, json=data, headers=REQUEST_HEADERS)
    return response


def get_project(project_id) -> Response:
    # Get the project definition
    url = f"{BASE_URL}/project/{project_id}"
    response = requests.get(url, headers=HEADERS)
    return response


def start_simulation(project_id) -> Response:
    # Start the simulation
    url = f"{BASE_URL}/project/{project_id}/stream/start/simulating"
    response = requests.put(url, headers=HEADERS)
    return response


def stop_simulation(project_id) -> Response:
    # Stop the simulation
    url = f"{BASE_URL}/project/{project_id}/stream/stop"
    response = requests.put(url, headers=HEADERS)
    response.raise_for_status()
    print("The simulation has been stopped!")
    return response


def printing_streaming_response(project_id):
    # Get a streaming response for the given event feed using sseclient.
    response = requests.get(f"{BASE_URL}/project/{project_id}/stream/result", stream=True, headers=HEADERS)
    client = sseclient.SSEClient(response)

    print("Waiting for events...")

    for event in client.events():
        if event.event != "message":
            continue

        event_data = json.loads(event.data)
        first_event = event_data[0]
        prescriptions = first_event["prescriptions"]
        prescriptions_with_output = [prescriptions[p] for p in prescriptions if prescriptions[p]["output"]]

        if not prescriptions_with_output:
            continue

        print(f"Received message: {event.event}")
        print(f"ID: {event.id}")

        pprint.pprint(prescriptions_with_output, width=120)

        print("-" * 24)



def main():
    print("\nStaring the client...\n")

    try:
        # Upload the event log file
        print("Uploading the event log file...")
        response = upload_file(EVENT_LOG_FILE)
        response.raise_for_status()
        event_log_id = response.json()["event_log_id"]
        print(f"Event log {event_log_id} has been uploaded!\n")

        # Set the columns definition
        print("Setting the columns definition...")
        response = set_columns_definition(event_log_id)
        response.raise_for_status()
        print("The columns definition has been set!\n")

        # Create the project
        print("Creating the project...")
        response = create_project(event_log_id)
        response.raise_for_status()
        project_id = response.json()["project"]["id"]
        PROJECT_ID = project_id
        print(f"Project {project_id} has been created!\n")

        # Get the project status
        print("Getting the project status...")
        i = 1
        while True:
            response = get_project(project_id)
            project_status = response.json()["project"]["status"]
            if project_status == "TRAINED":
                break
            plugins = response.json()["project"]["plugins"]
            if plugins:
                plugin_statuses = ", ".join([plugin["status"] for plugin in plugins])
                print(f"[{i:03d}] Now the project status is {project_status}, and its plugins have statuses {plugin_statuses}")
            else:
                print(f"[{i:03d}] Now the project status is {project_status}")
            sleep(1)
            i += 1
        print("The project has been trained!\n")

        # Start the simulation
        print("Starting the simulation...")
        response = start_simulation(project_id)
        response.raise_for_status()
        print("The simulation has been started!\n")

        # Get the streaming response
        print("Now we are going to get the streaming response...")
        printing_streaming_response(project_id)
    except KeyboardInterrupt:
        print("Interrupted by user\n")
    except Exception as e:
        print(f"Error: {e}\n")
    finally:
        PROJECT_ID and stop_simulation(PROJECT_ID)

    print("Done!\n")


if __name__ == "__main__":
    main()

Running the script

To run the script, simply execute the following command:

./venv/bin/python workflow-example.py

Example output

Here is an snippet of the output of the script:

Staring the client...

Uploading the event log file...
Event log 17 has been uploaded!

Setting the columns definition...
The columns definition has been set!

Creating the project...
Project 16 has been created!

Getting the project status...
Now the project status is PREPROCESSING. Waiting for 5 seconds...
Now the project status is PREPROCESSING. Waiting for 5 seconds...
Now the project status is PREPROCESSING. Waiting for 5 seconds...
Now the project status is PREPROCESSING. Waiting for 5 seconds...
Now the project status is PREPROCESSING. It's plugins have status PREPROCESSING, PREPROCESSING, TRAINED. Waiting for 5 seconds...
Now the project status is PREPROCESSING. It's plugins have status PREPROCESSING, PREPROCESSING, TRAINED. Waiting for 5 seconds...
Now the project status is PREPROCESSING. It's plugins have status PREPROCESSING, TRAINED, TRAINED. Waiting for 5 seconds...
Now the project status is PREPROCESSING. It's plugins have status PREPROCESSING, TRAINED, TRAINED. Waiting for 5 seconds...
Now the project status is TRAINING. It's plugins have status TRAINING, TRAINED, TRAINED. Waiting for 5 seconds...
Now the project status is TRAINING. It's plugins have status TRAINING, TRAINED, TRAINED. Waiting for 5 seconds...
The project has been trained!

Starting the simulation...
The simulation has been started!

Now we are going to get the streaming response...
Waiting for events...
Received message: NEW_RESULT
ID: 365
[{'date': '2023-02-14T10:03:25.053610',
  'output': 'W_Completeren aanvraag',
  'plugin': {'accuracy': 0.5669,
             'f1_score': 0.5115,
             'model': 3,
             'name': 'KNN next activity prediction',
             'precision': 0.936,
             'recall': 0.5669},
  'type': 'NEXT_ACTIVITY'},
 {'date': '2023-02-14T10:03:25.064882',
  'output': 0.6355,
  'plugin': {'accuracy': 0.6527,
             'f1_score': 0.5155,
             'model': 3,
             'name': 'Random forest negative outcome probability',
             'precision': 0.7733,
             'recall': 0.6527},
  'type': 'ALARM'},
 {'date': '2023-02-14T10:03:25.682665',
  'output': {'cate': 0.6872,
             'proba_if_treated': 0.6873,
             'proba_if_untreated': 0.0001,
             'treatment': [[{'column': 'Activity', 'operator': 'EQUAL', 'value': 'O_SENT_BACK'}]]},
  'plugin': {'model': 3, 'name': 'CasualLift treatment effect'},
  'type': 'TREATMENT_EFFECT'}]
------------------------
Received message: NEW_RESULT
ID: 371
[{'date': '2023-02-14T10:03:55.610696',
  'output': 'W_Afhandelen leads',
  'plugin': {'accuracy': 0.5669,
             'f1_score': 0.5115,
             'model': 3,
             'name': 'KNN next activity prediction',
             'precision': 0.936,
             'recall': 0.5669},
  'type': 'NEXT_ACTIVITY'},
 {'date': '2023-02-14T10:03:55.622616',
  'output': 0.6406,
  'plugin': {'accuracy': 0.6527,
             'f1_score': 0.5155,
             'model': 3,
             'name': 'Random forest negative outcome probability',
             'precision': 0.7733,
             'recall': 0.6527},
  'type': 'ALARM'},
 {'date': '2023-02-14T10:03:56.230732',
  'output': {'cate': 0.6828,
             'proba_if_treated': 0.6829,
             'proba_if_untreated': 0.0001,
             'treatment': [[{'column': 'Activity', 'operator': 'EQUAL', 'value': 'O_SENT_BACK'}]]},
  'plugin': {'model': 3, 'name': 'CasualLift treatment effect'},
  'type': 'TREATMENT_EFFECT'}]
------------------------
Received message: NEW_RESULT
ID: 372
[{'date': '2023-02-14T10:04:00.686650',
  'output': 'W_Completeren aanvraag',
  'plugin': {'accuracy': 0.6769,
             'f1_score': 0.555,
             'model': 4,
             'name': 'KNN next activity prediction',
             'precision': 0.782,
             'recall': 0.6769},
  'type': 'NEXT_ACTIVITY'},
 {'date': '2023-02-14T10:04:00.700571',
  'output': 0.6455,
  'plugin': {'accuracy': 0.6468,
             'f1_score': 0.5081,
             'model': 4,
             'name': 'Random forest negative outcome probability',
             'precision': 0.7716,
             'recall': 0.6468},
  'type': 'ALARM'},
 {'date': '2023-02-14T10:04:01.397980',
  'output': {'cate': 0.679,
             'proba_if_treated': 0.6791,
             'proba_if_untreated': 0.0001,
             'treatment': [[{'column': 'Activity', 'operator': 'EQUAL', 'value': 'O_SENT_BACK'}]]},
  'plugin': {'model': 4, 'name': 'CasualLift treatment effect'},
  'type': 'TREATMENT_EFFECT'}]
------------------------
Received message: NEW_RESULT
ID: 373
[{'date': '2023-02-14T10:04:05.760306',
  'output': 'W_Completeren aanvraag',
  'plugin': {'accuracy': 0.6769,
             'f1_score': 0.555,
             'model': 4,
             'name': 'KNN next activity prediction',
             'precision': 0.782,
             'recall': 0.6769},
  'type': 'NEXT_ACTIVITY'},
 {'date': '2023-02-14T10:04:05.771450',
  'output': 0.63,
  'plugin': {'accuracy': 0.6468,
             'f1_score': 0.5081,
             'model': 4,
             'name': 'Random forest negative outcome probability',
             'precision': 0.7716,
             'recall': 0.6468},
  'type': 'ALARM'},
 {'date': '2023-02-14T10:04:06.475218',
  'output': {'cate': 0.6873,
             'proba_if_treated': 0.6874,
             'proba_if_untreated': 0.0001,
             'treatment': [[{'column': 'Activity', 'operator': 'EQUAL', 'value': 'O_SENT_BACK'}]]},
  'plugin': {'model': 4, 'name': 'CasualLift treatment effect'},
  'type': 'TREATMENT_EFFECT'}]
------------------------
Interrupted by user

Done!

The example above is tested with the BPI Challenge 2012 dataset.