LangFuse API Client in R

Overview

Sources:

  • **

Code

#  ------------------------------------------------------------------------
#
# Title : Langfuse Integrations
#    By : Jimmy Briggs
#  Date : 2025-03-12
#
#  ------------------------------------------------------------------------
 
#' Langfuse API Client
#'
#' @name langfuse
#'
#' @description
#' A collection of functions for tracking and evaluating LLM interactions
#' using Langfuse.
#'
#' Below are the functions grouped by category:
#'
#' ## Core Client Functions
#'
#' - `lf_client()`: Create a Langfuse API client configuration.
#' - `lf_ingestion()`: Send data to the Langfuse ingestion endpoint.
#'
#' ## Chat Tracing
#'
#' - `lf_trace_chat()`: Trace an individual ellmer chat interaction.
#' - `lf_trace_chat_session()`: Trace a chat interaction within a session.
#' - `lf_add_feedback()`: Add user feedback for a traced chat interaction.
#'
#' ## Session Management
#'
#' - `lf_create_session()`: Create a new session for grouped interactions.
#' - `lf_create_observation()`: Add an observation to a trace.
#' - `lf_create_evaluation()`: Add an evaluation score to a trace.
#'
#' ## Prompt Management
#'
#' - `lf_add_prompt()`: Add a prompt template to Langfuse.
#' - `lf_get_prompt()`: Retrieve a prompt template from Langfuse.
#'
#' ## Dataset & Experimentation
#'
#' - `lf_add_dataset()`: Create a new dataset for testing.
#' - `lf_add_dataset_item()`: Add an item to a dataset.
#' - `lf_run_experiment()`: Run an experiment using a dataset.
#'
#' @importFrom cli cli_alert_warning cli_alert_danger
#' @importFrom httr2 request req_url_path_append req_method req_headers req_auth_basic
#' @importFrom httr2 req_body_json req_perform resp_check_status resp_body_json
#' @importFrom uuid UUIDgenerate
NULL
 
#' Create a Langfuse API Client
#'
#' @description
#' This function creates a Langfuse API client configuration.
#'
#' @param secret_key Langfuse API Secret Key. Defaults to `LANGFUSE_SECRET_KEY` environment variable.
#' @param public_key Langfuse API Public Key. Defaults to `LANGFUSE_PUBLIC_KEY` environment variable.
#' @param host Langfuse API Host URL. Defaults to `LANGFUSE_HOST` environment variable.
#'   If not set, defaults to `https://us.cloud.langfuse.com` (US region).
#' @param enabled Whether tracing is enabled. Defaults to `TRUE`.
#'
#' @return A list containing the Langfuse client configuration.
#'
#' @export
#'
#' @examples
#' # Create client with environment variables
#' client <- lf_client()
#'
#' # Create client with explicit keys
#' client <- lf_client(
#'   secret_key = "your_secret_key",
#'   public_key = "your_public_key"
#' )
lf_client <- function(
    secret_key = Sys.getenv("LANGFUSE_SECRET_KEY"),
    public_key = Sys.getenv("LANGFUSE_PUBLIC_KEY"),
    host = Sys.getenv("LANGFUSE_HOST", "https://us.cloud.langfuse.com"),
    enabled = TRUE
) {
 
  if (nzchar(secret_key) && nzchar(public_key)) {
    enabled <- enabled
  } else {
    enabled <- FALSE
    cli::cli_alert_warning("Langfuse credentials missing. Tracing disabled.")
  }
 
  list(
    secret_key = secret_key,
    public_key = public_key,
    host = host,
    enabled = enabled
  )
}
 
#' Send Data to Langfuse Ingestion Endpoint
#'
#' @description
#' This function sends data to the Langfuse `/ingestion` endpoint.
#'
#' @param batch The batch data to send as a list
#' @param client A Langfuse client configuration created with `lf_client()`
#'
#' @return The response from Langfuse on success, FALSE on failure
#'
#' @export
#'
lf_ingestion <- function(batch, client) {
  if (is.null(client) || !isTRUE(client$enabled)) {
    return(FALSE)
  }
 
  req <- httr2::request(client$host) |>
    httr2::req_url_path_append("api", "public", "ingestion") |>
    httr2::req_method("POST") |>
    httr2::req_headers("Content-Type" = "application/json") |>
    httr2::req_auth_basic(username = client$public_key, password = client$secret_key) |>
    httr2::req_body_json(list(batch = batch))
 
  tryCatch({
    resp <- httr2::req_perform(req)
    httr2::resp_check_status(resp)
    httr2::resp_body_json(resp)
  }, error = function(e) {
    cli::cli_alert_danger("Failed to ingest data: {conditionMessage(e)}")
    return(FALSE)
  })
}
 
#' Create a Trace Event
#'
#' @description
#' Creates a trace event for Langfuse.
#'
#' @param name Name of the trace
#' @param user_id User ID
#' @param input Input text
#' @param output Output text
#' @param metadata Additional metadata as a list
#'
#' @return A list representing a trace event
#'
#' @keywords internal
lf_create_trace_event <- function(name, user_id = NULL, input = NULL, output = NULL, metadata = NULL) {
  id <- uuid::UUIDgenerate()
  timestamp <- format(Sys.time(), "%Y-%m-%dT%H:%M:%OS6Z")
 
  event <- list(
    id = id,
    timestamp = timestamp,
    type = "trace-create",
    body = list(
      id = id,
      timestamp = timestamp,
      name = name,
      environment = "production"
    )
  )
 
  if (!is.null(user_id)) event$body$userId <- user_id
  if (!is.null(input)) event$body$input <- input
  if (!is.null(output)) event$body$output <- output
  if (!is.null(metadata)) event$body$metadata <- metadata
 
  return(event)
}
 
