in doc-architect/doc-architect-core/src/main/java/com/docarchitect/core/scanner/impl/python/SqlAlchemyScanner.java [250:363]
private List<EntityResult> extractEntitiesFromAST(Path file, List<PythonAst.PythonClass> classes) {
List<EntityResult> results = new ArrayList<>();
// Read file content for __tablename__ extraction (AST parser skips dunder fields)
String fileContent = null;
try {
fileContent = readFileContent(file);
} catch (IOException e) {
log.warn("Failed to read file content for tablename extraction: {} - {}", file, e.getMessage());
}
for (PythonAst.PythonClass pythonClass : classes) {
// Determine if this class represents a database table
if (!isDatabaseTable(pythonClass, fileContent)) {
continue;
}
// Extract table name from __tablename__ field in file content
String tableName = extractTableName(pythonClass, fileContent);
// Extract fields and relationships
List<DataEntity.Field> fields = new ArrayList<>();
List<Relationship> relationships = new ArrayList<>();
String primaryKey = null;
// Only process fields that are actually defined in THIS class, not inherited
List<PythonAst.Field> classFields = getFieldsDefinedInClass(pythonClass, fileContent);
log.debug("Class {} has {} fields defined", pythonClass.name(), classFields.size());
for (PythonAst.Field field : classFields) {
try {
log.debug("Processing field: {}.{}", pythonClass.name(), field.name());
// Skip dunder fields and private fields
if (field.name().equals(TABLENAME_FIELD_NAME) || field.name().startsWith("_")) {
continue;
}
// Check if this is a relationship field (Relationship() or relationship())
boolean isRelationship = field.value() != null &&
(field.value().contains(RELATIONSHIP_FUNCTION_NAME + "(") ||
field.value().contains("Relationship("));
if (isRelationship) {
// Extract relationship
Relationship rel = extractRelationship(pythonClass.name(), field);
if (rel != null) {
relationships.add(rel);
log.debug("Found SQLAlchemy relationship: {} -> {}", pythonClass.name(), rel.targetId());
}
} else {
// Regular column field
String sqlType = extractColumnType(field);
boolean nullable = isNullable(field.value());
boolean isPrimaryKey = isPrimaryKey(field.value());
// Check for foreign key in Field(foreign_key="...")
String foreignKeyRef = extractForeignKey(field.value());
log.debug("Field {}.{}: value='{}', foreignKeyRef='{}'",
pythonClass.name(), field.name(), field.value(), foreignKeyRef);
if (foreignKeyRef != null) {
// Create relationship for foreign key
String targetEntity = extractTargetEntityFromForeignKey(foreignKeyRef);
log.debug("Extracted target entity from FK '{}': '{}'", foreignKeyRef, targetEntity);
if (targetEntity != null) {
Relationship fkRel = new Relationship(
pythonClass.name(),
targetEntity,
RelationshipType.DEPENDS_ON,
"Foreign key reference",
SQLALCHEMY_TECHNOLOGY
);
relationships.add(fkRel);
log.debug("Found foreign key: {} -> {}", pythonClass.name(), targetEntity);
}
}
DataEntity.Field dataField = new DataEntity.Field(
field.name(),
sqlType,
nullable,
null
);
fields.add(dataField);
if (isPrimaryKey && primaryKey == null) {
primaryKey = field.name();
}
log.debug("Found SQLAlchemy field: {}.{} ({})", pythonClass.name(), field.name(), sqlType);
}
} catch (Exception e) {
log.warn("Error processing field {}.{}: {}", pythonClass.name(), field.name(), e.getMessage());
}
}
// Create DataEntity
if (!fields.isEmpty()) {
DataEntity entity = new DataEntity(
pythonClass.name(),
tableName,
TABLE_TYPE,
fields,
primaryKey,
SQLALCHEMY_MODEL_PREFIX + pythonClass.name()
);
results.add(new EntityResult(entity, relationships));
log.debug("Found SQLAlchemy entity: {} -> table: {}", pythonClass.name(), tableName);
}
}
return results;
}