GraphFrames guia do usuário - Scala

Este artigo demonstra exemplos do guia GraphFrames User.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

Criando GraphFrames

Você pode criar GraphFrames a partir de DataFrames de vértice e borda.

  • Vertex DataFrame: Um vértice DataFrame deve conter uma coluna especial chamada id que especifica IDs exclusivos para cada vértice no gráfico.

  • Edge DataFrame: Um Edge DataFrame deve conter duas colunas especiais: src (ID do vértice de origem da borda) e dst (ID do vértice de destino da borda).

Ambos os DataFrames podem ter outras colunas arbitrárias. Essas colunas podem representar atributos de vértice e borda.

Crie os vértices e arestas

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

Vamos criar um gráfico a partir desses vértices e dessas arestas:

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

Gráfico básico e queryDataFrame

GraphFrames fornecem query gráfica simples, como grau de nó.

Além disso, como os GraphFrames representam gráficos como pares de DataFrames de vértice e borda, é fácil fazer query poderosas diretamente nos DataFrames de vértice e borda. Esses DataFrames estão disponíveis como campos de vértices e arestas no GraphFrame.

display(g.vertices)
display(g.edges)

O grau de entrada dos vértices:

display(g.inDegrees)

O grau de saída dos vértices:

display(g.outDegrees)

O grau dos vértices:

display(g.degrees)

Você pode executar query diretamente nos vértices DataFrame. Por exemplo, podemos encontrar a idade da pessoa mais jovem no gráfico:

val youngest = g.vertices.groupBy().min("age")
display(youngest)

Da mesma forma, você pode executar query nas bordas DataFrame. Por exemplo, vamos contar o número de relacionamentos 'follow' no gráfico:

val numFollows = g.edges.filter("relationship = 'follow'").count()

descoberta de motivos

Construa relações mais complexas envolvendo arestas e vértices usando motivos. A célula a seguir encontra os pares de vértices com arestas em ambas as direções entre eles. O resultado é um DataFrame, no qual os nomes das colunas são motif key.

Confira o guia do usuário do GraphFrames para obter mais detalhes sobre a API.

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

Como o resultado é um DataFrame, você pode criar query mais complexas sobre o motivo. Vamos encontrar todos os relacionamentos recíprocos em que uma pessoa tem mais de 30 anos:

val filtered = motifs.filter("b.age > 30")
display(filtered)

queryde estado

A maioria query de motivos não tem estado e é simples de expressar, como nos exemplos acima. Os próximos exemplos demonstram query mais complexas que carregam o estado ao longo de um caminho no motivo. Expresse essas query combinando a localização do motivo GraphFrames com filtros no resultado, onde os filtros usam operações de sequência para construir uma série de colunas DataFrame.

Por exemplo, suponha que você queira identificar uma cadeia de 4 vértices com alguma propriedade definida por uma sequência de funções. Ou seja, entre cadeias de 4 vértices a->b->c->d, identifique o subconjunto de cadeias correspondentes a este filtro complexo:

  • Inicialize o estado no caminho.

  • Estado de atualização com base no vértice a.

  • Estado de atualização com base no vértice b.

  • Etc. para c e d.

  • Se o estado final corresponder a alguma condição, o filtro aceita a cadeia.

Os trechos de código a seguir demonstram esse processo, onde identificamos cadeias de 4 vértices de forma que pelo menos 2 das 3 arestas sejam relacionamentos “amigos”. Neste exemplo, o estado é a contagem atual de arestas “amigas”; em geral, pode ser qualquer coluna DataFrame.

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

Subgráficos

GraphFrames fornece APIs para construir subgrafos filtrando em arestas e vértices. Esses filtros podem ser compostos juntos. Por exemplo, o subgrafo a seguir contém apenas pessoas que são amigas e têm mais de 30 anos.

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

Filtros tripletos complexos

O exemplo a seguir mostra como selecionar um subgrafo baseado em filtros tripletos que operam em uma aresta e seus vértices “src” e “dst”. Estender este exemplo para ir além dos trigêmeos usando motivos mais complexos é simples.

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

Algoritmos de gráfico padrão

Esta seção descreve os algoritmos de gráfico padrão incorporados ao GraphFrames.

Pesquisa em largura (BFS)

Pesquise em “Esther” para usuários com menos de 32 anos.

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

A pesquisa também pode limitar filtros de borda e comprimentos máximos de caminho.

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

Componentes conectados

compute a associação de componentes conectados de cada vértice e retorne um gráfico com cada vértice atribuído a um ID de componente.

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

Componentes fortemente conectados

compute o componente fortemente conectado (SCC) de cada vértice e retorne um gráfico com cada vértice atribuído ao SCC que contém esse vértice.

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

Propagação de rótulos

execução static label Algoritmo de propagação para detecção de comunidades em redes.

Cada nó da rede é inicialmente atribuído à sua própria comunidade. A cada superpasso, os nós enviam sua afiliação à comunidade para todos os vizinhos e atualizam seu estado para o modo de afiliação à comunidade das mensagens recebidas.

LPA é um algoritmo de detecção de comunidade padrão para gráficos. É barato computacionalmente, embora (1) a convergência não seja garantida e (2) pode-se acabar com soluções triviais (todos os nós se identificam em uma única comunidade).

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

Ranking da página

Identifique vértices importantes em um gráfico com base em conexões.

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

caminhos mais curtos

compute os caminhos mais curtos para o conjunto fornecido de vértices de marco, onde os marcos são especificados por ID de vértice.

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

Contagem de triângulos

compute o número de triângulos que passam por cada vértice.

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()