#' Create a Generation Event
#'
#' @description
#' Creates a generation event for Langfuse.
#'
#' @param trace_id Parent trace ID
#' @param model Model name
#' @param prompt Prompt text
#' @param completion Completion text
#' @param metadata Additional metadata as a list
#'
#' @return A list representing a generation event
#'
#' @keywords internal
lf_create_generation_event <- function(trace_id, model, prompt, completion = NULL, metadata = NULL) {
  id <- uuid::UUIDgenerate()
  timestamp <- format(Sys.time(), "%Y-%m-%dT%H:%M:%OS6Z")
 
  event <- list(
    id = id,
    timestamp = timestamp,
    type = "generation-create",
    body = list(
      id = id,
      traceId = trace_id,
      startTime = timestamp,
      model = model,
      prompt = list(
        role = "user",
        content = prompt
      ),
      metadata = metadata %||% list()
    )
  )
 
  if (!is.null(completion)) {
    event$body$completion <- completion
    event$body$endTime <- timestamp
  }
 
  return(event)
}
 
#' Create a Score Event
#'
#' @description
#' Creates a score event for Langfuse.
#'
#' @param trace_id Parent trace ID
#' @param value Score value (0-1)
#' @param comment Optional comment
#'
#' @return A list representing a score event
#'
#' @keywords internal
lf_create_score_event <- function(trace_id, value, comment = NULL) {
  id <- uuid::UUIDgenerate()
  timestamp <- format(Sys.time(), "%Y-%m-%dT%H:%M:%OS6Z")
 
  event <- list(
    id = id,
    timestamp = timestamp,
    type = "score-create",
    body = list(
      id = id,
      traceId = trace_id,
      name = "user_feedback",
      value = value
    )
  )
 
  if (!is.null(comment)) event$body$comment <- comment
 
  return(event)
}
 
#' Trace an Ellmer Chat Interaction
#'
#' @description
#' Sends a message to an ellmer chat and tracks the interaction with Langfuse.
#'
#' @param chat An ellmer chat object
#' @param message User message
#' @param langfuse_client Langfuse client configuration from `lf_client()`
#' @param user_id Optional user ID
#' @param metadata Optional metadata
#'
#' @return The chat response with trace_id attribute
#'
#' @export
#'
#' @examples
#' \dontrun{
#' # Create a langfuse client
#' lf_client <- lf_client()
#'
#' # Create a chat
#' chat <- ellmer::chat_openai(model = "gpt-4")
#'
#' # Trace a chat interaction
#' response <- lf_trace_chat(
#'   chat = chat,
#'   message = "Tell me about R programming",
#'   langfuse_client = lf_client
#' )
#' }
lf_trace_chat <- function(chat, message, langfuse_client, user_id = NULL, metadata = NULL) {
  if (is.null(langfuse_client) || !isTRUE(langfuse_client$enabled)) {
    return(chat$chat(message))
  }
 
  # Create trace
  trace_event <- lf_create_trace_event(
    name = "chat_interaction",
    user_id = user_id,
    input = message,
    metadata = metadata
  )
 
  # Get trace ID
  trace_id <- trace_event$body$id
 
  # Start timing
  start_time <- Sys.time()
 
  # Start generation
  gen_event <- lf_create_generation_event(
    trace_id = trace_id,
    model = chat$get_model(),
    prompt = message,
    metadata = list(
      provider = class(chat$provider)[1],
      start_time = format(start_time, "%Y-%m-%dT%H:%M:%OS6Z")
    )
  )
 
  # Send events to Langfuse
  lf_ingestion(list(trace_event, gen_event), langfuse_client)
 
  # Call the chat function
  result <- tryCatch({
    chat$chat(message)
  }, error = function(e) {
    # Log error
    error_gen_event <- lf_create_generation_event(
      trace_id = trace_id,
      model = chat$get_model(),
      prompt = message,
      metadata = list(
        error = conditionMessage(e),
        duration_ms = as.numeric(difftime(Sys.time(), start_time, units = "secs")) * 1000
      )
    )
    lf_ingestion(list(error_gen_event), langfuse_client)
    stop(e)
  })
 
  # End timing
  end_time <- Sys.time()
  duration_ms <- as.numeric(difftime(end_time, start_time, units = "secs")) * 1000
 
  # Get the completion text
  completion <- chat$last_turn()@text
 
  # Send completion
  completion_gen_event <- lf_create_generation_event(
    trace_id = trace_id,
    model = chat$get_model(),
    prompt = message,
    completion = completion,
    metadata = list(
      duration_ms = duration_ms
    )
  )
 
  # Update trace with output
  update_trace_event <- list(
    id = uuid::UUIDgenerate(),
    timestamp = format(end_time, "%Y-%m-%dT%H:%M:%OS6Z"),
    type = "trace-update",
    body = list(
      id = trace_id,
      output = completion
    )
  )
 
  # Send events to Langfuse
  lf_ingestion(list(completion_gen_event, update_trace_event), langfuse_client)
 
  # Store trace ID as an attribute
  attr(result, "trace_id") <- trace_id
 
  return(result)
}
 
#' Add Feedback for a Chat Interaction
#'
#' @description
#' Adds user feedback for a traced chat interaction.
#'
#' @param result The result returned from `lf_trace_chat()`
#' @param score Score value (0-1)
#' @param comment Optional comment
#' @param langfuse_client Langfuse client configuration from `lf_client()`
#'
#' @return Invisibly returns whether the feedback was sent
#'
#' @export
#'
#' @examples
#' \dontrun{
#' # First trace a chat interaction
#' response <- lf_trace_chat(chat, "Tell me about R", lf_client)
#'
#' # Then add feedback
#' lf_add_feedback(
#'   result = response,
#'   score = 0.95,
#'   comment = "Very helpful response",
#'   langfuse_client = lf_client
#' )
#' }
lf_add_feedback <- function(result, score, comment = NULL, langfuse_client) {
  if (is.null(langfuse_client) || !isTRUE(langfuse_client$enabled)) {
    return(invisible(FALSE))
  }
 
  trace_id <- attr(result, "trace_id")
  if (is.null(trace_id)) {
    cli::cli_alert_warning("No trace ID found. Was this result created with lf_trace_chat()?")
    return(invisible(FALSE))
  }
 
  # Validate score
  if (!is.numeric(score) || score < 0 || score > 1) {
    cli::cli_alert_danger("Score must be a numeric value between 0 and 1")
    return(invisible(FALSE))
  }
 
  # Create score event
  score_event <- lf_create_score_event(
    trace_id = trace_id,
    value = score,
    comment = comment
  )
 
  # Send to Langfuse
  result <- lf_ingestion(list(score_event), langfuse_client)
 
  return(invisible(!isFALSE(result)))
}
 
