将异步进程的结果记录到数据库字段

3周前#Python

我正在使用 Django 和 Celery 开发一个项目,该项目需要大文件并分多个步骤处理它们。 该文件的主要对象称为排序,每个文件只创建一个。

现在,对于每个排序,我们都会创建许多样本、变体等。其中一些已经在数据库中,因此我们使用 get 或 create。 在处理完每个 Sequencing 之后,我希望将创建的对象数量和重用对象的数量写入 Sequencing 对象中称为元数据的 json 字段中,但由于某些进程异步运行,因此无法实现。

部分代码摘录:

models.py:

class Sequencing(...):
    metadata = models.JSONField(
        default=dict,
        blank=True,
    )

    def log(self, level: str, msg: str | list, *args, **kwargs) -> None:
        """Records messages for users in metadata."""
        if "log" not in self.metadata:
            self.metadata["log"] = []
        self.metadata["log"].append(
            {
                "date": datetime.datetime.utcnow().isoformat(),
                "level": level,
                "msg": msg,
            }
        )
        self.save()

    def load(self) -> None:
        chain(
            ...
            sequencing_load_variants.si(self.id),
            ...
        )

    def load_variants(self) -> None:
        ...
        for contig in contigs:
            sequencing_load_variants_by_contig.delay(self.pk, contig=contig)

    def load_variants_by_contig(self, contig: str) -> None:
        # create samples
        # create variants

        ....
        self.log(
            "info",
            [f"Variants created for region {region.seqid}: {log_dict['variants_create']}"]
            + [f"Variants found for region {region.seqid}: {log_dict['variants_get']}"]
            + [f"Samplerecords created for region {region.seqid}: {log_dict['samplerecords_create']}"]
            + ["Samplerecords found for region {region.seqid}: {log_dict['samplerecords_get']}"],
        )
}

In tasks.py:

@shared_task(bind=True, queue="computation")
def sequencing_load_variants(self, sequencing_pk: int) -> None:
    from sequencings.models import Sequencing

    sequencing_obj = Sequencing.objects.get(pk=sequencing_pk)
    sequencing_obj.load_variants()

@shared_task(bind=True, queue="computation")
def sequencing_load_variants_by_contig(self, sequencing_pk: int, contig: str) -> None:
    from sequencings.models import Sequencing

    sequencing_obj = Sequencing.objects.get(pk=sequencing_pk)
    sequencing_obj.load_variants_by_contig(contig=contig)

保存排序后,元数据字段保存部分区域的信息,但不是全部。 有时我只能看到一个区域的日志,有时是 2 个区域,但也不总是相同的区域(我有区域 8、区域 19 和 22、区域 9 和 22 等的日志)。

我认为当异步函数“load_variants_by_contig”试图调用日志函数保存方法时会发生冲突,但这是我的理解停止的地方。 我该如何分析、调试和解决这个问题?

回答
暂无回答