MLflow Model Registry Webhooks REST API Example(Python)

Loading...

MLflow Model Registry Webhooks REST API Example

Install MLflow and setup Python-REST API Wrapper

%pip install mlflow
# This Python wrapper facilitates the use of webhooks by encapsulating the REST API calls.
 
import mlflow
import urllib
import json
 
class HttpClient:
  def __init__(self, base_url, token):
    self.base_url = base_url
    self.token = token
  
  def createWebhook(self, request):
    return self._post('api/2.0/mlflow/registry-webhooks/create', request)
 
  def updateWebhook(self, request):
    return self._patch('api/2.0/mlflow/registry-webhooks/update', request)
 
  def listWebhooks(self, request):
    return self._get('api/2.0/mlflow/registry-webhooks/list', request)
 
  def deleteWebhook(self, request):
    return self._delete('api/2.0/mlflow/registry-webhooks/delete', request)
  
  def testWebhook(self, request):
    return self._post('api/2.0/mlflow/registry-webhooks/test', request)
    
  def _get(self, uri, params):
    data = urllib.parse.urlencode(params)
    url = f'{self.base_url}/{uri}/?{data}'
    headers = { 'Authorization': f'Bearer {self.token}'}
 
    req = urllib.request.Request(url, headers=headers)
    response = urllib.request.urlopen(req)
    return json.load(response)
 
  def _post(self, uri, body):
    json_body = json.dumps(body)
    json_bytes = json_body.encode('utf-8')
    headers = { 'Authorization': f'Bearer {self.token}'}
 
    url = f'{self.base_url}/{uri}'
    req = urllib.request.Request(url, data=json_bytes, headers=headers)
    response = urllib.request.urlopen(req)
    return json.load(response)
 
  def _patch(self, uri, body):
    json_body = json.dumps(body)
    json_bytes = json_body.encode('utf-8')
    headers = { 'Authorization': f'Bearer {self.token}'}
 
    url = f'{self.base_url}/{uri}'
    req = urllib.request.Request(url, data=json_bytes, headers=headers, method='PATCH')
    response = urllib.request.urlopen(req)
    return json.load(response)
 
  def _delete(self, uri, body):
    json_body = json.dumps(body)
    json_bytes = json_body.encode('utf-8')
    headers = { 'Authorization': f'Bearer {self.token}'}
 
    url = f'{self.base_url}/{uri}'
    req = urllib.request.Request(url, data=json_bytes, headers=headers, method='DELETE')
    response = urllib.request.urlopen(req)
    return json.load(response)

Create webhooks, test them, and confirm their presence in the list of all webhooks

## SETUP: Fill in variables
TOKEN = '<INSERT YOUR ACCESS TOKEN HERE>'
DOGFOOD_URL = '<INSERT YOUR DATABRICKS URL HERE (e.g. https://my-databricks-workspace.com)'
model_name = '<INSERT YOUR REGISTERED MODEL NAME HERE>'
job_id = 0 # INSERT ID OF PRE-DEFINED JOB
slack_url = '<INSERT YOUR SLACK INCOMING WEBHOOK URL HERE (DIRECTIONS: https://api.slack.com/incoming-webhooks)>' # can also use custom URL endpoint here
httpClient = HttpClient(DOGFOOD_URL, TOKEN)
# Create a Job webhook
job_webhook = httpClient.createWebhook({
  "model_name": model_name,
  "events": ["TRANSITION_REQUEST_CREATED"],
  "status": "ACTIVE",
  "job_spec": {
    "job_id": job_id,
    "workspace_url": DOGFOOD_URL,
    "access_token": TOKEN
  }
})
# Test the Job webhook
httpClient.testWebhook({
  "id": job_webhook['webhook']['id']
})
# Create a webhook that will create alerts about transition requests created
http_webhook = httpClient.createWebhook({
  "model_name": model_name,
  "events": ["TRANSITION_REQUEST_CREATED", "MODEL_VERSION_CREATED", "MODEL_VERSION_TRANSITIONED_STAGE"],
  "status": "ACTIVE",
  "http_url_spec": {"url": slack_url}
})
# Test the HTTP webhook
httpClient.testWebhook({
  "id": http_webhook['webhook']['id']
})
# List all webhooks
webhooks = httpClient.listWebhooks({
  "events": ["MODEL_VERSION_CREATED", "MODEL_VERSION_TRANSITIONED_STAGE", "TRANSITION_REQUEST_CREATED"],
})
webhooks

Create a transition request to trigger webhooks and then clean up webhooks

import mlflow
from mlflow.utils.rest_utils import http_request
import json
def client():
  return mlflow.tracking.client.MlflowClient()
 
host_creds = client()._tracking_client.store.get_host_creds()
def mlflow_call_endpoint(endpoint, method, body='{}'):
  if method == 'GET':
      response = http_request(
          host_creds=host_creds, endpoint="/api/2.0/mlflow/{}".format(endpoint), method=method, params=json.loads(body))
  else:
      response = http_request(
          host_creds=host_creds, endpoint="/api/2.0/mlflow/{}".format(endpoint), method=method, json=json.loads(body))
  return response.json()
# Create a transition request to staging and then approve the request
transition_request_body = {'name': model_name, 'version': 1, 'stage': 'Staging'}
mlflow_call_endpoint('transition-requests/create', 'POST', json.dumps(transition_request_body))
transition_request_body = {'name': model_name, 'version': 1, 'stage': 'Staging', 'archive_existing_versions': 'true'}
mlflow_call_endpoint('transition-requests/approve', 'POST', json.dumps(transition_request_body))
# Delete all webhooks
for webhook in webhooks['webhooks']:
  httpClient.deleteWebhook({"id": webhook['id']})
# Verify webhook deletion
webhooks = httpClient.listWebhooks({
  "events": ["MODEL_VERSION_CREATED", "MODEL_VERSION_TRANSITIONED_STAGE", "TRANSITION_REQUEST_CREATED"],
})
webhooks