#' Create a Session in Langfuse
#'
#' @description
#' Creates a new session for grouping related traces together.
#'
#' @param name Session name
#' @param user_id Optional user ID
#' @param metadata Optional metadata
#' @param langfuse_client Langfuse client configuration from `lf_client()`
#'
#' @return Session ID if successful, NULL otherwise
#'
#' @export
#'
#' @examples
#' \dontrun{
#' # Create a session
#' session_id <- lf_create_session(
#'   name = "User Learning Session",
#'   user_id = "user-123",
#'   metadata = list(app_version = "1.0.0"),
#'   langfuse_client = lf_client
#' )
#' }
lf_create_session <- function(name, user_id = NULL, metadata = NULL, langfuse_client) {
  if (is.null(langfuse_client) || !isTRUE(langfuse_client$enabled)) {
    return(NULL)
  }
 
  session_id <- uuid::UUIDgenerate()
  timestamp <- format(Sys.time(), "%Y-%m-%dT%H:%M:%OS6Z")
 
  event <- list(
    id = uuid::UUIDgenerate(),
    timestamp = timestamp,
    type = "session-create",
    body = list(
      id = session_id,
      name = name,
      timestamp = timestamp
    )
  )
 
  if (!is.null(user_id)) event$body$userId <- user_id
  if (!is.null(metadata)) event$body$metadata <- metadata
 
  result <- lf_ingestion(list(event), langfuse_client)
 
  if (!isFALSE(result)) {
    return(session_id)
  } else {
    return(NULL)
  }
}
 
#' Create an Observation in Langfuse
#'
#' @description
#' Adds an observation to a trace, such as retrieval results or intermediate calculations.
#'
#' @param trace_id Parent trace ID
#' @param type Observation type (e.g., "completion", "prompt", "retrieval")
#' @param input Input text or data
#' @param output Output text or data
#' @param metadata Optional metadata
#' @param langfuse_client Langfuse client configuration from `lf_client()`
#'
#' @return Observation ID if successful, NULL otherwise
#'
#' @export
#'
#' @examples
#' \dontrun{
#' # First trace a chat interaction
#' response <- lf_trace_chat(chat, "Tell me about R", lf_client)
#' trace_id <- attr(response, "trace_id")
#'
#' # Add an observation
#' lf_create_observation(
#'   trace_id = trace_id,
#'   type = "analysis",
#'   input = "User is asking about R",
#'   output = "User seems to be a beginner",
#'   langfuse_client = lf_client
#' )
#' }
lf_create_observation <- function(trace_id, type, input = NULL, output = NULL, metadata = NULL, langfuse_client) {
  if (is.null(langfuse_client) || !isTRUE(langfuse_client$enabled)) {
    return(NULL)
  }
 
  observation_id <- uuid::UUIDgenerate()
  timestamp <- format(Sys.time(), "%Y-%m-%dT%H:%M:%OS6Z")
 
  event <- list(
    id = uuid::UUIDgenerate(),
    timestamp = timestamp,
    type = "observation-create",
    body = list(
      id = observation_id,
      traceId = trace_id,
      type = type,
      timestamp = timestamp
    )
  )
 
  if (!is.null(input)) event$body$input <- input
  if (!is.null(output)) event$body$output <- output
  if (!is.null(metadata)) event$body$metadata <- metadata
 
  result <- lf_ingestion(list(event), langfuse_client)
 
  if (!isFALSE(result)) {
    return(observation_id)
  } else {
    return(NULL)
  }
}
 
#' Create an Evaluation in Langfuse
#'
#' @description
#' Adds a quality evaluation score to a trace.
#'
#' @param trace_id Parent trace ID
#' @param name Evaluation name
#' @param value Numeric score value (0-1)
#' @param comment Optional comment
#' @param metadata Optional metadata
#' @param langfuse_client Langfuse client configuration from `lf_client()`
#'
#' @return Evaluation ID if successful, NULL otherwise
#'
#' @export
#'
#' @examples
#' \dontrun{
#' # First trace a chat interaction
#' response <- lf_trace_chat(chat, "Tell me about R", lf_client)
#' trace_id <- attr(response, "trace_id")
#'
#' # Add an evaluation
#' lf_create_evaluation(
#'   trace_id = trace_id,
#'   name = "response_quality",
#'   value = 0.95,
#'   comment = "Excellent explanation",
#'   langfuse_client = lf_client
#' )
#' }
lf_create_evaluation <- function(trace_id, name, value, comment = NULL, metadata = NULL, langfuse_client) {
  if (is.null(langfuse_client) || !isTRUE(langfuse_client$enabled)) {
    return(NULL)
  }
 
  eval_id <- uuid::UUIDgenerate()
  timestamp <- format(Sys.time(), "%Y-%m-%dT%H:%M:%OS6Z")
 
  event <- list(
    id = uuid::UUIDgenerate(),
    timestamp = timestamp,
    type = "score-create",
    body = list(
      id = eval_id,
      traceId = trace_id,
      name = name,
      value = value,
      timestamp = timestamp
    )
  )
 
  if (!is.null(comment)) event$body$comment <- comment
  if (!is.null(metadata)) event$body$metadata <- metadata
 
  result <- lf_ingestion(list(event), langfuse_client)
 
  if (!isFALSE(result)) {
    return(eval_id)
  } else {
    return(NULL)
  }
}
 
