我正在使用 Django 和 Celery 开发一个项目,该项目需要大文件并分多个步骤处理它们。 该文件的主要对象称为排序,每个文件只创建一个。
现在,对于每个排序,我们都会创建许多样本、变体等。其中一些已经在数据库中,因此我们使用 get 或 create。 在处理完每个 Sequencing 之后,我希望将创建的对象数量和重用对象的数量写入 Sequencing 对象中称为元数据的 json 字段中,但由于某些进程异步运行,因此无法实现。
部分代码摘录:
models.py:
class Sequencing(...):
metadata = models.JSONField(
default=dict,
blank=True,
)
def log(self, level: str, msg: str | list, *args, **kwargs) -> None:
"""Records messages for users in metadata."""
if "log" not in self.metadata:
self.metadata["log"] = []
self.metadata["log"].append(
{
"date": datetime.datetime.utcnow().isoformat(),
"level": level,
"msg": msg,
}
)
self.save()
def load(self) -> None:
chain(
...
sequencing_load_variants.si(self.id),
...
)
def load_variants(self) -> None:
...
for contig in contigs:
sequencing_load_variants_by_contig.delay(self.pk, contig=contig)
def load_variants_by_contig(self, contig: str) -> None:
# create samples
# create variants
....
self.log(
"info",
[f"Variants created for region {region.seqid}: {log_dict['variants_create']}"]
+ [f"Variants found for region {region.seqid}: {log_dict['variants_get']}"]
+ [f"Samplerecords created for region {region.seqid}: {log_dict['samplerecords_create']}"]
+ ["Samplerecords found for region {region.seqid}: {log_dict['samplerecords_get']}"],
)
}
In tasks.py:
@shared_task(bind=True, queue="computation")
def sequencing_load_variants(self, sequencing_pk: int) -> None:
from sequencings.models import Sequencing
sequencing_obj = Sequencing.objects.get(pk=sequencing_pk)
sequencing_obj.load_variants()
@shared_task(bind=True, queue="computation")
def sequencing_load_variants_by_contig(self, sequencing_pk: int, contig: str) -> None:
from sequencings.models import Sequencing
sequencing_obj = Sequencing.objects.get(pk=sequencing_pk)
sequencing_obj.load_variants_by_contig(contig=contig)
保存排序后,元数据字段保存部分区域的信息,但不是全部。 有时我只能看到一个区域的日志,有时是 2 个区域,但也不总是相同的区域(我有区域 8、区域 19 和 22、区域 9 和 22 等的日志)。
我认为当异步函数“load_variants_by_contig”试图调用日志函数保存方法时会发生冲突,但这是我的理解停止的地方。 我该如何分析、调试和解决这个问题?