Writing custom extensions

cohortBuilder package is adapted to work with various data sources and custom backends. Currently there exists one official extension cohortBuilder.db package that allows you to use cohortBuilder with database connections.

The goal of this document is to explain how to create custom extensions to cohortBuilder.

In general to create the custom layer you need to:

  1. Define set of S3 methods operating on your custom source.
  2. Define selected filters (see vignette("custom-filters")).

It’s recommended to include all of the methods in your custom R package.

Before you start creating a new layer, you need to choose what data (connection) should your layer operate on.

For example, cohortBuilder uses tblist class object to operate on list of data frames , or db class for operating on database connections.

To start with create a function that will take required parameters to define data connection, such as tblist or dbtables in case of cohortBuilder.db. The function should return an object of selected class which is used to define required extension methods.

Below we describe all the required and optional methods you need to define within the created package.

  1. set_source - method used for defining a new source

Required parameters:

Details:

Example:

set_source.tblist <- function(dtconn, primary_keys = NULL, binding_keys = NULL, 
                           source_code = NULL, description = NULL, ...) {
  Source$new(
    dtconn, primary_keys = primary_keys,
    binding_keys = binding_keys, source_code = source_code,
    ...
  )
}
  1. .init_source - structure data passed between filtering steps

Required parameters:

Details:

Examples:

.init_step.tblist <- function(source, ...) {
  source$dtconn
}

cohortBuilder.db operates on db class object which is list of connection, tables and schema fields.

.init_step.db <- function(source) {
  purrr::map(
    stats::setNames(source$dtconn$tables, source$dtconn$tables),
    function(table) {
      tbl_conn <- dplyr::tbl(
        source$dtconn$connection,
        dbplyr::in_schema(source$dtconn$schema, table)
      )
      attr(tbl_conn, "tbl_name") <- table
      tbl_conn
    }
  )
}
  1. .pre_filtering (optional) - modify data object before filtering

Required parameters:

Details:

Examples:

.pre_filtering.tblist <- function(source, data_object, step_id) {
  for (dataset in names(data_object)) {
    attr(data_object[[dataset]], "filtered") <- FALSE
  }
  return(data_object)
}
.pre_filtering.db <- function(source, data_object, step_id) {
  purrr::map(
    stats::setNames(source$dtconn$tables, source$dtconn$tables),
    function(table) {
      table_name <- tmp_table_name(attr(data_object[[table]], "tbl_name"), step_id)
      DBI::dbRemoveTable(source$dtconn$connection, table_name, temporary = TRUE, fail_if_missing = FALSE)
      data_object[[table]] <- dplyr::compute(
        data_object[[table]],
        name = table_name
      )
      attr(data_object[[table]], "filtered") <- FALSE
      return(data_object[[table]])
    }
  )
}
  1. .post_filtering (optional) - data object modification after filtering (before running binding).

Required parameters:

  1. .collect_data - define how to collect data object into R.

Required parameters:

Details:

Examples:

.collect_data.tblist <- function(source, data_object) {
  data_object
}
.collect_data.db <- function(source, data_object) {
  purrr::map(
    stats::setNames(source$dtconn$tables, source$dtconn$tables),
    ~dplyr::collect(data_object[[.x]])
  )
}
  1. .get_stats - collect data object stats

Required parameters:

Details:

Examples:

.get_stats.tblist <- function(source, data_object) {
  dataset_names <- names(source$dtconn)
  dataset_names %>%
    purrr::map(
      ~ list(n_rows = nrow(data_object[[.x]]))
    ) %>%
    stats::setNames(dataset_names)
}
.get_stats.db <- function(source, data_object) {
  dataset_names <- source$dtconn$tables
  dataset_names %>%
    purrr::map(
      ~ list(
        n_rows = data_object[[.x]] %>%
          dplyr::summarise(n = n()) %>%
          dplyr::collect() %>%
          dplyr::pull(n) %>%
          as.integer()
      )
    ) %>%
    stats::setNames(dataset_names)
}
  1. .run_binding - method defining how binding should be handled

Required parameters:

Details:

Examples:

.run_binding.tblist <- function(source, binding_key, data_object_pre, data_object_post, ...) {
  binding_dataset <- binding_key$update$dataset
  dependent_datasets <- names(binding_key$data_keys)
  active_datasets <- data_object_post %>%
    purrr::keep(~ attr(., "filtered")) %>%
    names()

  if (!any(dependent_datasets %in% active_datasets)) {
    return(data_object_post)
  }

  key_values <- NULL
  common_key_names <- paste0("key_", seq_along(binding_key$data_keys[[1]]$key))
  for (dependent_dataset in dependent_datasets) {
    key_names <- binding_key$data_keys[[dependent_dataset]]$key
    tmp_key_values <- dplyr::distinct(data_object_post[[dependent_dataset]][, key_names, drop = FALSE]) %>%
      stats::setNames(common_key_names)
    if (is.null(key_values)) {
      key_values <- tmp_key_values
    } else {
      key_values <- dplyr::inner_join(key_values, tmp_key_values, by = common_key_names)
    }
  }

  data_object_post[[binding_dataset]] <- dplyr::inner_join(
    switch(
      as.character(binding_key$post),
      "FALSE" = data_object_pre[[binding_dataset]],
      "TRUE" = data_object_post[[binding_dataset]]
    ),
    key_values,
    by = stats::setNames(common_key_names, binding_key$update$key)
  )
  if (binding_key$activate) {
    attr(data_object_post[[binding_dataset]], "filtered") <- TRUE
  }

  return(data_object_post)
}
  1. .get_attrition_count - define how to get metric used for attrition data plot

Required parameters:

Details:

Examples:

.get_attrition_count.tblist <- function(source, data_stats, dataset, ...) {
  data_stats %>%
    purrr::map_int(~.[[dataset]][["n_rows"]])
}
  1. .get_attrition_label - define label displayed in attrition plot for the specified step

Required parameters:

Details:

Examples:

get_attrition_label.tblist <- function(source, step_id, step_filters, dataset, ...) {
  pkey <- source$primary_keys
  binding_keys <- source$binding_keys
  if (step_id == "0") {
    if (is.null(pkey)) {
      return(dataset)
    } else {
      dataset_pkey <- .get_item(pkey, "dataset", dataset)[1][[1]]$key
      if (is.null(dataset_pkey)) return(dataset)
      return(glue::glue("{dataset}\n primary key: {paste(dataset_pkey, collapse = ', ')}"))
    }
  }
  filters_section <- step_filters %>%
    purrr::keep(~.$dataset == dataset) %>%
    purrr::map(~get_attrition_filter_label(.$name, .$value_name, .$value)) %>%
    paste(collapse = "\n")
  bind_keys_section <- ""
  if (!is.null(binding_keys)) {
    dependent_datasets <- .get_item(
      binding_keys, attribute = "update", value = dataset,
      operator = function(value, target) {
        value == target$dataset
      }
    ) %>%
      purrr::map(~names(.[["data_keys"]])) %>%
      unlist() %>%
      unique()
    if (length(dependent_datasets) > 0) {
      bind_keys_section <- glue::glue(
        "\nData linked with external datasets: {paste(dependent_datasets, collapse = ', ')}",
        .trim = FALSE
      )
    }
  }
  gsub(
    "\n$",
    "",
    glue::glue("Step: {step_id}\n{filters_section}{bind_keys_section}")
  )
}