#' Add a Prompt to Langfuse
#'
#' @description
#' Adds a prompt template to Langfuse for version control and reuse.
#'
#' @param name Prompt name
#' @param prompt Prompt text or array of message objects (depends on type)
#' @param type Prompt type, either "text" or "chat"
#' @param config Optional configuration parameters (model, temperature, etc.)
#' @param labels Optional list of labels for organizing prompts
#' @param tags Optional list of tags for categorizing prompts
#' @param commit_message Optional message describing changes (for versioning)
#' @param langfuse_client Langfuse client configuration from `lf_client()`
#'
#' @return Prompt ID if successful, NULL otherwise
#'
#' @export
#'
#' @examples
#' \dontrun{
#' # Add a text prompt template
#' prompt_id <- lf_add_prompt(
#'   name = "r_tutorial_text",
#'   prompt = "You are a helpful R programming tutor. Explain {{topic}} with examples.",
#'   type = "text",
#'   langfuse_client = lf_client
#' )
#'
#' # Add a chat prompt template
#' prompt_id <- lf_add_prompt(
#'   name = "r_tutorial_chat",
#'   prompt = list(
#'     list(role = "system", content = "You are a helpful R programming tutor."),
#'     list(role = "user", content = "Explain {{topic}} in simple terms with examples.")
#'   ),
#'   type = "chat",
#'   config = list(model = "gpt-4", temperature = 0.7),
#'   labels = c("tutorial", "r-programming"),
#'   langfuse_client = lf_client
#' )
#' }
lf_add_prompt <- function(name, prompt, type = c("chat", "text"), config = NULL,
                          labels = NULL, tags = NULL, commit_message = NULL,
                          langfuse_client) {
  if (is.null(langfuse_client) || !isTRUE(langfuse_client$enabled)) {
    return(NULL)
  }
 
  type <- match.arg(type)
 
  # Prepare the request body
  body <- list(
    name = name,
    type = type
  )
 
  # Handle the prompt format based on type
  if (type == "text") {
    # For text prompts, ensure it's a simple string
    if (!is.character(prompt) || length(prompt) != 1) {
      cli::cli_alert_danger("For text prompts, 'prompt' must be a single character string")
      return(NULL)
    }
    body$prompt = prompt
  } else if (type == "chat") {
    # For chat prompts, ensure it's an array of messages with role/content
    if (is.character(prompt) && length(prompt) == 1) {
      # Convert simple string to a chat format with user role
      body$prompt = list(list(role = "user", content = prompt))
    } else if (is.list(prompt)) {
      # Validate the list structure
      valid <- all(sapply(prompt, function(msg) {
        is.list(msg) && !is.null(msg$role) && !is.null(msg$content)
      }))
 
      if (!valid) {
        cli::cli_alert_danger("Chat prompts must be a list of objects with 'role' and 'content' fields")
        return(NULL)
      }
      body$prompt = prompt
    } else {
      cli::cli_alert_danger("Invalid prompt format for chat type")
      return(NULL)
    }
  }
 
  # Add optional parameters if provided
  if (!is.null(config)) body$config <- config
  if (!is.null(labels)) body$labels <- as.list(labels)
  if (!is.null(tags)) body$tags <- as.list(tags)
  if (!is.null(commit_message)) body$commitMessage <- commit_message
 
  # Create the API request
  req <- httr2::request(langfuse_client$host) |>
    httr2::req_url_path_append("api", "public", "v2", "prompts") |>
    httr2::req_method("POST") |>
    httr2::req_headers("Content-Type" = "application/json") |>
    httr2::req_auth_basic(username = langfuse_client$public_key, password = langfuse_client$secret_key) |>
    httr2::req_body_json(body)
 
  tryCatch({
    resp <- httr2::req_perform(req)
    httr2::resp_check_status(resp)
    result <- httr2::resp_body_json(resp)
    return(result$id)
  }, error = function(e) {
    cli::cli_alert_danger("Failed to create prompt: {conditionMessage(e)}")
    return(NULL)
  })
}
 
