Databricks OpenAI LLM Model - Stored Procedure - Part 2

One‑time setup: HTTP connection to your APIM/Azure OpenAI endpoint
 
%sql
CREATE OR REPLACE CONNECTION aoai_apim
TYPE HTTP
OPTIONS (
  host         'https://xxxxxxxxxxxxxxxxxx.a03.azurefd.net',
  port         '443',
  base_path    '/xxxxxdatabricksapps',
  bearer_token secret('digitalAI','digitalAIKey')
 
 
Create the stored procedure

 

CREATE OR REPLACE PROCEDURE _xxxxx_assetdigital_dev.xxxxx.sp_llm_backfill_answers (
    IN max_rows_per_run INT    DEFAULT 50,
    IN default_model    STRING DEFAULT 'gpt-4o',
    IN api_version      STRING DEFAULT '2025-03-01-preview',
    IN secret_scope     STRING DEFAULT 'digitalAI',
    IN secret_key       STRING DEFAULT 'digitalAIKey'
)
LANGUAGE SQL
SQL SECURITY INVOKER
AS
BEGIN
  --------------------------------------------------------------------
  -- Config & constants
  --------------------------------------------------------------------
  DECLARE table_fqn STRING DEFAULT '_rtio_assetdigital_dev.beltscan.Asset_Digital_AI';
  DECLARE cat STRING DEFAULT split_part(table_fqn, '.', 1);
  DECLARE sch STRING DEFAULT split_part(table_fqn, '.', 2);
  DECLARE tbl STRING DEFAULT split_part(table_fqn, '.', 3);

 

  --------------------------------------------------------------------
  -- Ensure required answer columns exist (idempotent)
  --------------------------------------------------------------------
  IF NOT EXISTS (
    SELECT 1 FROM system.information_schema.columns
     WHERE table_catalog = cat AND table_schema = sch AND table_name = tbl
       AND column_name = 'answer'
  ) THEN
    EXECUTE IMMEDIATE 'ALTER TABLE ' || table_fqn || ' ADD COLUMNS (answer STRING)';
  END IF;

 

  IF NOT EXISTS (
    SELECT 1 FROM system.information_schema.columns
     WHERE table_catalog = cat AND table_schema = sch AND table_name = tbl
       AND column_name = 'answer_key'
  ) THEN
    EXECUTE IMMEDIATE 'ALTER TABLE ' || table_fqn || ' ADD COLUMNS (answer_key STRING)';
  END IF;

 

  IF NOT EXISTS (
    SELECT 1 FROM system.information_schema.columns
     WHERE table_catalog = cat AND table_schema = sch AND table_name = tbl
       AND column_name = 'answer_datetime'
  ) THEN
    EXECUTE IMMEDIATE 'ALTER TABLE ' || table_fqn || ' ADD COLUMNS (answer_datetime TIMESTAMP)';
  END IF;

 

  --------------------------------------------------------------------
  -- Stage: pick pending rows (STRING keys only)
  --------------------------------------------------------------------
  CREATE OR REPLACE TEMP VIEW _pending AS
  SELECT
      query       AS q,
      query_key   AS join_key,
      COALESCE(llm, default_model) AS mdl
  FROM _rtio_assetdigital_dev.beltscan.Asset_Digital_AI
  WHERE answer_key IS NULL OR answer_key <> query_key
  LIMIT max_rows_per_run;

 

  --------------------------------------------------------------------
  -- Call Azure OpenAI per-row using http_request() and parse content
  -- Endpoint pattern: /openai/deployments/{deployment}/chat/completions?api-version=...
  -- Header uses Databricks SQL secret() to fetch the API key.
  --------------------------------------------------------------------
  CREATE OR REPLACE TEMP VIEW _updates AS
  WITH calls AS (
    SELECT
      p.join_key,
      http_request(
        CONN    => 'aoai_apim',
        METHOD  => 'POST',
        PATH    => concat('/openai/deployments/', p.mdl, '/chat/completions'),
        PARAMS  => map('api-version', api_version),
        HEADERS => map(
                      'Content-Type', 'application/json',
                      'api-key',      secret(secret_scope, secret_key)
                   ),
        JSON    => to_json(named_struct(
                      'model',      p.mdl,
                      'messages',   array(
                                       named_struct('role','system','content','You are a helpful assistant.'),
                                       named_struct('role','user','content', p.q)
                                     ),
                      'temperature', 0.2
                   ))
      ) AS resp
    FROM _pending p
  )
  SELECT
    join_key,
    trim(get_json_object(resp.text, '$.choices[0].message.content')) AS answer,
    current_timestamp() AS answer_datetime
  FROM calls
  WHERE resp.status_code = 200
    AND trim(get_json_object(resp.text, '$.choices[0].message.content')) IS NOT NULL
    AND length(trim(get_json_object(resp.text, '$.choices[0].message.content'))) > 0;

 

  --------------------------------------------------------------------
  -- Merge answers back; stamp answer_key for idempotency
  --------------------------------------------------------------------
  EXECUTE IMMEDIATE '
    MERGE INTO ' || table_fqn || ' AS t
    USING _updates AS u
      ON t.query_key = u.join_key
    WHEN MATCHED THEN UPDATE SET
      t.answer           = u.answer,
      t.answer_datetime  = u.answer_datetime,
      t.answer_key       = t.query_key
  ';

 

  --------------------------------------------------------------------
  -- Return summary result set
  --------------------------------------------------------------------
  DECLARE attempted  INT;
  DECLARE updated    INT;

 

  SELECT count(*) FROM _pending INTO attempted;
  SELECT count(*) FROM _updates INTO updated;

 

  SELECT attempted AS attempted, updated AS updated, (attempted - updated) AS skipped;
END;
 
 
How to run it

 

-- Call with defaults (50 rows; gpt-4o deployment name per-row or default)
CALL _xxxxx_assetdigital_dev.xxxxx.sp_llm_backfill_answers();

 

-- Or override any parameter:
CALL _xxxxx_assetdigital_dev.xxxxxx.sp_llm_backfill_answers(
  max_rows_per_run => 25,
  default_model    => 'gpt-4o',                  -- your Azure OpenAI deployment name
  api_version      => '2025-03-01-preview',
  secret_scope     => 'digitalAI',
  secret_key       => 'digitalAIKey'
);