Feat: Support re-segmentation (#114)

Co-authored-by: John Wang <takatost@gmail.com>
Co-authored-by: Jyong <718720800@qq.com>
Co-authored-by: 金伟强 <iamjoel007@gmail.com>
This commit is contained in:
KVOJJJin
2023-06-01 23:19:36 +08:00
committed by GitHub
parent f65a3ad1cc
commit c67f626b66
61 changed files with 1166 additions and 759 deletions

View File

@@ -12,7 +12,7 @@ from events.dataset_event import dataset_was_deleted
from events.document_event import document_was_deleted
from extensions.ext_database import db
from models.account import Account
from models.dataset import Dataset, Document, DatasetQuery, DatasetProcessRule, AppDatasetJoin
from models.dataset import Dataset, Document, DatasetQuery, DatasetProcessRule, AppDatasetJoin, DocumentSegment
from models.model import UploadFile
from services.errors.account import NoPermissionError
from services.errors.dataset import DatasetNameDuplicateError
@@ -20,6 +20,7 @@ from services.errors.document import DocumentIndexingError
from services.errors.file import FileNotExistsError
from tasks.deal_dataset_vector_index_task import deal_dataset_vector_index_task
from tasks.document_indexing_task import document_indexing_task
from tasks.document_indexing_update_task import document_indexing_update_task
class DatasetService:
@@ -276,6 +277,14 @@ class DocumentService:
return document
@staticmethod
def get_document_by_id(document_id: str) -> Optional[Document]:
document = db.session.query(Document).filter(
Document.id == document_id
).first()
return document
@staticmethod
def get_document_file_detail(file_id: str):
file_detail = db.session.query(UploadFile). \
@@ -355,8 +364,79 @@ class DocumentService:
if dataset.indexing_technique == 'high_quality':
IndexBuilder.get_default_service_context(dataset.tenant_id)
if 'original_document_id' in document_data and document_data["original_document_id"]:
document = DocumentService.update_document_with_dataset_id(dataset, document_data, account)
else:
# save process rule
if not dataset_process_rule:
process_rule = document_data["process_rule"]
if process_rule["mode"] == "custom":
dataset_process_rule = DatasetProcessRule(
dataset_id=dataset.id,
mode=process_rule["mode"],
rules=json.dumps(process_rule["rules"]),
created_by=account.id
)
elif process_rule["mode"] == "automatic":
dataset_process_rule = DatasetProcessRule(
dataset_id=dataset.id,
mode=process_rule["mode"],
rules=json.dumps(DatasetProcessRule.AUTOMATIC_RULES),
created_by=account.id
)
db.session.add(dataset_process_rule)
db.session.commit()
file_name = ''
data_source_info = {}
if document_data["data_source"]["type"] == "upload_file":
file_id = document_data["data_source"]["info"]
file = db.session.query(UploadFile).filter(
UploadFile.tenant_id == dataset.tenant_id,
UploadFile.id == file_id
).first()
# raise error if file not found
if not file:
raise FileNotExistsError()
file_name = file.name
data_source_info = {
"upload_file_id": file_id,
}
# save document
position = DocumentService.get_documents_position(dataset.id)
document = Document(
tenant_id=dataset.tenant_id,
dataset_id=dataset.id,
position=position,
data_source_type=document_data["data_source"]["type"],
data_source_info=json.dumps(data_source_info),
dataset_process_rule_id=dataset_process_rule.id,
batch=time.strftime('%Y%m%d%H%M%S') + str(random.randint(100000, 999999)),
name=file_name,
created_from=created_from,
created_by=account.id,
# created_api_request_id = db.Column(UUID, nullable=True)
)
db.session.add(document)
db.session.commit()
# trigger async task
document_indexing_task.delay(document.dataset_id, document.id)
return document
@staticmethod
def update_document_with_dataset_id(dataset: Dataset, document_data: dict,
account: Account, dataset_process_rule: Optional[DatasetProcessRule] = None,
created_from: str = 'web'):
document = DocumentService.get_document(dataset.id, document_data["original_document_id"])
if document.display_status != 'available':
raise ValueError("Document is not available")
# save process rule
if not dataset_process_rule:
if 'process_rule' in document_data and document_data['process_rule']:
process_rule = document_data["process_rule"]
if process_rule["mode"] == "custom":
dataset_process_rule = DatasetProcessRule(
@@ -374,46 +454,48 @@ class DocumentService:
)
db.session.add(dataset_process_rule)
db.session.commit()
document.dataset_process_rule_id = dataset_process_rule.id
# update document data source
if 'data_source' in document_data and document_data['data_source']:
file_name = ''
data_source_info = {}
if document_data["data_source"]["type"] == "upload_file":
file_id = document_data["data_source"]["info"]
file = db.session.query(UploadFile).filter(
UploadFile.tenant_id == dataset.tenant_id,
UploadFile.id == file_id
).first()
file_name = ''
data_source_info = {}
if document_data["data_source"]["type"] == "upload_file":
file_id = document_data["data_source"]["info"]
file = db.session.query(UploadFile).filter(
UploadFile.tenant_id == dataset.tenant_id,
UploadFile.id == file_id
).first()
# raise error if file not found
if not file:
raise FileNotExistsError()
file_name = file.name
data_source_info = {
"upload_file_id": file_id,
}
# save document
position = DocumentService.get_documents_position(dataset.id)
document = Document(
tenant_id=dataset.tenant_id,
dataset_id=dataset.id,
position=position,
data_source_type=document_data["data_source"]["type"],
data_source_info=json.dumps(data_source_info),
dataset_process_rule_id=dataset_process_rule.id,
batch=time.strftime('%Y%m%d%H%M%S') + str(random.randint(100000, 999999)),
name=file_name,
created_from=created_from,
created_by=account.id,
# created_api_request_id = db.Column(UUID, nullable=True)
)
# raise error if file not found
if not file:
raise FileNotExistsError()
file_name = file.name
data_source_info = {
"upload_file_id": file_id,
}
document.data_source_type = document_data["data_source"]["type"]
document.data_source_info = json.dumps(data_source_info)
document.name = file_name
# update document to be waiting
document.indexing_status = 'waiting'
document.completed_at = None
document.processing_started_at = None
document.parsing_completed_at = None
document.cleaning_completed_at = None
document.splitting_completed_at = None
document.updated_at = datetime.datetime.utcnow()
document.created_from = created_from
db.session.add(document)
db.session.commit()
# update document segment
update_params = {
DocumentSegment.status: 're_segment'
}
DocumentSegment.query.filter_by(document_id=document.id).update(update_params)
db.session.commit()
# trigger async task
document_indexing_task.delay(document.dataset_id, document.id)
document_indexing_update_task.delay(document.dataset_id, document.id)
return document
@@ -443,6 +525,21 @@ class DocumentService:
@classmethod
def document_create_args_validate(cls, args: dict):
if 'original_document_id' not in args or not args['original_document_id']:
DocumentService.data_source_args_validate(args)
DocumentService.process_rule_args_validate(args)
else:
if ('data_source' not in args and not args['data_source'])\
and ('process_rule' not in args and not args['process_rule']):
raise ValueError("Data source or Process rule is required")
else:
if 'data_source' in args and args['data_source']:
DocumentService.data_source_args_validate(args)
if 'process_rule' in args and args['process_rule']:
DocumentService.process_rule_args_validate(args)
@classmethod
def data_source_args_validate(cls, args: dict):
if 'data_source' not in args or not args['data_source']:
raise ValueError("Data source is required")
@@ -459,6 +556,8 @@ class DocumentService:
if 'info' not in args['data_source'] or not args['data_source']['info']:
raise ValueError("Data source info is required")
@classmethod
def process_rule_args_validate(cls, args: dict):
if 'process_rule' not in args or not args['process_rule']:
raise ValueError("Process rule is required")