#' Get a Prompt from Langfuse
#'
#' @description
#' Retrieves a prompt template from Langfuse.
#'
#' @param prompt_name Name of the prompt to retrieve
#' @param version Specific version number to retrieve (if NULL, latest version will be fetched)
#' @param label Label of the prompt to retrieve (optional)
#' @param langfuse_client Langfuse client configuration from `lf_client()`
#'
#' @return Prompt object if successful, NULL otherwise
#'
#' @export
#'
#' @examples
#' \dontrun{
#' # Get the latest version of a prompt
#' prompt <- lf_get_prompt(
#'   prompt_name = "r_tutorial_chat",
#'   langfuse_client = lf_client
#' )
#'
#' # Get a specific version of a prompt
#' prompt <- lf_get_prompt(
#'   prompt_name = "r_tutorial_chat",
#'   version = 2,
#'   langfuse_client = lf_client
#' )
#'
#' # Get a prompt with a specific label
#' prompt <- lf_get_prompt(
#'   prompt_name = "r_tutorial_chat",
#'   label = "development",
#'   langfuse_client = lf_client
#' )
#' }
lf_get_prompt <- function(prompt_name, version = NULL, label = NULL, langfuse_client) {
  if (is.null(langfuse_client) || !isTRUE(langfuse_client$enabled)) {
    return(NULL)
  }
 
  # Validate prompt name
  if (is.null(prompt_name) || !is.character(prompt_name) || nchar(prompt_name) == 0) {
    cli::cli_alert_danger("'prompt_name' must be a non-empty string")
    return(NULL)
  }
 
  # If version is not provided, try to get a list of versions first
  if (is.null(version) && is.null(label)) {
    # Get all prompt versions
    versions_req <- httr2::request(langfuse_client$host) |>
      httr2::req_url_path_append("api", "public", "v2", "prompts", prompt_name, "versions") |>
      httr2::req_method("GET") |>
      httr2::req_auth_basic(username = langfuse_client$public_key, password = langfuse_client$secret_key)
 
    versions_result <- tryCatch({
      resp <- httr2::req_perform(versions_req)
      httr2::resp_check_status(resp)
      httr2::resp_body_json(resp)
    }, error = function(e) {
      cli::cli_alert_warning("Failed to get prompt versions: {conditionMessage(e)}")
      return(NULL)
    })
 
    # If we got versions, use the latest one
    if (!is.null(versions_result) && length(versions_result) > 0) {
      # Sort by version number (descending)
      version_numbers <- sapply(versions_result, function(v) as.integer(v$version))
      latest_version <- max(version_numbers)
      cli::cli_alert_info("Using latest version: {latest_version}")
      version <- latest_version
    } else {
      # Default to version 1 if we couldn't get a list of versions
      cli::cli_alert_info("Could not determine latest version, defaulting to version 1")
      version <- 1
    }
  }
 
  # Build the request for a specific version or label
  req <- httr2::request(langfuse_client$host) |>
    httr2::req_url_path_append("api", "public", "v2", "prompts", prompt_name) |>
    httr2::req_method("GET") |>
    httr2::req_auth_basic(username = langfuse_client$public_key, password = langfuse_client$secret_key)
 
  # Add optional query parameters
  if (!is.null(version)) {
    req <- httr2::req_url_query(req, version = as.integer(version))
  }
 
  if (!is.null(label)) {
    req <- httr2::req_url_query(req, label = label)
  }
 
  tryCatch({
    resp <- httr2::req_perform(req)
    httr2::resp_check_status(resp)
    result <- httr2::resp_body_json(resp)
    return(result)
  }, error = function(e) {
    cli::cli_alert_danger("Failed to get prompt: {conditionMessage(e)}")
    return(NULL)
  })
}
 
#' Add a Dataset to Langfuse
#'
#' @description
#' Creates a new dataset for testing and evaluation.
#'
#' @param name Dataset name
#' @param description Dataset description (optional)
#' @param metadata Additional metadata as a list (optional)
#' @param langfuse_client Langfuse client configuration from `lf_client()`
#'
#' @return Dataset ID if successful, NULL otherwise
#'
#' @export
#'
#' @examples
#' \dontrun{
#' # Create a dataset
#' dataset_id <- lf_add_dataset(
#'   name = "R Programming Questions",
#'   description = "Common questions about R programming for testing",
#'   langfuse_client = lf_client
#' )
#' }
lf_add_dataset <- function(name, description = NULL, metadata = NULL, langfuse_client) {
  if (is.null(langfuse_client) || !isTRUE(langfuse_client$enabled)) {
    return(NULL)
  }
 
  # Build request body
  body <- list(name = name)
  if (!is.null(description)) body$description <- description
  if (!is.null(metadata)) body$metadata <- metadata
 
  req <- httr2::request(langfuse_client$host) |>
    httr2::req_url_path_append("api", "public", "v2", "datasets") |>
    httr2::req_method("POST") |>
    httr2::req_headers("Content-Type" = "application/json") |>
    httr2::req_auth_basic(username = langfuse_client$public_key, password = langfuse_client$secret_key) |>
    httr2::req_body_json(body)
 
  tryCatch({
    resp <- httr2::req_perform(req)
    httr2::resp_check_status(resp)
    result <- httr2::resp_body_json(resp)
    return(result$id)
  }, error = function(e) {
    cli::cli_alert_danger("Failed to create dataset: {conditionMessage(e)}")
    return(NULL)
  })
}
 
#' Add an Item to a Dataset
#'
#' @description
#' Adds a test item to a Langfuse dataset.
#'
#' @param dataset_name Name of the dataset
#' @param input Input text or data
#' @param expected_output Expected output (optional)
#' @param metadata Optional metadata as a list
#' @param source_trace_id Optional trace ID that this item is based on
#' @param source_observation_id Optional observation ID that this item is based on
#' @param id Optional custom ID for the dataset item (must be unique)
#' @param status Item status ("ACTIVE" or "ARCHIVED", defaults to "ACTIVE")
#' @param langfuse_client Langfuse client configuration from `lf_client()`
#'
#' @return Item ID if successful, NULL otherwise
#'
#' @export
#'
#' @examples
#' \dontrun{
#' # Add an item to a dataset
#' lf_add_dataset_item(
#'   dataset_name = "R Programming Questions",
#'   input = "What are the differences between a list and a data frame in R?",
#'   expected_output = "A detailed comparison of lists and data frames",
#'   metadata = list(category = "data_structures"),
#'   langfuse_client = lf_client
#' )
#'
#' # Add an item with a custom ID
#' lf_add_dataset_item(
#'   dataset_name = "R Programming Questions",
#'   input = "How do you create a ggplot2 visualization?",
#'   id = "ggplot2-question",
#'   langfuse_client = lf_client
#' )
#' }
lf_add_dataset_item <- function(dataset_name, input, expected_output = NULL,
                                metadata = NULL, source_trace_id = NULL,
                                source_observation_id = NULL, id = NULL,
                                status = "ACTIVE", langfuse_client) {
  if (is.null(langfuse_client) || !isTRUE(langfuse_client$enabled)) {
    return(NULL)
  }
 
  # Validate required parameters
  if (is.null(dataset_name) || !is.character(dataset_name) || nchar(dataset_name) == 0) {
    cli::cli_alert_danger("'dataset_name' must be a non-empty string")
    return(NULL)
  }
 
  # Build the request body
  body <- list(
    datasetName = dataset_name,
    input = input,
    status = status
  )
 
  # Add optional fields
  if (!is.null(expected_output)) body$expectedOutput <- expected_output
  if (!is.null(metadata)) body$metadata <- metadata
  if (!is.null(source_trace_id)) body$sourceTraceId <- source_trace_id
  if (!is.null(source_observation_id)) body$sourceObservationId <- source_observation_id
  if (!is.null(id)) body$id <- id
 
  req <- httr2::request(langfuse_client$host) |>
    httr2::req_url_path_append("api", "public", "dataset-items") |>
    httr2::req_method("POST") |>
    httr2::req_headers("Content-Type" = "application/json") |>
    httr2::req_auth_basic(username = langfuse_client$public_key, password = langfuse_client$secret_key) |>
    httr2::req_body_json(body)
 
  tryCatch({
    resp <- httr2::req_perform(req)
    httr2::resp_check_status(resp)
    result <- httr2::resp_body_json(resp)
    return(result$id)
  }, error = function(e) {
    cli::cli_alert_danger("Failed to add dataset item: {conditionMessage(e)}")
    return(NULL)
  })
}
 
