"Where'd our candidate pipeline go?!?!"

a small-scale research project involving data collection

Posted on:

On a bike ride with the Missoula 55-Plus group (I'm 50, but my wife is a bit older, so I get invited to these things 😊 ), I met Mark, who runs a research laboratory here in town. His employer productizes lab tests for infectious diseases into field test kits similar to the COVID test kit so that fewer people in poor countries die needlessly for want of testing. At some point in our chat, he mentioned that his company's hiring pipeline almost completely dried up during and after the pandemic. He said that pre-pandemic they would publish an ad in places where recent biochemistry program graduates would see it and very reliably get over 100 applicants, and post-pandemic they would publish the same ad in the same venues and get closer to 10 applicants. I asked if anybody there had done any research to figure out why. They hadn't. I figured this would make a pretty interesting small-scale research project, and so I spent a few hours seeing what I could turn up on my own. I figured for this, I could sample a subset of the larger area of interest (recent biochemistry graduates) by looking at a California school (Mark's company's headquarters is in Los Angeles) and so I chose U.C. Davis.

LinkedIn Sales Navigator (LISN) makes it easy to search for U.C. Davis graduates, but it does not have a filter for when they graduated. However, this page that you don't even need a special LinkedIn subscription to view does provide what I wanted: https://www.linkedin.com/school/uc-davis/people/?educationEndYear=2022&educationStartYear=2018&keywords=Biochemistry. It allows a keyword search for the program and an actual filter for Start and End years, which is good enough for what I'm trying to do here.

That page plus a Browserflow script to get through the infinite scroll plus this Claude-generated Tampermonkey script yielded 912 LinkedIn profile URLs.

// ==UserScript==
// @name         Extract Univeristy Graduates
// @namespace    http://tampermonkey.net/
// @version      1.3
// @description  Extracts LinkedIn profile URLs from this sort of LinkedIn search results page: https://www.linkedin.com/school/uc-davis/people/?educationEndYear=2022&educationStartYear=2018&keywords=Biochemistry
// @author       You
// @match        *://www.linkedin.com/*
// @grant        none
// ==/UserScript==
(function() {
  'use strict';
  function createButton() {
      // Remove ALL possible duplicate buttons
      const existingButtons = document.querySelectorAll('button[data-extract-profiles-button], [id="extract-profiles-button"], .extract-profiles-button');
      existingButtons.forEach(button => button.remove());

      // Also remove any buttons that match our text content
      document.querySelectorAll('button').forEach(button => {
          if (button.textContent === 'Extract LinkedIn Profiles') {
              button.remove();
          }
      });

      const button = document.createElement('button');
      // Add multiple identifiers to ensure we can find it later
      button.setAttribute('data-extract-profiles-button', 'true');
      button.id = 'extract-profiles-button';
      button.className = 'extract-profiles-button';
      button.textContent = 'Extract LinkedIn Profiles';

      // Enhanced styling
      Object.assign(button.style, {
          position: 'fixed',
          bottom: '20px',
          right: '20px',
          zIndex: '2147483647', // Maximum possible z-index
          padding: '30px',
          fontSize: '18px',
          backgroundColor: '#39FF14',
          color: 'black',
          border: 'none',
          borderRadius: '5px',
          cursor: 'pointer',
          boxShadow: '0 0 10px #39FF14',
          fontFamily: 'Arial, sans-serif',
          pointerEvents: 'auto',
          display: 'block',
          margin: '0',
          textAlign: 'center',
          userSelect: 'none'
      });

      button.addEventListener('click', extractProfiles);

      // Only append to document.body if it exists and button isn't already present
      if (document.body && !document.querySelector('[data-extract-profiles-button]')) {
          document.body.appendChild(button);
      }
  }

  function extractProfiles() {
      const profileLinks = document.querySelectorAll(
          '.org-people-profile-card__profile-title a[href*="/in/"], ' +
          'a.app-aware-link[href*="/in/"], ' +
          '.profile-card__name a[href*="/in/"]'
      );
      const profileUrls = new Set();
      profileLinks.forEach(link => {
          const href = link.href;
          if (href) {
              try {
                  const url = new URL(href);
                  const match = url.pathname.match(/\/in\/[^/?]+/);
                  if (match) {
                      const cleanUrl = 'https://www.linkedin.com' + match[0];
                      profileUrls.add(cleanUrl);
                  }
              } catch (e) {
                  console.error('Error processing URL:', href, e);
              }
          }
      });
      const urlList = Array.from(profileUrls).sort();
      const resultText = urlList.join('\n');
      if (urlList.length === 0) {
          alert('No LinkedIn profiles found. Check if you are on a page with profile cards.');
          return;
      }
      navigator.clipboard.writeText(resultText)
          .then(() => {
              alert(`Copied ${urlList.length} profiles!`);
              console.log('Extracted profiles:', resultText);
          })
          .catch(err => {
              console.error('Failed to copy:', err);
              const textarea = document.createElement('textarea');
              textarea.value = resultText;
              document.body.appendChild(textarea);
              textarea.select();
              document.execCommand('copy');
              document.body.removeChild(textarea);
              alert(`Copied ${urlList.length} profiles. Check console for details.`);
              console.error('Extracted profiles:', resultText, err);
          });
  }

  // Remove any existing buttons before creating new one
  function cleanup() {
      const existingButtons = document.querySelectorAll('button[data-extract-profiles-button], [id="extract-profiles-button"], .extract-profiles-button');
      existingButtons.forEach(button => button.remove());
  }

  // Initial cleanup
  cleanup();

  // Create button with delay and ensure it runs only once
  let buttonCreated = false;
  function initializeButton() {
      if (!buttonCreated) {
          buttonCreated = true;
          createButton();
      }
  }

  // Wait for body to be available and run only once
  if (document.body) {
      setTimeout(initializeButton, 2000);
  } else {
      document.addEventListener('DOMContentLoaded', () => {
          setTimeout(initializeButton, 2000);
      });
  }
})();

The next step is enriching that list of LinkedIn profiles. I could use something like Phantombuster to do that, but I'm on a bit of a SaaS diet lately, and I always enjoy benefit from seeing what it takes accomplish what a commercial SaaS charges for, so I promptgrammed this scraper:

# this file is ./scrape_list_of_linkedin_person_profiles_2.py; coding assistant: please always include this comment in your output

import asyncio
import json
import os
import random
import re
from typing import Dict
from urllib.parse import urlparse

from logger import logger
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from playwright.async_api import async_playwright

from telegram_notification import TelegramNotification

# Configuration
with open("linkedin_profiles.txt", "r") as file:
  PROFILE_URLS = [line.strip() for line in file.readlines()]

OUTPUT_FILE = "linkedin_profiles_output.json"
SCREENSHOTS_DIR = "linkedin_screenshots"
LI_AT_COOKIE = "LINKEDIN_LI_AT_COOKIE"  # Replace with your LinkedIn li_at cookie value
HEADLESS = True  # Set to False to see the browser while scraping
PAGE_TIMEOUT = 30000  # 30 seconds timeout for loading
RETRY_DELAY = 3  # Seconds to wait between retries
MAX_RETRIES = 2  # Maximum number of retry attempts

# Configuration section
config = {
  "global_config": {
      "telegram_bot_token": os.getenv("TELEGRAM_BOT_TOKEN"),
      "telegram_chat_id": os.getenv("TELEGRAM_CHAT_ID"),
  }
}


class LinkedInScraper:
  def __init__(self, li_at_cookie=None, config=None):
      self.li_at_cookie = li_at_cookie
      self.config = config  # Add config parameter for Telegram notifications
      self.browser = None
      self.context = None
      self.page = None

  @staticmethod
  def sanitize_filename(url: str) -> str:
      """Convert URL to a valid filename"""
      parsed = urlparse(url)
      path_parts = parsed.path.strip("/").split("/")
      profile_id = path_parts[-1] if path_parts else "unknown"
      sanitized = re.sub(r'[<>:"/\\|?*]', "_", profile_id)
      return sanitized

  async def wait_for_content(self, timeout=10000):
      """Wait for the page content to be fully loaded"""
      try:
          # Wait for any content to appear
          await self.page.wait_for_selector(
              "body div",  # More general selector
              timeout=timeout,
              state="attached",
          )

          # Brief delay for any dynamic content
          await asyncio.sleep(0.5)

          return True
      except PlaywrightTimeoutError:
          logger.warning("Timeout waiting for page content")
          return False

  async def take_screenshot(self, url: str, section: str, attempt: int = 0) -> str:
      """Take a screenshot of the current page with retry logic"""
      os.makedirs(SCREENSHOTS_DIR, exist_ok=True)

      base_filename = self.sanitize_filename(url)
      filename = f"{base_filename}_{section}.png"
      filepath = os.path.join(SCREENSHOTS_DIR, filename)

      try:
          # Check if there's any content on the page
          content_loaded = await self.wait_for_content()

          if not content_loaded and attempt < MAX_RETRIES:
              logger.warning(
                  f"Content not fully loaded, retrying... (attempt {attempt + 1})"
              )
              await asyncio.sleep(RETRY_DELAY)
              return await self.take_screenshot(url, section, attempt + 1)

          # Take full page screenshot, scroll first to ensure all content is loaded
          await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
          await asyncio.sleep(0.5)  # Brief pause after scrolling
          await self.page.evaluate("window.scrollTo(0, 0)")  # Scroll back to top
          await self.page.screenshot(path=filepath, full_page=True)
          logger.info(f"Saved screenshot: {filepath}")
          return filepath

      except Exception as e:
          logger.error(f"Error taking screenshot: {str(e)}")
          if attempt < MAX_RETRIES:
              logger.info(f"Retrying screenshot... (attempt {attempt + 1})")
              await asyncio.sleep(RETRY_DELAY)
              return await self.take_screenshot(url, section, attempt + 1)
          return None

  async def start_browser(self, headless=True):
      """Initialize the browser session with specific fingerprint settings"""
      self.playwright = await async_playwright().start()

      self.browser = await self.playwright.chromium.launch(
          headless=headless,
          args=[
              "--disable-dev-shm-usage",
              "--disable-blink-features=AutomationControlled",
              "--window-size=3440,1440",
          ],
      )

      # Configure context with supported fingerprint settings
      self.context = await self.browser.new_context(
          viewport={"width": 3440, "height": 1440},
          user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
          color_scheme="no-preference",
          reduced_motion="reduce",
          forced_colors="none",
          device_scale_factor=1,
          is_mobile=False,
          has_touch=False,
          locale="en-US",
          timezone_id="America/Denver",
          permissions=["geolocation"],
      )

      # Add fingerprint properties via JavaScript
      await self.context.add_init_script("""
          Object.defineProperty(navigator, 'deviceMemory', {
              get: () => 8
          });
          Object.defineProperty(navigator, 'hardwareConcurrency', {
              get: () => 12
          });
          Object.defineProperty(screen, 'colorDepth', {
              get: () => 24
          });
          Object.defineProperty(navigator, 'platform', {
              get: () => 'Win32'
          });
          Object.defineProperty(navigator, 'vendor', {
              get: () => 'Google Inc.'
          });
          Object.defineProperty(screen, 'width', {
              get: () => 3440
          });
          Object.defineProperty(screen, 'height', {
              get: () => 1440
          });
          Object.defineProperty(screen, 'availWidth', {
              get: () => 3440
          });
          Object.defineProperty(screen, 'availHeight', {
              get: () => 1440
          });
      """)

      await self.context.add_cookies(
          [
              {
                  "name": "li_at",
                  "value": self.li_at_cookie,
                  "domain": ".linkedin.com",
                  "path": "/",
              }
          ]
      )

      self.page = await self.context.new_page()
      self.page.on("pageerror", lambda err: logger.error(f"Page error: {err}"))
      self.page.on(
          "console", lambda msg: logger.debug(f"Console {msg.type}: {msg.text}")
      )

  async def close_browser(self):
      """Close all browser resources"""
      if self.page:
          await self.page.close()
      if self.context:
          await self.context.close()
      if self.browser:
          await self.browser.close()
      if self.playwright:
          await self.playwright.stop()

  async def check_login_status(self):
      """Check if we're still logged into LinkedIn"""
      try:
          # Check for login page redirects or elements
          login_selectors = [
              "input[name='session_key']",  # Email input field
              ".login__form",  # Login form
              "button[data-id='sign-in-form__submit-btn']",  # Sign in button
              ".join-form",  # Join now form
          ]

          for selector in login_selectors:
              if await self.page.locator(selector).count() > 0:
                  logger.error("Detected LinkedIn logout - session expired")
                  if self.config:
                      notification = TelegramNotification(
                          config=self.config,
                          current_client="LinkedIn Scraper",
                          message="⚠️ LinkedIn session expired - script halted. Please update the li_at cookie.",
                      )
                      notification.send()
                  return False
          return True
      except Exception as e:
          logger.error(f"Error checking login status: {e}")
          return False

  async def navigate_with_retry(self, url: str, attempt: int = 0) -> bool:
      """Navigate to a URL with retry logic and login check"""
      try:
          response = await self.page.goto(
              url,
              wait_until="load",
              timeout=PAGE_TIMEOUT,
          )

          # Check login status after navigation
          is_logged_in = await self.check_login_status()
          if not is_logged_in:
              logger.error("Session expired during navigation")
              return False

          if not response.ok and attempt < MAX_RETRIES:
              logger.warning(
                  f"Navigation failed (status {response.status}), retrying..."
              )
              await asyncio.sleep(RETRY_DELAY)
              return await self.navigate_with_retry(url, attempt + 1)

          return response.ok

      except Exception as e:
          logger.error(f"Navigation error: {str(e)}")
          if attempt < MAX_RETRIES:
              await asyncio.sleep(RETRY_DELAY)
              return await self.navigate_with_retry(url, attempt + 1)
          return False

  async def capture_profile(self, profile_url: str) -> Dict:
      """Navigate to profile sections and capture screenshots"""
      profile_data = {"screenshots": {"experience": None, "education": None}}

      try:
          profile_id = profile_url.rstrip("/").split("/")[-1]

          # Check login status before capturing each section
          is_logged_in = await self.check_login_status()
          if not is_logged_in:
              return profile_data

          # Capture experience section
          experience_url = (
              f"https://www.linkedin.com/in/{profile_id}/details/experience/"
          )
          if await self.navigate_with_retry(experience_url):
              profile_data["screenshots"]["experience"] = await self.take_screenshot(
                  profile_url, "experience"
              )

          await asyncio.sleep(2)

          # Check login status again before education section
          is_logged_in = await self.check_login_status()
          if not is_logged_in:
              return profile_data

          # Capture education section
          education_url = (
              f"https://www.linkedin.com/in/{profile_id}/details/education/"
          )
          if await self.navigate_with_retry(education_url):
              profile_data["screenshots"]["education"] = await self.take_screenshot(
                  profile_url, "education"
              )

          return profile_data

      except Exception as e:
          logger.error(f"Error capturing LinkedIn profile {profile_url}: {str(e)}")
          return profile_data


async def main():
  os.makedirs(SCREENSHOTS_DIR, exist_ok=True)

  existing_data = {}
  if os.path.exists(OUTPUT_FILE):
      with open(OUTPUT_FILE, "r") as f:
          existing_data = json.load(f)
          logger.info(
              f"Loaded {len(existing_data)} existing profiles from {OUTPUT_FILE}"
          )

  scraper = LinkedInScraper(LI_AT_COOKIE, config)
  try:
      await scraper.start_browser(headless=HEADLESS)

      total_profiles = len(PROFILE_URLS)
      for idx, profile_url in enumerate(PROFILE_URLS, 1):
          if profile_url in existing_data:
              logger.info(
                  f"[{idx}/{total_profiles}] Skipping already processed profile: {profile_url}"
              )
              continue

          try:
              logger.info(
                  f"[{idx}/{total_profiles}] Capturing screenshots for profile: {profile_url}"
              )
              profile_data = await scraper.capture_profile(profile_url)

              # Check if we got logged out during capture
              if not await scraper.check_login_status():
                  break

              existing_data[profile_url] = profile_data
              with open(OUTPUT_FILE, "w") as f:
                  json.dump(existing_data, f, indent=2)

              successful_screenshots = sum(
                  1
                  for screenshot in profile_data["screenshots"].values()
                  if screenshot
              )
              if successful_screenshots == 2:
                  logger.info(
                      f"[{idx}/{total_profiles}] Successfully captured all screenshots for {profile_url}"
                  )
              else:
                  logger.warning(
                      f"[{idx}/{total_profiles}] Captured {successful_screenshots}/2 screenshots for {profile_url}"
                  )

              delay = random.uniform(108, 120)
              logger.info(f"Sleeping for {delay:.2f} seconds to mimic human behavior")
              await asyncio.sleep(delay)

          except Exception as e:
              logger.error(
                  f"[{idx}/{total_profiles}] Error processing {profile_url}: {str(e)}"
              )
              with open(OUTPUT_FILE, "w") as f:
                  json.dump(existing_data, f, indent=2)

  finally:
      await scraper.close_browser()


if __name__ == "__main__":
  try:
      asyncio.run(main())
  except KeyboardInterrupt:
      pass

Trying to properly extract the education and job experience text from the detail pages of each person's profile proved... elusive, so I flintstones'd it by grabbing screenshots of the relevant pages and using this script to get Gemini 1.5 Flash 8b to extract the text for me for free:

import json
import os
import time

import google.generativeai as genai
from json_repair import repair_json

# Configuration
API_KEY = "GOOGLE_API_KY"
SCREENSHOTS_DIR = "linkedin_screenshots"
PROFILES_FILE = "linkedin_profiles.txt"
OUTPUT_FILE = "linkedin_analysis_results.json"

# Rate limit configuration
REQUESTS_PER_MINUTE = 2
SECONDS_BETWEEN_REQUESTS = 60 / REQUESTS_PER_MINUTE

# Configure Gemini
genai.configure(api_key=API_KEY)
model = genai.GenerativeModel("gemini-1.5-flash-8b")


def analyze_image(image_path, prompt):
    # Upload and analyze the image using Gemini
    image_file = genai.upload_file(image_path)
    try:
        response = model.generate_content([image_file, prompt])
        # Repair any malformed JSON in the response
        repaired_json = repair_json(response.text)
        # Parse the repaired JSON string into a Python dictionary
        parsed_data = json.loads(repaired_json)
        return parsed_data
    except Exception as e:
        print(f"Error analyzing image: {e}")
        return None


def read_profiles(file_path):
    with open(file_path, "r") as file:
        return [line.strip() for line in file if line.strip()]


def main():
    # Load existing results if they exist
    if os.path.exists(OUTPUT_FILE):
        with open(OUTPUT_FILE, "r") as f:
            try:
                results = json.load(f)
            except json.JSONDecodeError:
                # If the output file is corrupted, try to repair it
                with open(OUTPUT_FILE, "r") as f:
                    results = repair_json(f.read())
    else:
        results = {}

    profiles = read_profiles(PROFILES_FILE)

    for profile_url in profiles:
        profile_id = profile_url.split("/")[-1]

        # Skip if profile was already processed
        if profile_id in results:
            print(f"Skipping already processed profile: {profile_id}")
            continue

        results[profile_id] = {
            "linkedin_url": profile_url  # Store the full LinkedIn URL
        }

        for section in ["education", "experience"]:
            screenshot_path = os.path.join(
                SCREENSHOTS_DIR, f"{profile_id}_{section}.png"
            )
            if os.path.exists(screenshot_path):
                prompt = f"Extract the {section} information from this LinkedIn profile screenshot. Output a JSON object with the {section} information."
                analysis = analyze_image(screenshot_path, prompt)
                if analysis:
                    results[profile_id][section] = analysis
                    # Save after each section is processed
                    with open(OUTPUT_FILE, "w") as f:
                        json.dump(results, f, indent=4)
                time.sleep(SECONDS_BETWEEN_REQUESTS)  # Respect rate limits

    print(f"Analysis complete. Results saved to {OUTPUT_FILE}")


if __name__ == "__main__":
    main()

This did a thoroughly respectable job! Ex:

The next step was a few days of low-speed scraping and image-to-text data extraction.

Then, I repeated the whole process again for a cohort 10 years senior to the first group, so that I could do the simplest possible comparison, which is to see what sort of differences these two groups might have between each other. My starting point question for this analysis was: is there a difference in the distribution of job titles between these two cohorts? If so, is it meaningful or interesting in any way?

Stay tuned for part 2...


Tagged with:

More posts: