Scientific Data Pipelines Framework - DataJoint
Data Stewards Roundtable
2026-01-27
The Problem: Scientific Data Pipelines
Problem
Raw Data
Raw Data
Process\nStep 1
Process
Step 1
Raw Data->Process\nStep 1
Process\nStep 2
Process
Step 2
Process\nStep 1->Process\nStep 2
Process\nStep 3
Process
Step 3
Process\nStep 2->Process\nStep 3
Analysis
Analysis
Process\nStep 3->Analysis
The Problem: Challenges
Manual dependency tracking
Hard to reproduce results
Lost provenance information
Difficult to scale workflows
Error-prone manual steps
The Problem: Typical Workflow
# Step 1: Check if raw data exists
if not os.path.exists('raw_data.h5' ):
raise Error("Missing raw data" )
# Step 2: Check if step1 processed
if not os.path.exists('step1_result.npy' ):
process_step1()
# Step 3: Check if step2 processed
if not os.path.exists('step2_result.npy' ):
process_step2() # But did step1 run?
# Step 4: Manual tracking
# Which version? Which parameters?
What is DataJoint?
Database schema = Executable workflow specification
Image source: docs.datajoint.com
Relational Workflow Model
Tables = Workflow steps
Foreign keys = Dependencies
Schema = Executable specification
Table Tiers
Tiers
Manual
Manual
#Lookup
#Lookup
Manual->#Lookup
_Imported
_Imported
#Lookup->_Imported
__Computed
__Computed
_Imported->__Computed
Table Structure
class Subject(dj.Manual):
definition = """
subject_id: int
---
subject_name: varchar(100)
species: varchar(50)
"""
Manual Tables
class Session(dj.Manual):
definition = """
-> Subject # Foreign key
session_id: int # Primary key
---
session_date: date # Secondary attributes
notes: varchar(500)
"""
Lookup Tables
class #StimulusType(dj.Lookup):
definition = """
stimulus_type: varchar(20)
---
description: varchar(200)
"""
contents = [
('visual' , 'Visual stimulus' ),
('auditory' , 'Auditory stimulus' )
]
Imported Tables
class _RawData(dj.Imported):
definition = """
-> Session
---
file_path: varchar(255)
data: longblob
"""
def make(self , key):
data = load_file(key['file_path' ])
self .insert1({** key, 'data' : data})
Computed Tables
class __Processed(dj.Computed):
definition = """
-> _RawData
---
processed_data: longblob
computed_at: datetime
"""
def make(self , key):
raw = (_RawData & key).fetch1()
processed = process(raw['data' ])
self .insert1({** key, 'processed_data' : processed})
Master-Part Tables
class Session(dj.Manual):
definition = """
-> Subject
session_id: int
---
session_date: date
"""
class Trial(dj.Part):
definition = """
-> Session
trial_id: int
---
trial_start: float
trial_end: float
"""
Complete Pipeline Example
Tables from code examples:
Tables
StimulusType
#StimulusType
[Lookup]
Subject
Subject
[Manual]
Session
Session
[Manual]
Subject->Session
-> Subject
SessionTrial
Session.Trial
[Part]
Session->SessionTrial
Part of
RawData
_RawData
[Imported]
Session->RawData
-> Session
Processed
__Processed
[Computed]
RawData->Processed
-> _RawData
Foreign Keys & Dependencies
Dependencies
Subject
Subject
Session
Session
Subject->Session
_RawData
_RawData
Session->_RawData
__Processed
__Processed
_RawData->__Processed
Query Operations
# Restrict - filter rows matching conditions
Session & {'subject_id' : 1 } # Only sessions for subject 1
# Join - combine tables
Session * Trial
# Project - select specific attributes
Subject.proj('subject_name' )
Auto-Population
# Automatically compute missing data
__Processed.populate()
# Check progress
__Processed.progress()
Architecture
Architecture
Python\nSchema
Python
Schema
DataJoint\nFramework
DataJoint
Framework
Python\nSchema->DataJoint\nFramework
SQL Database
SQL Database
DataJoint\nFramework->SQL Database
Object Storage
Object Storage
DataJoint\nFramework->Object Storage
Real-World Example: Ephys Pipeline
Element Array Electrophysiology - Neuropixels analysis with Kilosort
EphysPipeline
#Probe
#Probe
EphysSession
EphysSession
#Probe->EphysSession
Subject
Subject
Subject->EphysSession
EphysSession.Channel
EphysSession.Channel
EphysSession->EphysSession.Channel
_RawEphys
_RawEphys
EphysSession->_RawEphys
__SpikeSorting
__SpikeSorting
_RawEphys->__SpikeSorting
__QualityMetrics
__QualityMetrics
__SpikeSorting->__QualityMetrics
Ephys Pipeline Resources
📦 GitHub Repository: https://github.com/datajoint/element-array-ephys
📖 Documentation: https://docs.datajoint.com/elements/element-array-ephys/
Alternatives: Raw SQL
conn = mysql.connector.connect (...)
cursor = conn.cursor()
cursor.execute("SELECT * FROM raw_data WHERE processed=0" )
raw_data = cursor.fetchall()
for row in raw_data:
result = process(row['data' ])
cursor.execute("INSERT INTO processed ..." )
Alternatives: SQLAlchemy
# ORM for data access
class Session(Base):
__tablename__ = 'sessions'
id = Column(Integer, primary_key= True )
subject_id = Column(Integer, ForeignKey('subjects.id' ))
DataJoint Limitations
Limitations:
Schema-driven (less flexible)
Learning curve for new concepts
Not designed for web apps
Domain-specific (scientific focus)