#' Run an Experiment on a Dataset
#'
#' @description
#' Runs an experiment by processing items from a dataset and recording the results.
#'
#' @param name Experiment name
#' @param dataset_name Name of the dataset to use
#' @param chat_function Function to process each item (should accept input and return output)
#' @param langfuse_client Langfuse client configuration from `lf_client()`
#' @param max_items Maximum number of items to process (0 = all)
#' @param filter_status Filter items by status ("ACTIVE" or "ARCHIVED")
#'
#' @return Experiment results as a list
#'
#' @export
#'
#' @examples
#' \dontrun{
#' # Run an experiment with a fresh chat instance for each item
#' experiment_results <- lf_run_experiment(
#'   name = "R Tutor Test",
#'   dataset_name = "R Programming Questions",
#'   chat_function = function(input) {
#'     # Create a fresh chat for each item
#'     item_chat <- ellmer::chat_openai(model = "gpt-4o")
#'     # The chat() method returns the text response directly
#'     return(item_chat$chat(input))
#'   },
#'   langfuse_client = lf_client,
#'   max_items = 10
#' )
#' }
lf_run_experiment <- function(name, dataset_name, chat_function, langfuse_client,
                              max_items = 0, filter_status = "ACTIVE") {
  if (is.null(langfuse_client) || !isTRUE(langfuse_client$enabled)) {
    cli::cli_alert_warning("Langfuse client not configured or disabled")
    return(NULL)
  }
 
  # Get dataset items
  req <- httr2::request(langfuse_client$host) |>
    httr2::req_url_path_append("api", "public", "dataset-items") |>
    httr2::req_method("GET") |>
    httr2::req_auth_basic(username = langfuse_client$public_key, password = langfuse_client$secret_key) |>
    httr2::req_url_query(datasetName = dataset_name)
 
  # Add status filter if provided
  if (!is.null(filter_status)) {
    req <- httr2::req_url_query(req, status = filter_status)
  }
 
  items_response <- tryCatch({
    resp <- httr2::req_perform(req)
    httr2::resp_check_status(resp)
    httr2::resp_body_json(resp)
  }, error = function(e) {
    cli::cli_alert_danger("Failed to get dataset items: {conditionMessage(e)}")
    return(NULL)
  })
 
  # Handle different possible response formats from the API
  items <- NULL
  if (is.null(items_response)) {
    items <- NULL
  } else if (is.list(items_response) && length(items_response) > 0) {
    if ("items" %in% names(items_response) && is.list(items_response$items)) {
      # If response has an "items" field that's a list, use that
      items <- items_response$items
    } else {
      # Otherwise assume the response itself is the list of items
      items <- items_response
    }
  }
 
  if (is.null(items) || length(items) == 0) {
    cli::cli_alert_warning("No items found in dataset")
    return(NULL)
  }
 
  # Limit items if requested
  if (max_items > 0 && max_items < length(items)) {
    items <- items[1:max_items]
  }
 
  # Create experiment session
  session_id <- lf_create_session(
    name = paste0("Experiment: ", name),
    metadata = list(
      dataset_name = dataset_name,
      item_count = length(items)
    ),
    langfuse_client = langfuse_client
  )
 
  # Process each item
  results <- lapply(items, function(item) {
    # Extract and validate item ID
    item_id <- "unknown"
    if (!is.null(item$id) && is.character(item$id) && nchar(item$id) > 0) {
      item_id <- item$id
    }
 
    # Make sure we have a valid input
    if (is.null(item$input) || !is.character(item$input) || nchar(item$input) == 0) {
      cli::cli_alert_warning("Item {item_id} has no valid input, skipping")
      return(list(
        item_id = item_id,
        error = "No valid input found",
        skipped = TRUE
      ))
    }
 
    cli::cli_alert_info("Processing item {item_id}")
 
    # Create a trace for this item
    trace_event <- lf_create_trace_event(
      name = paste0("experiment_item_", item_id),
      metadata = list(
        dataset_name = dataset_name,
        item_id = item_id,
        session_id = session_id,
        experiment_name = name
      )
    )
    trace_id <- trace_event$body$id
 
    # Send the trace event
    lf_ingestion(list(trace_event), langfuse_client)
 
    # Process the item using the provided function
    start_time <- Sys.time()
    output <- NULL
    error <- NULL
 
    # Use tryCatch to properly capture any errors
    tryCatch({
      output <- chat_function(item$input)
 
      # Ensure output is a character string
      if (!is.character(output)) {
        output <- as.character(output)
      }
    }, error = function(e) {
      error <- conditionMessage(e)
      cli::cli_alert_danger("Error processing item {item_id}: {error}")
    })
 
    end_time <- Sys.time()
    duration_ms <- as.numeric(difftime(end_time, start_time, units = "secs")) * 1000
 
    # Create appropriate events based on success or failure
    if (is.null(error)) {
      # Create a generation event with the successful output
      gen_event <- lf_create_generation_event(
        trace_id = trace_id,
        model = "experiment",
        prompt = item$input,
        completion = output,
        metadata = list(
          duration_ms = duration_ms
        )
      )
 
      # Update trace with output
      update_trace_event <- list(
        id = uuid::UUIDgenerate(),
        timestamp = format(end_time, "%Y-%m-%dT%H:%M:%OS6Z"),
        type = "trace-update",
        body = list(
          id = trace_id,
          output = output
        )
      )
 
      # Send events to Langfuse
      lf_ingestion(list(gen_event, update_trace_event), langfuse_client)
 
      # If expected output exists, create an evaluation
      if (!is.null(item$expectedOutput)) {
        # Here you could add an automated evaluation
      }
    } else {
      # Log the error to Langfuse
      error_gen_event <- lf_create_generation_event(
        trace_id = trace_id,
        model = "experiment",
        prompt = item$input,
        metadata = list(
          error = error,
          duration_ms = duration_ms
        )
      )
 
      # Update trace with error info
      error_trace_event <- list(
        id = uuid::UUIDgenerate(),
        timestamp = format(end_time, "%Y-%m-%dT%H:%M:%OS6Z"),
        type = "trace-update",
        body = list(
          id = trace_id,
          output = paste("Error:", error),
          status = "error"
        )
      )
 
      lf_ingestion(list(error_gen_event, error_trace_event), langfuse_client)
    }
 
    return(list(
      item_id = item_id,
      input = item$input,
      output = output,
      error = error,
      trace_id = trace_id,
      duration_ms = duration_ms
    ))
  })
 
  return(list(
    experiment_name = name,
    dataset_name = dataset_name,
    session_id = session_id,
    results = results
  ))
}
 
