feat: Add W&B Weave Tracing Integration (#14262)
Signed-off-by: Yuichiro Utsumi <utsumi.yuichiro@fujitsu.com> Signed-off-by: -LAN- <laipz8200@outlook.com> Signed-off-by: yihong0618 <zouzou0208@gmail.com> Signed-off-by: kenwoodjw <blackxin55+@gmail.com> Signed-off-by: ChengZi <chen.zhang@zilliz.com> Signed-off-by: cl <cailue@apache.org> Co-authored-by: Yu Chun Chang <changyuchun159630@gmail.com> Co-authored-by: Kyle Chang <kylechang@91app.com> Co-authored-by: Lick-liu <51771897+Lick-liu@users.noreply.github.com> Co-authored-by: crazywoola <427733928@qq.com> Co-authored-by: Yuichiro Utsumi <81412151+utsumi-fj@users.noreply.github.com> Co-authored-by: NFish <douxc512@gmail.com> Co-authored-by: Yeuoly <45712896+Yeuoly@users.noreply.github.com> Co-authored-by: Wu Tianwei <30284043+WTW0313@users.noreply.github.com> Co-authored-by: DDDDD12138 <43703884+DDDDD12138@users.noreply.github.com> Co-authored-by: Jyong <76649700+JohnJyong@users.noreply.github.com> Co-authored-by: -LAN- <laipz8200@outlook.com> Co-authored-by: Novice <857526207@qq.com> Co-authored-by: yihong <zouzou0208@gmail.com> Co-authored-by: Kalo Chin <91766386+fdb02983rhy@users.noreply.github.com> Co-authored-by: zxhlyh <jasonapring2015@outlook.com> Co-authored-by: jiangbo721 <365065261@qq.com> Co-authored-by: 刘江波 <jiangbo721@163.com> Co-authored-by: Lam <scau_ljw@126.com> Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> Co-authored-by: Mars <524574386@qq.com> Co-authored-by: mars <linjx2@by-health.com> Co-authored-by: Joe <79627742+ZhouhaoJiang@users.noreply.github.com> Co-authored-by: Rafael Carvalho <r.carvalho@me.com> Co-authored-by: Joel <iamjoel007@gmail.com> Co-authored-by: 非法操作 <hjlarry@163.com> Co-authored-by: kenwoodjw <blackxin55+@gmail.com> Co-authored-by: codingjaguar <codingjaguar@gmail.com> Co-authored-by: ChengZi <chen.zhang@zilliz.com> Co-authored-by: Fei He <droxer.he@gmail.com> Co-authored-by: Arcaner <52057416+lrhan321@users.noreply.github.com> Co-authored-by: Xiyuan Chen <52963600+GareArc@users.noreply.github.com> Co-authored-by: KVOJJJin <jzongcode@gmail.com> Co-authored-by: XiaoBa <94062266+XiaoBa-Yu@users.noreply.github.com> Co-authored-by: Xiaoba Yu <xb1823725853@gmail.com> Co-authored-by: zhangyuhang <2827528315@qq.com> Co-authored-by: yuhang2.zhang <yuhang2.zhang@ly.com> Co-authored-by: 诗浓 <nyaashino@gmail.com> Co-authored-by: RookieAgent <42060616+Sakura4036@users.noreply.github.com> Co-authored-by: sho-takano-dev <shota.takano.dev@gmail.com> Co-authored-by: 過世秋風 <1040926235@qq.com> Co-authored-by: Yi Feng <66539215+bigyifeng@users.noreply.github.com> Co-authored-by: QuantumGhost <obelisk.reg+git@gmail.com> Co-authored-by: Yongtao Huang <99629139+hyongtao-db@users.noreply.github.com> Co-authored-by: ShadowJobs <794878115@qq.com> Co-authored-by: LinYing <linying@momenta.ai> Co-authored-by: Benjamin <benjaminx@gmail.com> Co-authored-by: LiuBodong <liubodong2010@126.com> Co-authored-by: huangzhuo1949 <167434202+huangzhuo1949@users.noreply.github.com> Co-authored-by: huangzhuo <huangzhuo1@xiaomi.com> Co-authored-by: csurong <csurong1@gmail.com> Co-authored-by: 傻笑zz <43721571+shaxiaozz@users.noreply.github.com> Co-authored-by: L8ng <straydragonl@foxmail.com> Co-authored-by: Bowen Liang <liangbowen@gf.com.cn> Co-authored-by: Novice Lee <novicelee@NoviPro.local> Co-authored-by: GuanMu <ballmanjq@gmail.com> Co-authored-by: LittleFish-15 <58618983+LittleFish-15@users.noreply.github.com> Co-authored-by: 诗浓 <844670992@qq.com> Co-authored-by: luckylhb90 <luckylhb90@gmail.com> Co-authored-by: hobo.l <hobo.l@binance.com> Co-authored-by: Gen Sato <52241300+halogen22@users.noreply.github.com> Co-authored-by: twwu <twwu@dify.ai> Co-authored-by: StoneFancyX <53338920+StoneFancyX@users.noreply.github.com> Co-authored-by: StoneFancyX <kindbin@qq.com> Co-authored-by: Naoki KOBAYASHI <naotama@gmail.com> Co-authored-by: kurokobo <kuro664@gmail.com> Co-authored-by: cyflhn <cyflhn@163.com> Co-authored-by: Yingchun Lai <laiyingchun@apache.org> Co-authored-by: jimmyfen <757343258@qq.com> Co-authored-by: Xuetao Song <xuetaomagicsong@gmail.com> Co-authored-by: Panpan <wurui.dev@gmail.com> Co-authored-by: wyy-holding <59436937+wyy-holding@users.noreply.github.com> Co-authored-by: リイノ Lin <sorphwer@gmail.com> Co-authored-by: Ning <accelerator314@gmail.com> Co-authored-by: Linh Nguyen <55907715+batman0911@users.noreply.github.com> Co-authored-by: Junjie.M <118170653@qq.com> Co-authored-by: Ron <svcvit@gmail.com> Co-authored-by: Novice <novice12185727@gmail.com> Co-authored-by: NanoNova <kid1412621@gmail.com> Co-authored-by: JaydenZhou <380774082@qq.com> Co-authored-by: dotdotdot <823150982@qq.com> Co-authored-by: Good Wood <slm_1990@126.com> Co-authored-by: Ryosei Karaki <38310693+karamaru-alpha@users.noreply.github.com> Co-authored-by: chenhuan0728 <54611342+chenhuan0728@users.noreply.github.com> Co-authored-by: chenhuan <huan.chen0728@foxmail> Co-authored-by: lenbo <islenbo@qq.com> Co-authored-by: Jiang <65766008+AlwaysBluer@users.noreply.github.com> Co-authored-by: jiangzhijie <jiangzhijie.jzj@alibaba-inc.com> Co-authored-by: Yongtao Huang <yongtaoh2022@gmail.com> Co-authored-by: zhangkun-21 <sephiroth0932@gmail.com> Co-authored-by: hsiong <37357447+hsiong@users.noreply.github.com> Co-authored-by: 李远军 <4842@9ji.com> Co-authored-by: yourchanges <yourchanges@gmail.com> Co-authored-by: David <guyuezhuying@126.com> Co-authored-by: liuzhenghua <1090179900@qq.com> Co-authored-by: taokuizu <taokuizu@qq.com> Co-authored-by: Hanqing Zhao <sherry9277@gmail.com> Co-authored-by: JimintheBox <gjwlals111@gmail.com> Co-authored-by: wlleiiwang <1025164922@qq.com> Co-authored-by: wlleiiwang <wlleiiwang@tencent.com> Co-authored-by: Alex <32982705+AlexYuan997@users.noreply.github.com> Co-authored-by: yuanlong <yuanlong@boco.com.cn> Co-authored-by: wanttobeamaster <45583625+wanttobeamaster@users.noreply.github.com> Co-authored-by: xiaozhiqing.xzq <xiaozhiqing.xzq@alibaba-inc.com> Co-authored-by: Chenhe Gu <guchenhe@gmail.com> Co-authored-by: tyounami <vkbo@qq.com> Co-authored-by: bo.zhao <bo.zhao@iglooinsure.com> Co-authored-by: ClSlaid <cailue@apache.org> Co-authored-by: adru <106513264+adpanru@users.noreply.github.com> Co-authored-by: horochx <32632779+horochx@users.noreply.github.com>
This commit is contained in:

committed by
GitHub

parent
f6305858a5
commit
0a20210a59
@@ -7,6 +7,7 @@ class TracingProviderEnum(Enum):
|
||||
LANGFUSE = "langfuse"
|
||||
LANGSMITH = "langsmith"
|
||||
OPIK = "opik"
|
||||
WEAVE = "weave"
|
||||
|
||||
|
||||
class BaseTracingConfig(BaseModel):
|
||||
@@ -88,5 +89,26 @@ class OpikConfig(BaseTracingConfig):
|
||||
return v
|
||||
|
||||
|
||||
class WeaveConfig(BaseTracingConfig):
|
||||
"""
|
||||
Model class for Weave tracing config.
|
||||
"""
|
||||
|
||||
api_key: str
|
||||
entity: str | None = None
|
||||
project: str
|
||||
endpoint: str = "https://trace.wandb.ai"
|
||||
|
||||
@field_validator("endpoint")
|
||||
@classmethod
|
||||
def set_value(cls, v, info: ValidationInfo):
|
||||
if v is None or v == "":
|
||||
v = "https://trace.wandb.ai"
|
||||
if not v.startswith("https://"):
|
||||
raise ValueError("endpoint must start with https://")
|
||||
|
||||
return v
|
||||
|
||||
|
||||
OPS_FILE_PATH = "ops_trace/"
|
||||
OPS_TRACE_FAILED_KEY = "FAILED_OPS_TRACE"
|
||||
|
@@ -20,6 +20,7 @@ from core.ops.entities.config_entity import (
|
||||
LangSmithConfig,
|
||||
OpikConfig,
|
||||
TracingProviderEnum,
|
||||
WeaveConfig,
|
||||
)
|
||||
from core.ops.entities.trace_entity import (
|
||||
DatasetRetrievalTraceInfo,
|
||||
@@ -34,7 +35,9 @@ from core.ops.entities.trace_entity import (
|
||||
)
|
||||
from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace
|
||||
from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace
|
||||
from core.ops.opik_trace.opik_trace import OpikDataTrace
|
||||
from core.ops.utils import get_message_data
|
||||
from core.ops.weave_trace.weave_trace import WeaveDataTrace
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_storage import storage
|
||||
from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
|
||||
@@ -43,8 +46,6 @@ from tasks.ops_trace_task import process_trace_tasks
|
||||
|
||||
|
||||
def build_opik_trace_instance(config: OpikConfig):
|
||||
from core.ops.opik_trace.opik_trace import OpikDataTrace
|
||||
|
||||
return OpikDataTrace(config)
|
||||
|
||||
|
||||
@@ -67,6 +68,12 @@ provider_config_map: dict[str, dict[str, Any]] = {
|
||||
"other_keys": ["project", "url", "workspace"],
|
||||
"trace_instance": lambda config: build_opik_trace_instance(config),
|
||||
},
|
||||
TracingProviderEnum.WEAVE.value: {
|
||||
"config_class": WeaveConfig,
|
||||
"secret_keys": ["api_key"],
|
||||
"other_keys": ["project", "entity", "endpoint"],
|
||||
"trace_instance": WeaveDataTrace,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
0
api/core/ops/weave_trace/__init__.py
Normal file
0
api/core/ops/weave_trace/__init__.py
Normal file
0
api/core/ops/weave_trace/entities/__init__.py
Normal file
0
api/core/ops/weave_trace/entities/__init__.py
Normal file
97
api/core/ops/weave_trace/entities/weave_trace_entity.py
Normal file
97
api/core/ops/weave_trace/entities/weave_trace_entity.py
Normal file
@@ -0,0 +1,97 @@
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from pydantic_core.core_schema import ValidationInfo
|
||||
|
||||
from core.ops.utils import replace_text_with_content
|
||||
|
||||
|
||||
class WeaveTokenUsage(BaseModel):
|
||||
input_tokens: Optional[int] = None
|
||||
output_tokens: Optional[int] = None
|
||||
total_tokens: Optional[int] = None
|
||||
|
||||
|
||||
class WeaveMultiModel(BaseModel):
|
||||
file_list: Optional[list[str]] = Field(None, description="List of files")
|
||||
|
||||
|
||||
class WeaveTraceModel(WeaveTokenUsage, WeaveMultiModel):
|
||||
id: str = Field(..., description="ID of the trace")
|
||||
op: str = Field(..., description="Name of the operation")
|
||||
inputs: Optional[Union[str, dict[str, Any], list, None]] = Field(None, description="Inputs of the trace")
|
||||
outputs: Optional[Union[str, dict[str, Any], list, None]] = Field(None, description="Outputs of the trace")
|
||||
attributes: Optional[Union[str, dict[str, Any], list, None]] = Field(
|
||||
None, description="Metadata and attributes associated with trace"
|
||||
)
|
||||
exception: Optional[str] = Field(None, description="Exception message of the trace")
|
||||
|
||||
@field_validator("inputs", "outputs")
|
||||
@classmethod
|
||||
def ensure_dict(cls, v, info: ValidationInfo):
|
||||
field_name = info.field_name
|
||||
values = info.data
|
||||
if v == {} or v is None:
|
||||
return v
|
||||
usage_metadata = {
|
||||
"input_tokens": values.get("input_tokens", 0),
|
||||
"output_tokens": values.get("output_tokens", 0),
|
||||
"total_tokens": values.get("total_tokens", 0),
|
||||
}
|
||||
file_list = values.get("file_list", [])
|
||||
if isinstance(v, str):
|
||||
if field_name == "inputs":
|
||||
return {
|
||||
"messages": {
|
||||
"role": "user",
|
||||
"content": v,
|
||||
"usage_metadata": usage_metadata,
|
||||
"file_list": file_list,
|
||||
},
|
||||
}
|
||||
elif field_name == "outputs":
|
||||
return {
|
||||
"choices": {
|
||||
"role": "ai",
|
||||
"content": v,
|
||||
"usage_metadata": usage_metadata,
|
||||
"file_list": file_list,
|
||||
},
|
||||
}
|
||||
elif isinstance(v, list):
|
||||
data = {}
|
||||
if len(v) > 0 and isinstance(v[0], dict):
|
||||
# rename text to content
|
||||
v = replace_text_with_content(data=v)
|
||||
if field_name == "inputs":
|
||||
data = {
|
||||
"messages": [
|
||||
dict(msg, **{"usage_metadata": usage_metadata, "file_list": file_list}) for msg in v
|
||||
]
|
||||
if isinstance(v, list)
|
||||
else v,
|
||||
}
|
||||
elif field_name == "outputs":
|
||||
data = {
|
||||
"choices": {
|
||||
"role": "ai",
|
||||
"content": v,
|
||||
"usage_metadata": usage_metadata,
|
||||
"file_list": file_list,
|
||||
},
|
||||
}
|
||||
return data
|
||||
else:
|
||||
return {
|
||||
"choices": {
|
||||
"role": "ai" if field_name == "outputs" else "user",
|
||||
"content": str(v),
|
||||
"usage_metadata": usage_metadata,
|
||||
"file_list": file_list,
|
||||
},
|
||||
}
|
||||
if isinstance(v, dict):
|
||||
v["usage_metadata"] = usage_metadata
|
||||
v["file_list"] = file_list
|
||||
return v
|
||||
return v
|
420
api/core/ops/weave_trace/weave_trace.py
Normal file
420
api/core/ops/weave_trace/weave_trace.py
Normal file
@@ -0,0 +1,420 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Optional, cast
|
||||
|
||||
import wandb
|
||||
import weave
|
||||
|
||||
from core.ops.base_trace_instance import BaseTraceInstance
|
||||
from core.ops.entities.config_entity import WeaveConfig
|
||||
from core.ops.entities.trace_entity import (
|
||||
BaseTraceInfo,
|
||||
DatasetRetrievalTraceInfo,
|
||||
GenerateNameTraceInfo,
|
||||
MessageTraceInfo,
|
||||
ModerationTraceInfo,
|
||||
SuggestedQuestionTraceInfo,
|
||||
ToolTraceInfo,
|
||||
TraceTaskName,
|
||||
WorkflowTraceInfo,
|
||||
)
|
||||
from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel
|
||||
from extensions.ext_database import db
|
||||
from models.model import EndUser, MessageFile
|
||||
from models.workflow import WorkflowNodeExecution
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WeaveDataTrace(BaseTraceInstance):
|
||||
def __init__(
|
||||
self,
|
||||
weave_config: WeaveConfig,
|
||||
):
|
||||
super().__init__(weave_config)
|
||||
self.weave_api_key = weave_config.api_key
|
||||
self.project_name = weave_config.project
|
||||
self.entity = weave_config.entity
|
||||
|
||||
# Login with API key first
|
||||
login_status = wandb.login(key=self.weave_api_key, verify=True, relogin=True)
|
||||
if not login_status:
|
||||
logger.error("Failed to login to Weights & Biases with the provided API key")
|
||||
raise ValueError("Weave login failed")
|
||||
|
||||
# Then initialize weave client
|
||||
self.weave_client = weave.init(
|
||||
project_name=(f"{self.entity}/{self.project_name}" if self.entity else self.project_name)
|
||||
)
|
||||
self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
|
||||
self.calls: dict[str, Any] = {}
|
||||
|
||||
def get_project_url(
|
||||
self,
|
||||
):
|
||||
try:
|
||||
project_url = f"https://wandb.ai/{self.weave_client._project_id()}"
|
||||
return project_url
|
||||
except Exception as e:
|
||||
logger.debug(f"Weave get run url failed: {str(e)}")
|
||||
raise ValueError(f"Weave get run url failed: {str(e)}")
|
||||
|
||||
def trace(self, trace_info: BaseTraceInfo):
|
||||
logger.debug(f"Trace info: {trace_info}")
|
||||
if isinstance(trace_info, WorkflowTraceInfo):
|
||||
self.workflow_trace(trace_info)
|
||||
if isinstance(trace_info, MessageTraceInfo):
|
||||
self.message_trace(trace_info)
|
||||
if isinstance(trace_info, ModerationTraceInfo):
|
||||
self.moderation_trace(trace_info)
|
||||
if isinstance(trace_info, SuggestedQuestionTraceInfo):
|
||||
self.suggested_question_trace(trace_info)
|
||||
if isinstance(trace_info, DatasetRetrievalTraceInfo):
|
||||
self.dataset_retrieval_trace(trace_info)
|
||||
if isinstance(trace_info, ToolTraceInfo):
|
||||
self.tool_trace(trace_info)
|
||||
if isinstance(trace_info, GenerateNameTraceInfo):
|
||||
self.generate_name_trace(trace_info)
|
||||
|
||||
def workflow_trace(self, trace_info: WorkflowTraceInfo):
|
||||
trace_id = trace_info.message_id or trace_info.workflow_run_id
|
||||
if trace_info.start_time is None:
|
||||
trace_info.start_time = datetime.now()
|
||||
|
||||
if trace_info.message_id:
|
||||
message_attributes = trace_info.metadata
|
||||
message_attributes["workflow_app_log_id"] = trace_info.workflow_app_log_id
|
||||
|
||||
message_attributes["message_id"] = trace_info.message_id
|
||||
message_attributes["workflow_run_id"] = trace_info.workflow_run_id
|
||||
message_attributes["trace_id"] = trace_id
|
||||
message_attributes["start_time"] = trace_info.start_time
|
||||
message_attributes["end_time"] = trace_info.end_time
|
||||
message_attributes["tags"] = ["message", "workflow"]
|
||||
|
||||
message_run = WeaveTraceModel(
|
||||
id=trace_info.message_id,
|
||||
op=str(TraceTaskName.MESSAGE_TRACE.value),
|
||||
inputs=dict(trace_info.workflow_run_inputs),
|
||||
outputs=dict(trace_info.workflow_run_outputs),
|
||||
total_tokens=trace_info.total_tokens,
|
||||
attributes=message_attributes,
|
||||
exception=trace_info.error,
|
||||
file_list=[],
|
||||
)
|
||||
self.start_call(message_run, parent_run_id=trace_info.workflow_run_id)
|
||||
self.finish_call(message_run)
|
||||
|
||||
workflow_attributes = trace_info.metadata
|
||||
workflow_attributes["workflow_run_id"] = trace_info.workflow_run_id
|
||||
workflow_attributes["trace_id"] = trace_id
|
||||
workflow_attributes["start_time"] = trace_info.start_time
|
||||
workflow_attributes["end_time"] = trace_info.end_time
|
||||
workflow_attributes["tags"] = ["workflow"]
|
||||
|
||||
workflow_run = WeaveTraceModel(
|
||||
file_list=trace_info.file_list,
|
||||
total_tokens=trace_info.total_tokens,
|
||||
id=trace_info.workflow_run_id,
|
||||
op=str(TraceTaskName.WORKFLOW_TRACE.value),
|
||||
inputs=dict(trace_info.workflow_run_inputs),
|
||||
outputs=dict(trace_info.workflow_run_outputs),
|
||||
attributes=workflow_attributes,
|
||||
exception=trace_info.error,
|
||||
)
|
||||
|
||||
self.start_call(workflow_run, parent_run_id=trace_info.message_id)
|
||||
|
||||
# through workflow_run_id get all_nodes_execution
|
||||
workflow_nodes_execution_id_records = (
|
||||
db.session.query(WorkflowNodeExecution.id)
|
||||
.filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)
|
||||
.all()
|
||||
)
|
||||
|
||||
for node_execution_id_record in workflow_nodes_execution_id_records:
|
||||
node_execution = (
|
||||
db.session.query(
|
||||
WorkflowNodeExecution.id,
|
||||
WorkflowNodeExecution.tenant_id,
|
||||
WorkflowNodeExecution.app_id,
|
||||
WorkflowNodeExecution.title,
|
||||
WorkflowNodeExecution.node_type,
|
||||
WorkflowNodeExecution.status,
|
||||
WorkflowNodeExecution.inputs,
|
||||
WorkflowNodeExecution.outputs,
|
||||
WorkflowNodeExecution.created_at,
|
||||
WorkflowNodeExecution.elapsed_time,
|
||||
WorkflowNodeExecution.process_data,
|
||||
WorkflowNodeExecution.execution_metadata,
|
||||
)
|
||||
.filter(WorkflowNodeExecution.id == node_execution_id_record.id)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not node_execution:
|
||||
continue
|
||||
|
||||
node_execution_id = node_execution.id
|
||||
tenant_id = node_execution.tenant_id
|
||||
app_id = node_execution.app_id
|
||||
node_name = node_execution.title
|
||||
node_type = node_execution.node_type
|
||||
status = node_execution.status
|
||||
if node_type == "llm":
|
||||
inputs = (
|
||||
json.loads(node_execution.process_data).get("prompts", {}) if node_execution.process_data else {}
|
||||
)
|
||||
else:
|
||||
inputs = json.loads(node_execution.inputs) if node_execution.inputs else {}
|
||||
outputs = json.loads(node_execution.outputs) if node_execution.outputs else {}
|
||||
created_at = node_execution.created_at or datetime.now()
|
||||
elapsed_time = node_execution.elapsed_time
|
||||
finished_at = created_at + timedelta(seconds=elapsed_time)
|
||||
|
||||
execution_metadata = (
|
||||
json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {}
|
||||
)
|
||||
node_total_tokens = execution_metadata.get("total_tokens", 0)
|
||||
attributes = execution_metadata.copy()
|
||||
attributes.update(
|
||||
{
|
||||
"workflow_run_id": trace_info.workflow_run_id,
|
||||
"node_execution_id": node_execution_id,
|
||||
"tenant_id": tenant_id,
|
||||
"app_id": app_id,
|
||||
"app_name": node_name,
|
||||
"node_type": node_type,
|
||||
"status": status,
|
||||
}
|
||||
)
|
||||
|
||||
process_data = json.loads(node_execution.process_data) if node_execution.process_data else {}
|
||||
if process_data and process_data.get("model_mode") == "chat":
|
||||
attributes.update(
|
||||
{
|
||||
"ls_provider": process_data.get("model_provider", ""),
|
||||
"ls_model_name": process_data.get("model_name", ""),
|
||||
}
|
||||
)
|
||||
attributes["tags"] = ["node_execution"]
|
||||
attributes["start_time"] = created_at
|
||||
attributes["end_time"] = finished_at
|
||||
attributes["elapsed_time"] = elapsed_time
|
||||
attributes["workflow_run_id"] = trace_info.workflow_run_id
|
||||
attributes["trace_id"] = trace_id
|
||||
node_run = WeaveTraceModel(
|
||||
total_tokens=node_total_tokens,
|
||||
op=node_type,
|
||||
inputs=inputs,
|
||||
outputs=outputs,
|
||||
file_list=trace_info.file_list,
|
||||
attributes=attributes,
|
||||
id=node_execution_id,
|
||||
exception=None,
|
||||
)
|
||||
|
||||
self.start_call(node_run, parent_run_id=trace_info.workflow_run_id)
|
||||
self.finish_call(node_run)
|
||||
|
||||
self.finish_call(workflow_run)
|
||||
|
||||
def message_trace(self, trace_info: MessageTraceInfo):
|
||||
# get message file data
|
||||
file_list = cast(list[str], trace_info.file_list) or []
|
||||
message_file_data: Optional[MessageFile] = trace_info.message_file_data
|
||||
file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
|
||||
file_list.append(file_url)
|
||||
attributes = trace_info.metadata
|
||||
message_data = trace_info.message_data
|
||||
if message_data is None:
|
||||
return
|
||||
message_id = message_data.id
|
||||
|
||||
user_id = message_data.from_account_id
|
||||
attributes["user_id"] = user_id
|
||||
|
||||
if message_data.from_end_user_id:
|
||||
end_user_data: Optional[EndUser] = (
|
||||
db.session.query(EndUser).filter(EndUser.id == message_data.from_end_user_id).first()
|
||||
)
|
||||
if end_user_data is not None:
|
||||
end_user_id = end_user_data.session_id
|
||||
attributes["end_user_id"] = end_user_id
|
||||
|
||||
attributes["message_id"] = message_id
|
||||
attributes["start_time"] = trace_info.start_time
|
||||
attributes["end_time"] = trace_info.end_time
|
||||
attributes["tags"] = ["message", str(trace_info.conversation_mode)]
|
||||
message_run = WeaveTraceModel(
|
||||
id=message_id,
|
||||
op=str(TraceTaskName.MESSAGE_TRACE.value),
|
||||
input_tokens=trace_info.message_tokens,
|
||||
output_tokens=trace_info.answer_tokens,
|
||||
total_tokens=trace_info.total_tokens,
|
||||
inputs=trace_info.inputs,
|
||||
outputs=trace_info.outputs,
|
||||
exception=trace_info.error,
|
||||
file_list=file_list,
|
||||
attributes=attributes,
|
||||
)
|
||||
self.start_call(message_run)
|
||||
|
||||
# create llm run parented to message run
|
||||
llm_run = WeaveTraceModel(
|
||||
id=str(uuid.uuid4()),
|
||||
input_tokens=trace_info.message_tokens,
|
||||
output_tokens=trace_info.answer_tokens,
|
||||
total_tokens=trace_info.total_tokens,
|
||||
op="llm",
|
||||
inputs=trace_info.inputs,
|
||||
outputs=trace_info.outputs,
|
||||
attributes=attributes,
|
||||
file_list=[],
|
||||
exception=None,
|
||||
)
|
||||
self.start_call(
|
||||
llm_run,
|
||||
parent_run_id=message_id,
|
||||
)
|
||||
self.finish_call(llm_run)
|
||||
self.finish_call(message_run)
|
||||
|
||||
def moderation_trace(self, trace_info: ModerationTraceInfo):
|
||||
if trace_info.message_data is None:
|
||||
return
|
||||
|
||||
attributes = trace_info.metadata
|
||||
attributes["tags"] = ["moderation"]
|
||||
attributes["message_id"] = trace_info.message_id
|
||||
attributes["start_time"] = trace_info.start_time or trace_info.message_data.created_at
|
||||
attributes["end_time"] = trace_info.end_time or trace_info.message_data.updated_at
|
||||
|
||||
moderation_run = WeaveTraceModel(
|
||||
id=str(uuid.uuid4()),
|
||||
op=str(TraceTaskName.MODERATION_TRACE.value),
|
||||
inputs=trace_info.inputs,
|
||||
outputs={
|
||||
"action": trace_info.action,
|
||||
"flagged": trace_info.flagged,
|
||||
"preset_response": trace_info.preset_response,
|
||||
"inputs": trace_info.inputs,
|
||||
},
|
||||
attributes=attributes,
|
||||
exception=getattr(trace_info, "error", None),
|
||||
file_list=[],
|
||||
)
|
||||
self.start_call(moderation_run, parent_run_id=trace_info.message_id)
|
||||
self.finish_call(moderation_run)
|
||||
|
||||
def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
|
||||
message_data = trace_info.message_data
|
||||
if message_data is None:
|
||||
return
|
||||
attributes = trace_info.metadata
|
||||
attributes["message_id"] = trace_info.message_id
|
||||
attributes["tags"] = ["suggested_question"]
|
||||
attributes["start_time"] = (trace_info.start_time or message_data.created_at,)
|
||||
attributes["end_time"] = (trace_info.end_time or message_data.updated_at,)
|
||||
|
||||
suggested_question_run = WeaveTraceModel(
|
||||
id=str(uuid.uuid4()),
|
||||
op=str(TraceTaskName.SUGGESTED_QUESTION_TRACE.value),
|
||||
inputs=trace_info.inputs,
|
||||
outputs=trace_info.suggested_question,
|
||||
attributes=attributes,
|
||||
exception=trace_info.error,
|
||||
file_list=[],
|
||||
)
|
||||
|
||||
self.start_call(suggested_question_run, parent_run_id=trace_info.message_id)
|
||||
self.finish_call(suggested_question_run)
|
||||
|
||||
def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
|
||||
if trace_info.message_data is None:
|
||||
return
|
||||
attributes = trace_info.metadata
|
||||
attributes["message_id"] = trace_info.message_id
|
||||
attributes["tags"] = ["dataset_retrieval"]
|
||||
attributes["start_time"] = (trace_info.start_time or trace_info.message_data.created_at,)
|
||||
attributes["end_time"] = (trace_info.end_time or trace_info.message_data.updated_at,)
|
||||
|
||||
dataset_retrieval_run = WeaveTraceModel(
|
||||
id=str(uuid.uuid4()),
|
||||
op=str(TraceTaskName.DATASET_RETRIEVAL_TRACE.value),
|
||||
inputs=trace_info.inputs,
|
||||
outputs={"documents": trace_info.documents},
|
||||
attributes=attributes,
|
||||
exception=getattr(trace_info, "error", None),
|
||||
file_list=[],
|
||||
)
|
||||
|
||||
self.start_call(dataset_retrieval_run, parent_run_id=trace_info.message_id)
|
||||
self.finish_call(dataset_retrieval_run)
|
||||
|
||||
def tool_trace(self, trace_info: ToolTraceInfo):
|
||||
attributes = trace_info.metadata
|
||||
attributes["tags"] = ["tool", trace_info.tool_name]
|
||||
attributes["start_time"] = trace_info.start_time
|
||||
attributes["end_time"] = trace_info.end_time
|
||||
|
||||
tool_run = WeaveTraceModel(
|
||||
id=str(uuid.uuid4()),
|
||||
op=trace_info.tool_name,
|
||||
inputs=trace_info.tool_inputs,
|
||||
outputs=trace_info.tool_outputs,
|
||||
file_list=[cast(str, trace_info.file_url)] if trace_info.file_url else [],
|
||||
attributes=attributes,
|
||||
exception=trace_info.error,
|
||||
)
|
||||
message_id = trace_info.message_id or getattr(trace_info, "conversation_id", None)
|
||||
message_id = message_id or None
|
||||
self.start_call(tool_run, parent_run_id=message_id)
|
||||
self.finish_call(tool_run)
|
||||
|
||||
def generate_name_trace(self, trace_info: GenerateNameTraceInfo):
|
||||
attributes = trace_info.metadata
|
||||
attributes["tags"] = ["generate_name"]
|
||||
attributes["start_time"] = trace_info.start_time
|
||||
attributes["end_time"] = trace_info.end_time
|
||||
|
||||
name_run = WeaveTraceModel(
|
||||
id=str(uuid.uuid4()),
|
||||
op=str(TraceTaskName.GENERATE_NAME_TRACE.value),
|
||||
inputs=trace_info.inputs,
|
||||
outputs=trace_info.outputs,
|
||||
attributes=attributes,
|
||||
exception=getattr(trace_info, "error", None),
|
||||
file_list=[],
|
||||
)
|
||||
|
||||
self.start_call(name_run)
|
||||
self.finish_call(name_run)
|
||||
|
||||
def api_check(self):
|
||||
try:
|
||||
login_status = wandb.login(key=self.weave_api_key, verify=True, relogin=True)
|
||||
if not login_status:
|
||||
raise ValueError("Weave login failed")
|
||||
else:
|
||||
print("Weave login successful")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.debug(f"Weave API check failed: {str(e)}")
|
||||
raise ValueError(f"Weave API check failed: {str(e)}")
|
||||
|
||||
def start_call(self, run_data: WeaveTraceModel, parent_run_id: Optional[str] = None):
|
||||
call = self.weave_client.create_call(op=run_data.op, inputs=run_data.inputs, attributes=run_data.attributes)
|
||||
self.calls[run_data.id] = call
|
||||
if parent_run_id:
|
||||
self.calls[run_data.id].parent_id = parent_run_id
|
||||
|
||||
def finish_call(self, run_data: WeaveTraceModel):
|
||||
call = self.calls.get(run_data.id)
|
||||
if call:
|
||||
self.weave_client.finish_call(call=call, output=run_data.outputs, exception=run_data.exception)
|
||||
else:
|
||||
raise ValueError(f"Call with id {run_data.id} not found")
|
Reference in New Issue
Block a user