The processColumn API
The pframes.processColumn function is a powerful and advanced utility in the Workflow SDK for performing mapping and aggregation operations on a p-column. It allows you to iterate over a p-column's data, apply custom logic for each entry (or group of entries) via a separate Tengo template, and generate one or more new p-columns or other artifacts as output.
Key Use Cases
This function is ideal for a wide range of complex tasks, including:
- Parallelizing Tool Execution: A primary use case is processing a file-based p-column in parallel. For example, if you have a p-column where each partition contains FASTQ files for a specific sample, you can use
processColumnto run a bioinformatics tool (like MiXCR or STAR) on each sample's data concurrently. - Complex Aggregation: It can group data by specific axes and then execute a template for each group to calculate aggregate statistics (e.g., sums, means, counts) or perform other complex operations on the grouped data.
- Generating File-Based Artifacts: It can be used to transform data from a p-column into a different format, such as generating a summary TSV report for each group in an aggregated operation.
Function Signature
The basic signature is:
processColumn(input, bodyTpl, outputs, opts)
input: The source p-column to process. This is an object containing the p-column'sspecand a reference to itsdata.bodyTpl: A reference to another Tengo template file that will be executed for each data entry (or group of entries).outputs: An array that defines the structure and specs of the p-columns or other artifacts you want to create.opts: An optional map of settings to control the operation, such as aggregation rules or tracing.
How it Works
For each entry in the input p-column, processColumn executes the bodyTpl template. The bodyTpl receives the value of the entry and can perform any calculation or transformation. When aggregation is used, the bodyTpl receives the entire group of data. The result from each bodyTpl execution is then used to populate the new p-column(s) defined in the outputs array.
Example: Parallelizing MiXCR Analysis
This real-world example from the mixcr-clonotyping-2 block demonstrates how to run the MiXCR tool in parallel for each sample.
1. The Main Workflow (process.tpl.tengo)
The main workflow script calls processColumn.
input: A file-based p-column where each partition contains FASTQ files for a sample.aggregate: Groups the input files by sample using their shared axes (laneandreadIndex).bodyTpl: Themixcr-analyze.tpl.tengotemplate, which will run once for each sample.outputs: Defines the new p-columns to be created from the MiXCR results (e.g., log files, QC reports, and the main.clnsfile).
// In main workflow file: process.tpl.tengo
// ... imports and setup ...
mixcrAnalyzeTpl := assets.importTemplate(":mixcr-analyze")
targetOutputs := [
{ type: "Resource", name: "qc", spec: { /* qc spec */ } },
{ type: "Resource", name: "log", spec: { /* log spec */ } },
{ type: "Resource", name: "clns", spec: { /* clns spec */ } }
// ... more outputs
]
mixcrResults := pframes.processColumn(
// 1. Input: A file-based p-column of FASTQ files
{ spec: inputSpec, data: inputs.inputData },
// 2. Body Template: The template to execute for each group
mixcrAnalyzeTpl,
// 3. Outputs: The artifacts to be generated
targetOutputs,
// 4. Options: Aggregate by sample
{
aggregate: [
{ name: "pl7.app/sequencing/lane", optional: true },
{ name: "pl7.app/sequencing/readIndex", optional: true }
]
}
)
// ...
2. The Body Template (mixcr-analyze.tpl.tengo)
This template is executed for each aggregated group (i.e., for each sample). It receives the sample's data, constructs a command-line call to MiXCR, runs it, and returns the resulting files.
// In body template file: mixcr-analyze.tpl.tengo
self := import("@platforma-sdk/workflow-tengo:tpl")
exec := import("@platforma-sdk/workflow-tengo:exec")
// ... other imports
// Define the outputs this template will produce, matching the 'name' fields
// from the targetOutputs array in the main workflow.
self.defineOutputs("qc", "reports", "log", "clns")
self.body(func(inputs) {
// `inputs.__value__` contains the PColumnData resource for the current group.
inputData := inputs.__value__
// ... logic to get file paths from inputData and determine file names ...
// Build the `mixcr analyze` command
mixcrCmdBuilder := exec.builder().
software(mixcrSw).
arg("analyze").
// ... add species, preset, and other arguments ...
addFile("input_R1.fastq", inputFileR1).
addFile("input_R2.fastq", inputFileR2).
arg("input_{{R}}.fastq").
arg("result"). // output prefix
saveFile("result.qc.json").
saveFile("result.clns")
// Run the command
mixcrCmd := mixcrCmdBuilder.run()
// Return a map where keys match the defined outputs and values
// are the file resources generated by the command.
return {
qc: mixcrCmd.getFile("result.qc.json"),
log: mixcrCmd.getStdoutStream(),
clns: mixcrCmd.getFile("result.clns")
}
})
The processColumn function collects the qc, log, and clns results from each parallel execution and assembles them into the final output p-columns defined in targetOutputs.