#' Trace a Chat Interaction within a Session
#'
#' @description
#' Sends a message to an ellmer chat and tracks the interaction with Langfuse
#' within a defined session.
#'
#' @param chat An ellmer chat object
#' @param message User message
#' @param session_id Session ID for grouped traces from `lf_create_session()`
#' @param langfuse_client Langfuse client configuration from `lf_client()`
#' @param user_id Optional user ID
#' @param metadata Optional metadata
#'
#' @return The chat response with trace_id and session_id attributes
#'
#' @export
#'
#' @examples
#' \dontrun{
#' # Create a langfuse client and session
#' lf_client <- lf_client()
#' session_id <- lf_create_session(name = "Learning Session", langfuse_client = lf_client)
#'
#' # Create a chat
#' chat <- ellmer::chat_openai(model = "gpt-4")
#'
#' # Trace chat interactions in the session
#' response1 <- lf_trace_chat_session(
#'   chat = chat,
#'   message = "What are the main features of R?",
#'   session_id = session_id,
#'   langfuse_client = lf_client
#' )
#'
#' response2 <- lf_trace_chat_session(
#'   chat = chat,
#'   message = "Tell me more about data visualization in R",
#'   session_id = session_id,
#'   langfuse_client = lf_client
#' )
#' }
lf_trace_chat_session <- function(chat, message, session_id, langfuse_client, user_id = NULL, metadata = NULL) {
  if (is.null(langfuse_client) || !isTRUE(langfuse_client$enabled)) {
    return(chat$chat(message))
  }
 
  # Create trace with session ID
  trace_event <- lf_create_trace_event(
    name = "chat_interaction",
    user_id = user_id,
    input = message,
    metadata = metadata
  )
 
  # Add session ID to trace
  trace_event$body$sessionId <- session_id
 
  # Get trace ID
  trace_id <- trace_event$body$id
 
  # Start timing
  start_time <- Sys.time()
 
  # Start generation
  gen_event <- lf_create_generation_event(
    trace_id = trace_id,
    model = chat$get_model(),
    prompt = message,
    metadata = list(
      provider = class(chat$provider)[1],
      start_time = format(start_time, "%Y-%m-%dT%H:%M:%OS6Z")
    )
  )
 
  # Send events to Langfuse
  lf_ingestion(list(trace_event, gen_event), langfuse_client)
 
  # Call the chat function
  result <- tryCatch({
    chat$chat(message)
  }, error = function(e) {
    # Log error
    error_gen_event <- lf_create_generation_event(
      trace_id = trace_id,
      model = chat$get_model(),
      prompt = message,
      metadata = list(
        error = conditionMessage(e),
        duration_ms = as.numeric(difftime(Sys.time(), start_time, units = "secs")) * 1000
      )
    )
    lf_ingestion(list(error_gen_event), langfuse_client)
    stop(e)
  })
 
  # End timing
  end_time <- Sys.time()
  duration_ms <- as.numeric(difftime(end_time, start_time, units = "secs")) * 1000
 
  # Get the completion text
  completion <- chat$last_turn()@text
 
  # Send completion
  completion_gen_event <- lf_create_generation_event(
    trace_id = trace_id,
    model = chat$get_model(),
    prompt = message,
    completion = completion,
    metadata = list(
      duration_ms = duration_ms
    )
  )
 
  # Update trace with output
  update_trace_event <- list(
    id = uuid::UUIDgenerate(),
    timestamp = format(end_time, "%Y-%m-%dT%H:%M:%OS6Z"),
    type = "trace-update",
    body = list(
      id = trace_id,
      output = completion
    )
  )
 
  # Send events to Langfuse
  lf_ingestion(list(completion_gen_event, update_trace_event), langfuse_client)
 
  # Store trace ID and session ID as attributes
  attr(result, "trace_id") <- trace_id
  attr(result, "session_id") <- session_id
 
  return(result)
}

Example

Example Usage:

#  ------------------------------------------------------------------------
#
# Title : Langfuse Workflow Example
#    By : Jimmy Briggs
#  Date : 2025-03-12
#
#  ------------------------------------------------------------------------
 
library(R6)
library(ellmer)
library(httr2)
library(uuid)
library(cli)
 
pkgload::load_all()
 
langfuse_config <- config::get("langfuse")
 
Sys.setenv("LANGFUSE_SECRET_KEY" = langfuse_config$secret_key)
Sys.setenv("LANGFUSE_PUBLIC_KEY" = langfuse_config$public_key)
 
# Initialize a Langfuse client
lf_client <- lf_client(
  secret_key = Sys.getenv("LANGFUSE_SECRET_KEY"),
  public_key = Sys.getenv("LANGFUSE_PUBLIC_KEY")
)
 
# Create a regular ellmer chat
chat <- ellmer::chat_openai(
  model = "gpt-5",
  system_prompt = "You are a helpful assistant."
)
 
# Create a new session
session_id <- lf_create_session(
  name = "User Learning Session",
  user_id = "user-123",
  metadata = list(
    app_version = "1.0.0",
    session_type = "tutorial"
  ),
  langfuse_client = lf_client
)
 
# Have a conversation in a session
result1 <- lf_trace_chat_session(
  chat = chat,
  message = "What are the main features of R?",
  session_id = session_id,
  langfuse_client = lf_client,
  user_id = "user-123"
)
 
# Continue the conversation
result2 <- lf_trace_chat_session(
  chat = chat,
  message = "Can you explain more about data visualization in R?",
  session_id = session_id,
  langfuse_client = lf_client,
  user_id = "user-123"
)
 
# Add some observations about the conversation
lf_create_observation(
  trace_id = attr(result2, "trace_id"),
  type = "analysis",
  input = "User is asking follow-up questions about R visualization",
  output = "User seems engaged and interested in data visualization",
  metadata = list(
    topic = "data_visualization",
    expertise_level = "beginner"
  ),
  langfuse_client = lf_client
)
 
# Add evaluation for the quality of response
lf_create_evaluation(
  trace_id = attr(result2, "trace_id"),
  name = "response_quality",
  value = 0.95,
  comment = "Excellent explanation with good examples",
  metadata = list(
    evaluated_by = "system"
  ),
  langfuse_client = lf_client
)
 
# Add a text prompt to Langfuse for reuse
text_prompt_id <- lf_add_prompt(
  name = "r_tutorial_text",
  prompt = "You are a helpful R programming tutor. Explain {{topic}} in simple terms with examples.",
  type = "text",
  langfuse_client = lf_client
)
 
# Add a chat-style prompt to Langfuse
chat_prompt_id <- lf_add_prompt(
  name = "r_tutorial_chat",
  prompt = list(
    list(role = "system", content = "You are a helpful R programming tutor."),
    list(role = "user", content = "Explain {{topic}} in simple terms with examples.")
  ),
  type = "chat",
  config = list(model = "gpt-4o", temperature = 0.7),
  labels = c("tutorial", "r-programming"),
  langfuse_client = lf_client
)
 
# Create a dataset for testing
dataset_id <- lf_add_dataset(
  name = "R Programming Questions",
  description = "Common questions about R programming for testing",
  metadata = list(
    created_by = "noclocksai",
    category = "programming"
  ),
  langfuse_client = lf_client
)
 
# Add items to the dataset
lf_add_dataset_item(
  dataset_name = "R Programming Questions",
  input = "What are the differences between a list and a data frame in R?",
  expected_output = "A detailed comparison of lists and data frames",
  metadata = list(
    category = "data_structures",
    difficulty = "beginner"
  ),
  langfuse_client = lf_client
)
 
lf_add_dataset_item(
  dataset_name = "R Programming Questions",
  input = "How do you create a ggplot2 visualization with multiple layers?",
  expected_output = "Step-by-step explanation of ggplot2 layers",
  metadata = list(
    category = "visualization",
    difficulty = "intermediate"
  ),
  langfuse_client = lf_client
)
 
# Add an item with a custom ID
lf_add_dataset_item(
  dataset_name = "R Programming Questions",
  input = "Explain R's apply family of functions",
  expected_output = "Overview of apply, lapply, sapply, etc.",
  id = "apply-functions",
  metadata = list(
    category = "functions",
    difficulty = "intermediate"
  ),
  langfuse_client = lf_client
)
 
# Retrieve the latest version of a prompt automatically
chat_prompt <- lf_get_prompt(
  prompt_name = "r_tutorial_chat",
  langfuse_client = lf_client
)
 
# Retrieve a specific version of a prompt explicitly
text_prompt_v1 <- lf_get_prompt(
  prompt_name = "r_tutorial_text",
  version = 1,
  langfuse_client = lf_client
)
 
# Run an experiment on the dataset with direct text capture
experiment_results <- lf_run_experiment(
  name = "R Tutor Test",
  dataset_name = "R Programming Questions",
  chat_function = function(input) {
    # Create a fresh chat for each item to avoid state issues
    item_chat <- ellmer::chat_openai(
      model = "gpt-4o",
      system_prompt = "You are a helpful R programming assistant."
    )
 
    # Call the chat function and capture the response directly
    response <- item_chat$chat(input)
 
    # The chat() method returns the text response directly
    return(response)
  },
  langfuse_client = lf_client,
  max_items = 2  # Process maximum 2 items
)
 
# Create a regular ellmer chat
chat <- ellmer::chat_openai(
  model = "gpt-4o",
  system_prompt = "You are a helpful assistant."
)
 
# Use the chat with tracing
result <- lf_trace_chat(
  chat = chat,
  message = "What are the main features of R?",
  langfuse_client = lf_client,
  user_id = "user-123",
  metadata = list(
    session_type = "learning",
    client_version = "1.0.0"
  )
)
 
# Display the result
print(result)
 
# Add feedback
lf_add_feedback(
  result = result,
  score = 0.9,
  comment = "Very helpful response",
  langfuse_client = lf_client
)

Details


Appendix

Note created on 2025-12-24 and last modified on 2025-12-24.

See Also


(c) No Clocks, LLC | 2025